#!/usr/bin/env python3 # # Copyright 2016 WebAssembly Community Group participants # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import argparse import difflib import fnmatch import multiprocessing import os import re import shlex import shutil import subprocess import sys import threading import time import find_exe from utils import Error IS_WINDOWS = sys.platform == 'win32' TEST_DIR = os.path.dirname(os.path.abspath(__file__)) REPO_ROOT_DIR = os.path.dirname(TEST_DIR) OUT_DIR = os.path.join(REPO_ROOT_DIR, 'out') DEFAULT_TIMEOUT = 120 # seconds SLOW_TIMEOUT_MULTIPLIER = 3 # default configurations for tests TOOLS = { 'wat2wasm': [ ('RUN', '%(wat2wasm)s'), ('ARGS', ['%(in_file)s', '-o', '%(out_dir)s/out.wasm']), ('VERBOSE-ARGS', ['-v']), ], 'wast2json': [ ('RUN', '%(wast2json)s'), ('ARGS', ['%(in_file)s', '-o', '%(out_dir)s/out.json']), ('VERBOSE-ARGS', ['-v']), ], 'wat-desugar': [ ('RUN', '%(wat-desugar)s'), ('ARGS', ['%(in_file)s']), ], 'run-objdump': [ ('RUN', '%(wat2wasm)s %(in_file)s -o %(temp_file)s.wasm'), ('RUN', '%(wasm-objdump)s -r -d %(temp_file)s.wasm'), ('VERBOSE-ARGS', ['-v']), ], 'run-objdump-gen-wasm': [ ('RUN', '%(gen_wasm_py)s %(in_file)s -o %(temp_file)s.wasm'), ('RUN', '%(wasm-objdump)s -r -d %(temp_file)s.wasm'), ('VERBOSE-ARGS', ['-v']), ], 'run-objdump-spec': [ ('RUN', '%(wast2json)s %(in_file)s -o %(temp_file)s.json'), # NOTE: wasm files must be passed in manually via ARGS1 ('RUN', '%(wasm-objdump)s -r -d'), ('VERBOSE-ARGS', ['-v']), ], 'run-roundtrip': [ ('RUN', 'test/run-roundtrip.py'), ('ARGS', [ '%(in_file)s', '-v', '--bindir=%(bindir)s', '--no-error-cmdline', '-o', '%(out_dir)s', ]), ('VERBOSE-ARGS', ['--print-cmd', '-v']), ], 'run-interp': [ ('RUN', '%(wat2wasm)s %(in_file)s -o %(temp_file)s.wasm'), ('RUN', '%(wasm-interp)s %(temp_file)s.wasm --run-all-exports'), ('VERBOSE-ARGS', ['--print-cmd', '-v']), ], 'run-interp-spec': [ ('RUN', '%(wast2json)s %(in_file)s -o %(temp_file)s.json'), ('RUN', '%(spectest-interp)s %(temp_file)s.json'), ('VERBOSE-ARGS', ['--print-cmd', '-v']), ], 'run-gen-wasm': [ ('RUN', '%(gen_wasm_py)s %(in_file)s -o %(temp_file)s.wasm'), ('RUN', '%(wasm-validate)s %(temp_file)s.wasm'), ('RUN', '%(wasm2wat)s %(temp_file)s.wasm'), ('VERBOSE-ARGS', ['--print-cmd', '-v']), ], 'run-gen-wasm-bad': [ ('RUN', '%(gen_wasm_py)s %(in_file)s -o %(temp_file)s.wasm'), ('RUN', '%(wasm-validate)s %(temp_file)s.wasm'), ('ERROR', '1'), ('RUN', '%(wasm2wat)s %(temp_file)s.wasm'), ('ERROR', '1'), ('VERBOSE-ARGS', ['--print-cmd', '-v']), ], 'run-gen-wasm-interp': [ ('RUN', '%(gen_wasm_py)s %(in_file)s -o %(temp_file)s.wasm'), ('RUN', '%(wasm-interp)s --run-all-exports %(temp_file)s.wasm'), ('VERBOSE-ARGS', ['--print-cmd', '-v']), ], 'run-gen-wasm-strip': [ ('RUN', '%(gen_wasm_py)s %(in_file)s -o %(temp_file)s.wasm'), ('RUN', '%(wasm-strip)s %(temp_file)s.wasm'), ('RUN', '%(wasm-objdump)s -h %(temp_file)s.wasm'), ('VERBOSE-ARGS', ['--print-cmd', '-v']), ], 'run-gen-wasm-decompile': [ ('RUN', '%(gen_wasm_py)s %(in_file)s -o %(temp_file)s.wasm'), ('RUN', '%(wasm-validate)s %(temp_file)s.wasm'), ('RUN', '%(wasm-decompile)s %(temp_file)s.wasm'), ('VERBOSE-ARGS', ['--print-cmd', '-v']), ], 'run-opcodecnt': [ ('RUN', '%(wat2wasm)s %(in_file)s -o %(temp_file)s.wasm'), ('RUN', '%(wasm-opcodecnt)s %(temp_file)s.wasm'), ('VERBOSE-ARGS', ['--print-cmd', '-v']), ], 'run-gen-spec-js': [ ('RUN', '%(wast2json)s %(in_file)s -o %(temp_file)s.json'), ('RUN', '%(gen_spec_js_py)s %(temp_file)s.json'), ('VERBOSE-ARGS', ['--print-cmd', '-v']), ], 'run-spec-wasm2c': [ ('RUN', 'test/run-spec-wasm2c.py'), ('ARGS', [ '%(in_file)s', '--bindir=%(bindir)s', '--no-error-cmdline', '-o', '%(out_dir)s', ]), ('VERBOSE-ARGS', ['--print-cmd', '-v']), ], 'run-wasm-decompile': [ ('RUN', '%(wat2wasm)s --enable-all %(in_file)s -o %(temp_file)s.wasm'), ('RUN', '%(wasm-decompile)s --enable-all %(temp_file)s.wasm'), ] } # TODO(binji): Add Windows support for compiling using run-spec-wasm2c.py if IS_WINDOWS: TOOLS['run-spec-wasm2c'].append(('SKIP', '')) ROUNDTRIP_TOOLS = ('wat2wasm',) class NoRoundtripError(Error): pass def Indent(s, spaces): return ''.join(' ' * spaces + l for l in s.splitlines(1)) def DiffLines(expected, actual): def Decode(s): return s.decode('utf-8', 'replace') expected_lines = [Decode(line) for line in expected.splitlines() if line] actual_lines = [Decode(line) for line in actual.splitlines() if line] return list( difflib.unified_diff(expected_lines, actual_lines, fromfile='expected', tofile='actual', lineterm='')) class Cell(object): def __init__(self, value): self.value = [value] def Set(self, value): self.value[0] = value def Get(self): return self.value[0] def SplitArgs(value): if isinstance(value, list): return value return shlex.split(value) def FixPythonExecutable(args): """Given an argument list beginning with a `*.py` file, return one with that uses sys.executable as arg[0]. """ exe, rest = args[0], args[1:] if os.path.splitext(exe)[1] == '.py': return [sys.executable, os.path.join(REPO_ROOT_DIR, exe)] + rest return args class CommandTemplate(object): def __init__(self, exe): self.args = SplitArgs(exe) self.verbose_args = [] self.stdin = None self.expected_returncode = 0 def AppendArgs(self, args): self.args += SplitArgs(args) def SetExpectedReturncode(self, returncode): self.expected_returncode = returncode def AppendVerboseArgs(self, args_list): for level, level_args in enumerate(args_list): while level >= len(self.verbose_args): self.verbose_args.append([]) self.verbose_args[level] += SplitArgs(level_args) def SetStdin(self, filename): self.stdin = filename def _Format(self, cmd, variables): return [arg % variables for arg in cmd] def GetCommand(self, variables, extra_args=None, verbose_level=0): args = self.args[:] vl = 0 while vl < verbose_level and vl < len(self.verbose_args): args += self.verbose_args[vl] vl += 1 if extra_args: args += extra_args args = self._Format(args, variables) stdin = self.stdin if stdin: stdin = stdin % variables return Command(self, FixPythonExecutable(args), stdin) class Command(object): def __init__(self, template, args, stdin): self.template = template self.args = args self.stdin = stdin def GetExpectedReturncode(self): return self.template.expected_returncode def Run(self, cwd, timeout, console_out=False, env=None): process = None is_timeout = Cell(False) def KillProcess(timeout=True): if process: try: if IS_WINDOWS: # http://stackoverflow.com/a/10830753: deleting child processes in # Windows subprocess.call(['taskkill', '/F', '/T', '/PID', str(process.pid)]) else: os.killpg(os.getpgid(process.pid), 15) except OSError: pass is_timeout.Set(timeout) try: start_time = time.time() kwargs = {} if not IS_WINDOWS: kwargs['preexec_fn'] = os.setsid stdin_data = None if self.stdin: stdin_data = open(self.stdin, 'rb').read() # http://stackoverflow.com/a/10012262: subprocess with a timeout # http://stackoverflow.com/a/22582602: kill subprocess and children process = subprocess.Popen(self.args, cwd=cwd, env=env, stdout=None if console_out else subprocess.PIPE, stderr=None if console_out else subprocess.PIPE, stdin=None if not self.stdin else subprocess.PIPE, **kwargs) timer = threading.Timer(timeout, KillProcess) try: timer.start() stdout, stderr = process.communicate(input=stdin_data) finally: returncode = process.returncode process = None timer.cancel() if is_timeout.Get(): raise Error('TIMEOUT') duration = time.time() - start_time except OSError as e: raise Error(str(e)) finally: KillProcess(False) return RunResult(self, stdout, stderr, returncode, duration) def __str__(self): return ' '.join(self.args) class RunResult(object): def __init__(self, cmd=None, stdout='', stderr='', returncode=0, duration=0): self.cmd = cmd self.stdout = stdout self.stderr = stderr self.returncode = returncode self.duration = duration def GetExpectedReturncode(self): return self.cmd.GetExpectedReturncode() def Failed(self): return self.returncode != self.GetExpectedReturncode() def __repr__(self): return 'RunResult(%s, %s, %s, %s, %s)' % ( self.cmd, self.stdout, self.stderr, self.returncode, self.duration) class TestResult(object): def __init__(self): self.results = [] self.stdout = b'' self.stderr = b'' self.duration = 0 def GetLastCommand(self): return self.results[-1].cmd def GetLastFailure(self): return [r for r in self.results if r.Failed()][-1] def Failed(self): return any(r.Failed() for r in self.results) def Append(self, result): self.results.append(result) if result.stdout is not None: self.stdout += result.stdout if result.stderr is not None: self.stderr += result.stderr self.duration += result.duration class TestInfo(object): def __init__(self): self.filename = '' self.header = [] self.input_filename = '' self.input_ = '' self.expected_stdout = '' self.expected_stderr = '' self.tool = None self.cmds = [] self.env = {} self.slow = False self.skip = False self.is_roundtrip = False def CreateRoundtripInfo(self, fold_exprs): if self.tool not in ROUNDTRIP_TOOLS: raise NoRoundtripError() if len(self.cmds) != 1: raise NoRoundtripError() result = TestInfo() result.SetTool('run-roundtrip') result.filename = self.filename result.header = self.header result.input_filename = self.input_filename result.input_ = self.input_ result.expected_stdout = '' result.expected_stderr = '' # TODO(binji): It's kind of cheesy to keep the enable flag based on prefix. # Maybe it would be nicer to add a new directive ENABLE instead. old_cmd = self.cmds[0] new_cmd = result.cmds[0] new_cmd.AppendArgs([f for f in old_cmd.args if f.startswith('--enable')]) if fold_exprs: new_cmd.AppendArgs('--fold-exprs') result.env = self.env result.slow = self.slow result.skip = self.skip result.is_roundtrip = True result.fold_exprs = fold_exprs return result def GetName(self): name = self.filename if self.is_roundtrip: if self.fold_exprs: name += ' (roundtrip fold-exprs)' else: name += ' (roundtrip)' return name def GetGeneratedInputFilename(self): # All tests should be generated in their own directories, even if they # share the same input filename. We want the input filename to be correct, # though, so we use the directory of the test (.txt) file, but the basename # of the input file (likely a .wast). path = OUT_DIR if self.input_filename: basename = os.path.basename(self.input_filename) else: basename = os.path.basename(self.filename) path = os.path.join(path, os.path.dirname(self.filename), basename) if self.is_roundtrip: dirname = os.path.dirname(path) basename = os.path.basename(path) if self.fold_exprs: path = os.path.join(dirname, 'roundtrip_folded', basename) else: path = os.path.join(dirname, 'roundtrip', basename) return path def SetTool(self, tool): if tool not in TOOLS: raise Error('Unknown tool: %s' % tool) self.tool = tool for tool_key, tool_value in TOOLS[tool]: self.ParseDirective(tool_key, tool_value) def GetCommand(self, index): try: return self.cmds[index] except IndexError: raise Error('Invalid command index: %s' % index) def GetLastCommand(self): return self.GetCommand(len(self.cmds) - 1) def ApplyToCommandBySuffix(self, suffix, fn): if suffix == '': fn(self.GetLastCommand()) elif re.match(r'^\d+$', suffix): fn(self.GetCommand(int(suffix))) elif suffix == '*': for cmd in self.cmds: fn(cmd) else: raise Error('Invalid directive suffix: %s' % suffix) def ParseDirective(self, key, value): if key == 'RUN': self.cmds.append(CommandTemplate(value)) elif key == 'STDIN_FILE': self.input_filename = value elif key.startswith('ARGS'): suffix = key[len('ARGS'):] self.ApplyToCommandBySuffix(suffix, lambda cmd: cmd.AppendArgs(value)) elif key.startswith('ERROR'): suffix = key[len('ERROR'):] self.ApplyToCommandBySuffix( suffix, lambda cmd: cmd.SetExpectedReturncode(int(value))) elif key == 'SLOW': self.slow = True elif key == 'SKIP': self.skip = True elif key == 'VERBOSE-ARGS': self.GetLastCommand().AppendVerboseArgs(value) elif key in ['TODO', 'NOTE']: pass elif key == 'TOOL': self.SetTool(value) elif key == 'STDIN': self.GetLastCommand().SetStdin(value) elif key == 'ENV': # Pattern: FOO=1 BAR=stuff self.env = dict(x.split('=') for x in value.split()) else: raise Error('Unknown directive: %s' % key) def Parse(self, filename): self.filename = filename test_path = os.path.join(REPO_ROOT_DIR, filename) # Read/write as binary because spec tests may have invalid UTF-8 codes. with open(test_path, 'rb') as f: state = 'header' empty = True header_lines = [] input_lines = [] stdout_lines = [] stderr_lines = [] for line in f.readlines(): empty = False m = re.match(b'\\s*\\(;; (STDOUT|STDERR) ;;;$', line.strip()) if m: directive = m.group(1).decode('utf-8') if directive == 'STDERR': state = 'stderr' continue elif directive == 'STDOUT': state = 'stdout' continue else: m = re.match(b'\\s*;;;(.*)$', line) if m: directive = m.group(1).decode('utf-8').strip() if state == 'header': key, value = directive.split(':', 1) key = key.strip() value = value.strip() self.ParseDirective(key, value) elif state in ('stdout', 'stderr'): if not re.match(r'%s ;;\)$' % state.upper(), directive): raise Error('Bad directive in %s block: %s' % (state, directive)) state = 'none' else: raise Error('Unexpected directive: %s' % directive) elif state == 'header': state = 'input' if state == 'header': header_lines.append(line.decode('utf-8')) if state == 'input': if self.input_filename: raise Error('Can\'t have STDIN_FILE and input') input_lines.append(line) elif state == 'stderr': stderr_lines.append(line) elif state == 'stdout': stdout_lines.append(line) if empty: raise Error('empty test file') self.header = ''.join(header_lines) self.input_ = b''.join(input_lines) self.expected_stdout = b''.join(stdout_lines) self.expected_stderr = b''.join(stderr_lines) if not self.cmds: raise Error('test has no commands') def CreateInputFile(self): gen_input_path = self.GetGeneratedInputFilename() gen_input_dir = os.path.dirname(gen_input_path) try: os.makedirs(gen_input_dir) except OSError: if not os.path.isdir(gen_input_dir): raise # Read/write as binary because spec tests may have invalid UTF-8 codes. with open(gen_input_path, 'wb') as gen_input_file: if self.input_filename: input_path = os.path.join(REPO_ROOT_DIR, self.input_filename) with open(input_path, 'rb') as input_file: gen_input_file.write(input_file.read()) else: # add an empty line for each header line so the line numbers match gen_input_file.write(('\n' * self.header.count('\n')).encode('ascii')) gen_input_file.write(self.input_) gen_input_file.flush() return gen_input_file.name def Rebase(self, stdout, stderr): test_path = os.path.join(REPO_ROOT_DIR, self.filename) with open(test_path, 'wb') as f: f.write(self.header.encode('ascii')) f.write(self.input_) if stderr: f.write(b'(;; STDERR ;;;\n') f.write(stderr) f.write(b';;; STDERR ;;)\n') if stdout: f.write(b'(;; STDOUT ;;;\n') f.write(stdout) f.write(b';;; STDOUT ;;)\n') def Diff(self, stdout, stderr): msg = '' if self.expected_stderr != stderr: diff_lines = DiffLines(self.expected_stderr, stderr) if len(diff_lines) > 0: msg += 'STDERR MISMATCH:\n' + '\n'.join(diff_lines) + '\n' if self.expected_stdout != stdout: diff_lines = DiffLines(self.expected_stdout, stdout) if len(diff_lines) > 0: msg += 'STDOUT MISMATCH:\n' + '\n'.join(diff_lines) + '\n' if msg: raise Error(msg) class Status(object): def __init__(self, isatty): self.isatty = isatty self.start_time = None self.last_length = 0 self.last_finished = None self.skipped = 0 self.passed = 0 self.failed = 0 self.total = 0 self.failed_tests = [] def Start(self, total): self.total = total self.start_time = time.time() def Passed(self, info, duration): self.passed += 1 if self.isatty: self._Clear() self._PrintShortStatus(info) else: sys.stderr.write('+ %s (%.3fs)\n' % (info.GetName(), duration)) def Failed(self, info, error_msg, result=None): self.failed += 1 self.failed_tests.append((info, result)) if self.isatty: self._Clear() sys.stderr.write('- %s\n%s\n' % (info.GetName(), Indent(error_msg, 2))) def Skipped(self, info): self.skipped += 1 if not self.isatty: sys.stderr.write('. %s (skipped)\n' % info.GetName()) def Done(self): if self.isatty: sys.stderr.write('\n') def _PrintShortStatus(self, info): assert(self.isatty) total_duration = time.time() - self.start_time name = info.GetName() if info else '' if (self.total - self.skipped): percent = 100 * (self.passed + self.failed) / (self.total - self.skipped) else: percent = 100 status = '[+%d|-%d|%%%d] (%.2fs) %s' % (self.passed, self.failed, percent, total_duration, name) self.last_length = len(status) self.last_finished = info sys.stderr.write(status) sys.stderr.flush() def _Clear(self): assert(self.isatty) sys.stderr.write('\r%s\r' % (' ' * self.last_length)) def FindTestFiles(ext, filter_pattern_re): tests = [] for root, dirs, files in os.walk(TEST_DIR): for f in files: path = os.path.join(root, f) if os.path.splitext(f)[1] == ext: tests.append(os.path.relpath(path, REPO_ROOT_DIR)) tests.sort() return [test for test in tests if re.match(filter_pattern_re, test)] def GetAllTestInfo(test_names, status): infos = [] for test_name in test_names: info = TestInfo() try: info.Parse(test_name) infos.append(info) except Error as e: status.Failed(info, str(e)) return infos def RunTest(info, options, variables, verbose_level=0): timeout = options.timeout if info.slow: timeout *= SLOW_TIMEOUT_MULTIPLIER # Clone variables dict so it can be safely modified. variables = dict(variables) cwd = REPO_ROOT_DIR env = dict(os.environ) env.update(info.env) gen_input_path = info.CreateInputFile() rel_gen_input_path = ( os.path.relpath(gen_input_path, cwd).replace(os.path.sep, '/')) variables['in_file'] = rel_gen_input_path # Each test runs with a unique output directory which is removed before # we run the test. out_dir = os.path.splitext(rel_gen_input_path)[0] if os.path.isdir(out_dir): shutil.rmtree(out_dir) os.makedirs(out_dir) variables['out_dir'] = out_dir # Temporary files typically are placed in `out_dir` and use the same test's # basename. This name does include an extension. input_basename = os.path.basename(rel_gen_input_path) variables['temp_file'] = os.path.join(out_dir, os.path.splitext(input_basename)[0]) test_result = TestResult() for cmd_template in info.cmds: cmd = cmd_template.GetCommand(variables, options.arg, verbose_level) if options.print_cmd: print(cmd) try: result = cmd.Run(cwd, timeout, verbose_level > 0, env) except (Error, KeyboardInterrupt) as e: return e test_result.Append(result) if result.Failed(): break return test_result def HandleTestResult(status, info, result, rebase=False): try: if isinstance(result, (Error, KeyboardInterrupt)): raise result if info.is_roundtrip: if result.Failed(): if result.GetLastFailure().returncode == 2: # run-roundtrip.py returns 2 if the file couldn't be parsed. # it's likely a "bad-*" file. status.Skipped(info) else: raise Error(result.stderr) else: status.Passed(info, result.duration) else: if result.Failed(): # This test has already failed, but diff it anyway. last_failure = result.GetLastFailure() msg = 'expected error code %d, got %d.' % ( last_failure.GetExpectedReturncode(), last_failure.returncode) try: info.Diff(result.stdout, result.stderr) except Error as e: msg += '\n' + str(e) raise Error(msg) else: if rebase: info.Rebase(result.stdout, result.stderr) else: info.Diff(result.stdout, result.stderr) status.Passed(info, result.duration) except Error as e: status.Failed(info, str(e), result) # Source : http://stackoverflow.com/questions/3041986/python-command-line-yes-no-input def YesNoPrompt(question, default='yes'): """Ask a yes/no question via input() and return their answer. "question" is a string that is presented to the user. "default" is the presumed answer if the user just hits . It must be "yes" (the default), "no" or None (meaning an answer is required of the user). The "answer" return value is True for "yes" or False for "no". """ valid = {'yes': True, 'y': True, 'ye': True, 'no': False, 'n': False} if default is None: prompt = ' [y/n] ' elif default == 'yes': prompt = ' [Y/n] ' elif default == 'no': prompt = ' [y/N] ' else: raise ValueError('invalid default answer: \'%s\'' % default) while True: sys.stdout.write(question + prompt) choice = input().lower() if default is not None and choice == '': return valid[default] elif choice in valid: return valid[choice] else: sys.stdout.write('Please respond with \'yes\' or \'no\' ' '(or \'y\' or \'n\').\n') def RunMultiThreaded(infos_to_run, status, options, variables): pool = multiprocessing.Pool(options.jobs) try: results = [(info, pool.apply_async(RunTest, (info, options, variables))) for info in infos_to_run] while results: new_results = [] for info, result in results: if result.ready(): HandleTestResult(status, info, result.get(0), options.rebase) else: new_results.append((info, result)) time.sleep(0.01) results = new_results pool.close() finally: pool.terminate() pool.join() def RunSingleThreaded(infos_to_run, status, options, variables): continued_errors = 0 for info in infos_to_run: result = RunTest(info, options, variables) HandleTestResult(status, info, result, options.rebase) if status.failed > continued_errors: if options.fail_fast: break elif options.stop_interactive: rerun_verbose = YesNoPrompt(question='Rerun with verbose option?', default='no') if rerun_verbose: RunTest(info, options, variables, verbose_level=2) should_continue = YesNoPrompt(question='Continue testing?', default='yes') if not should_continue: break elif options.verbose: RunTest(info, options, variables, verbose_level=1) continued_errors += 1 def GetDefaultJobCount(): cpu_count = multiprocessing.cpu_count() if cpu_count <= 1: return 1 else: return cpu_count // 2 def main(args): parser = argparse.ArgumentParser() parser.add_argument('-a', '--arg', help='additional args to pass to executable', action='append') parser.add_argument('--bindir', metavar='PATH', default=find_exe.GetDefaultPath(), help='directory to search for all executables.') parser.add_argument('-v', '--verbose', help='print more diagnotic messages.', action='store_true') parser.add_argument('-f', '--fail-fast', help='Exit on first failure. ' 'Extra options with \'--jobs 1\'', action='store_true') parser.add_argument('--stop-interactive', help='Enter interactive mode on errors. ' 'Extra options with \'--jobs 1\'', action='store_true') parser.add_argument('-l', '--list', help='list all tests.', action='store_true') parser.add_argument('-r', '--rebase', help='rebase a test to its current output.', action='store_true') parser.add_argument('-j', '--jobs', help='number of jobs to use to run tests', type=int, default=GetDefaultJobCount()) parser.add_argument('-t', '--timeout', type=float, default=DEFAULT_TIMEOUT, help='per test timeout in seconds') parser.add_argument('--no-roundtrip', help='don\'t run roundtrip.py on all tests', action='store_false', default=True, dest='roundtrip') parser.add_argument('-p', '--print-cmd', help='print the commands that are run.', action='store_true') parser.add_argument('patterns', metavar='pattern', nargs='*', help='test patterns.') options = parser.parse_args(args) if options.jobs != 1: if options.fail_fast: parser.error('--fail-fast only works with -j1') if options.stop_interactive: parser.error('--stop-interactive only works with -j1') if options.patterns: pattern_re = '|'.join( fnmatch.translate('*%s*' % p) for p in options.patterns) else: pattern_re = '.*' test_names = FindTestFiles('.txt', pattern_re) if options.list: for test_name in test_names: print(test_name) return 0 if not test_names: print('no tests match that filter') return 1 variables = {} variables['test_dir'] = os.path.abspath(TEST_DIR) variables['bindir'] = options.bindir variables['gen_wasm_py'] = os.path.join(TEST_DIR, 'gen-wasm.py') variables['gen_spec_js_py'] = os.path.join(TEST_DIR, 'gen-spec-js.py') for exe_basename in find_exe.EXECUTABLES: exe_override = os.path.join(options.bindir, exe_basename) variables[exe_basename] = find_exe.FindExecutable(exe_basename, exe_override) status = Status(sys.stderr.isatty() and not options.verbose) infos = GetAllTestInfo(test_names, status) infos_to_run = [] for info in infos: if info.skip: status.Skipped(info) continue infos_to_run.append(info) if options.roundtrip: for fold_exprs in False, True: try: infos_to_run.append(info.CreateRoundtripInfo(fold_exprs=fold_exprs)) except NoRoundtripError: pass if not os.path.exists(OUT_DIR): os.makedirs(OUT_DIR) status.Start(len(infos_to_run)) try: if options.jobs > 1: RunMultiThreaded(infos_to_run, status, options, variables) else: RunSingleThreaded(infos_to_run, status, options, variables) except KeyboardInterrupt: print('\nInterrupted testing\n') status.Done() ret = 0 if status.failed: sys.stderr.write('**** FAILED %s\n' % ('*' * (80 - 14))) for info, result in status.failed_tests: if isinstance(result, TestResult): msg = result.GetLastCommand() elif isinstance(result, Error): msg = result else: msg = '' sys.stderr.write('- %s\n %s\n' % (info.GetName(), msg)) ret = 1 return ret if __name__ == '__main__': try: sys.exit(main(sys.argv[1:])) except Error as e: sys.stderr.write(str(e) + '\n') sys.exit(1)