# Copyright (c) 2019-2023 The Bitcoin Unlimited developers import asyncio import time import logging import hashlib from test_framework.electrumconnection import ElectrumConnection from test_framework.util import rpc_hex_to_uint256, wait_for from test_framework.test_framework import BitcoinTestFramework from test_framework.connectrum.exc import ElectrumErrorResponse from test_framework.mininode import ( P2PDataStore, NodeConn, ) from test_framework.util import assert_equal, p2p_port from .environment import network, Network, node, Node, on_bch from .test_node import TestNode from . import bch from . import nex ERROR_CODE_INVALID_REQUEST = -32600 ERROR_CODE_METHOD_NOT_FOUND = -32601 ERROR_CODE_INVALID_PARAMS = -32602 ERROR_CODE_INTERNAL_ERROR = -32603 ERROR_CODE_NOT_FOUND = -32004 ERROR_CODE_TIMEOUT = -32005 class ElectrumTestFramework(BitcoinTestFramework): def __init__(self): super().__init__() logging.basicConfig(level=logging.INFO) self.setup_clean_chain = True self.num_nodes = 1 # Cached to speed up mining self.hash_at_height = {} self.p2p = None self.connection = None async def run_test(self): raise NotImplementedError("Test needs to override this method") def info(self, *args): logging.info(*args) async def bootstrap_p2p(self): """Add a P2P connection to the node.""" self.p2p = P2PDataStore() self.connection = await NodeConn.create( "127.0.0.1", await p2p_port(0), self.nodes[0], self.p2p ) self.p2p.add_connection(self.connection) await self.p2p.wait_for_verack() assert self.p2p.connection.state == "connected" # pylint: disable=too-many-locals def _mine_nexa_blocks(self, n, num_blocks, txns): genesis_block_hash = rpc_hex_to_uint256(n.getblockheader(0)["hash"]) prev = n.getblockheader(n.getbestblockhash()) prev_height = prev["height"] prev_hash = prev["hash"] prev_time = max(prev["time"] + 1, int(time.time())) prev_chainwork = rpc_hex_to_uint256(prev["chainwork"]) blocks = [] for _ in range(num_blocks): height = prev_height + 1 anc_height = nex.blocktools.ancestor_height(height) if anc_height == 0: ancestor_hash = genesis_block_hash else: ancestor_hash = self.hash_at_height.get(anc_height, None) if ancestor_hash is None: ancestor_hash = nex.blocktools.get_anc_hash(anc_height, n) coinbase = nex.blocktools.create_coinbase(height) b = nex.blocktools.create_block( hashprev=prev_hash, chainwork=prev_chainwork + 2, height=height, coinbase=coinbase, hash_ancestor=ancestor_hash, txns=txns, n_time=prev_time + 1, ) txns = None b.solve() blocks.append(b) prev_time = b.n_time prev_height += 1 prev_hash = b.gethash() prev_chainwork = b.chain_work self.hash_at_height[height] = b.gethash() return blocks def _mine_bch_blocks(self, n, num_blocks, txns): prev = n.getblockheader(n.getbestblockhash()) prev_height = prev["height"] prev_hash = prev["hash"] prev_time = max(prev["time"] + 1, int(time.time())) blocks = [] for _ in range(num_blocks): coinbase = bch.blocktools.create_coinbase(prev_height + 1) b = bch.blocktools.create_block( hashprev=prev_hash, coinbase=coinbase, txns=txns, n_time=prev_time + 1 ) txns = None b.solve() blocks.append(b) prev_time = b.n_time prev_height += 1 prev_hash = b.hash return blocks # pylint: disable=too-many-locals async def mine_blocks(self, cli, n, num_blocks, txns=None): """ Mine a block without using the node RPC """ if network() == Network.BCH: blocks = self._mine_bch_blocks(n, num_blocks, txns) elif network() == Network.NEX: blocks = self._mine_nexa_blocks(n, num_blocks, txns) else: raise NotImplementedError() await self.p2p.send_blocks_and_test(blocks, n) assert_equal(blocks[-1].hash, n.getbestblockhash()) await self.sync_all(cli, n) # Return coinbases for spending later return [b.vtx[0] for b in blocks] async def sync_height(self, cli: ElectrumConnection, n=None, timeout: int = 60): if n is None: n = self.nodes[0] async def match_height(): electrum_tip = (await cli.call("blockchain.headers.tip"))["height"] node_tip = n.getblockcount() return electrum_tip == node_tip try: await wait_for(timeout, match_height, sleep_amt=0.5) except: logging.error( "rostrum height: %s node: %s", (await cli.call("blockchain.headers.tip"))["height"], n.getblockcount(), ) raise async def sync_mempool_count( self, cli: ElectrumConnection, n: TestNode | None = None, timeout: int = 60 ): if n is None: n = self.nodes[0] async def match_mempool_count(): node_count = len(n.getrawtxpool()) electrum_count = (await cli.call("mempool.count"))["count"] return node_count == electrum_count try: await wait_for(timeout, match_mempool_count, sleep_amt=0.2) except: logging.error( "rostrum mempool count: %s != node mempool count %s", (await cli.call("mempool.count"))["count"], len(n.getrawtxpool()), ) raise async def wait_for_mempool_count( self, cli, n: TestNode | None = None, *, count: int, timeout: int = 60 ): if n is None: n = self.nodes[0] async def match_count(): electrum_count = (await cli.call("mempool.count"))["count"] return electrum_count == count try: await wait_for(timeout, match_count, sleep_amt=0.2) except: logging.error( "rostrum mempool count: %s != expected %s", (await cli.call("mempool.count"))["count"], count, ) raise # Sync both mempool count and block height async def sync_all(self, cli, n=None): await asyncio.gather(self.sync_mempool_count(cli, n), self.sync_height(cli, n)) # pylint: disable=too-many-arguments def create_token( self, *args, n=None, to_addr, mint_amount, bch_can_mint_nft=False, decimal_places=None, return_txid=False, ): if n is None: n = self.nodes[0] if decimal_places is not None and on_bch(): raise Exception("decimal places not supported on BCH") if node() == Node.NEXA: token = n.token("new", *args) group_id = token["groupIdentifier"] txidem = n.token("mint", group_id, to_addr, mint_amount) if return_txid: return group_id, txidem return group_id if node() == Node.BCHN or node() == Node.BCHUNLIMITED: nft = bytearray("minting", "utf8") if bch_can_mint_nft else None token_id, txid = bch.utiltoken.create_token_genesis_tx( n, to_addr, 1, mint_amount, nft=nft, return_txid=True ) # The NEXA way above does 1 creation tx + 1 mint, while the BCH way # does creation+mint. # # To make the tests more similar, produce a second transaction that just # moves the coins to the same address. if not bch_can_mint_nft: self.send_token(token_id, to_addr, mint_amount) if return_txid: return token_id, txid return token_id raise NotImplementedError(f"create_token for node {node()} NYI") def send_token(self, token_id, to_addr, amount): if node() == Node.NEXA: return self.nodes[0].token("send", token_id, to_addr, amount) if node() == Node.BCHN or node() == Node.BCHUNLIMITED: return bch.utiltoken.send_token(self.nodes[0], token_id, to_addr, amount) raise NotImplementedError(f"send_token for node {node()} NYI") def create_nft(self, parent_token_id, to_addr, blob): if node() == Node.NEXA: nft_id = self.nodes[0].token("subgroup", parent_token_id, blob) self.nodes[0].token("mint", nft_id, to_addr, 1) return nft_id if node() == Node.BCHN or node() == Node.BCHUNLIMITED: return bch.utiltoken.create_nft( self.nodes[0], parent_token_id, bytearray(str(blob), "utf8"), to_addr ) raise NotImplementedError() async def decode_groupid(self, electrum_cli, group_id): if on_bch(): # BCH tokens have no encoding. return group_id decoded = await electrum_cli.call("blockchain.address.decode", group_id) assert_equal("group", decoded["type"]) return decoded["payload"] # pylint: disable=too-many-arguments def create_transaction( self, prevtx, n, sig, value, out=nex.blocktools.PADDED_ANY_SPEND ): if network() == Network.BCH: return bch.blocktools.create_transaction(prevtx, n, sig, value, out) if network() == Network.NEX: return nex.blocktools.create_transaction(prevtx, n, sig, value, out) raise NotImplementedError() def some_amount() -> int: """ Some insignificant amount to send or whatever (in regtest context) """ if network() == Network.BCH: return 1 if network() == Network.NEX: return 1000 raise NotImplementedError() def script_to_scripthash(script): scripthash = hashlib.sha256(script).digest() # Electrum wants little endian scripthash = bytearray(scripthash) scripthash.reverse() return scripthash.hex() async def get_txid_from_idem(n, txidem_or_txid, attempt=1): if network() == Network.NEX: try: return n.getrawtransaction(txidem_or_txid, True)["txid"] except Exception as e: if "No such txpool transaction" in f"{e}": # workaround for tx not submitted to mempool yet if attempt >= 10: raise e await asyncio.sleep(1) return await get_txid_from_idem(n, txidem_or_txid, attempt + 1) return txidem_or_txid async def assert_response_error(call, error_code=None, error_string=None): """ Asserts that function call throw a electrum error, optionally testing for the contents of the error. """ try: await call() raise AssertionError("assert_electrum_error: Error was not thrown.") except ElectrumErrorResponse as exception: res = exception.response if error_code is not None: if "code" not in res: # pylint: disable=raise-missing-from raise AssertionError( "assert_response_error: Error code is missing in response" ) if res["code"] != error_code: # pylint: disable=raise-missing-from raise AssertionError( ( f"assert_response_error: Expected error code {error_code}, " "got {res['code']} (Full response: {exception})" ) ) if error_string is not None: if "message" not in res: # pylint: disable=raise-missing-from raise AssertionError( "assert_response_error: Error message is missing in response" ) if error_string not in res["message"]: # pylint: disable=raise-missing-from raise AssertionError( f"assert_response_error: Expected error string '{error_string}' not found in '{res['message']}' (Full response: {exception})" ) async def yield_to_eventloop(): await asyncio.sleep(0.001)