""" /* Copyright (c) 2023 Amazon Written by Jan Buethe */ /* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ """ import os import multiprocess as multiprocessing import random import subprocess import argparse import shutil import yaml from utils.files import get_wave_file_list from utils.pesq import compute_PESQ from utils.pitch import compute_pitch_error parser = argparse.ArgumentParser() parser.add_argument('setup', type=str, help='setup yaml specifying end to end processing with model under test') parser.add_argument('input_folder', type=str, help='input folder path') parser.add_argument('output_folder', type=str, help='output folder path') parser.add_argument('--num-testitems', type=int, help="number of testitems to be processed (default 100)", default=100) parser.add_argument('--seed', type=int, help='seed for random item selection', default=None) parser.add_argument('--fs', type=int, help="sampling rate at which input is presented as wave file (defaults to 16000)", default=16000) parser.add_argument('--num-workers', type=int, help="number of subprocesses to be used (default=4)", default=4) parser.add_argument('--plc-suffix', type=str, default="_is_lost.txt", help="suffix of plc error pattern file: only relevant if command chain uses PLCFILE (default=_is_lost.txt)") parser.add_argument('--metrics', type=str, default='pesq', help='comma separated string of metrics, supported: {{"pesq", "pitch_error", "voicing_error"}}, default="pesq"') parser.add_argument('--verbose', action='store_true', help='enables printouts of all commands run in the pipeline') def check_for_sox_in_path(): r = subprocess.run("sox -h", shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return r.returncode == 0 def run_save_sh(command, verbose=False): if verbose: print(f"[run_save_sh] running command {command}...") r = subprocess.run(command, shell=True) if r.returncode != 0: raise RuntimeError(f"command '{command}' failed with exit code {r.returncode}") def run_processing_chain(input_path, output_path, model_commands, fs, metrics={'pesq'}, plc_suffix="_is_lost.txt", verbose=False): # prepare model input model_input = output_path + ".resamp.wav" run_save_sh(f"sox {input_path} -r {fs} {model_input}", verbose=verbose) plcfile = os.path.splitext(input_path)[0] + plc_suffix if os.path.isfile(plcfile): run_save_sh(f"cp {plcfile} {os.path.dirname(output_path)}") # generate model output for command in model_commands: run_save_sh(command.format(INPUT=model_input, OUTPUT=output_path, PLCFILE=plcfile), verbose=verbose) scores = dict() cache = dict() for metric in metrics: if metric == 'pesq': # run pesq score = compute_PESQ(input_path, output_path, fs=fs) elif metric == 'pitch_error': if metric in cache: score = cache[metric] else: rval = compute_pitch_error(input_path, output_path, fs=fs) score = rval[metric] cache['voicing_error'] = rval['voicing_error'] elif metric == 'voicing_error': if metric in cache: score = cache[metric] else: rval = compute_pitch_error(input_path, output_path, fs=fs) score = rval[metric] cache['pitch_error'] = rval['pitch_error'] else: ValueError(f'error: unknown metric {metric}') scores[metric] = score return (output_path, scores) def get_output_path(root_folder, input, output_folder): input_relpath = os.path.relpath(input, root_folder) os.makedirs(os.path.join(output_folder, 'processing', os.path.dirname(input_relpath)), exist_ok=True) output_path = os.path.join(output_folder, 'processing', input_relpath + '.output.wav') return output_path def add_audio_table(f, html_folder, results, title, metric): item_folder = os.path.join(html_folder, 'items') os.makedirs(item_folder, exist_ok=True) # table with results f.write(f"""

{title}

""") for i, r in enumerate(results): item, score = r item_name = os.path.basename(item) new_item_path = os.path.join(item_folder, item_name) shutil.copyfile(item, new_item_path) shutil.copyfile(item + '.resamp.wav', os.path.join(item_folder, item_name + '.orig.wav')) f.write(f""" """) # footer f.write("""
Rank Name {metric.upper()} Audio (out) Audio (orig)
{i + 1} {item_name.split('.')[0]} {score:.3f}
""") def create_html(output_folder, results, title, metric): html_folder = output_folder items_folder = os.path.join(html_folder, 'items') os.makedirs(html_folder, exist_ok=True) os.makedirs(items_folder, exist_ok=True) with open(os.path.join(html_folder, 'index.html'), 'w') as f: # header and title f.write(f""" {title}

{title}

""") # top 20 add_audio_table(f, html_folder, results[:-21: -1], "Top 20", metric) # 20 around median N = len(results) // 2 add_audio_table(f, html_folder, results[N + 10 : N - 10: -1], "Median 20", metric) # flop 20 add_audio_table(f, html_folder, results[:20], "Flop 20", metric) # footer f.write("""
""") metric_sorting_signs = { 'pesq' : 1, 'pitch_error' : -1, 'voicing_error' : -1 } def is_valid_result(data, metrics): if not isinstance(data, dict): return False for metric in metrics: if not metric in data: return False return True def evaluate_results(output_folder, results, metric): results = sorted(results, key=lambda x : metric_sorting_signs[metric] * x[1]) with open(os.path.join(args.output_folder, f'scores_{metric}.txt'), 'w') as f: for result in results: f.write(f"{os.path.relpath(result[0], args.output_folder)} {result[1]}\n") # some statistics mean = sum([r[1] for r in results]) / len(results) top_mean = sum([r[1] for r in results[-20:]]) / 20 bottom_mean = sum([r[1] for r in results[:20]]) / 20 with open(os.path.join(args.output_folder, f'stats_{metric}.txt'), 'w') as f: f.write(f"mean score: {mean}\n") f.write(f"bottom mean score: {bottom_mean}\n") f.write(f"top mean score: {top_mean}\n") print(f"\nmean score: {mean}") print(f"bottom mean score: {bottom_mean}") print(f"top mean score: {top_mean}\n") # create output html create_html(os.path.join(output_folder, 'html', metric), results, setup['test'], metric) if __name__ == "__main__": args = parser.parse_args() # check for sox if not check_for_sox_in_path(): raise RuntimeError("script requires sox") # prepare output folder if os.path.exists(args.output_folder): print("warning: output folder exists") reply = input('continue? (y/n): ') while reply not in {'y', 'n'}: reply = input('continue? (y/n): ') if reply == 'n': os._exit() else: # start with a clean sleight shutil.rmtree(args.output_folder) os.makedirs(args.output_folder, exist_ok=True) # extract metrics metrics = args.metrics.split(",") for metric in metrics: if not metric in metric_sorting_signs: print(f"unknown metric {metric}") args.usage() # read setup print(f"loading {args.setup}...") with open(args.setup, "r") as f: setup = yaml.load(f.read(), yaml.FullLoader) model_commands = setup['processing'] print("\nfound the following model commands:") for command in model_commands: print(command.format(INPUT='input.wav', OUTPUT='output.wav', PLCFILE='input_is_lost.txt')) # store setup to output folder setup['input'] = os.path.abspath(args.input_folder) setup['output'] = os.path.abspath(args.output_folder) setup['seed'] = args.seed with open(os.path.join(args.output_folder, 'setup.yml'), 'w') as f: yaml.dump(setup, f) # get input print(f"\nCollecting audio files from {args.input_folder}...") file_list = get_wave_file_list(args.input_folder, check_for_features=False) print(f"...{len(file_list)} files found\n") # sample from file list file_list = sorted(file_list) random.seed(args.seed) random.shuffle(file_list) num_testitems = min(args.num_testitems, len(file_list)) file_list = file_list[:num_testitems] print(f"\nlaunching test on {num_testitems} items...") # helper function for parallel processing def func(input_path): output_path = get_output_path(args.input_folder, input_path, args.output_folder) try: rval = run_processing_chain(input_path, output_path, model_commands, args.fs, metrics=metrics, plc_suffix=args.plc_suffix, verbose=args.verbose) except: rval = (input_path, -1) return rval with multiprocessing.Pool(args.num_workers) as p: results = p.map(func, file_list) results_dict = dict() for name, values in results: if is_valid_result(values, metrics): results_dict[name] = values print(results_dict) # evaluating results num_failures = num_testitems - len(results_dict) print(f"\nprocessing of {num_failures} items failed\n") for metric in metrics: print(metric) evaluate_results( args.output_folder, [(name, value[metric]) for name, value in results_dict.items()], metric )