#!/usr/bin/env python3 # Copyright (c) 2014-2015 The Bitcoin Core developers # Copyright (c) 2015-2024 The Bitcoin Unlimited developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. # Base class for RPC testing import logging # pylint: disable=deprecated-module import optparse import os import sys import pdb import shutil import tempfile import traceback from os.path import basename import string from sys import argv from typing import List, Optional from .util import ( initialize_chain_clean, start_node, start_nodes, connect_nodes_bi, sync_blocks, sync_mempools, stop_nodes, wait_bitcoind_exit, check_json_precision, UtilOptions, ) from .authproxy import JSONRPCException class BitcoinTestFramework: drop_to_pdb = os.getenv("DROP_TO_PDB", "") setup_clean_chain = True num_nodes = 1 extra_args: Optional[List[List[str]]] = None nodes = [] options = None args = None def __init__(self): super().__init__() self.set_test_params() # These may be over-ridden by subclasses: def set_test_params(self): pass async def run_test(self): raise NotImplementedError("Test needs to override this method") def add_options(self, parser): pass async def setup_chain(self): """ Sets up the blockchain for the bitcoin nodes. It also sets up the daemon configuration. bitcoin_conf_dict: Pass a dictionary of values you want written to bitcoin.conf. If you have a key with multiple values, pass a list of the values as the value, for example: { "debug":["net","blk","thin","lck","mempool","req","bench","evict"] } This framework provides values for the necessary fields (like regtest=1). But you can override these defaults by setting them in this dictionary. wallets: Pass a list of wallet filenames. Each wallet file will be copied into the node's directory before starting the node. """ logging.info("Initializing test directory %s", self.options.tmpdir) assert self.setup_clean_chain await initialize_chain_clean( self.options.tmpdir, self.num_nodes, self.extra_args, ) async def setup_nodes(self): return await start_nodes( self.num_nodes, self.options.tmpdir, extra_args=self.extra_args ) async def setup_network(self): self.nodes = await self.setup_nodes() if self.num_nodes == 1: return if self.num_nodes == 2: await connect_nodes_bi(self.nodes, 0, 1) await self.sync_all_nodes() return if self.num_nodes != 4: raise Exception(f"Default setup_network for {self.num_nodes} nodes NYI") await connect_nodes_bi(self.nodes, 0, 1) await connect_nodes_bi(self.nodes, 1, 2) await connect_nodes_bi(self.nodes, 2, 3) await self.sync_all_nodes() def stop_node(self, i): """Stop a nexad test node""" stop_nodes([self.nodes[i]]) self.wait_for_node_exit(i, 60) def stop_nodes(self): """Stop multiple nexad test nodes""" return stop_nodes(self.nodes) def start_node(self, i, extra_args=None): return start_node(i, self.options.tmpdir, extra_args) def restart_node(self, i, extra_args=None): """Stop and start a test node""" self.stop_node(i) self.start_node(i, extra_args) def wait_for_node_exit(self, i, timeout): wait_bitcoind_exit(i, timeout) async def sync_all_nodes(self): await sync_blocks(self.nodes) await sync_mempools(self.nodes) async def sync_blocks(self): """Synchronizes blocks""" await sync_blocks(self.nodes) # pylint: disable=too-many-branches,too-many-statements async def main(self): """ argsOverride: pass your own values for sys.argv in this field (or pass None) to use sys.argv bitcoin_conf_dict: Pass a dictionary of values you want written to bitcoin.conf. If you have a key with multiple values, pass a list of the values as the value, for example: { "debug":["net","blk","thin","lck","mempool","req","bench","evict"] } This framework provides values for the necessary fields (like regtest=1). But you can override these defaults by setting them in this dictionary. wallets: Pass a list of wallet filenames. Each wallet file will be copied into the node's directory before starting the node. """ parser = optparse.OptionParser(usage="%prog [options]") parser.add_option( "--nocleanup", dest="nocleanup", default=False, action="store_true", help="Leave nexads and test.* datadir on exit or error", ) parser.add_option( "--noshutdown", dest="noshutdown", default=False, action="store_true", help="Don't stop nexads after the test execution", ) testname = "".join( filter(lambda x: x in string.ascii_lowercase, basename(argv[0])) ) default_tempdir = tempfile.mkdtemp(prefix="test_" + testname + "_") parser.add_option( "--tmppfx", dest="tmppfx", default=None, help="Directory custom prefix for data directories, if specified, overrides tmpdir", ) parser.add_option( "--tmpdir", dest="tmpdir", default=default_tempdir, help="Root directory for data directories.", ) parser.add_option( "--tracerpc", dest="trace_rpc", default=False, action="store_true", help="Print out all RPC calls as they are made", ) parser.add_option( "--no-ipv6-rpc-listen", dest="no_ipv6_rpc_listen", default=False, action="store_true", help="Switch off listening on the IPv6 ::1 localhost RPC port. " "This is meant to deal with travis which is currently not supporting IPv6 sockets.", ) parser.add_option( "--gitlab", dest="gitlab", default=False, action="store_true", help="Changes root directory for gitlab artifact exporting. overrides tmpdir and tmppfx", ) self.add_options(parser) (self.options, self.args) = parser.parse_args() if self.options.gitlab is True: basedir = os.path.normpath( os.path.dirname(os.path.realpath(__file__)) + "/../../qa_tests" ) if not os.path.exists(basedir): try: os.mkdir(path=basedir, mode=0o700) except FileExistsError: # ignore pass self.options.tmpdir = tempfile.mkdtemp( prefix="test_" + testname + "_", dir=basedir ) UtilOptions.no_ipv6_rpc_listen = self.options.no_ipv6_rpc_listen if self.options.trace_rpc: logging.basicConfig(level=logging.DEBUG, stream=sys.stdout) check_json_precision() success = False try: try: os.makedirs(self.options.tmpdir, exist_ok=False) except FileExistsError: # sanity check that tmpdir is not the top level before I delete # stuff assert self.options.tmpdir.count(os.sep) >= 2 for n in range( 0, 8 ): # delete the nodeN directories so their contents dont affect the new test d = self.options.tmpdir + os.sep + f"node{n}" try: shutil.rmtree(d) except FileNotFoundError: pass await self.setup_chain() await self.setup_network() await self.run_test() success = True except JSONRPCException as e: logging.error("JSONRPC error: %s", e.error["message"]) _, _, tb = sys.exc_info() traceback.print_tb(tb) if self.drop_to_pdb: pdb.post_mortem(tb) except AssertionError as e: logging.error("Assertion failed: %s", str(e)) _, _, tb = sys.exc_info() traceback.print_tb(tb) if self.drop_to_pdb: pdb.post_mortem(tb) except KeyError as e: logging.error("key not found: %s", str(e)) _, _, tb = sys.exc_info() traceback.print_tb(tb) if self.drop_to_pdb: pdb.post_mortem(tb) except Exception as e: logging.error("Unexpected exception caught during testing: %s", repr(e)) _, _, tb = sys.exc_info() traceback.print_tb(tb) if self.drop_to_pdb: pdb.post_mortem(tb) except KeyboardInterrupt as e: logging.error("Exiting after %s", repr(e)) if not self.options.noshutdown: logging.info("Stopping nodes") if hasattr(self, "nodes"): # nodes may not exist if there's a startup error stop_nodes(self.nodes) else: logging.warning("Note: nexads were not stopped and may still be running") if not self.options.nocleanup and not self.options.noshutdown and success: logging.info("Cleaning up") shutil.rmtree(self.options.tmpdir) else: logging.info("Not cleaning up dir %s", self.options.tmpdir) if success: logging.info("Test successful") else: raise Exception("Test failed")