historical/moneyrocket.git/test/functional/feature_segwit.py

683 lines
40 KiB
Python
Raw Normal View History

2024-01-16 17:20:27 +00:00
#!/usr/bin/env python3
# Copyright (c) 2016-2022 The Moneyrocket Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test the SegWit changeover logic."""
from decimal import Decimal
from test_framework.address import (
key_to_p2pkh,
program_to_witness,
script_to_p2sh,
script_to_p2sh_p2wsh,
script_to_p2wsh,
)
from test_framework.blocktools import (
send_to_witness,
witness_script,
)
from test_framework.descriptors import descsum_create
from test_framework.messages import (
COIN,
COutPoint,
CTransaction,
CTxIn,
CTxOut,
tx_from_hex,
)
from test_framework.script import (
CScript,
OP_0,
OP_1,
OP_DROP,
OP_TRUE,
)
from test_framework.script_util import (
key_to_p2pk_script,
key_to_p2pkh_script,
key_to_p2wpkh_script,
keys_to_multisig_script,
script_to_p2sh_script,
script_to_p2wsh_script,
)
from test_framework.test_framework import MoneyrocketTestFramework
from test_framework.util import (
assert_equal,
assert_greater_than_or_equal,
assert_is_hex_string,
assert_raises_rpc_error,
try_rpc,
)
from test_framework.wallet_util import (
get_generate_key,
)
NODE_0 = 0
NODE_2 = 2
P2WPKH = 0
P2WSH = 1
def getutxo(txid):
utxo = {}
utxo["vout"] = 0
utxo["txid"] = txid
return utxo
def find_spendable_utxo(node, min_value):
for utxo in node.listunspent(query_options={'minimumAmount': min_value}):
if utxo['spendable']:
return utxo
raise AssertionError(f"Unspent output equal or higher than {min_value} not found")
txs_mined = {} # txindex from txid to blockhash
class SegWitTest(MoneyrocketTestFramework):
def add_options(self, parser):
self.add_wallet_options(parser)
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 3
# This test tests SegWit both pre and post-activation, so use the normal BIP9 activation.
self.extra_args = [
[
"-acceptnonstdtxn=1",
"-rpcserialversion=0",
"-testactivationheight=segwit@165",
"-addresstype=legacy",
],
[
"-acceptnonstdtxn=1",
"-rpcserialversion=1",
"-testactivationheight=segwit@165",
"-addresstype=legacy",
],
[
"-acceptnonstdtxn=1",
"-testactivationheight=segwit@165",
"-addresstype=legacy",
],
]
self.rpc_timeout = 120
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def setup_network(self):
super().setup_network()
self.connect_nodes(0, 2)
self.sync_all()
def success_mine(self, node, txid, sign, redeem_script=""):
send_to_witness(1, node, getutxo(txid), self.pubkey[0], False, Decimal("49.998"), sign, redeem_script)
block = self.generate(node, 1)
assert_equal(len(node.getblock(block[0])["tx"]), 2)
self.sync_blocks()
def fail_accept(self, node, error_msg, txid, sign, redeem_script=""):
assert_raises_rpc_error(-26, error_msg, send_to_witness, use_p2wsh=1, node=node, utxo=getutxo(txid), pubkey=self.pubkey[0], encode_p2sh=False, amount=Decimal("49.998"), sign=sign, insert_redeem_script=redeem_script)
def run_test(self):
self.generate(self.nodes[0], 161) # block 161
self.log.info("Verify sigops are counted in GBT with pre-BIP141 rules before the fork")
txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1)
tmpl = self.nodes[0].getblocktemplate({'rules': ['segwit']})
assert_equal(tmpl['sizelimit'], 1000000)
assert 'weightlimit' not in tmpl
assert_equal(tmpl['sigoplimit'], 20000)
assert_equal(tmpl['transactions'][0]['hash'], txid)
assert_equal(tmpl['transactions'][0]['sigops'], 2)
assert '!segwit' not in tmpl['rules']
self.generate(self.nodes[0], 1) # block 162
balance_presetup = self.nodes[0].getbalance()
self.pubkey = []
p2sh_ids = [] # p2sh_ids[NODE][TYPE] is an array of txids that spend to P2WPKH (TYPE=0) or P2WSH (TYPE=1) scripts to an address for NODE embedded in p2sh
wit_ids = [] # wit_ids[NODE][TYPE] is an array of txids that spend to P2WPKH (TYPE=0) or P2WSH (TYPE=1) scripts to an address for NODE via bare witness
for i in range(3):
key = get_generate_key()
self.pubkey.append(key.pubkey)
multiscript = keys_to_multisig_script([self.pubkey[-1]])
p2sh_ms_addr = self.nodes[i].createmultisig(1, [self.pubkey[-1]], 'p2sh-segwit')['address']
bip173_ms_addr = self.nodes[i].createmultisig(1, [self.pubkey[-1]], 'bech32')['address']
assert_equal(p2sh_ms_addr, script_to_p2sh_p2wsh(multiscript))
assert_equal(bip173_ms_addr, script_to_p2wsh(multiscript))
p2sh_ms_desc = descsum_create(f"sh(wsh(multi(1,{key.privkey})))")
bip173_ms_desc = descsum_create(f"wsh(multi(1,{key.privkey}))")
assert_equal(self.nodes[i].deriveaddresses(p2sh_ms_desc)[0], p2sh_ms_addr)
assert_equal(self.nodes[i].deriveaddresses(bip173_ms_desc)[0], bip173_ms_addr)
sh_wpkh_desc = descsum_create(f"sh(wpkh({key.privkey}))")
wpkh_desc = descsum_create(f"wpkh({key.privkey})")
assert_equal(self.nodes[i].deriveaddresses(sh_wpkh_desc)[0], key.p2sh_p2wpkh_addr)
assert_equal(self.nodes[i].deriveaddresses(wpkh_desc)[0], key.p2wpkh_addr)
if self.options.descriptors:
res = self.nodes[i].importdescriptors([
{"desc": p2sh_ms_desc, "timestamp": "now"},
{"desc": bip173_ms_desc, "timestamp": "now"},
{"desc": sh_wpkh_desc, "timestamp": "now"},
{"desc": wpkh_desc, "timestamp": "now"},
])
else:
# The nature of the legacy wallet is that this import results in also adding all of the necessary scripts
res = self.nodes[i].importmulti([
{"desc": p2sh_ms_desc, "timestamp": "now"},
])
assert all([r["success"] for r in res])
p2sh_ids.append([])
wit_ids.append([])
for _ in range(2):
p2sh_ids[i].append([])
wit_ids[i].append([])
for _ in range(5):
for n in range(3):
for v in range(2):
wit_ids[n][v].append(send_to_witness(v, self.nodes[0], find_spendable_utxo(self.nodes[0], 50), self.pubkey[n], False, Decimal("49.999")))
p2sh_ids[n][v].append(send_to_witness(v, self.nodes[0], find_spendable_utxo(self.nodes[0], 50), self.pubkey[n], True, Decimal("49.999")))
self.generate(self.nodes[0], 1) # block 163
# Make sure all nodes recognize the transactions as theirs
assert_equal(self.nodes[0].getbalance(), balance_presetup - 60 * 50 + 20 * Decimal("49.999") + 50)
assert_equal(self.nodes[1].getbalance(), 20 * Decimal("49.999"))
assert_equal(self.nodes[2].getbalance(), 20 * Decimal("49.999"))
self.log.info("Verify unsigned p2sh witness txs without a redeem script are invalid")
self.fail_accept(self.nodes[2], "mandatory-script-verify-flag-failed (Operation not valid with the current stack size)", p2sh_ids[NODE_2][P2WPKH][1], sign=False)
self.fail_accept(self.nodes[2], "mandatory-script-verify-flag-failed (Operation not valid with the current stack size)", p2sh_ids[NODE_2][P2WSH][1], sign=False)
self.generate(self.nodes[0], 1) # block 164
self.log.info("Verify witness txs are mined as soon as segwit activates")
send_to_witness(1, self.nodes[2], getutxo(wit_ids[NODE_2][P2WPKH][0]), self.pubkey[0], encode_p2sh=False, amount=Decimal("49.998"), sign=True)
send_to_witness(1, self.nodes[2], getutxo(wit_ids[NODE_2][P2WSH][0]), self.pubkey[0], encode_p2sh=False, amount=Decimal("49.998"), sign=True)
send_to_witness(1, self.nodes[2], getutxo(p2sh_ids[NODE_2][P2WPKH][0]), self.pubkey[0], encode_p2sh=False, amount=Decimal("49.998"), sign=True)
send_to_witness(1, self.nodes[2], getutxo(p2sh_ids[NODE_2][P2WSH][0]), self.pubkey[0], encode_p2sh=False, amount=Decimal("49.998"), sign=True)
assert_equal(len(self.nodes[2].getrawmempool()), 4)
blockhash = self.generate(self.nodes[2], 1)[0] # block 165 (first block with new rules)
assert_equal(len(self.nodes[2].getrawmempool()), 0)
segwit_tx_list = self.nodes[2].getblock(blockhash)["tx"]
assert_equal(len(segwit_tx_list), 5)
self.log.info("Verify default node can't accept txs with missing witness")
# unsigned, no scriptsig
self.fail_accept(self.nodes[0], "non-mandatory-script-verify-flag (Witness program hash mismatch)", wit_ids[NODE_0][P2WPKH][0], sign=False)
self.fail_accept(self.nodes[0], "non-mandatory-script-verify-flag (Witness program was passed an empty witness)", wit_ids[NODE_0][P2WSH][0], sign=False)
self.fail_accept(self.nodes[0], "mandatory-script-verify-flag-failed (Operation not valid with the current stack size)", p2sh_ids[NODE_0][P2WPKH][0], sign=False)
self.fail_accept(self.nodes[0], "mandatory-script-verify-flag-failed (Operation not valid with the current stack size)", p2sh_ids[NODE_0][P2WSH][0], sign=False)
# unsigned with redeem script
self.fail_accept(self.nodes[0], "non-mandatory-script-verify-flag (Witness program hash mismatch)", p2sh_ids[NODE_0][P2WPKH][0], sign=False, redeem_script=witness_script(False, self.pubkey[0]))
self.fail_accept(self.nodes[0], "non-mandatory-script-verify-flag (Witness program was passed an empty witness)", p2sh_ids[NODE_0][P2WSH][0], sign=False, redeem_script=witness_script(True, self.pubkey[0]))
self.log.info("Verify block and transaction serialization rpcs return differing serializations depending on rpc serialization flag")
assert self.nodes[2].getblock(blockhash, False) != self.nodes[0].getblock(blockhash, False)
assert self.nodes[1].getblock(blockhash, False) == self.nodes[2].getblock(blockhash, False)
for tx_id in segwit_tx_list:
tx = tx_from_hex(self.nodes[2].gettransaction(tx_id)["hex"])
assert self.nodes[2].getrawtransaction(tx_id, False, blockhash) != self.nodes[0].getrawtransaction(tx_id, False, blockhash)
assert self.nodes[1].getrawtransaction(tx_id, False, blockhash) == self.nodes[2].getrawtransaction(tx_id, False, blockhash)
assert self.nodes[0].getrawtransaction(tx_id, False, blockhash) != self.nodes[2].gettransaction(tx_id)["hex"]
assert self.nodes[1].getrawtransaction(tx_id, False, blockhash) == self.nodes[2].gettransaction(tx_id)["hex"]
assert self.nodes[0].getrawtransaction(tx_id, False, blockhash) == tx.serialize_without_witness().hex()
# Coinbase contains the witness commitment nonce, check that RPC shows us
coinbase_txid = self.nodes[2].getblock(blockhash)['tx'][0]
coinbase_tx = self.nodes[2].gettransaction(txid=coinbase_txid, verbose=True)
witnesses = coinbase_tx["decoded"]["vin"][0]["txinwitness"]
assert_equal(len(witnesses), 1)
assert_is_hex_string(witnesses[0])
assert_equal(witnesses[0], '00' * 32)
self.log.info("Verify witness txs without witness data are invalid after the fork")
self.fail_accept(self.nodes[2], 'non-mandatory-script-verify-flag (Witness program hash mismatch)', wit_ids[NODE_2][P2WPKH][2], sign=False)
self.fail_accept(self.nodes[2], 'non-mandatory-script-verify-flag (Witness program was passed an empty witness)', wit_ids[NODE_2][P2WSH][2], sign=False)
self.fail_accept(self.nodes[2], 'non-mandatory-script-verify-flag (Witness program hash mismatch)', p2sh_ids[NODE_2][P2WPKH][2], sign=False, redeem_script=witness_script(False, self.pubkey[2]))
self.fail_accept(self.nodes[2], 'non-mandatory-script-verify-flag (Witness program was passed an empty witness)', p2sh_ids[NODE_2][P2WSH][2], sign=False, redeem_script=witness_script(True, self.pubkey[2]))
self.log.info("Verify default node can now use witness txs")
self.success_mine(self.nodes[0], wit_ids[NODE_0][P2WPKH][0], True)
self.success_mine(self.nodes[0], wit_ids[NODE_0][P2WSH][0], True)
self.success_mine(self.nodes[0], p2sh_ids[NODE_0][P2WPKH][0], True)
self.success_mine(self.nodes[0], p2sh_ids[NODE_0][P2WSH][0], True)
self.log.info("Verify sigops are counted in GBT with BIP141 rules after the fork")
txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1)
raw_tx = self.nodes[0].getrawtransaction(txid, True)
tmpl = self.nodes[0].getblocktemplate({'rules': ['segwit']})
assert_greater_than_or_equal(tmpl['sizelimit'], 3999577) # actual maximum size is lower due to minimum mandatory non-witness data
assert_equal(tmpl['weightlimit'], 4000000)
assert_equal(tmpl['sigoplimit'], 80000)
assert_equal(tmpl['transactions'][0]['txid'], txid)
expected_sigops = 9 if 'txinwitness' in raw_tx["vin"][0] else 8
assert_equal(tmpl['transactions'][0]['sigops'], expected_sigops)
assert '!segwit' in tmpl['rules']
self.generate(self.nodes[0], 1) # Mine a block to clear the gbt cache
self.log.info("Non-segwit miners are able to use GBT response after activation.")
# Create a 3-tx chain: tx1 (non-segwit input, paying to a segwit output) ->
# tx2 (segwit input, paying to a non-segwit output) ->
# tx3 (non-segwit input, paying to a non-segwit output).
# tx1 is allowed to appear in the block, but no others.
txid1 = send_to_witness(1, self.nodes[0], find_spendable_utxo(self.nodes[0], 50), self.pubkey[0], False, Decimal("49.996"))
hex_tx = self.nodes[0].gettransaction(txid)['hex']
tx = tx_from_hex(hex_tx)
assert tx.wit.is_null() # This should not be a segwit input
assert txid1 in self.nodes[0].getrawmempool()
tx1_hex = self.nodes[0].gettransaction(txid1)['hex']
tx1 = tx_from_hex(tx1_hex)
# Check that wtxid is properly reported in mempool entry (txid1)
assert_equal(int(self.nodes[0].getmempoolentry(txid1)["wtxid"], 16), tx1.calc_sha256(True))
# Check that weight and vsize are properly reported in mempool entry (txid1)
assert_equal(self.nodes[0].getmempoolentry(txid1)["vsize"], tx1.get_vsize())
assert_equal(self.nodes[0].getmempoolentry(txid1)["weight"], tx1.get_weight())
# Now create tx2, which will spend from txid1.
tx = CTransaction()
tx.vin.append(CTxIn(COutPoint(int(txid1, 16), 0), b''))
tx.vout.append(CTxOut(int(49.99 * COIN), CScript([OP_TRUE, OP_DROP] * 15 + [OP_TRUE])))
tx2_hex = self.nodes[0].signrawtransactionwithwallet(tx.serialize().hex())['hex']
txid2 = self.nodes[0].sendrawtransaction(tx2_hex)
tx = tx_from_hex(tx2_hex)
assert not tx.wit.is_null()
# Check that wtxid is properly reported in mempool entry (txid2)
assert_equal(int(self.nodes[0].getmempoolentry(txid2)["wtxid"], 16), tx.calc_sha256(True))
# Check that weight and vsize are properly reported in mempool entry (txid2)
assert_equal(self.nodes[0].getmempoolentry(txid2)["vsize"], tx.get_vsize())
assert_equal(self.nodes[0].getmempoolentry(txid2)["weight"], tx.get_weight())
# Now create tx3, which will spend from txid2
tx = CTransaction()
tx.vin.append(CTxIn(COutPoint(int(txid2, 16), 0), b""))
tx.vout.append(CTxOut(int(49.95 * COIN), CScript([OP_TRUE, OP_DROP] * 15 + [OP_TRUE]))) # Huge fee
tx.calc_sha256()
txid3 = self.nodes[0].sendrawtransaction(hexstring=tx.serialize().hex(), maxfeerate=0)
assert tx.wit.is_null()
assert txid3 in self.nodes[0].getrawmempool()
# Check that getblocktemplate includes all transactions.
template = self.nodes[0].getblocktemplate({"rules": ["segwit"]})
template_txids = [t['txid'] for t in template['transactions']]
assert txid1 in template_txids
assert txid2 in template_txids
assert txid3 in template_txids
# Check that wtxid is properly reported in mempool entry (txid3)
assert_equal(int(self.nodes[0].getmempoolentry(txid3)["wtxid"], 16), tx.calc_sha256(True))
# Check that weight and vsize are properly reported in mempool entry (txid3)
assert_equal(self.nodes[0].getmempoolentry(txid3)["vsize"], tx.get_vsize())
assert_equal(self.nodes[0].getmempoolentry(txid3)["weight"], tx.get_weight())
# Mine a block to clear the gbt cache again.
self.generate(self.nodes[0], 1)
if not self.options.descriptors:
self.log.info("Verify behaviour of importaddress and listunspent")
# Some public keys to be used later
pubkeys = [
"0363D44AABD0F1699138239DF2F042C3282C0671CC7A76826A55C8203D90E39242", # cPiM8Ub4heR9NBYmgVzJQiUH1if44GSBGiqaeJySuL2BKxubvgwb
"02D3E626B3E616FC8662B489C123349FECBFC611E778E5BE739B257EAE4721E5BF", # cPpAdHaD6VoYbW78kveN2bsvb45Q7G5PhaPApVUGwvF8VQ9brD97
"04A47F2CBCEFFA7B9BCDA184E7D5668D3DA6F9079AD41E422FA5FD7B2D458F2538A62F5BD8EC85C2477F39650BD391EA6250207065B2A81DA8B009FC891E898F0E", # 91zqCU5B9sdWxzMt1ca3VzbtVm2YM6Hi5Rxn4UDtxEaN9C9nzXV
"02A47F2CBCEFFA7B9BCDA184E7D5668D3DA6F9079AD41E422FA5FD7B2D458F2538", # cPQFjcVRpAUBG8BA9hzr2yEzHwKoMgLkJZBBtK9vJnvGJgMjzTbd
"036722F784214129FEB9E8129D626324F3F6716555B603FFE8300BBCB882151228", # cQGtcm34xiLjB1v7bkRa4V3aAc9tS2UTuBZ1UnZGeSeNy627fN66
"0266A8396EE936BF6D99D17920DB21C6C7B1AB14C639D5CD72B300297E416FD2EC", # cTW5mR5M45vHxXkeChZdtSPozrFwFgmEvTNnanCW6wrqwaCZ1X7K
"0450A38BD7F0AC212FEBA77354A9B036A32E0F7C81FC4E0C5ADCA7C549C4505D2522458C2D9AE3CEFD684E039194B72C8A10F9CB9D4764AB26FCC2718D421D3B84", # 92h2XPssjBpsJN5CqSP7v9a7cf2kgDunBC6PDFwJHMACM1rrVBJ
]
# Import a compressed key and an uncompressed key, generate some multisig addresses
self.nodes[0].importprivkey("92e6XLo5jVAVwrQKPNTs93oQco8f8sDNBcpv73Dsrs397fQtFQn")
uncompressed_spendable_address = ["mvozP4UwyGD2mGZU4D2eMvMLPB9WkMmMQu"]
self.nodes[0].importprivkey("cNC8eQ5dg3mFAVePDX4ddmPYpPbw41r9bm2jd1nLJT77e6RrzTRR")
compressed_spendable_address = ["mmWQubrDomqpgSYekvsU7HWEVjLFHAakLe"]
assert not self.nodes[0].getaddressinfo(uncompressed_spendable_address[0])['iscompressed']
assert self.nodes[0].getaddressinfo(compressed_spendable_address[0])['iscompressed']
self.nodes[0].importpubkey(pubkeys[0])
compressed_solvable_address = [key_to_p2pkh(pubkeys[0])]
self.nodes[0].importpubkey(pubkeys[1])
compressed_solvable_address.append(key_to_p2pkh(pubkeys[1]))
self.nodes[0].importpubkey(pubkeys[2])
uncompressed_solvable_address = [key_to_p2pkh(pubkeys[2])]
spendable_anytime = [] # These outputs should be seen anytime after importprivkey and addmultisigaddress
spendable_after_importaddress = [] # These outputs should be seen after importaddress
solvable_after_importaddress = [] # These outputs should be seen after importaddress but not spendable
unsolvable_after_importaddress = [] # These outputs should be unsolvable after importaddress
solvable_anytime = [] # These outputs should be solvable after importpubkey
unseen_anytime = [] # These outputs should never be seen
uncompressed_spendable_address.append(self.nodes[0].addmultisigaddress(2, [uncompressed_spendable_address[0], compressed_spendable_address[0]])['address'])
uncompressed_spendable_address.append(self.nodes[0].addmultisigaddress(2, [uncompressed_spendable_address[0], uncompressed_spendable_address[0]])['address'])
compressed_spendable_address.append(self.nodes[0].addmultisigaddress(2, [compressed_spendable_address[0], compressed_spendable_address[0]])['address'])
uncompressed_solvable_address.append(self.nodes[0].addmultisigaddress(2, [compressed_spendable_address[0], uncompressed_solvable_address[0]])['address'])
compressed_solvable_address.append(self.nodes[0].addmultisigaddress(2, [compressed_spendable_address[0], compressed_solvable_address[0]])['address'])
compressed_solvable_address.append(self.nodes[0].addmultisigaddress(2, [compressed_solvable_address[0], compressed_solvable_address[1]])['address'])
# Test multisig_without_privkey
# We have 2 public keys without private keys, use addmultisigaddress to add to wallet.
# Money sent to P2SH of multisig of this should only be seen after importaddress with the BASE58 P2SH address.
multisig_without_privkey_address = self.nodes[0].addmultisigaddress(2, [pubkeys[3], pubkeys[4]])['address']
script = keys_to_multisig_script([pubkeys[3], pubkeys[4]])
solvable_after_importaddress.append(script_to_p2sh_script(script))
for i in compressed_spendable_address:
v = self.nodes[0].getaddressinfo(i)
if v['isscript']:
[bare, p2sh, p2wsh, p2sh_p2wsh] = self.p2sh_address_to_script(v)
# p2sh multisig with compressed keys should always be spendable
spendable_anytime.extend([p2sh])
# bare multisig can be watched and signed, but is not treated as ours
solvable_after_importaddress.extend([bare])
# P2WSH and P2SH(P2WSH) multisig with compressed keys are spendable after direct importaddress
spendable_after_importaddress.extend([p2wsh, p2sh_p2wsh])
else:
[p2wpkh, p2sh_p2wpkh, p2pk, p2pkh, p2sh_p2pk, p2sh_p2pkh, p2wsh_p2pk, p2wsh_p2pkh, p2sh_p2wsh_p2pk, p2sh_p2wsh_p2pkh] = self.p2pkh_address_to_script(v)
# normal P2PKH and P2PK with compressed keys should always be spendable
spendable_anytime.extend([p2pkh, p2pk])
# P2SH_P2PK, P2SH_P2PKH with compressed keys are spendable after direct importaddress
spendable_after_importaddress.extend([p2sh_p2pk, p2sh_p2pkh, p2wsh_p2pk, p2wsh_p2pkh, p2sh_p2wsh_p2pk, p2sh_p2wsh_p2pkh])
# P2WPKH and P2SH_P2WPKH with compressed keys should always be spendable
spendable_anytime.extend([p2wpkh, p2sh_p2wpkh])
for i in uncompressed_spendable_address:
v = self.nodes[0].getaddressinfo(i)
if v['isscript']:
[bare, p2sh, p2wsh, p2sh_p2wsh] = self.p2sh_address_to_script(v)
# p2sh multisig with uncompressed keys should always be spendable
spendable_anytime.extend([p2sh])
# bare multisig can be watched and signed, but is not treated as ours
solvable_after_importaddress.extend([bare])
# P2WSH and P2SH(P2WSH) multisig with uncompressed keys are never seen
unseen_anytime.extend([p2wsh, p2sh_p2wsh])
else:
[p2wpkh, p2sh_p2wpkh, p2pk, p2pkh, p2sh_p2pk, p2sh_p2pkh, p2wsh_p2pk, p2wsh_p2pkh, p2sh_p2wsh_p2pk, p2sh_p2wsh_p2pkh] = self.p2pkh_address_to_script(v)
# normal P2PKH and P2PK with uncompressed keys should always be spendable
spendable_anytime.extend([p2pkh, p2pk])
# P2SH_P2PK and P2SH_P2PKH are spendable after direct importaddress
spendable_after_importaddress.extend([p2sh_p2pk, p2sh_p2pkh])
# Witness output types with uncompressed keys are never seen
unseen_anytime.extend([p2wpkh, p2sh_p2wpkh, p2wsh_p2pk, p2wsh_p2pkh, p2sh_p2wsh_p2pk, p2sh_p2wsh_p2pkh])
for i in compressed_solvable_address:
v = self.nodes[0].getaddressinfo(i)
if v['isscript']:
# Multisig without private is not seen after addmultisigaddress, but seen after importaddress
[bare, p2sh, p2wsh, p2sh_p2wsh] = self.p2sh_address_to_script(v)
solvable_after_importaddress.extend([bare, p2sh, p2wsh, p2sh_p2wsh])
else:
[p2wpkh, p2sh_p2wpkh, p2pk, p2pkh, p2sh_p2pk, p2sh_p2pkh, p2wsh_p2pk, p2wsh_p2pkh, p2sh_p2wsh_p2pk, p2sh_p2wsh_p2pkh] = self.p2pkh_address_to_script(v)
# normal P2PKH, P2PK, P2WPKH and P2SH_P2WPKH with compressed keys should always be seen
solvable_anytime.extend([p2pkh, p2pk, p2wpkh, p2sh_p2wpkh])
# P2SH_P2PK, P2SH_P2PKH with compressed keys are seen after direct importaddress
solvable_after_importaddress.extend([p2sh_p2pk, p2sh_p2pkh, p2wsh_p2pk, p2wsh_p2pkh, p2sh_p2wsh_p2pk, p2sh_p2wsh_p2pkh])
for i in uncompressed_solvable_address:
v = self.nodes[0].getaddressinfo(i)
if v['isscript']:
[bare, p2sh, p2wsh, p2sh_p2wsh] = self.p2sh_address_to_script(v)
# Base uncompressed multisig without private is not seen after addmultisigaddress, but seen after importaddress
solvable_after_importaddress.extend([bare, p2sh])
# P2WSH and P2SH(P2WSH) multisig with uncompressed keys are never seen
unseen_anytime.extend([p2wsh, p2sh_p2wsh])
else:
[p2wpkh, p2sh_p2wpkh, p2pk, p2pkh, p2sh_p2pk, p2sh_p2pkh, p2wsh_p2pk, p2wsh_p2pkh, p2sh_p2wsh_p2pk, p2sh_p2wsh_p2pkh] = self.p2pkh_address_to_script(v)
# normal P2PKH and P2PK with uncompressed keys should always be seen
solvable_anytime.extend([p2pkh, p2pk])
# P2SH_P2PK, P2SH_P2PKH with uncompressed keys are seen after direct importaddress
solvable_after_importaddress.extend([p2sh_p2pk, p2sh_p2pkh])
# Witness output types with uncompressed keys are never seen
unseen_anytime.extend([p2wpkh, p2sh_p2wpkh, p2wsh_p2pk, p2wsh_p2pkh, p2sh_p2wsh_p2pk, p2sh_p2wsh_p2pkh])
op1 = CScript([OP_1])
op0 = CScript([OP_0])
# 2N7MGY19ti4KDMSzRfPAssP6Pxyuxoi6jLe is the P2SH(P2PKH) version of mjoE3sSrb8ByYEvgnC3Aox86u1CHnfJA4V
unsolvable_address_key = bytes.fromhex("02341AEC7587A51CDE5279E0630A531AEA2615A9F80B17E8D9376327BAEAA59E3D")
unsolvablep2pkh = key_to_p2pkh_script(unsolvable_address_key)
unsolvablep2wshp2pkh = script_to_p2wsh_script(unsolvablep2pkh)
p2shop0 = script_to_p2sh_script(op0)
p2wshop1 = script_to_p2wsh_script(op1)
unsolvable_after_importaddress.append(unsolvablep2pkh)
unsolvable_after_importaddress.append(unsolvablep2wshp2pkh)
unsolvable_after_importaddress.append(op1) # OP_1 will be imported as script
unsolvable_after_importaddress.append(p2wshop1)
unseen_anytime.append(op0) # OP_0 will be imported as P2SH address with no script provided
unsolvable_after_importaddress.append(p2shop0)
spendable_txid = []
solvable_txid = []
spendable_txid.append(self.mine_and_test_listunspent(spendable_anytime, 2))
solvable_txid.append(self.mine_and_test_listunspent(solvable_anytime, 1))
self.mine_and_test_listunspent(spendable_after_importaddress + solvable_after_importaddress + unseen_anytime + unsolvable_after_importaddress, 0)
importlist = []
for i in compressed_spendable_address + uncompressed_spendable_address + compressed_solvable_address + uncompressed_solvable_address:
v = self.nodes[0].getaddressinfo(i)
if v['isscript']:
bare = bytes.fromhex(v['hex'])
importlist.append(bare.hex())
importlist.append(script_to_p2wsh_script(bare).hex())
else:
pubkey = bytes.fromhex(v['pubkey'])
p2pk = key_to_p2pk_script(pubkey)
p2pkh = key_to_p2pkh_script(pubkey)
importlist.append(p2pk.hex())
importlist.append(p2pkh.hex())
importlist.append(key_to_p2wpkh_script(pubkey).hex())
importlist.append(script_to_p2wsh_script(p2pk).hex())
importlist.append(script_to_p2wsh_script(p2pkh).hex())
importlist.append(unsolvablep2pkh.hex())
importlist.append(unsolvablep2wshp2pkh.hex())
importlist.append(op1.hex())
importlist.append(p2wshop1.hex())
for i in importlist:
# import all generated addresses. The wallet already has the private keys for some of these, so catch JSON RPC
# exceptions and continue.
try_rpc(-4, "The wallet already contains the private key for this address or script", self.nodes[0].importaddress, i, "", False, True)
self.nodes[0].importaddress(script_to_p2sh(op0)) # import OP_0 as address only
self.nodes[0].importaddress(multisig_without_privkey_address) # Test multisig_without_privkey
spendable_txid.append(self.mine_and_test_listunspent(spendable_anytime + spendable_after_importaddress, 2))
solvable_txid.append(self.mine_and_test_listunspent(solvable_anytime + solvable_after_importaddress, 1))
self.mine_and_test_listunspent(unsolvable_after_importaddress, 1)
self.mine_and_test_listunspent(unseen_anytime, 0)
spendable_txid.append(self.mine_and_test_listunspent(spendable_anytime + spendable_after_importaddress, 2))
solvable_txid.append(self.mine_and_test_listunspent(solvable_anytime + solvable_after_importaddress, 1))
self.mine_and_test_listunspent(unsolvable_after_importaddress, 1)
self.mine_and_test_listunspent(unseen_anytime, 0)
# Repeat some tests. This time we don't add witness scripts with importaddress
# Import a compressed key and an uncompressed key, generate some multisig addresses
self.nodes[0].importprivkey("927pw6RW8ZekycnXqBQ2JS5nPyo1yRfGNN8oq74HeddWSpafDJH")
uncompressed_spendable_address = ["mguN2vNSCEUh6rJaXoAVwY3YZwZvEmf5xi"]
self.nodes[0].importprivkey("cMcrXaaUC48ZKpcyydfFo8PxHAjpsYLhdsp6nmtB3E2ER9UUHWnw")
compressed_spendable_address = ["n1UNmpmbVUJ9ytXYXiurmGPQ3TRrXqPWKL"]
self.nodes[0].importpubkey(pubkeys[5])
compressed_solvable_address = [key_to_p2pkh(pubkeys[5])]
self.nodes[0].importpubkey(pubkeys[6])
uncompressed_solvable_address = [key_to_p2pkh(pubkeys[6])]
unseen_anytime = [] # These outputs should never be seen
solvable_anytime = [] # These outputs should be solvable after importpubkey
unseen_anytime = [] # These outputs should never be seen
uncompressed_spendable_address.append(self.nodes[0].addmultisigaddress(2, [uncompressed_spendable_address[0], compressed_spendable_address[0]])['address'])
uncompressed_spendable_address.append(self.nodes[0].addmultisigaddress(2, [uncompressed_spendable_address[0], uncompressed_spendable_address[0]])['address'])
compressed_spendable_address.append(self.nodes[0].addmultisigaddress(2, [compressed_spendable_address[0], compressed_spendable_address[0]])['address'])
uncompressed_solvable_address.append(self.nodes[0].addmultisigaddress(2, [compressed_solvable_address[0], uncompressed_solvable_address[0]])['address'])
compressed_solvable_address.append(self.nodes[0].addmultisigaddress(2, [compressed_spendable_address[0], compressed_solvable_address[0]])['address'])
premature_witaddress = []
for i in compressed_spendable_address:
v = self.nodes[0].getaddressinfo(i)
if v['isscript']:
[bare, p2sh, p2wsh, p2sh_p2wsh] = self.p2sh_address_to_script(v)
premature_witaddress.append(script_to_p2sh(p2wsh))
else:
[p2wpkh, p2sh_p2wpkh, p2pk, p2pkh, p2sh_p2pk, p2sh_p2pkh, p2wsh_p2pk, p2wsh_p2pkh, p2sh_p2wsh_p2pk, p2sh_p2wsh_p2pkh] = self.p2pkh_address_to_script(v)
# P2WPKH, P2SH_P2WPKH are always spendable
spendable_anytime.extend([p2wpkh, p2sh_p2wpkh])
for i in uncompressed_spendable_address + uncompressed_solvable_address:
v = self.nodes[0].getaddressinfo(i)
if v['isscript']:
[bare, p2sh, p2wsh, p2sh_p2wsh] = self.p2sh_address_to_script(v)
# P2WSH and P2SH(P2WSH) multisig with uncompressed keys are never seen
unseen_anytime.extend([p2wsh, p2sh_p2wsh])
else:
[p2wpkh, p2sh_p2wpkh, p2pk, p2pkh, p2sh_p2pk, p2sh_p2pkh, p2wsh_p2pk, p2wsh_p2pkh, p2sh_p2wsh_p2pk, p2sh_p2wsh_p2pkh] = self.p2pkh_address_to_script(v)
# P2WPKH, P2SH_P2WPKH with uncompressed keys are never seen
unseen_anytime.extend([p2wpkh, p2sh_p2wpkh])
for i in compressed_solvable_address:
v = self.nodes[0].getaddressinfo(i)
if v['isscript']:
[bare, p2sh, p2wsh, p2sh_p2wsh] = self.p2sh_address_to_script(v)
premature_witaddress.append(script_to_p2sh(p2wsh))
else:
[p2wpkh, p2sh_p2wpkh, p2pk, p2pkh, p2sh_p2pk, p2sh_p2pkh, p2wsh_p2pk, p2wsh_p2pkh, p2sh_p2wsh_p2pk, p2sh_p2wsh_p2pkh] = self.p2pkh_address_to_script(v)
# P2SH_P2PK, P2SH_P2PKH with compressed keys are always solvable
solvable_anytime.extend([p2wpkh, p2sh_p2wpkh])
self.mine_and_test_listunspent(spendable_anytime, 2)
self.mine_and_test_listunspent(solvable_anytime, 1)
self.mine_and_test_listunspent(unseen_anytime, 0)
# Check that createrawtransaction/decoderawtransaction with non-v0 Bech32 works
v1_addr = program_to_witness(1, [3, 5])
v1_tx = self.nodes[0].createrawtransaction([getutxo(spendable_txid[0])], {v1_addr: 1})
v1_decoded = self.nodes[1].decoderawtransaction(v1_tx)
assert_equal(v1_decoded['vout'][0]['scriptPubKey']['address'], v1_addr)
assert_equal(v1_decoded['vout'][0]['scriptPubKey']['hex'], "51020305")
# Check that spendable outputs are really spendable
self.create_and_mine_tx_from_txids(spendable_txid)
# import all the private keys so solvable addresses become spendable
self.nodes[0].importprivkey("cPiM8Ub4heR9NBYmgVzJQiUH1if44GSBGiqaeJySuL2BKxubvgwb")
self.nodes[0].importprivkey("cPpAdHaD6VoYbW78kveN2bsvb45Q7G5PhaPApVUGwvF8VQ9brD97")
self.nodes[0].importprivkey("91zqCU5B9sdWxzMt1ca3VzbtVm2YM6Hi5Rxn4UDtxEaN9C9nzXV")
self.nodes[0].importprivkey("cPQFjcVRpAUBG8BA9hzr2yEzHwKoMgLkJZBBtK9vJnvGJgMjzTbd")
self.nodes[0].importprivkey("cQGtcm34xiLjB1v7bkRa4V3aAc9tS2UTuBZ1UnZGeSeNy627fN66")
self.nodes[0].importprivkey("cTW5mR5M45vHxXkeChZdtSPozrFwFgmEvTNnanCW6wrqwaCZ1X7K")
self.create_and_mine_tx_from_txids(solvable_txid)
# Test that importing native P2WPKH/P2WSH scripts works
for use_p2wsh in [False, True]:
if use_p2wsh:
scriptPubKey = "00203a59f3f56b713fdcf5d1a57357f02c44342cbf306ffe0c4741046837bf90561a"
transaction = "01000000000100e1f505000000002200203a59f3f56b713fdcf5d1a57357f02c44342cbf306ffe0c4741046837bf90561a00000000"
else:
scriptPubKey = "a9142f8c469c2f0084c48e11f998ffbe7efa7549f26d87"
transaction = "01000000000100e1f5050000000017a9142f8c469c2f0084c48e11f998ffbe7efa7549f26d8700000000"
self.nodes[1].importaddress(scriptPubKey, "", False)
rawtxfund = self.nodes[1].fundrawtransaction(transaction)['hex']
rawtxfund = self.nodes[1].signrawtransactionwithwallet(rawtxfund)["hex"]
txid = self.nodes[1].sendrawtransaction(rawtxfund)
assert_equal(self.nodes[1].gettransaction(txid, True)["txid"], txid)
assert_equal(self.nodes[1].listtransactions("*", 1, 0, True)[0]["txid"], txid)
# Assert it is properly saved
self.restart_node(1)
assert_equal(self.nodes[1].gettransaction(txid, True)["txid"], txid)
assert_equal(self.nodes[1].listtransactions("*", 1, 0, True)[0]["txid"], txid)
self.log.info('Test negative and unknown rpcserialversion throw an init error')
self.stop_node(0)
self.nodes[0].assert_start_raises_init_error(["-rpcserialversion=-1"], "Error: rpcserialversion must be non-negative.")
self.nodes[0].assert_start_raises_init_error(["-rpcserialversion=100"], "Error: Unknown rpcserialversion requested.")
def mine_and_test_listunspent(self, script_list, ismine):
utxo = find_spendable_utxo(self.nodes[0], 50)
tx = CTransaction()
tx.vin.append(CTxIn(COutPoint(int('0x' + utxo['txid'], 0), utxo['vout'])))
for i in script_list:
tx.vout.append(CTxOut(10000000, i))
tx.rehash()
signresults = self.nodes[0].signrawtransactionwithwallet(tx.serialize_without_witness().hex())['hex']
txid = self.nodes[0].sendrawtransaction(hexstring=signresults, maxfeerate=0)
txs_mined[txid] = self.generate(self.nodes[0], 1)[0]
watchcount = 0
spendcount = 0
for i in self.nodes[0].listunspent():
if i['txid'] == txid:
watchcount += 1
if i['spendable']:
spendcount += 1
if ismine == 2:
assert_equal(spendcount, len(script_list))
elif ismine == 1:
assert_equal(watchcount, len(script_list))
assert_equal(spendcount, 0)
else:
assert_equal(watchcount, 0)
return txid
def p2sh_address_to_script(self, v):
bare = CScript(bytes.fromhex(v['hex']))
p2sh = CScript(bytes.fromhex(v['scriptPubKey']))
p2wsh = script_to_p2wsh_script(bare)
p2sh_p2wsh = script_to_p2sh_script(p2wsh)
return [bare, p2sh, p2wsh, p2sh_p2wsh]
def p2pkh_address_to_script(self, v):
pubkey = bytes.fromhex(v['pubkey'])
p2wpkh = key_to_p2wpkh_script(pubkey)
p2sh_p2wpkh = script_to_p2sh_script(p2wpkh)
p2pk = key_to_p2pk_script(pubkey)
p2pkh = CScript(bytes.fromhex(v['scriptPubKey']))
p2sh_p2pk = script_to_p2sh_script(p2pk)
p2sh_p2pkh = script_to_p2sh_script(p2pkh)
p2wsh_p2pk = script_to_p2wsh_script(p2pk)
p2wsh_p2pkh = script_to_p2wsh_script(p2pkh)
p2sh_p2wsh_p2pk = script_to_p2sh_script(p2wsh_p2pk)
p2sh_p2wsh_p2pkh = script_to_p2sh_script(p2wsh_p2pkh)
return [p2wpkh, p2sh_p2wpkh, p2pk, p2pkh, p2sh_p2pk, p2sh_p2pkh, p2wsh_p2pk, p2wsh_p2pkh, p2sh_p2wsh_p2pk, p2sh_p2wsh_p2pkh]
def create_and_mine_tx_from_txids(self, txids, success=True):
tx = CTransaction()
for i in txids:
txraw = self.nodes[0].getrawtransaction(i, 0, txs_mined[i])
txtmp = tx_from_hex(txraw)
for j in range(len(txtmp.vout)):
tx.vin.append(CTxIn(COutPoint(int('0x' + i, 0), j)))
tx.vout.append(CTxOut(0, CScript()))
tx.rehash()
signresults = self.nodes[0].signrawtransactionwithwallet(tx.serialize_without_witness().hex())['hex']
self.nodes[0].sendrawtransaction(hexstring=signresults, maxfeerate=0)
self.generate(self.nodes[0], 1)
if __name__ == '__main__':
SegWitTest().main()