From 42f93b2cb615705d550a250646ab0b615b2c2483 Mon Sep 17 00:00:00 2001 From: Hui Zhang Date: Tue, 15 Jun 2021 14:22:18 +0000 Subject: [PATCH] read with 2d; window process --- third_party/paddle_audio/frontend/kaldi.py | 74 +++++++++++++++---- .../paddle_audio/frontend/kaldi_test.py | 13 ++-- 2 files changed, 66 insertions(+), 21 deletions(-) diff --git a/third_party/paddle_audio/frontend/kaldi.py b/third_party/paddle_audio/frontend/kaldi.py index 3dea41098..fa89a80e9 100644 --- a/third_party/paddle_audio/frontend/kaldi.py +++ b/third_party/paddle_audio/frontend/kaldi.py @@ -9,12 +9,26 @@ import soundfile as sf from .common import get_window, dft_matrix -def read(wavpath:str, sr:int = None, dtype='int16')->Tuple[int, np.ndarray]: - wav, r_sr = sf.read(wavpath, dtype=dtype) +def read(wavpath:str, sr:int = None, start=0, stop=None, dtype='int16', always_2d=True)->Tuple[int, np.ndarray]: + """load wav file. + + Args: + wavpath (str): wav path. + sr (int, optional): expect sample rate. Defaults to None. + dtype (str, optional): wav data bits. Defaults to 'int16'. + + Returns: + Tuple[int, np.ndarray]: sr (int), wav (int16) [T, C]. + """ + wav, r_sr = sf.read(wavpath, start=start, stop=stop, dtype=dtype, always_2d=always_2d) if sr: assert sr == r_sr return r_sr, wav + +def write(wavpath:str, wav:np.ndarray, sr:int, dtype='PCM_16'): + sf.write(wavpath, wav, sr, subtype=dtype) + def frames(x: Tensor, num_samples: Tensor, @@ -68,27 +82,46 @@ def frames(x: Tensor, return frames, num_frames -def dither(signal, dither_value=1.0): - signal += paddle.normal(shape=signal.shape) * dither_value +def dither(signal:Tensor, dither_value=1.0)->Tensor: + """dither frames for log compute. + + Args: + signal (Tensor): [B, T, D] + dither_value (float, optional): [scalar]. Defaults to 1.0. + + Returns: + Tensor: [B, T, D] + """ + signal += paddle.normal(shape=[1, 1, signal.shape[-1]]) * dither_value return signal -def remove_dc_offset(signal): - signal -= paddle.mean(signal) - return signal +def remove_dc_offset(signal:Tensor)->Tensor: + """remove dc. + + Args: + signal (Tensor): [B, T, D] + Returns: + Tensor: [B, T, D] + """ + signal -= paddle.mean(signal, axis=-1) + return signal -def preemphasis(signal, coeff=0.97): +def preemphasis(signal:Tensor, coeff=0.97)->Tensor: """perform preemphasis on the input signal. - :param signal: The signal to filter. - :param coeff: The preemphasis coefficient. 0 is no filter, default is 0.95. - :returns: the filtered signal. + Args: + signal (Tensor): [B, T, D], The signal to filter. + coeff (float, optional): [scalar].The preemphasis coefficient. 0 is no filter, Defaults to 0.97. + + Returns: + Tensor: [B, T, D] """ return paddle.concat([ - (1-coeff)*signal[0:1], - signal[1:] - coeff * signal[:-1] - ]) + (1-coeff)*signal[:, :, 0:1], + signal[:, :, 1:] - coeff * signal[:, :, :-1] + ], axis=-1) class STFT(nn.Layer): @@ -130,16 +163,19 @@ class STFT(nn.Layer): self.dither = dither self.preemph_coeff = preemph_coeff self.remove_dc_offset = remove_dc_offset + self.window_type = window_type self.clip = clip self.n_fft = n_fft self.n_bin = 1 + n_fft // 2 - w_real, w_imag, kernel_size = dft_matrix(self.n_fft, int(self.win_length * sr), self.n_bin) + w_real, w_imag, kernel_size = dft_matrix( + self.n_fft, int(self.win_length * self.sr), self.n_bin + ) # calculate window window = get_window(window_type, kernel_size) - + # (2 * n_bins, kernel_size) w = np.concatenate([w_real, w_imag], axis=0) w = w * window @@ -166,6 +202,12 @@ class STFT(nn.Layer): """ batch_size = paddle.shape(num_samples) F, nframe = frames(x, num_samples, self.sr, self.win_length, self.stride_length, clip=self.clip) + if self.dither: + F = dither(F, dither) + if self.remove_dc_offset: + F = remove_dc_offset(F) + if self.preemph_coeff: + F = preemphasis(F) C = paddle.matmul(F, self.weight) # [B, T, K] [K, 2 * n_bins] C = paddle.reshape(C, [batch_size, -1, 2, self.n_bin]) C = C.transpose([0, 1, 3, 2]) diff --git a/third_party/paddle_audio/frontend/kaldi_test.py b/third_party/paddle_audio/frontend/kaldi_test.py index 15a40bfa5..7b9788ee8 100644 --- a/third_party/paddle_audio/frontend/kaldi_test.py +++ b/third_party/paddle_audio/frontend/kaldi_test.py @@ -376,11 +376,13 @@ class TestKaldiFE(unittest.TestCase): import scipy.io.wavfile as wav rate, sig = wav.read(self.wavpath) sr, wav = kaldi.read(self.wavpath) + wav = wav[:, 0] self.assertTrue(np.all(sig == wav)) self.assertEqual(rate, sr) def test_frames(self): sr, wav = kaldi.read(self.wavpath) + wav = wav[:, 0] _, fs = frames(wav, samplerate=sr, winlen=self.winlen, winstep=self.winstep, nfilt=self.nfilt, nfft=self.nfft, @@ -397,6 +399,7 @@ class TestKaldiFE(unittest.TestCase): def test_stft(self): sr, wav = kaldi.read(self.wavpath) + wav = wav[:, 0] for wintype in ['', 'hamm', 'hann', 'povey']: print(wintype) @@ -412,7 +415,7 @@ class TestKaldiFE(unittest.TestCase): t_wav = paddle.to_tensor([wav], dtype='float32') t_wavlen = paddle.to_tensor([len(wav)]) - stft_class = kaldi.STFT(self.nfft, sr, self.winlen, self.winstep, window_type=self.wintype, clip=False) + stft_class = kaldi.STFT(self.nfft, sr, self.winlen, self.winstep, window_type=self.wintype, dither=0.0, preemph_coeff=0.0, remove_dc_offset=False, clip=False) t_stft, t_nframe = stft_class(t_wav, t_wavlen) t_stft = t_stft.astype(stft_c_win.real.dtype)[0] t_real = t_stft[:, :, 0] @@ -434,7 +437,7 @@ class TestKaldiFE(unittest.TestCase): def test_magspec(self): sr, wav = kaldi.read(self.wavpath) - + wav = wav[:, 0] for wintype in ['', 'hamm', 'hann', 'povey']: print(wintype) self.wintype=wintype @@ -448,7 +451,7 @@ class TestKaldiFE(unittest.TestCase): t_wav = paddle.to_tensor([wav], dtype='float32') t_wavlen = paddle.to_tensor([len(wav)]) - stft_class = kaldi.STFT(self.nfft, sr, self.winlen, self.winstep, window_type=self.wintype, clip=False) + stft_class = kaldi.STFT(self.nfft, sr, self.winlen, self.winstep, window_type=self.wintype, dither=0.0, preemph_coeff=0.0, remove_dc_offset=False, clip=False) t_stft, t_nframe = stft_class(t_wav, t_wavlen) t_stft = t_stft.astype(stft_win.dtype) t_spec = kaldi.magspec(t_stft)[0] @@ -463,7 +466,7 @@ class TestKaldiFE(unittest.TestCase): def test_powspec(self): sr, wav = kaldi.read(self.wavpath) - + wav = wav[:, 0] for wintype in ['', 'hamm', 'hann', 'povey']: print(wintype) self.wintype=wintype @@ -478,7 +481,7 @@ class TestKaldiFE(unittest.TestCase): t_wav = paddle.to_tensor([wav], dtype='float32') t_wavlen = paddle.to_tensor([len(wav)]) - stft_class = kaldi.STFT(self.nfft, sr, self.winlen, self.winstep, window_type=self.wintype, clip=False) + stft_class = kaldi.STFT(self.nfft, sr, self.winlen, self.winstep, window_type=self.wintype, dither=0.0, preemph_coeff=0.0, remove_dc_offset=False, clip=False) t_stft, t_nframe = stft_class(t_wav, t_wavlen) t_stft = t_stft.astype(stft_win.dtype) t_spec = kaldi.powspec(t_stft)[0]