# Copyright (c) Microsoft Corporation. # Licensed under the MIT license. import copy import sys import argparse from os import mkdir from shutil import move, rmtree from os.path import isdir import yaml from ci.job.utils import set_commit_hash, set_libos from ci.job.factory import JobFactory import ci.git as git # ===================================================================================================================== # Runs the CI pipeline. def run_pipeline( platform: str, log_directory: str, repository: str, branch: str, libos: str, is_debug: bool, server: str, client: str, test_unit: bool, test_integration: bool, test_system: str, server_addr: str, client_addr: str, delay: float, config_path: str, output_dir: str, enable_nfs: bool, install_prefix: str) -> int: is_sudo: bool = True if libos == "catnip" or libos == "catpowder" or libos == "catloop" else False status: dict[str, bool] = {} config: dict = { "server": server, "server_name": server, "client": client, "client_name": client, "repository": repository, "branch": branch, "libos": libos, "is_debug": is_debug, "test_unit": test_unit, "test_system": test_system, "server_addr": server_addr, "server_ip": server_addr, "client_addr": client_addr, "client_ip": client_addr, "delay": delay, "config_path": config_path, "output_dir": output_dir, "enable_nfs": enable_nfs, "log_directory": log_directory, "is_sudo": is_sudo, "platform": platform, "install_prefix": install_prefix, } factory: JobFactory = JobFactory(config) # STEP 1: Check out. status["checkout"] = factory.checkout().execute() # STEP 2: Compile debug. if status["checkout"]: status["compile"] = True status["compile"] &= factory.compile().execute() if config["platform"] == "linux": status["compile"] &= factory.install().execute() # STEP 3: Run unit tests. if test_unit: if status["checkout"] and status["compile"]: status["unit_tests"] = True status["unit_tests"] &= factory.unit_test(test_name="test-unit-rust").execute() status["unit_tests"] &= factory.unit_test(test_name="test-unit-c").execute() # STEP 4: Run integration tests. if test_integration: if status["checkout"] and status["compile"]: if libos in ["catnap", "catloop", "catnip", "catpowder"]: status["integration_tests"] = factory.integration_test().execute() elif libos == "catmem": status["integration_tests"] = factory.integration_test("standalone").execute() status["integration_tests"] = factory.integration_test("push-wait").execute() status["integration_tests"] = factory.integration_test("pop-wait").execute() status["integration_tests"] = factory.integration_test("push-wait-async").execute() status["integration_tests"] = factory.integration_test("pop-wait-async").execute() # STEP 5: Run system tests. if test_system and config["platform"]: if status["checkout"] and status["compile"]: ci_map = read_yaml(platform, libos) # Run pipe-open test. if __should_run(ci_map[libos], "pipe_open", test_system): scenario = ci_map[libos]['pipe_open'] status["pipe-open"] = factory.system_test(test_name="open-close", niterations=scenario['niterations']).execute() # Run pipe-ping-pong test. if __should_run(ci_map[libos], "pipe_ping_pong", test_system): status["pipe-ping-pong"] = factory.system_test(test_name="ping-pong").execute() # Run pipe-push-pop test. if __should_run(ci_map[libos], "pipe_push_pop", test_system): status["pipe-push-pop"] = factory.system_test(test_name="push-pop").execute() if __should_run(ci_map[libos], "tcp_echo", test_system): status["tcp_echo"] = True test_config = ci_map[libos]['tcp_echo'] names = [p for p in test_config] scenarios = build_combinations(test_config, names, {}) for scenario in scenarios: # Skipt if scenario requires more threads then clients. if scenario['nthreads'] > scenario['nclients']: continue if libos == "catnap": status["tcp_echo"] &= factory.system_test( test_name="tcp_echo", run_mode=scenario['run_mode'], nclients=scenario['nclients'], bufsize=scenario['bufsize'], nrequests=scenario['nrequests'], nthreads=scenario['nthreads']).execute() else: status["tcp_echo"] &= factory.system_test( test_name="tcp_echo", run_mode=scenario['run_mode'], nclients=scenario['nclients'], bufsize=scenario['bufsize'], nrequests=scenario['nrequests'], nthreads=1).execute() if __should_run(ci_map[libos], "tcp_close", test_system): status["tcp_close"] = True test_config = ci_map[libos]['tcp_close'] names = [p for p in test_config] scenarios = build_combinations(test_config, names, {}) for scenario in scenarios: status["tcp_close"] &= factory.system_test( test_name="tcp_close", run_mode=scenario['run_mode'], who_closes=scenario['who_closes'], nclients=scenario['nclients']).execute() if __should_run(ci_map[libos], "tcp_wait", test_system): status["tcp_wait"] = True test_config = ci_map[libos]['tcp_wait'] names = [p for p in test_config] scenarios = build_combinations(test_config, names, {}) for scenario in scenarios: status["tcp_wait"] &= factory.system_test(test_name="tcp_wait", scenario=scenario['scenario'], nclients=scenario['nclients']).execute() if __should_run(ci_map[libos], "tcp_ping_pong", test_system): status["tcp_ping_pong"] = factory.system_test(test_name="tcp_ping_pong").execute() if __should_run(ci_map[libos], "tcp_push_pop", test_system): status["tcp_push_pop"] = factory.system_test(test_name="tcp_push_pop").execute() if __should_run(ci_map[libos], "udp_ping_pong", test_system): status["udp_ping_pong"] = factory.system_test(test_name="udp_ping_pong").execute() if __should_run(ci_map[libos], "udp_push_pop", test_system): status["udp_push_pop"] = factory.system_test(test_name="udp_push_pop").execute() # Setp 5: Clean up. status["cleanup"] = factory.cleanup().execute() return status # Runs the CI pipeline. def run_redis_pipeline( platform: str, log_directory: str, branch: str, libos: str, server: str, client: str, server_addr: str, client_addr: str, delay: float, output_dir: str, libshim_path: str, ld_library_path: str, config_path: str) -> int: is_sudo: bool = True if libos == "catnip" or libos == "catpowder" or libos == "catloop" else False status: dict[str, bool] = {} config: dict = { "server": server, "server_name": server, "client": client, "libos": libos, "path": "\$HOME", "client_name": client, "repository": "https://github.com/redis/redis.git", "branch": "7.0", "server_addr": server_addr, "client_addr": client_addr, "delay": delay, "output_dir": output_dir, "log_directory": log_directory, "is_sudo": is_sudo, "platform": platform, "libshim_path": libshim_path, "ld_library_path": ld_library_path, "config_path": config_path, } factory: JobFactory = JobFactory(config) status["clone-redis"] = factory.clone_redis().execute() status["make-redis"] = factory.make_redis().execute() status["run-redis"] = factory.run_redis_server().execute() status["run-redis-benchmark"] = factory.run_redis_benchmark().execute() factory.stop_redis_server().execute() # NOTE: we do not report status od stopping redis server. status["cleanup-redis"] = factory.cleanup_redis().execute() return status # Recursively builds all combinations def build_combinations(scenario: dict, names: list, params: dict) -> list: if len(names) == 0: l = [copy.deepcopy(params)] return l else: name = names[0] values = [v for v in scenario[name]] scenarios = [] for value in values: params[name] = value scenarios += build_combinations(scenario, names[1:], params) del params[name] return scenarios def __should_run(ci_map, test_name: str, test_system: str) -> bool: """Checks if we should run a given system test.""" return test_name in ci_map and (test_system == "all" or test_system == test_name) def read_yaml(platform: str, libos: str): path: str = f"tools/ci/config/test/{platform}/{libos}.yaml" yaml_str = "" with open(path) as f: yaml_str = f.read() return yaml.safe_load(yaml_str) # Reads and parses command line arguments. def read_args() -> argparse.Namespace: description: str = "" description += "Use this utility to run the regression system of Demikernel on a pair of remote host machines.\n" description += "Before using this utility, ensure that you have correctly setup the development environment on the remote machines.\n" description += "For more information, check out the README.md file of the project." # Initialize parser. parser = argparse.ArgumentParser( prog="demikernel_ci.py", description=description) # Host options. parser.add_argument("--platform", required=True, help="set platform") parser.add_argument("--server", required=True, help="set server host name") parser.add_argument("--client", required=True, help="set client host name") # Build options. parser.add_argument("--repository", required=True, help="set location of target repository in remote hosts") parser.add_argument("--branch", required=True, help="set target branch in remote hosts") parser.add_argument("--libos", required=True, help="set target libos in remote hosts") parser.add_argument("--debug", required=False, action='store_true', help="sets debug build mode") parser.add_argument("--delay", default=1.0, type=float, required=False, help="set delay between server and host for system-level tests") parser.add_argument("--enable-nfs", required=False, default=False, action="store_true", help="enable building on nfs directories") parser.add_argument("--install-prefix", required=False, default="/tmp/demikernel", help="set install prefix for building") # Test options. parser.add_argument("--test-unit", action='store_true', required=False, help="run unit tests") parser.add_argument("--test-integration", action='store_true', required=False, help="run integration tests") parser.add_argument("--test-system", type=str, required=False, help="run system tests") parser.add_argument("--test-redis", action='store_true', required=False, help="run redis tests") parser.add_argument("--server-addr", required="--test-system" in sys.argv, help="sets server address in tests") parser.add_argument("--client-addr", required="--test-system" in sys.argv, help="sets client address in tests") parser.add_argument("--config-path", required=False, default="\$HOME/config.yaml", help="sets config path") # Other options. parser.add_argument("--output-dir", required=False, default=".", help="output directory for logs") # Read arguments from command line. return parser.parse_args() # Drives the program. def main(): # Parse and read arguments from command line. args: argparse.Namespace = read_args() # Extract host options. platform: str = args.platform server: str = args.server client: str = args.client # Extract build options. repository: str = args.repository branch: str = args.branch libos: str = args.libos is_debug: bool = args.debug delay: float = args.delay config_path: str = args.config_path enable_nfs: bool = args.enable_nfs install_prefix: str = args.install_prefix # Extract test options. test_unit: bool = args.test_unit test_integration: bool = args.test_integration test_system: str = args.test_system test_redis: bool = args.test_redis server_addr: str = args.server_addr client_addr: str = args.client_addr # Output directory. output_dir: str = args.output_dir # Initialize glboal variables. head_commit: str = git.get_head_commit(branch) set_commit_hash(head_commit) set_libos(libos) # Create folder for test logs log_directory: str = "{}/{}".format(output_dir, "{}-{}".format(libos, branch).replace("/", "_")) if isdir(log_directory): # Keep the last run old_dir: str = log_directory + ".old" if isdir(old_dir): rmtree(old_dir) move(log_directory, old_dir) mkdir(log_directory) # Check if (platform, libos) combination is invalid. if platform == "windows" and libos not in ["catnap", "catpowder"]: print("Invalid (platform, libos) combination.") sys.exit(-1) status: dict = run_pipeline(platform, log_directory, repository, branch, libos, is_debug, server, client, test_unit, test_integration, test_system, server_addr, client_addr, delay, config_path, output_dir, enable_nfs, install_prefix) if test_redis: libshim_path: str = f"{install_prefix}/lib/libshim.so" ld_library_path: str = f"{install_prefix}/lib" if libos == "catnip": ld_library_path = f"\$HOME/lib/x86_64-linux-gnu:{install_prefix}/lib" status |= run_redis_pipeline(platform, log_directory, branch, libos, server, client, server_addr, client_addr, delay, output_dir, libshim_path, ld_library_path, config_path) if False in status.values(): sys.exit(-1) else: sys.exit(0) if __name__ == "__main__": main()