import numpy as np def make_filter (x, noise_floor_db, noise_gain_db, filter_length, window_size): assert noise_gain_db < 0 assert filter_length <= len (x) assert filter_length <= window_size filter_length /= 2 # Compute a windowed FFT of the input fft = np.fft.fft (x * (1.0 - np.cos (np.linspace (0.0, 2*np.pi, len (x)))) / 2.0) # Compute an ideal frequency response that keeps frequencies where # the signal is stronger than the noise and cuts frequencies where # the noise is stronger. t = len (fft)**2 / 4.0 * (10.0 ** (noise_floor_db/20.0)) g = 10.0 ** (noise_gain_db / 20.0) strength = -t / np.log (1.0 - g) flt = 1.0 - np.exp (-np.abs (fft)**2 / strength) # Increase the ideal frequency response so that frequencies close # to signal frequencies are kept too. If we didn't do that, the # subsequent windowing of the impulse response would decrease the # gain of the signal frequencies. Moreover we can afford to leave # some noise at frequencies close to the signal because the human # ear will not perceive it. flt = nd.maximum_filter1d (flt, len (flt)/filter_length+1) flt[-len (flt)/2:] = flt[len (flt)/2:0:-1] # Window the impulse response to avoid issues with the Gibbs # phenomenon (the ideal frequency response has infinite impulse # response which cause ringing artefacts in the filter output) impulse = np.fft.ifft (flt) impulse[:filter_length] *= (np.cos (np.linspace (0, np.pi, filter_length))+1)/2 # impulse[filter_length:-filter_length+1] = 0 # impulse[-filter_length:] = impulse[filter_length:0:-1] impulse = np.hstack (( impulse[:filter_length], np.zeros ((window_size - 2*filter_length), impulse.dtype), impulse[filter_length:0:-1])) return np.fft.fft (impulse) def process_signal (input, noise_floor_db, noise_gain_db, filter_length): window_size = 2 ** (filter_length-1).bit_length() output = np.zeros_like (input) output[:window_size/2] = input[:window_size/2] * (1 + np.cos (np.linspace (0.0, np.pi, window_size/2, endpoint = False))) / 2 work = np.zeros ((2*window_size,), input.dtype) start = 0 while start + window_size < len (input): work[:window_size] = input[start:start+window_size] h = make_filter (work[:window_size], noise_floor_db, noise_gain_db, filter_length, 2*window_size) work[:window_size] = input[start:start+window_size] output[start:start+window_size] \ += np.fft.ifft (np.fft.fft (work) * h)[:window_size].real \ * (1 - np.cos (np.linspace (0.0, 2*np.pi, window_size, endpoint = False)))/2 start += window_size/2 return output