import json import logging import os import signal import socket import subprocess import time # A helper function for testing lib/daemon itself. def _daemon_shell_script(ctx, filename=None): get_file = globals()["get_file"] data = get_file(filename) with open(filename, "wb") as f: f.write(data) os.chmod(filename, 0o755) # Start a daemon that will open a port on localhost. def daemon_start_on_port(ctx, path=None, args=None, name=None, port=None, env=None): _daemon_start(ctx, path=path, args=args, name=name, env=env) daemon_wait_for_port("localhost", port) # Start a daemon after a little wait. This is used only for testing the # port-waiting code. def _daemon_start_soonish(ctx, path=None, args=None, name=None, port=None, env=None): _daemon_start(ctx, path=path, args=args, name=name, env=env) daemon = ctx.declare("_daemon") # Store the PID of the process we just started so that _daemon_stop_soonish # can kill it during the cleanup phase. This works around the Subplot # Python template not giving the step captures to cleanup functions. Note # that this code assume at most one _soonish function is called. daemon["_soonish"] = daemon[name]["pid"] try: daemon_wait_for_port("localhost", port) except Exception as e: daemon["_start_error"] = repr(e) logging.info("pgrep: %r", _daemon_pgrep(path)) def _daemon_stop_soonish(ctx, path=None, args=None, name=None, port=None, env=None): ns = ctx.declare("_daemon") pid = ns["_soonish"] logging.debug(f"Stopping soonishly-started daemon, {pid}") signo = signal.SIGKILL try: os.kill(pid, signo) except ProcessLookupError: logging.warning("Process did not actually exist (anymore?)") # Find a binary akin to the `which` or `whereis` command def _daemon_whereis(path): if "/" in path: logging.debug(f"Not using PATH for daemon {path}") return path for prefix in os.environ["PATH"].split(":"): absolute = os.path.join(prefix, path) logging.debug(f"Checking for {absolute}") if os.access(absolute, os.X_OK) and os.path.isfile(absolute): return absolute return path # Start a daeamon, get its PID. Don't wait for a port or anything. This is # meant for background processes that don't have port. Useful for testing the # lib/daemon library of Subplot, but not much else. def _daemon_start(ctx, path=None, args=None, name=None, env=None): runcmd_run = globals()["runcmd_run"] runcmd_exit_code_is = globals()["runcmd_exit_code_is"] runcmd_get_exit_code = globals()["runcmd_get_exit_code"] runcmd_get_stderr = globals()["runcmd_get_stderr"] runcmd_prepend_to_path = globals()["runcmd_prepend_to_path"] path = _daemon_whereis(path) path = os.path.abspath(path) argv = [path] + args.split() env = json.loads(env or "{}") env_vars = [] if env: for (key, value) in env.items(): env_vars.extend(["-E", f"{key}={value}"]) logging.debug(f"Starting daemon {name}") logging.debug(f" ctx={ctx.as_dict()}") logging.debug(f" name={name}") logging.debug(f" path={path}") logging.debug(f" args={args}") logging.debug(f" argv={argv}") logging.debug(f" env={env}") ns = ctx.declare("_daemon") this = ns[name] = { "pid-file": f"{name}.pid", "stderr": f"{name}.stderr", "stdout": f"{name}.stdout", } # Debian installs `daemonize` to /usr/sbin, which isn't part of the minimal # environment that Subplot sets up. So we add /usr/sbin to the PATH. runcmd_prepend_to_path(ctx, "/usr/sbin") runcmd_run( ctx, [ "daemonize", "-c", os.getcwd(), "-p", this["pid-file"], "-e", this["stderr"], "-o", this["stdout"], ] + env_vars + argv, ) # Check that daemonize has exited OK. If it hasn't, it didn't start the # background process at all. If so, log the stderr in case there was # something useful there for debugging. exit = runcmd_get_exit_code(ctx) if exit != 0: stderr = runcmd_get_stderr(ctx) logging.error(f"daemon {name} stderr: {stderr}") runcmd_exit_code_is(ctx, 0) # Get the pid of the background process, from the pid file created by # daemonize. We don't need to wait for it, since we know daemonize already # exited. If it isn't there now, it's won't appear later. if not os.path.exists(this["pid-file"]): raise Exception("daemonize didn't create a PID file") this["pid"] = _daemon_wait_for_pid(this["pid-file"], 10.0) logging.debug(f"Started daemon {name}") logging.debug(f" pid={this['pid']}") logging.debug(f" ctx={ctx.as_dict()}") def _daemon_wait_for_pid(filename, timeout): start = time.time() while time.time() < start + timeout: with open(filename) as f: data = f.read().strip() if data: return int(data) raise Exception("daemonize created a PID file without a PID") def daemon_wait_for_port(host, port, timeout=5.0): addr = (host, port) until = time.time() + timeout while True: try: s = socket.create_connection(addr, timeout=timeout) s.close() return except socket.timeout: logging.error( f"daemon did not respond at port {port} within {timeout} seconds" ) raise except socket.error as e: logging.info(f"could not connect to daemon at {port}: {e}") pass if time.time() >= until: logging.error( f"could not connect to daemon at {port} within {timeout} seconds" ) raise ConnectionRefusedError() # Sleep a bit to avoid consuming too much CPU while busy-waiting. time.sleep(0.1) # Stop a daemon. def daemon_stop(ctx, path=None, args=None, name=None): logging.debug(f"Stopping daemon {name}") ns = ctx.declare("_daemon") logging.debug(f" ns={ns}") pid = ns[name]["pid"] signo = signal.SIGTERM this = ns[name] _daemon_log_long(f"{name} stdout, before:", open(this["stdout"]).read()) _daemon_log_long(f"{name} stderr, before:", open(this["stderr"]).read()) logging.debug(f"Terminating process {pid} with signal {signo}") try: os.kill(pid, signo) except ProcessLookupError: logging.warning("Process did not actually exist (anymore?)") while True: try: os.kill(pid, 0) logging.debug(f"Daemon {name}, pid {pid} still exists") time.sleep(1) except ProcessLookupError: break logging.debug(f"Daemon {name} is gone") _daemon_log_long(f"{name} stdout, after:", open(this["stdout"]).read()) _daemon_log_long(f"{name} stderr, after:", open(this["stderr"]).read()) def _daemon_log_long(prefix, data): logging.debug(prefix) for line in data.splitlines(): logging.debug(f" {line}") def daemon_no_such_process(ctx, args=None): assert not _daemon_pgrep(args) def daemon_process_exists(ctx, args=None): assert _daemon_pgrep(args) def _daemon_pgrep(pattern): logging.info(f"checking if process exists: pattern={pattern}") exit = subprocess.call( ["pgrep", "-laf", pattern], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) logging.info(f"exit code: {exit}") return exit == 0 def daemon_start_fails_with(ctx, message=None): daemon = ctx.declare("_daemon") error = daemon["_start_error"] logging.debug(f"daemon_start_fails_with: error={error!r}") logging.debug(f"daemon_start_fails_with: message={message!r}") assert message.lower() in error.lower() def daemon_start_succeeds(ctx): daemon = ctx.declare("_daemon") logging.debug(f"daemon_start_succeeds: {daemon}") for name in daemon.keys(): if isinstance(daemon[name], dict): logging.debug(f"name={name}") stderr = daemon_get_stderr(ctx, name) logging.debug(f"daemon_start_succeeds: {name}:\n{stderr}") assert "_start_error" not in daemon def daemon_get_stdout(ctx, name): return _daemon_get_output(ctx, name, "stdout") def daemon_get_stderr(ctx, name): return _daemon_get_output(ctx, name, "stderr") def _daemon_get_output(ctx, name, which): ns = ctx.declare("_daemon") this = ns[name] filename = this[which] data = open(filename).read() logging.debug(f"Read {which} of daemon {name} from {filename}: {data!r}") return data def daemon_has_produced_output(ctx, name=None): started = time.time() timeout = 5.0 while time.time() < started + timeout: stdout = daemon_get_stdout(ctx, name) stderr = daemon_get_stderr(ctx, name) if stdout and stderr: break time.sleep(0.1) def daemon_stdout_is(ctx, name=None, text=None): daemon_get_stdout = globals()["daemon_get_stdout"] _daemon_output_is(ctx, name, text, daemon_get_stdout) def daemon_stdout_contains(ctx, name=None, text=None): daemon_get_stdout = globals()["daemon_get_stdout"] assert_eq = globals()["assert_eq"] result = _daemon_output_contains(ctx, name, text, daemon_get_stdout) assert_eq(result, True) def daemon_stdout_doesnt_contain(ctx, name=None, text=None): daemon_get_stdout = globals()["daemon_get_stdout"] assert_eq = globals()["assert_eq"] result = _daemon_output_contains(ctx, name, text, daemon_get_stdout) assert_eq(result, False) def daemon_stderr_is(ctx, name=None, text=None): daemon_get_stderr = globals()["daemon_get_stderr"] _daemon_output_is(ctx, name, text, daemon_get_stderr) def _daemon_output_is(ctx, name, text, getter): assert_eq = globals()["assert_eq"] text = bytes(text, "UTF-8").decode("unicode_escape") output = getter(ctx, name) assert_eq(text, output) def _daemon_output_contains(ctx, name, text, getter): log_lines = globals()["log_lines"] indent = " " * 4 text = bytes(text, "UTF-8").decode("unicode_escape") output = getter(ctx, name) logging.debug(" output:") log_lines(indent, output) logging.debug(" text:") log_lines(indent, text) return text in output