#!/usr/bin/env python3 # Copyright (c) 2022 The Bitcoin Unlimited developers import asyncio from test_framework.electrumutil import script_to_scripthash from test_framework.util import assert_equal from test_framework.electrumutil import ( ElectrumTestFramework, ) from test_framework.environment import on_bch, on_nex from test_framework.script import ( CScript, OP_DROP, OP_FALSE, OP_TRUE, OP_NOP, ) from test_framework.serialize import to_hex from test_framework.utiltx import pad_tx from test_framework.electrumconnection import ElectrumConnection if on_bch(): from test_framework.bch.blocktools import create_transaction elif on_nex(): from test_framework.nex.blocktools import create_transaction else: raise NotImplementedError() SCRIPT1 = CScript([OP_FALSE, OP_DROP]) SCRIPT1_HASH = script_to_scripthash(SCRIPT1) SCRIPT2 = CScript([OP_FALSE, OP_FALSE, OP_DROP, OP_DROP]) SCRIPT2_HASH = script_to_scripthash(SCRIPT2) SCRIPT3 = CScript([OP_NOP, OP_NOP]) SCRIPT3_HASH = script_to_scripthash(SCRIPT3) SCRIPT4 = CScript([OP_NOP, OP_FALSE, OP_DROP]) SCRIPT4_HASH = script_to_scripthash(SCRIPT4) def new_tx(prevtx, out_script, amount): tx = create_transaction( prevtx=prevtx, value=amount, n=0, sig=CScript([OP_TRUE]), out=out_script ) pad_tx(tx) return tx class ScripthashListunspentTests(ElectrumTestFramework): async def run_test(self): n = self.nodes[0] await self.bootstrap_p2p() cli = ElectrumConnection() await cli.connect() coinbases = await self.mine_blocks(cli, n, 104) try: await self.test_basic(n, cli, coinbases) await self.test_filters_spent_in_mempool(n, cli, coinbases) finally: cli.disconnect() async def test_basic(self, n, cli, coinbases): """ Test that we see a unspent balance and that the outpoint hash exists. """ tx1 = new_tx(coinbases.pop(0), SCRIPT1, 10000) n.sendrawtransaction(to_hex(tx1), True) await self.sync_mempool_count(cli) r = await cli.call("blockchain.scripthash.listunspent", SCRIPT1_HASH) assert_equal(1, len(r)) u = r[0] assert_equal(0, u["height"]) assert_equal(10000, u["value"]) assert_equal(0, u["tx_pos"]) assert_equal(u["tx_hash"], tx1.get_rpc_hex_id()) assert_equal(u["has_token"], False) if on_nex(): # Check that we can query the outpointhash utxo = await cli.call("blockchain.utxo.get", u["outpoint_hash"]) assert_equal(u["tx_hash"], utxo["tx_hash"]) # Check that height is updated await self.mine_blocks(cli, n, 1, [tx1]) r = await cli.call("blockchain.scripthash.listunspent", SCRIPT1_HASH) assert_equal(1, len(r)) u = r[0] assert_equal(n.getblockcount(), u["height"]) assert_equal(10000, u["value"]) assert_equal(0, u["tx_pos"]) assert_equal(u["tx_hash"], tx1.get_rpc_hex_id()) assert_equal(u["has_token"], False) async def test_filters_spent_in_mempool(self, n, cli, coinbases): # Test that utxos created in mempool are filtered out if also spent in mempool tx1 = new_tx(coinbases.pop(0), SCRIPT2, 30000) n.sendrawtransaction(to_hex(tx1), True) await self.sync_mempool_count(cli) r = await cli.call("blockchain.scripthash.listunspent", SCRIPT2_HASH) assert_equal(1, len(r)) tx2 = new_tx(tx1, SCRIPT3, 20000) n.sendrawtransaction(to_hex(tx2), True) await self.sync_mempool_count(cli) r = await cli.call("blockchain.scripthash.listunspent", SCRIPT2_HASH) assert_equal(0, len(r)) r = await cli.call("blockchain.scripthash.listunspent", SCRIPT3_HASH) assert_equal(1, len(r)) # Test that confirmed utxos that are spent in mempool are filtered out await self.mine_blocks(cli, n, 1, [tx1, tx2]) tx3 = new_tx(tx2, SCRIPT4, 10000) n.sendrawtransaction(to_hex(tx3), True) await self.sync_mempool_count(cli) r = await cli.call("blockchain.scripthash.listunspent", SCRIPT3_HASH) assert_equal(0, len(r)) r = await cli.call("blockchain.scripthash.listunspent", SCRIPT4_HASH) assert_equal(1, len(r)) if __name__ == "__main__": asyncio.run(ScripthashListunspentTests().main())