#!/usr/bin/env python3 # blocktools.py - utilities for manipulating blocks and transactions # Copyright (c) 2015-2016 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. from decimal import Decimal import time from .. import cashaddr from .nodemessages import ( COIN, CBlock, TxOut, CTransaction, CTxIn, COutPoint, CTxOut, decode_base58, ) from ..script import ( CScript, OP_CHECKSIG, OP_DROP, OP_DUP, OP_HASH160, OP_EQUALVERIFY, OP_RETURN, OP_NOP, ) from ..util import ( rpc_hex_to_uint256, uint256_to_rpc_hex, assert_equal, unhexlify, hexlify, ) from ..serialize import uint256_from_bigendian COINBASE_REWARD = Decimal("10000000.00") def get_anc_hash(height, node): return rpc_hex_to_uint256(node.getblockheader(ancestor_height(height))["hash"]) def ancestor_height(height): assert height != 0 # ancestor height of GB is undefined (hash is all 0s) if height & 1 == 0: return height & (height - 1) return max(0, height - 5040) # Create a block (with regtest difficulty) # pylint: disable=too-many-arguments def create_block( hashprev, height, chainwork, coinbase, hash_ancestor, n_time=None, txns=None, ctor=True, ) -> CBlock: block = CBlock() if n_time is None: block.n_time = int(time.time() + 600) else: if not isinstance(n_time, int): raise ValueError(f"n_time should be int, got {type(n_time)}") block.n_time = n_time if isinstance(hashprev, str): hashprev = uint256_from_bigendian(hashprev) block.chain_work = chainwork block.height = height block.hash_prev_block = hashprev block.hash_ancestor = hash_ancestor # Will break after a difficulty adjustment... which never happens in # regtest block.n_bits = 0x207FFFFF if coinbase: block.vtx.append(coinbase) if txns: if ctor: txns.sort(key=lambda x: uint256_to_rpc_hex(x.get_id())) block.vtx += txns block.tx_count = len(block.vtx) block.nonce = b"" block.utxo_commitment = b"" block.miner_data = b"" block.nonce = bytearray(3) block.update_fields() return block # Create large OP_RETURN txouts that can be appended to a transaction # to make it large (helper for constructing large transactions). def gen_return_txouts(): # Some pre-processing to create a bunch of OP_RETURN txouts to insert into transactions we create # So we have big transactions (and therefore can't fit very many into each block) # create one script_pubkey script_pubkey = "6a4d0200" # OP_RETURN OP_PUSH2 512 bytes for _ in range(1024): script_pubkey = script_pubkey + "01" constraint = bytes.fromhex(script_pubkey) # CScript(script_pubkey) # concatenate 63 txouts of above script_pubkey which we'll insert before # the txout for change txouts = [] for _ in range(63): tmp = TxOut(0, 0, constraint) txouts.append(tmp) return txouts def make_conform_to_ctor(block): for tx in block.vtx: tx.rehash() block.vtx = [block.vtx[0]] + sorted( block.vtx[1:], key=lambda tx: uint256_to_rpc_hex(tx.GetId()) ) # Create a coinbase transaction, assuming no miner fees. # If pubkey is passed in, the coinbase output will be a P2PK output; # otherwise an anyone-can-spend output. def create_coinbase(height, pubkey=None, script_pub_key=None) -> CTransaction: assert not ( pubkey and script_pub_key ), "cannot both have pubkey and custom script_pub_key" coinbase = CTransaction() coinbaseoutput = CTxOut() coinbaseoutput.n_value = int(COINBASE_REWARD) * COIN halvings = int(height / 150) # regtest coinbaseoutput.n_value >>= halvings if pubkey is not None: coinbaseoutput.script_pub_key = CScript([pubkey, OP_CHECKSIG]) else: if script_pub_key is None: script_pub_key = CScript([OP_NOP]) coinbaseoutput.script_pub_key = CScript(script_pub_key) uniquifier = TxOut(0, 0, CScript([OP_RETURN, height])) coinbase.vout = [coinbaseoutput, uniquifier] # Make sure the coinbase is at least 64 bytes coinbase_size = len(coinbase.serialize()) if coinbase_size < 65: coinbase.vout[1].script_pub_key += b"x" * (65 - coinbase_size) coinbase.calc_id() return coinbase # Create a transaction with an anyone-can-spend output, that spends the # nth output of prevtx. pass a single integer value to make one output, # or a list to create multiple outputs PADDED_ANY_SPEND = ( b"\x61" * 50 ) # add a bunch of OP_NOPs to make sure this tx is long enough def create_transaction( prevtx: CTransaction, n, sig, value, out=PADDED_ANY_SPEND ) -> CTransaction: prevtx.calc_idem() if not isinstance(value, list): value = [value] tx = CTransaction() assert n < len(prevtx.vout) outpt = COutPoint().from_idem_and_idx(prevtx.calc_idem(), n) tx.vin.append(CTxIn(outpt, prevtx.vout[n].n_value, sig, 0xFFFFFFFF)) for v in value: tx.vout.append(CTxOut(v, out)) tx.rehash() return tx def address2bin(btc_address): """convert a bitcoin address to binary data capable of being put in a CScript""" try: addr = cashaddr.decode(btc_address) return addr[2] except BaseException: pass # Try bitcoin address: chop the version and checksum out of the bytes of # the address return decode_base58(btc_address)[1:-4] B58_DIGITS = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz" def create_wasteful_output(btc_address): """Warning: Creates outputs that can't be spent by bitcoind""" data = b"""this is junk data. this is junk data. this is junk data. this is junk data. this is junk data. this is junk data. this is junk data. this is junk data. this is junk data. this is junk data. this is junk data. this is junk data. this is junk data. this is junk data. this is junk data.""" ret = CScript( [ data, OP_DROP, OP_DUP, OP_HASH160, address2bin(btc_address), OP_EQUALVERIFY, OP_CHECKSIG, ] ) return ret def p2pkh(btc_address): """create a pay-to-public-key-hash script""" ret = CScript( [OP_DUP, OP_HASH160, address2bin(btc_address), OP_EQUALVERIFY, OP_CHECKSIG] ) return ret def spend_coinbase_tx(node, coinbase, to_address, amount, in_amount=None): if in_amount is None: in_amount = COINBASE_REWARD inputs = [ { "outpoint": COutPoint().from_idem_and_idx(coinbase, 0).rpc_hex(), "amount": in_amount, } ] outputs = {to_address: amount} rawtx = node.createrawtransaction(inputs, outputs) signresult = node.signrawtransaction(rawtx) assert_equal(signresult["complete"], True) return signresult["hex"] def createrawtransaction(inputs, outputs, out_script_generator=p2pkh): """ Create a transaction with the exact input and output syntax as the bitcoin-cli "createrawtransaction" command. If you use the default out_script_generator, this function will return a hex string that exactly matches the output of bitcoin-cli createrawtransaction. But this function is extended beyond bitcoin-cli in the following ways: inputs can have a "sig" field which is a binary hex string of the signature script outputs can be a list of tuples rather than a dictionary. In that format, they can pass complex objects to the outputScriptGenerator (use a tuple or an object), be a list (that is passed to CScript()), or a callable """ if not isinstance(inputs, list): inputs = [inputs] tx = CTransaction() for i in inputs: sig_script = i.get("sig", b"") tx.vin.append( CTxIn( COutPoint(i["outpoint"]), int(i["amount"] * COIN), sig_script, 0xFFFFFFFF, ) ) pairs = [] if isinstance(outputs, dict): for addr, amount in outputs.items(): pairs.append((addr, amount)) else: pairs = outputs for addr, amount in pairs: if callable(addr): tx.vout.append(CTxOut(int(amount * COIN), addr())) elif isinstance(addr, list): tx.vout.append(CTxOut(int(amount * COIN), CScript(addr))) elif addr == "data": tx.vout.append(CTxOut(0, CScript([OP_RETURN, unhexlify(amount)]))) else: tx.vout.append(CTxOut(int(amount * COIN), out_script_generator(addr))) tx.rehash() return hexlify(tx.serialize()).decode("utf-8") def create_tx_with_script( prevtx, n, script_sig=b"", amount=1, script_pub_key=CScript() ): """Return one-input, one-output transaction object spending the prevtx's n-th output with the given amount. Can optionally pass scriptPubKey and scriptSig, default is anyone-can-spend output. """ tx = CTransaction() assert n < len(prevtx.vout) tx.vin.append( CTxIn(prevtx.OutpointAt(n), prevtx.vout[n].nValue, script_sig, 0xFFFFFFFF) ) tx.vout.append(CTxOut(amount, script_pub_key)) tx.rehash() return tx