#!/usr/bin/env python3 # Copyright (c) 2014-2015 The Bitcoin Core developers # Copyright (c) 2015-2022 The Bitcoin Unlimited developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ Run Functional Test Suite This module calls down into individual test cases via subprocess. It will forward all unrecognized arguments onto the individual test scripts, other than: - `-h` or '--help': print help about all options - `-list`: only list the test scripts, do not run. For more detailed help on options, run with '--help'. """ # pylint: disable=too-many-locals,too-many-branches,too-many-statements,too-many-nested-blocks,too-few-public-methods,consider-using-with import multiprocessing import os import time import signal import sys import subprocess import tempfile import re import glob import logging import psutil from test_runner_classes import RpcTest from test_framework.environment import ( network, full_node_path, rostrum_path, node, Node, NodeFeature, node_supports, ) # to support out-of-source builds, we need to add both the source directory to the path, and the out-of-source directory # because tests_config is a generated file THIS_SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) sys.path.append(THIS_SCRIPT_DIR) logging.basicConfig(level=logging.DEBUG) def bold(text): if os.name == "posix": # primitive formatting on supported # terminal via ANSI escape sequences: return f"\033[1m{text}\033[0m" return text RPC_TESTS_DIR = os.path.join(THIS_SCRIPT_DIR) logging.info("RPC test dir: %s", RPC_TESTS_DIR) ENABLE_COVERAGE = 0 CUSTOM_ELECTRUM_PATH = None # Create a set to store arguments and create the OPTIONS_TO_PASS_ON string opts = set() OPTIONS_TO_PASS_ON = "" p = re.compile("^--") p_parallel = re.compile("^-parallel=") PARALLEL_TASKS = multiprocessing.cpu_count() # some of the single-dash options applicable only to this runner script # are also allowed in double-dash format (but are not passed on to the # test scripts themselves) # equivalent to -force-enable private_single_opts = ("-h", "-f", "-help", "-list") def option_passed(option_without_dashes): return "-" + option_without_dashes in opts def show_wrapper_options(): """print command line options specific to wrapper""" print("Wrapper options:") print() print(" -list / --list only list test names") print( " -f / -force-enable / --force-enable\n" + " attempt to run disabled/skipped tests" ) print(" -h / -help / --help print this help") for arg in sys.argv[1:]: if p.match(arg) or arg in ("-h", "-help"): if arg in ("--help", "-help", "-h"): show_wrapper_options() sys.exit(1) else: if OPTIONS_TO_PASS_ON: OPTIONS_TO_PASS_ON += " " + arg else: OPTIONS_TO_PASS_ON = arg elif p_parallel.match(arg): PARALLEL_TASKS = int(arg.split(sep="=", maxsplit=1)[1]) else: # this is for single-dash options only # they are interpreted only by this script opts.add(arg) # check for unrecognized options for o in opts: if o.startswith("-"): if o not in private_single_opts: logging.error("Unrecognized option %s", o) show_wrapper_options() sys.exit(1) try: logging.info("Using node path '%s'", full_node_path()) logging.info("Using rostrum path '%s'", rostrum_path()) logging.info("Running tests for network '%s'", network()) except Exception as e: logging.error(e) sys.exit(1) test_files = glob.glob("rpc_*.py", root_dir=RPC_TESTS_DIR) + glob.glob( "feature_*.py", root_dir=RPC_TESTS_DIR ) def add_test(t): t = RpcTest(t) supports_spawn = node_supports(node(), NodeFeature.SPAWN_ROSTRUM) if t.name.startswith("rpc_token") and not node_supports(node(), NodeFeature.TOKENS): t.disable("Node does not support tokens.") if t.name == "feature_reorg.py" and node() == Node.BCHUNLIMITED: t.disable("BCHUnlimited does not serve blocks outside active chain.") if t.name == "feature_shutdownonerror.py" and not supports_spawn: t.disable(f"Node '{node()}' does not support this feature.") if t.name == "feature_doslimit.py" and not supports_spawn: t.disable(f"Test requires {NodeFeature.SPAWN_ROSTRUM} support") if t.name == "feature_idle_timeout.py" and not supports_spawn: t.disable(f"Test requires {NodeFeature.SPAWN_ROSTRUM} support") if ( t.name in ("rpc_blockchain_address_listunspent.py", "rpc_token_address_history.py") ) and node() == Node.BCHUNLIMITED: t.disable("BCHUnlimited wallet too unreliable for tokens (double spends coins)") return t test_scripts_electrum = [add_test(t) for t in test_files] def runtests(): test_passed = [] disabled = [] skipped = [] tests_to_run = test_scripts_electrum force_enable = option_passed("force-enable") or "-f" in opts if option_passed("list"): logging.info("Tests to run:") for test in tests_to_run: logging.info(test) sys.exit(0) flags = f" {OPTIONS_TO_PASS_ON}" # weed out the disabled / skipped tests and print them beforehand # this allows earlier intervention in case a test is unexpectedly # skipped if not force_enable: trimmed_tests_to_run = [] for test in tests_to_run: if test.is_disabled(): logging.info( "Disabled testscript %s (reason: %s)", bold(test), test.reason ) disabled.append(str(test)) elif test.is_skipped(): logging.info( "Skipping testscript %s on this platform (reason: %s)", bold(test), test.reason, ) skipped.append(str(test)) else: trimmed_tests_to_run.append(test) tests_to_run = trimmed_tests_to_run # if all specified tests are disabled just quit if not tests_to_run: sys.exit(0) tests_to_run = list(map(str, tests_to_run)) max_len_name = len(max(tests_to_run, key=len)) time_sum = 0 time0 = time.time() job_queue = RPCTestHandler(PARALLEL_TASKS, tests_to_run, flags) results = bold(f"\n{'TEST'.ljust(max_len_name)} | PASSED | DURATION") + "\n\n" all_passed = True for _ in range(len(tests_to_run)): ( name, ret_code, stdout, stderr, passed, duration, ) = job_queue.get_next() test_passed.append(passed) all_passed = all_passed and passed time_sum += duration results += f"{name.ljust(max_len_name)} | {str(passed).ljust(6)} | {duration}\n" logging.info("\n%s: Pass %s, Duration: %s", bold(name), bold(passed), duration) if not passed: print(f"## Output for failed test {name}") print(f"Return code: {ret_code}") if stdout != "": print("- stdout " + ("-" * 50) + "\n", stdout) if stderr != "": print("- stderr " + ("-" * 50) + "\n", stderr) print(f"## End of output for failed test {name}") results += bold( f"{'ALL'.ljust(max_len_name)} | {str(all_passed).ljust(6)} | {time_sum} s (accumulated)" ) logging.info(results) logging.info("\nRuntime: %s s", (int(time.time() - time0))) # show some overall results and aggregates logging.info( "\n%d test(s) passed / %d test(s) failed / %d test(s) executed", test_passed.count(True), test_passed.count(False), len(test_passed), ) logging.info( "%d test(s) disabled / %d test(s) skipped due to platform", len(disabled), len(skipped), ) # signal that tests have failed using exit code sys.exit(not all_passed) class RPCTestHandler: """ Trigger the testscrips passed in via the list. """ def __init__(self, num_tests_parallel, test_list=None, flags=None): assert num_tests_parallel >= 1 self.num_jobs = num_tests_parallel self.test_list = test_list self.flags = flags self.num_running = 0 # In case there is a graveyard of zombie nexads, we can apply a # pseudorandom offset to hopefully jump over them. # 3750 is PORT_RANGE/MAX_NODES defined in util, but awkward to import # into rpc-test.py self.jobs = [] def get_next(self): while self.num_running < self.num_jobs and self.test_list: # Add tests self.num_running += 1 test = self.test_list.pop(0) log_stdout = tempfile.SpooledTemporaryFile(max_size=2**16, mode="w+") log_stderr = tempfile.SpooledTemporaryFile(max_size=2**16, mode="w+") got_outputs = [False] logging.info("Starting %s", test) self.jobs.append( ( test, time.time(), subprocess.Popen( (os.path.join(RPC_TESTS_DIR, test)).split() + self.flags.split(), universal_newlines=True, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, restore_signals=True, start_new_session=True, ), log_stdout, log_stderr, got_outputs, ) ) if not self.jobs: raise IndexError("Can't get next job. No jobs running.") count = 0 def comms(proc, timeout): stdout_data, stderr_data = proc.communicate(timeout=timeout) log_stdout.write(stdout_data) log_stderr.write(stderr_data) while True: count += 1 # Return first proc that finishes time.sleep(0.5) for j in self.jobs: (name, time0, proc, log_stdout, log_stderr, got_outputs) = j if int(time.time() - time0) > 5 * 60: # Timeout individual tests after 5 minutes (to stop tests hanging and not # providing useful output. proc.send_signal(signal.SIGINT) # Poll for new data on stdout and stderr. This is also necessary as to not block # the subprocess when the stdout or stderr pipe is full. try: # WARNING: There seems to be a bug in python handling of .join() so that # when you do a .join() with a zero or negative timeout, it will not even try # joining the thread. This is for the handling of the stdout/stderr reader threads # in subprocess.py. A sufficiently positive value (and 0.1s seems to be enough) # seems to make the .join() logic to work, and in turn communicate() not to fail # with a timeout, even though the thread is done reading (which was another cause # of a hang) if not got_outputs[0]: comms(proc, 0.1) # .communicate() can only be called once and we have to keep in mind now that # communication happened properly (and the files are closed). It _has_ to be called with a non-None # timeout initially, however, to start the communication threads internal to subprocess.Popen(..) # that are necessary to not block on more output than what fits into the OS' pipe buffer. # Note that end-of-communication does not necessarily # indicate a finished subprocess. got_outputs[0] = True except subprocess.TimeoutExpired: pass except ValueError: # There is a bug in communicate that causes this exception if the child process has closed any pipes but is still running # see: https://bugs.python.org/issue35182 pass # it won't ever communicate() fully because child didn't close # sockets try: psproc = psutil.Process(proc.pid) if psproc.status() == psutil.STATUS_ZOMBIE: got_outputs[0] = True except AttributeError: pass except FileNotFoundError: pass # its ok means process exited cleanly except psutil.NoSuchProcess: pass if got_outputs[0]: retval = ( proc.returncode if proc.returncode is not None else proc.poll() ) if retval is None: logging.warning( "%s: should be impossible, got output from communicate but process is alive", proc.args[0], ) log_stdout.seek(0) log_stderr.seek(0) stdout = log_stdout.read() stderr = log_stderr.read() passed = stderr == "" and proc.returncode == 0 passed = proc.returncode == 0 self.num_running -= 1 self.jobs.remove(j) return ( name, retval, stdout, stderr, passed, int(time.time() - time0), ) print(".", end=("" if count % 160 != 0 else "\n"), flush=True) if __name__ == "__main__": runtests()