#!/usr/bin/env python3 # Copyright (c) 2014-2015 The Bitcoin Core developers # Copyright (c) 2015-2017 The Bitcoin Unlimited developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. # # Helpful routines for regression testing # # Add python-bitcoinrpc to module search path: import asyncio from dataclasses import dataclass import os from binascii import hexlify, unhexlify from base64 import b64encode from decimal import Decimal, ROUND_DOWN import hashlib import json import http.client import subprocess import time import errno import logging from typing import List, Optional from .serialize import deser_uint256, ser_uint256 from . import test_node from .portseed import ( electrum_monitoring_port, electrum_rpc_port, electrum_ws_port, p2p_port, rpc_port, ) from .authproxy import AuthServiceProxy, JSONRPCException from .environment import ( NodeFeature, node as node_software, node_supports, rostrum_path, full_node_path, network, Network, process_wrapper, Node, testing_websocket, ) DEFAULT_TX_FEE_PER_BYTE = 50 BTC = 100 # mBTC = 100000 # uBTC = 100 # Serialization/deserialization tools def sha256(s): """Return the sha256 hash of the passed binary data >>> hexlify(sha256("e hat eye pie plus one is O".encode())) b'c5b94099f454a3807377724eb99a33fbe9cb5813006cadc03e862a89d410eaf0' """ return hashlib.new("sha256", s).digest() def hash256(s): """Return the double SHA256 hash (what bitcoin typically uses) of the passed binary data >>> hexlify(hash256("There was a terrible ghastly silence".encode())) b'730ac30b1e7f4061346277ab639d7a68c6686aeba4cc63280968b903024a0a40' """ return sha256(sha256(s)) def hash160(msg): """RIPEME160(SHA256(msg)) -> bytes""" h = hashlib.new("ripemd160") h.update(hashlib.sha256(msg).digest()) return h.digest() def uint256_to_rpc_hex(b): """RPC (nexad) hex is reversed""" if isinstance(b, int): b = ser_uint256(b) return b[::-1].hex() def rpc_hex_to_uint256(h): """RPC (nexad) hex is reversed""" b = bytes.fromhex(h) return deser_uint256(b[::-1]) class TimeoutException(Exception): pass @dataclass class UtilOptions: # this module-wide var is set from test_framework.py no_ipv6_rpc_listen: bool = False NEXAD_PROC_WAIT_TIMEOUT = 60 async def wait_for(timeout, fn, on_error="timeout in wait_for", sleep_amt=1.0): """Repeatedly calls fn while it returns None, raising an assert after timeout. If fn returns non None, return that result""" timeout = float(timeout) while True: if asyncio.iscoroutinefunction(fn): result = await fn() else: result = fn() if not (result is None or result is False): return result if timeout <= 0: if callable(on_error): on_error = on_error() raise TimeoutException(on_error) await asyncio.sleep(sleep_amt) timeout -= sleep_amt def get_rpc_proxy(url: str, *, timeout: int | None = None): """ Args: url (str): URL of the RPC server to call node_number (int): the node number (or id) that this calls to Kwargs: timeout (int): HTTP timeout in seconds miningCapable (bool): mining Capability (all client are True, except Floweethehub or "hub") Returns: AuthServiceProxy. convenience object for making RPC calls. """ proxy_kwargs = {} if timeout is not None: proxy_kwargs["timeout"] = timeout proxy = AuthServiceProxy(url, **proxy_kwargs) proxy.url = url # store URL on proxy for info return proxy def do_and_ignore_failure(fn): try: fn() except BaseException: pass def check_json_precision(): """Make sure json library being used does not lose precision converting BTC values""" n = Decimal("20000000.03") satoshis = int(json.loads(json.dumps(float(n))) * 1.0e2) if satoshis != 2000000003: raise RuntimeError("JSON encode/decode loses precision") def count_bytes(hex_string): return len(bytearray.fromhex(hex_string)) def bytes_to_hex_str(byte_str): return hexlify(byte_str).decode("ascii") def hex_str_to_bytes(hex_str): return unhexlify(hex_str.encode("ascii")) def str_to_b64str(string): return b64encode(string.encode("utf-8")).decode("ascii") # credit: https://www.python-course.eu/graphs_python.php def is_connected(gdict, vertices_encountered=None, start_vertex=None): """determines if the graph is connected""" if vertices_encountered is None: vertices_encountered = set() vertices = list(gdict.keys()) # "list" necessary in Python 3 if not start_vertex: # chosse a vertex from graph as a starting point start_vertex = vertices[0] vertices_encountered.add(start_vertex) if len(vertices_encountered) != len(vertices): for vertex in gdict[start_vertex]: if vertex not in vertices_encountered: if is_connected(gdict, vertices_encountered, vertex): return True else: return True return False async def sync_blocks(rpc_connections, *, wait=1, verbose=1, timeout=60): """ Wait until everybody has the same block count """ i = -1 stop_time = time.time() + timeout while time.time() <= stop_time: counts = [x.getblockcount() for x in rpc_connections] if counts == [counts[0]] * len(counts): return if verbose and i > 2: logging.info("sync blocks %s: %s", i, counts) asyncio.sleep(wait) i += 1 logging.info("sync_blocks timeout, printing debug info: ") raise Exception( "sync_blocks: blocks did not sync through various nodes before the timeout of {timeout} seconds kicked in", ) async def sync_mempools(rpc_connections, wait=1, verbose=1): """ Wait until everybody has the same transactions in their memory pools """ count = 0 while True: count += 1 pool = set(rpc_connections[0].getrawtxpool()) num_match = 1 pool_len = [len(pool)] for i in range(1, len(rpc_connections)): tmp = set(rpc_connections[i].getrawtxpool()) if tmp == pool: num_match = num_match + 1 pool_len.append(len(tmp)) if verbose and count % 30 == 0: logging.info("sync txpool: %s", pool_len) if num_match == len(rpc_connections): break asyncio.sleep(wait) def datadir_path(tmpdir, n) -> str: datadir = os.path.join(tmpdir, f"node{n}") if not os.path.isdir(datadir): os.makedirs(datadir) return datadir async def rostrum_args(tmpdir: str, n: int) -> list: node_datadir = datadir_path(tmpdir, n) rpc_u, rpc_p = rpc_auth_pair(n) args = { "auth": f"{rpc_u}:{rpc_p}", "daemon-dir": node_datadir, "daemon-rpc-addr": f"127.0.0.1:{await rpc_port(n)}", "daemon-p2p-addr": f"127.0.0.1:{await p2p_port(n)}", "electrum-rpc-addr": f"0.0.0.0:{await electrum_rpc_port(n)}", "electrum-ws-addr": f"0.0.0.0:{await electrum_ws_port(n)}", "network": "regtest", "monitoring-addr": f"0.0.0.0:{await electrum_monitoring_port(n)}", "wait-duration-secs": 1, "rpc-idle-timeout": 600, "db-dir": os.path.join(node_datadir, "rostrum"), } as_cmd_args = [f"--{k}={v}" for k, v in args.items()] as_cmd_args.append("-vvvv") # Disable server peer discovery, so we're not hitting remote servers # requesting our public IP. as_cmd_args.append("--no-announce") as_cmd_args.append("--no-metrics") if not testing_websocket(): as_cmd_args.append("--no-websocket") return as_cmd_args # pylint: disable=too-many-branches, too-many-locals async def initialize_datadir( tmpdir: str, n: int, extra_args: Optional[List[List[str]]] ): datadir = datadir_path(tmpdir, n) rpc_u, rpc_p = rpc_auth_pair(n) defaults = { "discover": 0, "regtest": 1, "rpcuser": rpc_u, "rpcpassword": rpc_p, "printtoconsole": 1, "listenonion": 0, } network_section = { "port": await p2p_port(n), "rpcport": str(await rpc_port(n)), } if node_software() in (Node.BCHUNLIMITED, Node.NEXA): defaults.update( { "bindallorfail": 1, "debug": ["electrum", "rpc", "net"], "keypool": 1, } ) if node_software() == Node.BCHN: defaults.update( { "acceptnonstdtxn": 1, } ) # switch off default IPv6 listening port (for travis) if UtilOptions.no_ipv6_rpc_listen: defaults.update({"rpcbind": "127.0.0.1", "rpcallowip": "127.0.0.1"}) if network() == Network.NEX: file = "nexa.conf" defaults.update( { "relay.minRelayTxFee": 0, "relay.limitFreeRelay": 15, "wallet.payTxFee": 1000, } ) elif network() == Network.BCH: file = "bitcoin.conf" if node_software() == Node.BCHUNLIMITED: defaults.update( { "maxlimitertxfee": 0, "minlimitertxfee": 0, "limitfreerelay": 15, } ) else: raise NotImplementedError() if node_supports(node_software(), NodeFeature.SPAWN_ROSTRUM): # dont override for feature_idle_timeout test idle_timeout_set = False if extra_args: for arg in extra_args[0]: if "-electrum.rawarg=--rpc-idle-timeout=" in arg: idle_timeout_set = True defaults.update( { "electrum": 1, "electrum.port": await electrum_rpc_port(n), "electrum.ws.port": await electrum_ws_port(n), "electrum.monitoring.port": await electrum_monitoring_port(n), "electrum.exec": rostrum_path(), "electrum.rawarg": [ "--no-announce", f"--daemon-p2p-addr=127.0.0.1:{await p2p_port(n)}", "" if testing_websocket() else "--no-websocket", "--no-metrics", "" if idle_timeout_set else "--rpc-idle-timeout=600", ], } ) config_file_path = os.path.join(datadir, file) with open(config_file_path, "w", encoding="utf-8") as f: for key, val in defaults.items(): if isinstance(val, type([])): for v in val: f.write(f"{str(key)}={str(v)}\n") else: f.write(f"{str(key)}={str(val)}\n") f.write("debug=rpc\n") if node_supports(node_software(), NodeFeature.SPAWN_ROSTRUM): f.write("debug=electrum\n") f.write("") if node_software() == Node.BCHN: f.write("[regtest]\n") for key, val in network_section.items(): f.write(f"{str(key)}={str(val)}\n") return datadir def rpc_auth_pair(n): return "rpcuser💻" + str(n), "rpcpass🔑" + str(n) async def rpc_url(i, rpchost=None): rpc_u, rpc_p = rpc_auth_pair(i) return f"http://{rpc_u}:{rpc_p}@{rpchost or '127.0.0.1'}:{await rpc_port(i)}" async def wait_for_bitcoind_start(process, url: str, *, timeout: int = 120): """ Wait for daemon to start. This means that RPC is accessible and fully initialized. Raise an exception if daemon exits during initialization. """ rpc = None end_time = time.time() + timeout while time.time() < end_time: if process.poll() is not None: raise Exception( f"daemon exited with status {process.returncode} during initialization" ) try: rpc = get_rpc_proxy(url) rpc.getblockcount() return rpc except IOError as e: if e.errno != errno.ECONNREFUSED: # Port not yet open? raise # unknown IO error except JSONRPCException as e: # Initialization phase if e.error["code"] != -28: # RPC in warmup? raise # unkown JSON RPC exception await asyncio.sleep(0.25) raise TimeoutException(f"daemon (pid {process.pid}) did not start (url {url})") async def initialize_chain_clean( test_dir: str, num_nodes: int, extra_args: Optional[List[List[str]]] ): """ Create an empty blockchain and num_nodes wallets. Useful if a test case wants complete control over initialization. """ await asyncio.gather( *(initialize_datadir(test_dir, i, extra_args) for i in range(num_nodes)) ) # pylint: disable=consider-using-with async def start_node(i, tmpdir, extra_args=None, rpchost=None, timewait=None): """ Start a nexad and return RPC connection to it """ datadir = datadir_path(tmpdir, i) wrapper = process_wrapper() if wrapper is not None: args = [wrapper, full_node_path(), f"-datadir={datadir}"] else: args = [full_node_path(), f"-datadir={datadir}"] if extra_args is not None: args.extend(extra_args[i]) process = None rostrum_process = None r_args = [] spawn_rostrum = not node_supports(node_software(), NodeFeature.SPAWN_ROSTRUM) try: logging.debug("Running: %s", args) process = subprocess.Popen(args) url = await rpc_url(i, rpchost) await wait_for_bitcoind_start(process, url) logging.info("Running as pid %s url: %s datadir: %s", process.pid, url, datadir) proxy = get_rpc_proxy(url, timeout=timewait) if spawn_rostrum: # We need to spawn rostrum process for the node. if wrapper is not None: r_args = [wrapper, rostrum_path()] else: r_args = [rostrum_path()] r_args = r_args + await rostrum_args(tmpdir, i) logging.info("Starting rostrum with args: %s", r_args) rostrum_process = subprocess.Popen(r_args) except Exception as e: logging.error( "Error starting node %s (%s) with args %s. %s rostrum with args %s: %s", i, node_software(), args, "Spawning" if spawn_rostrum else "Not spawning", r_args, e, ) do_and_ignore_failure(process.kill) raise return test_node.TestNode(proxy, datadir, process, i, rostrum_process) async def start_nodes(num_nodes, dirname, extra_args=None, rpchost=None, timewait=None): """ Start multiple nexads, return RPC connections to them """ logging.info("Starting %d nodes.", num_nodes) started = [] try: for i in range(num_nodes): started.append(await start_node(i, dirname, extra_args, rpchost, timewait)) except Exception as e: logging.error("Not all nodes managed to start: %s", e) if started: logging.info("Stopping %d previously started nodes.", len(started)) for i, node in enumerate(started): try: stop_node(node) except Exception as error_2: logging.error("Failed to stop node %d: %s", i, error_2) raise e return started def node_regtest_dir(dirname, n_node): return os.path.join(dirname, "node" + str(n_node), "regtest") def log_filename(dirname, n_node, logname): return os.path.join(node_regtest_dir(dirname, n_node), logname) def stop_node(node): if node.rostrum_process is not None: do_and_ignore_failure(node.rostrum_process.kill) try: node.stop() except http.client.CannotSendRequest as e: logging.info("Unable to stop node via RPC, sending kill signal. Error: %s", e) do_and_ignore_failure(node.process.kill) except Exception as e: logging.info("Error sending stop signal to node: %s", e) try: node.process.wait(timeout=NEXAD_PROC_WAIT_TIMEOUT) except Exception as e: logging.warning( "Error while waiting for node to exit, sending kill signal. Error: %s", e ) do_and_ignore_failure(node.process.kill) def stop_nodes(nodes): for n in nodes: stop_node(n) def set_node_times(nodes, t): for node in nodes: node.setmocktime(t) def wait_bitcoinds(nodes): # Wait for all bitcoinds to cleanly exit for n in nodes: n.process.wait(timeout=NEXAD_PROC_WAIT_TIMEOUT) def wait_bitcoind_exit(node, timeout=NEXAD_PROC_WAIT_TIMEOUT): # Wait for all bitcoinds to cleanly exit node.process.wait(timeout=timeout) def is_bitcoind_running(node): return node.process.poll() is None async def connect_nodes(from_connection, node_num_or_str): """Connect the passed node to another node specified either by node index or by ip address:port string""" if isinstance(node_num_or_str, int): ip_port = "127.0.0.1:" + str(await p2p_port(node_num_or_str)) else: ip_port = node_num_or_str from_connection.addnode(ip_port, "onetry") # poll until version handshake complete to avoid race conditions # with transaction relaying while any(peer["version"] == 0 for peer in from_connection.getpeerinfo()): await asyncio.sleep(0.1) async def connect_nodes_bi(nodes, a, b): """Connect nodes a and b bidirectionally.""" await connect_nodes(nodes[a], b) await connect_nodes(nodes[b], a) def assert_not_equal(thing1, thing2): if thing1 == thing2: raise AssertionError(f"{str(thing1)} != {str(thing2)}") def assert_equal(thing1, thing2): if thing1 != thing2: raise AssertionError(f"{str(thing1)} != {str(thing2)}") def assert_greater_than(thing1, thing2): if thing1 <= thing2: raise AssertionError(f"{str(thing1)} <= {str(thing2)}") def assert_raises(exc, fun, *args, **kwds): try: fun(*args, **kwds) except exc: pass except Exception as e: # pylint: disable=raise-missing-from raise AssertionError("Unexpected exception raised: " + type(e).__name__) else: raise AssertionError("No exception raised") async def assert_raises_async(exc, fun, *args, **kwds): try: await fun(*args, **kwds) except exc: pass except Exception as e: # pylint: disable=raise-missing-from raise AssertionError("Unexpected exception raised: " + type(e).__name__) else: raise AssertionError("No exception raised") def assert_raises_rpc_error(code, message, fun, *args, **kwds): """Run an RPC and verify that a specific JSONRPC exception code and message is raised. Calls function `fun` with arguments `args` and `kwds`. Catches a JSONRPCException and verifies that the error code and message are as expected. Throws AssertionError if no JSONRPCException was raised or if the error code/message are not as expected. Args: code (int), optional: the error code returned by the RPC call (defined in src/rpc/protocol.h). Set to None if checking the error code is not required. message (string), optional: [a substring of] the error string returned by the RPC call. Set to None if checking the error string is not required. fun (function): the function to call. This should be the name of an RPC. args*: positional arguments for the function. kwds**: named arguments for the function. """ assert try_rpc(code, message, fun, *args, **kwds), "No exception raised" def try_rpc(code, message, fun, *args, **kwds): """Tries to run an rpc command. Test against error code and message if the rpc fails. Returns whether a JSONRPCException was raised.""" try: fun(*args, **kwds) except JSONRPCException as e: # JSONRPCException was thrown as expected. Check the code and message # values are correct. if (code is not None) and (code != e.error["code"]): # pylint: disable=raise-missing-from raise AssertionError( f"Unexpected JSONRPC error code {e.error['code']} ({e})" ) if (message is not None) and (message not in e.error["message"]): # pylint: disable=raise-missing-from raise AssertionError( f"Expected substring '{message}' not found in '{e.error['message']}'" ) return True except Exception as e: # pylint: disable=raise-missing-from raise AssertionError("Unexpected exception raised: " + type(e).__name__) else: return False def satoshi_round(amount): return Decimal(amount).quantize(Decimal("0.01"), rounding=ROUND_DOWN)