#!/usr/bin/env python3 # Copyright (c) 2020 The Bitcoin Unlimited developers """ Tests various aspects of chained transactions, primarily `get_history` and `get_mempool`. `get_history` and `get_mempool` are basically the same RPC call, except that get_mempool ignores confirmed transactions. """ import asyncio from test_framework.electrumutil import ( ElectrumTestFramework, script_to_scripthash, ) from test_framework.script import CScript, OP_TRUE, OP_DROP, OP_NOP, OP_FALSE from test_framework.util import assert_equal from test_framework.utiltx import pad_tx from test_framework.serialize import to_hex from test_framework.electrumconnection import ElectrumConnection GET_HISTORY = "blockchain.scripthash.get_history" GET_MEMPOOL = "blockchain.scripthash.get_mempool" class ElectrumMempoolChain(ElectrumTestFramework): async def run_test(self): n = self.nodes[0] await self.bootstrap_p2p() cli = ElectrumConnection() try: await cli.connect() coinbases = await self.mine_blocks(cli, n, 100) await self.test_blockheight_unconfirmed(n, cli, coinbases.pop(0)) await self.test_chain_to_from_one_scripthash(n, cli, coinbases.pop(0)) finally: cli.disconnect() async def test_blockheight_unconfirmed(self, n, cli, unspent): """ Check that unconfirmed transactions are correctly provided. Note that height is different for unconfirmed with confirmed parents and unconfirmed with unconfirmed parents. """ # Another unique anyone-can-spend scriptpubkey scriptpubkey = CScript([OP_FALSE, OP_DROP, OP_NOP]) scripthash = script_to_scripthash(scriptpubkey) # There should exist any history for scripthash assert_equal(0, len(await cli.call(GET_HISTORY, scripthash))) assert_equal(0, len(await cli.call(GET_MEMPOOL, scripthash))) # Send a chain of three txs. We expect that: # tx1: height 0, signaling unconfirmed with confirmed parents # tx2: height -1, signaling unconfirmed with unconfirmed parents # tx3: height -1, signaling unconfirmed with unconfirmed parents fee = 200 tx1 = self.create_transaction( unspent, n=0, value=unspent.vout[0].n_value - fee, sig=CScript([OP_TRUE]), out=scriptpubkey, ) pad_tx(tx1) tx2 = self.create_transaction( tx1, n=0, value=tx1.vout[0].n_value - fee, sig=CScript([OP_TRUE]), out=scriptpubkey, ) pad_tx(tx2) tx3 = self.create_transaction( tx2, n=0, value=tx2.vout[0].n_value - fee, sig=CScript([OP_TRUE]), out=scriptpubkey, ) pad_tx(tx3) for tx in [tx1, tx2, tx3]: await cli.call("blockchain.transaction.broadcast", to_hex(tx)) await self.wait_for_mempool_count(cli, n, count=3) res = await cli.call(GET_HISTORY, scripthash) assert_equal(3, len(res)) assert_equal(res, await cli.call(GET_MEMPOOL, scripthash)) def get_tx(txhash): for tx in res: if tx["tx_hash"] == txhash: return tx raise AssertionError("tx not in result") assert_equal(0, get_tx(tx1.get_rpc_hex_id())["height"]) assert_equal(-1, get_tx(tx2.get_rpc_hex_id())["height"]) assert_equal(-1, get_tx(tx3.get_rpc_hex_id())["height"]) # Confirm tx1, see that # tx1: gets tipheight # tx2: gets height 0, tx3 keeps height -1 await self.mine_blocks(cli, n, 1, [tx1]) for call in [GET_HISTORY, GET_MEMPOOL]: res = await cli.call(call, scripthash) if call == GET_HISTORY: assert_equal(n.getblockcount(), get_tx(tx1.get_rpc_hex_id())["height"]) else: assert (tx["tx_hash"] != tx1.get_rpc_hex_id() for tx in res) assert_equal(0, get_tx(tx2.get_rpc_hex_id())["height"]) assert_equal(-1, get_tx(tx3.get_rpc_hex_id())["height"]) # cleanup mempool for next test await self.mine_blocks(cli, n, 1, [tx2, tx3]) assert len(n.getrawtxpool()) == 0 async def test_chain_to_from_one_scripthash(self, n, cli, unspent): """ Creates a tx chain where the same scripthash is both funder and spender """ scriptpubkey = CScript([OP_TRUE, OP_TRUE, OP_DROP, OP_DROP]) scripthash = script_to_scripthash(scriptpubkey) assert_equal(0, len(await cli.call(GET_HISTORY, scripthash))) assert_equal(0, len(await cli.call(GET_MEMPOOL, scripthash))) def has_tx(res, txhash): for tx in res: if tx["tx_hash"] == txhash: return True return False chain_length = 25 tx_chain = self._create_tx_chain(unspent, scriptpubkey, chain_length) # Check mempool assert len(n.getrawtxpool()) == 0 await self.p2p.send_txs_and_test(tx_chain, n) await self.wait_for_mempool_count(cli, n, count=len(tx_chain)) res = await cli.call(GET_HISTORY, scripthash) assert_equal(len(tx_chain), len(res)) assert all(has_tx(res, tx.get_rpc_hex_id()) for tx in tx_chain) res_mempool = await cli.call(GET_MEMPOOL, scripthash) assert_equal(res, res_mempool) # Check when confirmed in a block await self.mine_blocks(cli, n, 1, tx_chain) res = await cli.call(GET_HISTORY, scripthash) assert_equal(len(tx_chain), len(res)) assert all(has_tx(res, tx.get_rpc_hex_id()) for tx in tx_chain) assert_equal(0, len(await cli.call(GET_MEMPOOL, scripthash))) def _create_tx_chain(self, unspent, scriptpubkey, chain_len): tx_chain = [unspent] fee = 300 for _ in range(chain_len): prev_tx = tx_chain[-1] amount = prev_tx.vout[0].n_value - fee tx = self.create_transaction( prev_tx, n=0, value=amount, sig=CScript([OP_TRUE]), out=scriptpubkey, ) pad_tx(tx) tx_chain.append(tx) tx_chain.pop(0) # the initial unspent is not part of the chain return tx_chain if __name__ == "__main__": asyncio.run(ElectrumMempoolChain().main())