#!/usr/bin/env python3 # Copyright (c) 2024 The Bitcoin Unlimited developers """ Test the `token.transaction.get_history` RPC call """ import asyncio from test_framework.bch.utiltoken import ( createrawtransaction, signrawtransaction, token_utxos, ) from test_framework.util import assert_equal from test_framework.electrumutil import ( ElectrumTestFramework, get_txid_from_idem, ) from test_framework.electrumconnection import ElectrumConnection from test_framework.environment import on_nex, on_bch def has_tx(res, txhash): for tx in res: if tx["tx_hash"] == txhash: return True return False class ElectrumTokenTransactionGetHistory(ElectrumTestFramework): async def run_test(self): # This test users nexad wallet to create and send tokens. # Mine and mature some coins. n = self.nodes[0] n.generate(120) cli = ElectrumConnection() await cli.connect() try: await self.sync_height(cli) await self.test_history(n, cli) await self.test_nft_history(n, cli) await self.test_melt_in_history(n, cli) await self.test_opreturn_burn(n, cli) finally: cli.disconnect() async def test_history(self, n, cli): # This creates two transaction, creating tx + mint tx token_id = self.create_token(to_addr=n.getnewaddress(), mint_amount=42) # 8 more transactions for sending send_txids = [] for _ in range(0, 8): txidem = self.send_token(token_id, n.getnewaddress(), 1) send_txids.append(await get_txid_from_idem(n, txidem)) await self.sync_mempool_count(cli) res = await cli.call("token.transaction.get_history", token_id) assert_equal(None, res["cursor"]) assert_equal(10, len(res["history"])) for txid in send_txids: assert txid in map(lambda x: x["tx_hash"], res["history"]) for item in res["history"]: assert_equal(0, item["height"]) n.generate(1) await self.sync_all(cli) res = await cli.call("token.transaction.get_history", token_id) assert_equal(10, len(res["history"])) for item in res["history"]: assert_equal(n.getblockcount(), item["height"]) # Create a tx confirmed in a later block, + 1 in mempool self.send_token(token_id, n.getnewaddress(), 1) n.generate(1) self.send_token(token_id, n.getnewaddress(), 1) await self.sync_all(cli) # The order returned should be: # [mempool, blockheight, blockheight - 1] h = (await cli.call("token.transaction.get_history", token_id))["history"] assert_equal(12, len(h)) assert_equal(0, h[0]["height"]) assert_equal(n.getblockcount(), h[1]["height"]) assert_equal(n.getblockcount() - 1, h[2]["height"]) # clean mempool n.generate(1) await self.wait_for_mempool_count(cli, count=0) async def test_nft_history(self, n, cli): """ Check that nft transaction history not returns parent history """ # On nexa, this creates two transaction, creating tx + mint tx # On bch, it creates and mints in same transaction assert_equal(0, (await cli.call("mempool.count"))["count"]) token_id = self.create_token( to_addr=n.getnewaddress(), mint_amount=42, bch_can_mint_nft=True ) self.create_nft(token_id, n.getnewaddress(), 255) self.create_nft(token_id, n.getnewaddress(), 24) async def check_parent_history(at_height: int): if on_nex(): # Creation, mint, create nft, create nft if at_height == 0: await self.wait_for_mempool_count(cli, count=4) res = await cli.call("token.transaction.get_history", token_id) assert_equal(4, len(res["history"])) if on_bch(): # Creation+mint, create nft, create nft if at_height == 0: await self.wait_for_mempool_count(cli, count=3) res = await cli.call("token.transaction.get_history", token_id) assert_equal(3, len(res["history"])) assert_equal(None, res["cursor"]) for item in res["history"]: assert_equal(at_height, item["height"]) await check_parent_history(0) if on_bch(): # Edge case: This framework creates an NFT in the genesis # If commitment is filtered, then it won't appear in the result. commitment = "" res = await cli.call( "token.transaction.get_history", token_id, None, commitment ) assert_equal(0, len(res["history"])) # Confirm in a block and check if confirmed yields the same. n.generate(1) await self.sync_all(cli) await check_parent_history(n.getblockcount()) # Check history for a specific NFT nft = self.create_nft(token_id, n.getnewaddress(), 3) await self.sync_mempool_count(cli) async def check_nft_history(at_height: int): if on_nex(): # NFT has unique token ID res = await cli.call("token.transaction.get_history", nft) elif on_bch(): # NFT has it's own "commitment", need to pass both res = await cli.call( "token.transaction.get_history", token_id, None, nft ) else: raise NotImplementedError() assert_equal(1, len(res["history"])) assert_equal(at_height, res["history"][0]["height"]) await check_nft_history(0) n.generate(1) await self.sync_all(cli) await check_nft_history(n.getblockcount()) async def test_melt_in_history(self, n, cli): """ Test that a melt transaction is part of token history. (issue #217) """ if on_bch(): self.info("skipping test_meld_in_history on BCH") return addr = n.getnewaddress() token = n.token("new") group_id = token["groupIdentifier"] mint_txid = await get_txid_from_idem(n, n.token("mint", group_id, addr, 42)) melt_txid = await get_txid_from_idem(n, n.token("melt", group_id, 42)) await self.sync_mempool_count(cli, n) token_history = await cli.call("token.transaction.get_history", group_id) assert_equal(3, len(token_history["history"])) assert has_tx(token_history["history"], mint_txid) assert has_tx(token_history["history"], melt_txid) # Check that melt is also part of confirmed history n.generate(1) await self.sync_all(cli, n) token_history = await cli.call("token.transaction.get_history", group_id) assert token_history["history"][0]["height"] > 0 assert_equal(3, len(token_history["history"])) assert has_tx(token_history["history"], melt_txid) async def test_opreturn_burn(self, n, cli): """ Test for issue #217; where transactions with tokens as inputs, but not outputs were not being listed in history. """ if on_nex(): # NYI return mint_to_addr = n.getnewaddress() token_id = self.create_token(to_addr=mint_to_addr, mint_amount=100) addr = n.getnewaddress() await cli.call("blockchain.address.get_scripthash", addr) utxos = token_utxos(n, token_id) outputs = [{"data": "badf000d"}] txhex = createrawtransaction( n, [{"txid": u["txid"], "vout": u["vout"]} for u in utxos], outputs, [] ) res = signrawtransaction(n, txhex, "ALL|FORKID|UTXOS") txhex = res["hex"] burn_txid = n.sendrawtransaction(txhex) await self.sync_mempool_count(cli, n) token_history = await cli.call("token.transaction.get_history", token_id) assert_equal(3, len(token_history["history"])) assert has_tx(token_history["history"], burn_txid) # also in confirmed history n.generate(1) await self.sync_all(cli, n) token_history = await cli.call("token.transaction.get_history", token_id) assert_equal(3, len(token_history["history"])) assert has_tx(token_history["history"], burn_txid) if __name__ == "__main__": asyncio.run(ElectrumTokenTransactionGetHistory().main())