import sys import os import io sys.path.append(f"{os.environ['WORKSPACE']}/ci_resources/configs/{os.environ['CLUSTER']}") import subprocess import re import cloudbees_config import common import shlex import time # A Jenkins env variable for job name is composed of the name of the jenkins job and the branch name # it is building for. for e.g. in our case jobname = 'ofi_libfabric/master' class Test: def __init__ (self, jobname, buildno, testname, hw, core_prov, fabric, hosts, ofi_build_mode, user_env, log_file, mpitype=None, util_prov=None, way=None): self.jobname = jobname self.buildno = buildno self.testname = testname self.hw = hw self.core_prov = core_prov self.util_prov = f'ofi_{util_prov}' if util_prov != None else '' self.fabric = fabric self.hosts = hosts self.log_file = log_file self.mpi_type = mpitype self.ofi_build_mode = ofi_build_mode if (len(hosts) == 1): self.server = hosts[0] self.client = hosts[0] elif (len(hosts) == 2): self.server = hosts[0] self.client = hosts[1] self.nw_interface = cloudbees_config.interface_map[self.fabric] self.custom_workspace = os.environ['CUSTOM_WORKSPACE'] self.libfab_installpath = f'{self.custom_workspace}/'\ f'{self.hw}/{self.ofi_build_mode}' self.middlewares_path = f'{self.custom_workspace}/middlewares' self.ci_logdir_path = f'{self.custom_workspace}/log_dir' self.env = user_env self.way = way self.mpi = '' if (self.mpi_type == 'impi'): self.mpi = IMPI(self.core_prov, self.hosts, self.libfab_installpath, self.nw_interface, self.server, self.client, self.env, self.middlewares_path, self.util_prov) elif (self.mpi_type == 'ompi'): self.mpi = OMPI(self.core_prov, self.hosts, self.libfab_installpath, self.nw_interface, self.server, self.client, self.env, self.middlewares_path, self.util_prov) elif (self.mpi_type == 'mpich'): self.mpi = MPICH(self.hw, self.core_prov, self.hosts, self.libfab_installpath, self.nw_interface, self.server, self.client, self.env, self.middlewares_path, self.util_prov) class FiInfoTest(Test): def __init__(self, jobname, buildno, testname, hw, core_prov, fabric, hosts, ofi_build_mode, user_env, log_file, util_prov=None): super().__init__(jobname, buildno, testname, hw, core_prov, fabric, hosts, ofi_build_mode, user_env, log_file, None, util_prov) self.fi_info_testpath = f'{self.libfab_installpath}/bin' @property def cmd(self): return f"{self.fi_info_testpath}/fi_info " @property def options(self): if (self.util_prov): opts = f"-f {self.fabric} -p {self.core_prov};{self.util_prov}" elif (self.core_prov == 'psm3'): opts = f"-p {self.core_prov}" else: opts = f"-f {self.fabric} -p {self.core_prov}" return opts def execute_cmd(self): command = self.cmd + self.options outputcmd = shlex.split(command) common.run_command(outputcmd) class Fabtest(Test): def __init__(self, jobname, buildno, testname, hw, core_prov, fabric, hosts, ofi_build_mode, user_env, log_file, util_prov=None, way=None): super().__init__(jobname, buildno, testname, hw, core_prov, fabric, hosts, ofi_build_mode, user_env, log_file, None, util_prov, way) self.fabtestpath = f'{self.libfab_installpath}/bin' self.fabtestconfigpath = f'{self.libfab_installpath}/share/fabtests' self.device = cloudbees_config.fabric_map[self.hw] def get_exclude_file(self): path = self.libfab_installpath efile_path = f'{path}/share/fabtests/test_configs' if self.hw == 'ivysaur': efile = f'{efile_path}/{self.core_prov}/io_uring.exclude' elif self.hw == 'cyndaquil' or self.hw == 'quilava': efile = f'{efile_path}/{self.core_prov}/cuda.exclude' else: prov = self.util_prov if self.util_prov else self.core_prov efile_old = f'{efile_path}/{prov}/{prov}.exclude' if self.util_prov: efile = f'{efile_path}/{self.util_prov}/{self.core_prov}/exclude' else: efile = f'{efile_path}/{self.core_prov}/exclude' if os.path.isfile(efile): return efile elif os.path.isfile(efile_old): return efile_old else: print(f"Exclude file: {efile} not found!") return None @property def cmd(self): return f"{self.fabtestpath}/runfabtests.sh " @property def options(self): opts = f"-T 300 -vvv -p {self.fabtestpath} -S " if (self.core_prov != 'shm' and self.nw_interface): opts += f"-s {common.get_node_name(self.server, self.nw_interface)} " opts += f"-c {common.get_node_name(self.client, self.nw_interface)} " if (self.core_prov == 'shm'): opts += f"-s {self.server} " opts += f"-c {self.client} " opts += "-N " if (self.core_prov == 'ucx'): opts += "-b " if (self.ofi_build_mode == 'dl'): opts += "-t short " else: opts += "-t all " if (self.way == 'h2d'): opts += f"-C \"-H\" -L \"-D {self.device}\" " elif (self.way == 'd2d'): opts += f"-C \"-D {self.device}\" -L \"-D {self.device}\" " elif (self.way == 'xd2d'): opts += f"-C \"-D {self.device}\" -L \"-D {self.device} -i 1\" " if (self.core_prov == 'sockets' and self.ofi_build_mode == 'reg'): complex_test_file = f'{self.libfab_installpath}/share/fabtests/'\ f'test_configs/{self.core_prov}/quick.test' if (os.path.isfile(complex_test_file)): opts += "-u {complex_test_file} " else: print(f"{self.core_prov} Complex test file not found") if (self.ofi_build_mode != 'reg' or self.core_prov == 'udp'): opts += "-e \'ubertest,multinode\' " efile = self.get_exclude_file() if efile: opts += "-R " opts += f"-f {efile} " for key in self.env: opts += f"-E {key}={self.env[key]} " if self.util_prov: opts += f"{self.core_prov};{self.util_prov} " else: opts += f"{self.core_prov} " if (self.core_prov == 'shm'): opts += f"{self.server} {self.server} " else: opts += f"{self.server} {self.client} " return opts @property def execute_condn(self): return True def execute_cmd(self): curdir = os.getcwd() os.chdir(self.fabtestconfigpath) command = self.cmd + self.options outputcmd = shlex.split(command) common.run_command(outputcmd) os.chdir(curdir) class ShmemTest(Test): def __init__(self, jobname, buildno, testname, hw, core_prov, fabric, hosts, ofi_build_mode, user_env, log_file, util_prov=None, weekly=False): super().__init__(jobname, buildno, testname, hw, core_prov, fabric, hosts, ofi_build_mode, user_env, log_file, None, util_prov) self.n = 2 self.ppn = 1 self.weekly = weekly self.shmem_dir = f'{self.middlewares_path}/shmem_{self.hw}' self.oshrun = f'{self.shmem_dir}/bin/oshrun' self.hydra = f'{cloudbees_config.hydra}' self.shmem_testname = '' self.threshold = '1' self.isx_shmem_total_size = 33554432 self.isx_shmem_kernel_max = 134217728 self.prk_iterations = 10 self.prk_first_arr_dim = 1000 self.prk_second_arr_dim = 1000 if self.util_prov: self.prov = f'{self.core_prov}\\;{self.util_prov}' else: self.prov = self.core_prov self.test_dir = { 'sos' : 'SOS/test', 'isx' : 'ISx/SHMEM', 'prk' : 'PRK/SHMEM' } self.shmem_environ = { 'SHMEM_OFI_USE_PROVIDER': self.prov, 'OSHRUN_LAUNCHER' : self.hydra, 'PATH' : f'{self.shmem_dir}/bin:$PATH', 'LD_LIBRARY_PATH' : f'{self.shmem_dir}/lib:'\ f'{self.libfab_installpath}/lib', 'SHMEM_SYMMETRIC_SIZE' : '4G', 'LD_PRELOAD' : f'{self.libfab_installpath}'\ '/lib/libfabric.so', 'threshold' : self.threshold, 'SHMEM_DEBUG' : '1' } self.exclude_extensions = ['.cpp', '.c', '.o', '.h', '.f90', '.log', '.am', '.in', '.deps', '.libs'] self.SOS_tests = [ 'unit', 'shmemx', 'apps', 'spec-example' ] if self.weekly: self.SOS_tests.append('performance/shmem_perf_suite') self.SOS_tests.append('performance/tests') self.exclude = { 'sos' : { 'verbs' : [ 'makefile', 'readme' ], 'tcp' : [ 'makefile', 'readme' ], 'sockets' : [ 'makefile', 'readme' ] } } def export_env(self): environ = '' if self.shmem_testname == 'isx' or self.shmem_testname == 'prk': self.threshold = '0' for key,val in self.shmem_environ.items(): environ += f"export {key}={val}; " return environ def check_ending(self, f_name): """ Returns True if ending is okay, false if not """ for ext in self.exclude_extensions: if f_name.lower().endswith(ext): return False return True def get_cmds(self): cmd_list = [] if self.shmem_testname == 'sos': for test_dir in self.SOS_tests: test_dir_path = f'{self.shmem_dir}/' \ f'{self.test_dir[self.shmem_testname]}/' \ f'{test_dir}' for f_name in os.listdir(test_dir_path): if not self.check_ending(f_name) or \ f_name.lower() in \ self.exclude[self.shmem_testname][self.core_prov]: continue cmd_list.append(f"{test_dir_path}/{f_name}") elif self.shmem_testname == 'isx': exec_path = f'{self.shmem_dir}/{self.test_dir[self.shmem_testname]}/bin' cmd_list.append(f"{exec_path}/isx.strong {self.isx_shmem_kernel_max} " \ "output_strong") cmd_list.append(f"{exec_path}/isx.weak " \ f"{self.isx_shmem_total_size} output_weak") cmd_list.append(f"{exec_path}/isx.weak_iso " \ f"{self.isx_shmem_total_size} output_weak_iso") elif self.shmem_testname == 'prk': exec_path = f'{self.shmem_dir}/{self.test_dir[self.shmem_testname]}' cmd_list.append(f"{exec_path}/Stencil/stencil " \ f"{self.prk_iterations} {self.prk_first_arr_dim}") cmd_list.append(f"{exec_path}/Synch_p2p/p2p " \ f"{self.prk_iterations} {self.prk_first_arr_dim} "\ f"{self.prk_second_arr_dim}") cmd_list.append(f"{exec_path}/Transpose/transpose " \ f"{self.prk_iterations} {self.prk_first_arr_dim}") return cmd_list @property def execute_condn(self): return True def execute_cmd(self, shmem_testname): self.shmem_testname = shmem_testname base_cmd = f"{self.oshrun}" base_cmd = f"{base_cmd} -n {self.n}" base_cmd = f"{base_cmd} -ppn {self.ppn}" cmds = self.get_cmds() for cmd in self.get_cmds(): command = f"bash -c \'{self.export_env()} {base_cmd} {cmd}\'" outputcmd = shlex.split(command) print(f"Running {self.shmem_testname} {cmd.split('/')[-1]}") common.run_command(outputcmd) print(f"{self.shmem_testname} {cmd.split('/')[-1]} PASS!") class MultinodeTests(Test): def __init__(self, jobname, buildno, testname, hw, core_prov, fabric, hosts, ofi_build_mode, user_env, log_file, util_prov=None): super().__init__(jobname, buildno, testname, hw, core_prov, fabric, hosts, ofi_build_mode, user_env, log_file, None, util_prov) self.fabtestpath = f'{self.libfab_installpath}/bin' self.fabtestconfigpath = f'{self.libfab_installpath}/share/fabtests' self.n = 2 self.ppn = 64 self.iterations = 1 self.method = 'msg' self.pattern = "full_mesh" @property def cmd(self): return f"{self.fabtestpath}/runmultinode.sh " @property def options(self): opts = f"-h {common.get_node_name(self.server, self.nw_interface)}" opts += f",{common.get_node_name(self.client, self.nw_interface)}" opts += f" -n {self.ppn}" opts += f" -I {self.iterations}" opts += f" -z {self.pattern}" opts += f" -C {self.method}" if self.util_prov: opts += f" -p {self.core_prov};{self.util_prov}" else: opts += f" -p {self.core_prov}" opts += f" --ci {self.fabtestpath}/" #enable ci mode to disable tput return opts @property def execute_condn(self): return True def execute_cmd(self): if self.util_prov: prov = f"{self.core_prov}-{self.util_prov} " else: prov = self.core_prov curdir = os.getcwd() os.chdir(self.fabtestconfigpath) command = self.cmd + self.options outputcmd = shlex.split(command) common.run_command(outputcmd) os.chdir(curdir) class OMPI: def __init__(self, core_prov, hosts, libfab_installpath, nw_interface, server, client, environ, middlewares_path, util_prov=None): self.ompi_src = f'{middlewares_path}/ompi' self.core_prov = core_prov self.hosts = hosts self.util_prov = util_prov self.libfab_installpath = libfab_installpath self.nw_interface = nw_interface self.server = server self.client = client self.environ = environ self.n = 4 self.ppn = 2 @property def env(self): cmd = "bash -c \'" if (self.util_prov): cmd += f"export FI_PROVIDER={self.core_prov}\\;{self.util_prov}; " else: cmd += f"export FI_PROVIDER={self.core_prov}; " cmd += "export I_MPI_FABRICS=ofi; " cmd += f"export LD_LIBRARY_PATH={self.ompi_src}/lib:$LD_LIBRARY_PATH; " cmd += f"export LD_LIBRARY_PATH={self.libfab_installpath}/lib/:"\ "$LD_LIBRARY_PATH; " cmd += f"export PATH={self.ompi_src}/bin:$PATH; " cmd += f"export PATH={self.libfab_installpath}/bin:$PATH; " return cmd @property def options(self): opts = f"-np {self.n} " hosts = '\',\''.join([':'.join([common.get_node_name(host, \ self.nw_interface), str(self.ppn)]) \ for host in self.hosts]) opts += f"--host \'{hosts}\' " if self.util_prov: opts += f"--mca mtl_ofi_provider_include {self.core_prov}\\;"\ f"{self.util_prov} " opts += f"--mca btl_ofi_provider_include {self.core_prov}\\;"\ f"{self.util_prov} " else: opts += f"--mca mtl_ofi_provider_include {self.core_prov} " opts += f"--mca btl_ofi_provider_include {self.core_prov} " opts += "--mca orte_base_help_aggregate 0 " # This is necessary to prevent verbs from printing warning messages # The test still uses libfabric verbs even when enabled. # if (self.core_prov == 'verbs'): # opts += "--mca btl_openib_allow_ib 1 " opts += "--mca mtl ofi " opts += "--mca pml cm -tag-output " for key in self.environ: opts += f"-x {key}={self.environ[key]} " return opts @property def cmd(self): return f"{self.ompi_src}/bin/mpirun {self.options}" class MPICH: def __init__(self, hw, core_prov, hosts, libfab_installpath, nw_interface, server, client, environ, middlewares_path, util_prov=None): self.mpich_dir = f'{middlewares_path}/mpich_{hw}' self.mpichpath = f'{self.mpich_dir}/mpich' self.core_prov = core_prov self.hosts = hosts self.util_prov = util_prov self.libfab_installpath = libfab_installpath self.nw_interface = nw_interface self.server = server self.client = client self.environ = environ self.n = 4 self.ppn = 1 @property def env(self): cmd = "bash -c \'" if (self.util_prov): cmd += f"export FI_PROVIDER={self.core_prov}\\;{self.util_prov}; " else: cmd += f"export FI_PROVIDER={self.core_prov}; " cmd += "export I_MPI_FABRICS=ofi; " cmd += "export HYDRA_LAUNCHER=fork;" cmd += "export MPIR_CVAR_CH4_OFI_ENABLE_ATOMICS=0; " cmd += "export MPIR_CVAR_CH4_OFI_CAPABILITY_SETS_DEBUG=0; " cmd += f"export LD_LIBRARY_PATH={self.mpich_dir}/lib:$LD_LIBRARY_PATH; " cmd += f"export LD_LIBRARY_PATH={self.libfab_installpath}/lib/:"\ "$LD_LIBRARY_PATH; " cmd += f"export PATH={self.mpich_dir}/bin:$PATH; " cmd += f"export PATH={self.libfab_installpath}/bin:$PATH; " return cmd @property def options(self): opts = f"-n {self.n} " opts += f"-ppn {self.ppn} " opts += "-launcher ssh " # Removed because sbatch does this for us whenwe use mpirun # opts += f"-hosts {common.get_node_name(self.server, self.nw_interface)},"\ # f"{common.get_node_name(self.client, self.nw_interface)} " for key in self.environ: opts += f"-genv {key} {self.environ[key]} " return opts @property def cmd(self): return f"{self.mpich_dir}/bin/mpirun {self.options}" class IMPI: def __init__(self, core_prov, hosts, libfab_installpath, nw_interface, server, client, environ, middlewares_path, util_prov=None): self.impi_src = f'{cloudbees_config.impi_root}' self.mpichpath = f'{middlewares_path}/impi/mpichsuite/' self.core_prov = core_prov self.hosts = hosts self.util_prov = util_prov self.libfab_installpath = libfab_installpath self.nw_interface = nw_interface self.server = server self.client = client self.environ = environ self.n = 4 self.ppn = 1 @property def env(self): cmd = f"bash -c \'source {self.impi_src}/env/vars.sh "\ "-i_mpi_ofi_internal=0; " cmd += f"source {cloudbees_config.intel_compiler_root}/env/vars.sh; " if (self.util_prov): cmd += f"export FI_PROVIDER={self.core_prov}\\;{self.util_prov}; " else: cmd += f"export FI_PROVIDER={self.core_prov}; " if (self.core_prov == 'tcp'): cmd += "export FI_IFACE=eth0; " elif (self.core_prov == 'verbs'): cmd += "export FI_IFACE=ib0; " cmd += "export I_MPI_FABRICS=ofi; " cmd += f"export LD_LIBRARY_PATH={self.impi_src}/lib:$LD_LIBRARY_PATH; " cmd += f"export LD_LIBRARY_PATH={self.impi_src}/lib/release:"\ "$LD_LIBRARY_PATH; " cmd += f"export LD_LIBRARY_PATH={self.libfab_installpath}/lib/:"\ "$LD_LIBRARY_PATH; " cmd += f"export PATH={self.libfab_installpath}/bin:$PATH; " return cmd @property def options(self): opts = f"-n {self.n} " opts += f"-ppn {self.ppn} " opts += f"-hosts {common.get_node_name(self.server, self.nw_interface)},"\ f"{common.get_node_name(self.client, self.nw_interface)} " for key in self.environ: opts += f"-genv {key} {self.environ[key]} " return opts @property def cmd(self): return f"{self.impi_src}/bin/mpiexec {self.options}" class IMBtests(Test): def __init__(self, jobname, buildno, testname, hw, core_prov, fabric, hosts, mpitype, ofi_build_mode, user_env, log_file, test_group, util_prov=None): super().__init__(jobname, buildno, testname, hw, core_prov, fabric, hosts, ofi_build_mode, user_env, log_file, mpitype, util_prov) self.test_group = test_group self.mpi_type = mpitype self.imb_src = '' self.imb_tests = { '1' :[ 'MPI1', 'P2P' ], '2' :[ 'EXT', 'IO' ], '3' :[ 'NBC', 'RMA', 'MT' ] } self.iter = 100 self.include = { 'MPI1':[ 'Biband', 'Uniband', 'PingPongAnySource', 'PingPingAnySource', 'PingPongSpecificSource', 'PingPingSpecificSource' ], 'P2P':[], 'EXT':[], 'IO':[], 'NBC':[], 'RMA':[], 'MT':[] } self.exclude = { 'MPI1':[], 'P2P':[], 'EXT':[ 'Accumulate' ], 'IO':[], 'NBC':[], 'RMA':[ 'Accumulate', 'Get_accumulate', 'Fetch_and_op', 'Compare_and_swap', 'All_put_all', 'All_get_all' ], 'MT':[] } self.imb_src = f'{self.middlewares_path}/{self.mpi_type}/imb' @property def execute_condn(self): # Mpich and ompi are excluded to save time. Run manually if needed return (self.mpi_type == 'impi') def imb_cmd(self, imb_test): print(f"Running IMB-{imb_test}") cmd = f"{self.imb_src}/IMB-{imb_test} " if (imb_test != 'MT'): cmd += f"-iter {self.iter} " if (len(self.include[imb_test]) > 0): cmd += f"-include {','.join(self.include[imb_test])}" if (len(self.exclude[imb_test]) > 0): cmd += f"-exclude {','.join(self.exclude[imb_test])}" return cmd def execute_cmd(self): for test_type in self.imb_tests[self.test_group]: outputcmd = shlex.split(self.mpi.env + self.mpi.cmd + \ self.imb_cmd(test_type) + '\'') common.run_command(outputcmd) class OSUtests(Test): def __init__(self, jobname, buildno, testname, hw, core_prov, fabric, hosts, mpitype, ofi_build_mode, user_env, log_file, util_prov=None): super().__init__(jobname, buildno, testname, hw, core_prov, fabric, hosts, ofi_build_mode, user_env, log_file, mpitype, util_prov) self.n_ppn = { 'pt2pt': (2, 1), 'collective': (4, 2), 'one-sided': (2, 1), 'startup': (2, 1) } if mpitype == 'mpich' and hw in ['water', 'grass']: self.mpitype = f'{mpitype}_{hw}' else: self.mpitype = mpitype self.osu_src = f'{self.middlewares_path}/{self.mpitype}/osu/libexec/'\ 'osu-micro-benchmarks/mpi/' @property def execute_condn(self): # mpich-tcp, ompi are the only osu test combinations failing return False if ((self.mpi_type == 'mpich' and self.core_prov == 'tcp') or \ self.mpi_type == 'ompi') \ else True def osu_cmd(self, test_type, test): print(f"Running OSU-{test_type}-{test}") cmd = f'{self.osu_src}/{test_type}/{test} ' return cmd def execute_cmd(self): assert(self.osu_src) p = re.compile('osu_put*') for root, dirs, tests in os.walk(self.osu_src): for test in tests: self.mpi.n = self.n_ppn[os.path.basename(root)][0] self.mpi.ppn = self.n_ppn[os.path.basename(root)][1] if (test == 'osu_latency_mp' and self.core_prov == 'verbs'): self.env['IBV_FORK_SAFE'] = '1' if(p.search(test) == None): osu_command = self.osu_cmd(os.path.basename(root), test) outputcmd = shlex.split(self.mpi.env + self.mpi.cmd + \ osu_command + '\'') common.run_command(outputcmd) if (test == 'osu_latency_mp' and self.core_prov == 'verbs'): self.env.pop('IBV_FORK_SAFE') class MpichTestSuite(Test): def __init__(self, jobname, buildno, testname, hw, core_prov, fabric, hosts, mpitype, ofi_build_mode, user_env, log_file, util_prov=None, weekly=None): super().__init__(jobname, buildno, testname, hw, core_prov, fabric, hosts, ofi_build_mode, user_env, log_file, mpitype, util_prov) self.mpi_type = mpitype if (mpitype != 'ompi'): self.mpichsuitepath = f'{self.mpi.mpichpath}/test/mpi/' self.pwd = os.getcwd() self.weekly = weekly self.mpichtests_exclude = { 'tcp' : { 'rma' : [('win_shared_put_flush_load 3', 'test')], 'threads/comm' : [('idup_nb 4','test')] }, 'verbs' : { 'threads/comm' : [('idup_nb 4','test')], 'spawn' : [('concurrent_spawns 1', 'test')], 'pt2pt' : [('sendrecv3 2','test'), ('sendrecv3 2 arg=-isendrecv','test')], 'threads/pt2pt': [(f"mt_improbe_sendrecv_huge 2 " f"arg=-iter=64 arg=-count=4194304 " f"env=MPIR_CVAR_CH4_OFI_EAGER_MAX_MSG_SIZE" f"=16384", 'test')] } } def create_hostfile(self, file, hostlist): with open(file, "w") as f: for host in hostlist: f.write(f"{host}\n") def update_testlists(self, filename, category): with open(filename, 'r') as file: lines = file.read().splitlines() for line in lines: if (line == category): lines[lines.index(line)] = f'#{line}' else: continue with open(filename, 'w') as file: file.write('\n'.join(lines)) def exclude_tests(self, test_root, provider): for path,exclude_list in self.mpichtests_exclude[f'{provider}'].items(): for item in exclude_list: self.update_testlists(f'{test_root}/{path}/testlist', item[0]) if (item[1] == 'dir'): filename = f'{test_root}/{path}/{item[0]}/testlist' with open(filename,'r') as file: for line in file: line = line.strip() if (not line.startswith('#')): print(f'excluding:{path}/{item[0]}:{line}') else: #item[1]=test print(f'excluding:{path}/{item[0]}') @property def execute_condn(self): return ((self.mpi_type == 'impi' and self.weekly) or \ self.mpi_type == 'mpich') def execute_cmd(self): if (self.mpi_type == 'mpich'): configure_cmd = f"./configure --with-mpi={self.mpi.mpich_dir} " if (self.weekly): print(f'Weekly {self.mpi_type} mpichsuite tests') os.chdir(self.mpichsuitepath) common.run_command(shlex.split(self.mpi.env + configure_cmd + '\'')) self.exclude_tests(self.mpichsuitepath, self.core_prov) testcmd = 'make testing' outputcmd = shlex.split(self.mpi.env + testcmd + '\'') common.run_command(outputcmd) common.run_command(shlex.split(f"cat {self.mpichsuitepath}/" \ f"summary.tap")) os.chdir(self.pwd) else: print(f"PR {self.mpi_type} mpichsuite tests") os.chdir(self.mpichsuitepath) common.run_command(shlex.split(self.mpi.env + configure_cmd + '\'')) common.run_command(['make', '-j']) self.exclude_tests(self.mpichsuitepath, self.core_prov) testcmd = "./runtests -tests=testlist " testcmd += f" -xmlfile=summary.xml -tapfile=summary.tap " \ f"-junitfile=summary.junit.xml " common.run_command(shlex.split(self.mpi.env + testcmd + '\'')) common.run_command(shlex.split(f"cat {self.mpichsuitepath}/" \ f"summary.tap")) os.chdir(self.pwd) if (self.mpi_type == 'impi' and self.weekly == True): print (f'Weekly {self.mpi_type} mpichsuite tests') os.chdir(self.mpi.mpichpath) print(self.hosts) self.create_hostfile(f'{self.mpi.mpichpath}/hostfile', self.hosts) os.environ["I_MPI_HYDRA_HOST_FILE"] = \ f'{self.mpi.mpichpath}/hostfile' test_cmd = f"export I_MPI_HYDRA_HOST_FILE=" \ f"{self.mpi.mpichpath}/hostfile; " test_cmd += f"./test.sh --exclude lin,{self.core_prov},*,*,*,*; " common.run_command(shlex.split(self.mpi.env + test_cmd + '\'')) common.run_command(shlex.split(f"cat {self.mpichsuitepath}/" \ f"summary.tap")) os.chdir(self.pwd) class OneCCLTests(Test): def __init__(self, jobname, buildno, testname, hw, core_prov, fabric, hosts, ofi_build_mode, user_env, log_file, util_prov=None): super().__init__(jobname, buildno, testname, hw, core_prov, fabric, hosts, ofi_build_mode, user_env, log_file, None, util_prov) self.oneccl_path = f'{self.middlewares_path}/oneccl/' self.test_dir = f'{self.middlewares_path}/oneccl/ci_tests' if self.util_prov: self.prov = f"{self.core_prov}\;{self.util_prov}" else: self.prov = self.core_prov self.oneccl_environ = { 'FI_PROVIDER' : f"\"{self.prov}\"", 'CCL_ATL_TRANSPORT' : 'ofi', 'CCL_ATL_TRANSPORT_LIST' : 'ofi' } if self.env: for key in self.env: self.oneccl_environ[key] = self.env[key] self.ld_library = [ f'{self.libfab_installpath}/lib', f'{self.oneccl_path}/build/_install/lib' ] def export_env(self): environ = f"source {cloudbees_config.oneapi_root}/setvars.sh; " environ += f"source {self.oneccl_path}/build/_install/env/setvars.sh; " if self.core_prov == 'psm3': self.oneccl_environ['PSM3_MULTI_EP'] = '1' if self.core_prov == 'shm': self.oneccl_environ['CCL_ATL_SHM'] = '1' for key, val in self.oneccl_environ.items(): environ += f"export {key}={val}; " ld_library_path = 'LD_LIBRARY_PATH=' for item in self.ld_library: ld_library_path += f'{item}:' environ += f"export {ld_library_path}$LD_LIBRARY_PATH; " return environ def cmd(self): return './run.sh ' def options(self): opts = "--mode cpu " return opts @property def execute_condn(self): return True @property def execute_condn(self): return True def execute_cmd(self): curr_dir = os.getcwd() os.chdir(self.test_dir) command = f"bash -c \'{self.export_env()} {self.cmd()} "\ f"{self.options()}\'" outputcmd = shlex.split(command) common.run_command(outputcmd) os.chdir(curr_dir) class OneCCLTestsGPU(Test): def __init__(self, jobname, buildno, testname, hw, core_prov, fabric, hosts, ofi_build_mode, user_env, log_file, util_prov=None): super().__init__(jobname, buildno, testname, hw, core_prov, fabric, hosts, ofi_build_mode, user_env, log_file, None, util_prov) self.n = 2 self.ppn = 1 self.oneccl_path = f'{self.middlewares_path}/oneccl_gpu/build' if self.util_prov: self.prov = f"{self.core_prov}\;{self.util_prov}" else: self.prov = self.core_prov self.onecclgpu_environ = { 'FI_PROVIDER' : self.prov, # 'LD_PRELOAD' : f"{self.libfab_installpath}/lib/libfabric.so", 'CCL_ATL_TRANSPORT' : 'ofi', 'CCL_ROOT' : f"{self.oneccl_path}/_install" } self.ld_library = [ f'{self.libfab_installpath}/lib', '$LD_LIBRARY_PATH', f'{self.oneccl_path}/_install/lib' ] self.tests = { 'examples' : [ 'sycl_allgatherv_custom_usm_test', 'sycl_allgatherv_inplace_test', 'sycl_allgatherv_inplace_usm_test', 'sycl_allgatherv_test', 'sycl_allgatherv_usm_test', 'sycl_allreduce_inplace_usm_test', 'sycl_allreduce_test', 'sycl_allreduce_usm_test', 'sycl_alltoall_test', 'sycl_alltoall_usm_test', 'sycl_alltoallv_test', 'sycl_alltoallv_usm_test', 'sycl_broadcast_test', 'sycl_broadcast_usm_test', 'sycl_reduce_inplace_usm_test', 'sycl_reduce_scatter_test', 'sycl_reduce_scatter_usm_test', 'sycl_reduce_test', 'sycl_reduce_usm_test' ], 'functional' : [ 'allgatherv_test', 'alltoall_test', 'alltoallv_test', 'bcast_test', 'reduce_scatter_test', 'reduce_test' ] } def export_env(self): environ = f"source {cloudbees_config.impi_root}/env/vars.sh "\ "-i_mpi_internal=0; " environ += f"source {cloudbees_config.intel_compiler_root}/env/vars.sh; " for key, val in self.onecclgpu_environ.items(): environ += f"export {key}={val}; " ld_library_path = 'LD_LIBRARY_PATH=' for item in self.ld_library: ld_library_path += f'{item}:' environ += f"export {ld_library_path}$LD_LIBRARY_PATH; " return environ def cmd(self): return f"{self.oneccl_path}/_install/bin/mpiexec " def options(self): opts = "-l " opts += f"-n {self.n} " opts += f"-ppn {self.ppn} " opts += f"-hosts {self.server},{self.client} " return opts @property def execute_condn(self): return True def execute_cmd(self, oneccl_test_gpu): curr_dir = os.getcwd() if 'examples' in oneccl_test_gpu: os.chdir(f"{self.oneccl_path}/_install/examples/sycl") else: os.chdir(f"{self.oneccl_path}/tests/functional") for test in self.tests[oneccl_test_gpu]: if '_usm_' in test: gpu_selector = 'device' else: gpu_selector = 'default' if self.core_prov == 'psm3': command = f"bash -c \'{self.export_env()} export PSM3_MULTI_EP=1; {self.cmd()} "\ f"{self.options()} ./{test} " elif self.core_prov == 'shm': command = f"bash -c \'{self.export_env()} export CCL_ATL_SHM=1; {self.cmd()} "\ f"{self.options()} ./{test} " else: command = f"bash -c \'{self.export_env()} {self.cmd()} "\ f"{self.options()} ./{test} " if 'examples' in oneccl_test_gpu: command += f"gpu {gpu_selector}" command += "\'" outputcmd = shlex.split(command) common.run_command(outputcmd) os.chdir(curr_dir) class DaosCartTest(Test): def __init__(self, jobname, buildno, testname, hw, core_prov, fabric, hosts, ofi_build_mode, user_env, log_file, util_prov=None): super().__init__(jobname, buildno, testname, hw, core_prov, fabric, hosts, ofi_build_mode, user_env, log_file, None, util_prov) self.set_paths(core_prov) print(core_prov) self.daos_nodes = cloudbees_config.prov_node_map[core_prov] print(self.daos_nodes) self.launch_node = self.daos_nodes[0] self.cart_tests = { 'corpc_one_node' : {'tags' :'cart,corpc,one_node', 'numservers':1, 'numclients':0}, 'corpc_two_node' : {'tags' :'cart,corpc,two_node', 'numservers':2, 'numclients':0}, 'ctl_one_node' : {'tags' :'cart,ctl,one_node', 'numservers':1, 'numclients':1}, 'ghost_rank_rpc_one_node' : {'tags' :'cart,ghost_rank_rpc,one_node', 'numservers':1, 'numclients':0}, 'group_test' : {'tags' :'cart,group_test,one_node', 'numservers':1, 'numclients':0}, 'iv_one_node' : {'tags' :'cart,iv,one_node', 'numservers':1, 'numclients':1}, 'iv_two_node' : {'tags' :'cart,iv,two_node', 'numservers':2, 'numclients':1}, 'launcher_one_node' : {'tags' :'cart,no_pmix_launcher,one_node','numservers':1, 'numclients':1}, 'multictx_one_node' : {'tags' :'cart,no_pmix,one_node', 'numservers':1, 'numclients':0}, 'rpc_one_node' : {'tags' :'cart,rpc,one_node', 'numservers':1, 'numclients':1}, 'rpc_two_node' : {'tags' :'cart,rpc,two_node','numservers':2, 'numclients':1}, 'swim_notification' : {'tags' :'cart,rpc,swim_rank_eviction,one_node', 'numservers':1, 'numclients':1} } def set_paths(self, core_prov): self.ci_middlewares_path = f'{cloudbees_config.build_dir}/{core_prov}' self.daos_install_root = f'{self.ci_middlewares_path}/daos/install' self.cart_test_scripts = f'{self.daos_install_root}/lib/daos/TESTING/ftest' self.mpipath = f'{cloudbees_config.daos_mpi}/bin' self.pathlist = [f'{self.daos_install_root}/bin/', self.cart_test_scripts, self.mpipath, \ f'{self.daos_install_root}/lib/daos/TESTING/tests'] self.daos_prereq = f'{self.daos_install_root}/prereq' common.run_command(['rm', '-rf', f'{self.ci_middlewares_path}/daos_logs/*']) common.run_command(['rm','-rf', f'{self.daos_prereq}/debug/ofi']) common.run_command(['ln', '-sfn', self.libfab_installpath, f'{self.daos_prereq}/debug/ofi']) @property def cmd(self): return f"env; echo {common.cloudbees_log_start_string}; "\ "python3.6 launch.py " def remote_launch_cmd(self, testname): # The following env variables must be set appropriately prior # to running the daos/cart tests OFI_DOMAIN, OFI_INTERFACE, # CRT_PHY_ADDR_STR, PATH, DAOS_TEST_SHARED_DIR DAOS_TEST_LOG_DIR, # LD_LIBRARY_PATH in the script being sourced below. launch_cmd = f"ssh {self.launch_node} \"source {self.ci_middlewares_path}/daos_ci_env_setup.sh && \ cd {self.cart_test_scripts} &&\" " return launch_cmd def options(self, testname): opts = "-s " opts += f"{self.cart_tests[testname]['tags']} " if (self.cart_tests[testname]['numservers'] != 0): servers = ",".join(self.daos_nodes[:self.cart_tests[testname]['numservers']]) opts += f"--test_servers={servers} " if (self.cart_tests[testname]['numclients'] != 0): clients = ",".join(self.daos_nodes[:self.cart_tests[testname]['numclients']]) opts += f"--test_clients={clients}" return opts @property def execute_condn(self): return True def execute_cmd(self): sys.path.append(f'{self.daos_install_root}/lib64/python3.6/site-packages') os.environ['PYTHONPATH']=f'{self.daos_install_root}/lib64/python3.6/site-packages' test_dir=self.cart_test_scripts curdir=os.getcwd() os.chdir(test_dir) for test in self.cart_tests: print(test) command = self.remote_launch_cmd(test) + self.cmd + self.options(test) outputcmd = shlex.split(command) common.run_logging_command(outputcmd, self.log_file) print("--------------------TEST COMPLETED----------------------") os.chdir(curdir) class DMABUFTest(Test): def __init__(self, jobname, buildno, testname, hw, core_prov, fabric, hosts, ofi_build_mode, user_env, log_file, util_prov=None): super().__init__(jobname, buildno, testname, hw, core_prov, fabric, hosts, ofi_build_mode, user_env, log_file, None, util_prov) self.DMABUFtestpath = f'{self.libfab_installpath}/bin' self.timeout = 300 self.n = os.environ['SLURM_NNODES'] if 'SLURM_NNODES' \ in os.environ.keys() \ else 0 if util_prov: self.prov = f"{self.core_prov}\;{self.util_prov}" else: self.prov = self.core_prov self.dmabuf_environ = { 'ZEX_NUMBER_OF_CCS' : '0:4,1:4', 'NEOReadDebugKeys' : '1', 'EnableImplicitScaling' : '0', 'MLX5_SCATTER_TO_CQE' : '0' } self.single_node_combinations = { 'H2H' : { '-m malloc' : ['-m malloc'] }, 'H2D' : { '-m malloc' : [ '-m device -d 0', '-m device -d 1' ] }, 'D2H' : { '-m device -d 0' : ['-m malloc'], '-m device -d 1' : ['-m malloc'] }, 'D2D' : { '-m device -d 0' : [ '-m device -d 1', '-m device -d 2', '-m device -d 3' ], '-m device -d 1' : [ '-m device -d 2', '-m device -d 3' ] } } self.double_node_combinations = { 'H2H' : { '-m malloc' : ['-m malloc'] }, 'H2D' : { '-m malloc' : [ '-m device -d 0', '-m device -d 1', '-m device -d 2' ] }, 'D2H' : { '-m device -d 0' : ['-m malloc'], '-m device -d 1' : ['-m malloc'], '-m device -d 2' : ['-m malloc'], '-m device -d 3' : ['-m malloc'] }, 'D2D' : { '-m device -d 0' : [ '-m device -d 0', '-m device -d 1', '-m device -d 2', '-m device -d 3' ], '-m device -d 1' : [ '-m device -d 1', '-m device -d 2', '-m device -d 3' ] } } @property def execute_condn(self): return True if (self.core_prov == 'verbs') \ else False @property def cmd(self): return f"{self.DMABUFtestpath}/fi_xe_rdmabw" def dmabuf_env(self): return ' '.join([f"{key}={self.dmabuf_environ[key]}" \ for key in self.dmabuf_environ]) def execute_cmd(self, test_type): os.chdir(self.DMABUFtestpath) base_cmd = '' operations = ['write', 'read', 'send'] log_prefix = f"{os.environ['LOG_DIR']}/dmabuf_{self.n}" if self.n == '1': self.tests = self.single_node_combinations else: self.tests = self.double_node_combinations for operation in operations: for key,value in self.tests[test_type].items(): for values in value: server_command = f"{self.cmd} {values} -p {self.core_prov}" if 'send' in operation: server_command += f" -t {operation}" base_cmd = f"-t {operation} -p {self.core_prov} {self.server}" client_command = f"{self.cmd} {key} {base_cmd}" RC = common.ClientServerTest( f"ssh {self.server} {self.dmabuf_env()} {server_command}", \ f"ssh {self.client} {self.dmabuf_env()} {client_command}", \ f"{log_prefix}_server.log", f"{log_prefix}_client.log", \ self.timeout ).run() if RC == (0, 0): print("-------------- TEST COMPLETED ---------------") else: print("-------------- TEST FAILED ---------------") sys.exit(f"Exiting with returncode: {RC}")