diff --git a/deepspeech/io/dataset.py b/deepspeech/io/dataset.py index d1fe0470..e58e03b4 100644 --- a/deepspeech/io/dataset.py +++ b/deepspeech/io/dataset.py @@ -76,19 +76,19 @@ class ManifestDataset(Dataset): Args: manifest_path (str): manifest josn file path - max_input_len ([type], optional): maximum output seq length, + max_input_len ([type], optional): maximum output seq length, in seconds for raw wav, in frame numbers for feature data. Defaults to float('inf'). - min_input_len (float, optional): minimum input seq length, + min_input_len (float, optional): minimum input seq length, in seconds for raw wav, in frame numbers for feature data. Defaults to 0.0. - max_output_len (float, optional): maximum input seq length, + max_output_len (float, optional): maximum input seq length, in modeling units. Defaults to 500.0. - min_output_len (float, optional): minimum input seq length, + min_output_len (float, optional): minimum input seq length, in modeling units. Defaults to 0.0. - max_output_input_ratio (float, optional): maximum output seq length/output seq length ratio. + max_output_input_ratio (float, optional): maximum output seq length/output seq length ratio. Defaults to 10.0. min_output_input_ratio (float, optional): minimum output seq length/output seq length ratio. Defaults to 0.05. - + """ super().__init__() diff --git a/third_party/__init__.py b/third_party/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/third_party/paddle_audio/__init__.py b/third_party/paddle_audio/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/third_party/paddle_audio/frontend.py b/third_party/paddle_audio/frontend.py deleted file mode 100644 index 1b337732..00000000 --- a/third_party/paddle_audio/frontend.py +++ /dev/null @@ -1,146 +0,0 @@ -from typing import Tuple -import numpy as np -import paddle -from paddle import Tensor -from paddle import nn -from paddle.nn import functional as F - - -def frame(x: Tensor, - num_samples: Tensor, - win_length: int, - hop_length: int, - clip: bool = True) -> Tuple[Tensor, Tensor]: - """Extract frames from audio. - - Parameters - ---------- - x : Tensor - Shape (N, T), batched waveform. - num_samples : Tensor - Shape (N, ), number of samples of each waveform. - win_length : int - Window length. - hop_length : int - Number of samples shifted between ajancent frames. - clip : bool, optional - Whether to clip audio that does not fit into the last frame, by - default True - - Returns - ------- - frames : Tensor - Shape (N, T', win_length). - num_frames : Tensor - Shape (N, ) number of valid frames - """ - assert hop_length <= win_length - num_frames = (num_samples - win_length) // hop_length - padding = (0, 0) - if not clip: - num_frames += 1 - # NOTE: pad hop_length - 1 to the right to ensure that there is at most - # one frame dangling to the righe edge - padding = (0, hop_length - 1) - - weight = paddle.eye(win_length).unsqueeze(1) - - frames = F.conv1d(x.unsqueeze(1), - weight, - padding=padding, - stride=(hop_length, )) - return frames, num_frames - - -class STFT(nn.Layer): - """A module for computing stft transformation in a differentiable way. - - Parameters - ------------ - n_fft : int - Number of samples in a frame. - - hop_length : int - Number of samples shifted between adjacent frames. - - win_length : int - Length of the window. - - clip: bool - Whether to clip audio is necesaary. - """ - def __init__(self, - n_fft: int, - hop_length: int, - win_length: int, - window_type: str = None, - clip: bool = True): - super().__init__() - - self.hop_length = hop_length - self.n_bin = 1 + n_fft // 2 - self.n_fft = n_fft - self.clip = clip - - # calculate window - if window_type is None: - window = np.ones(win_length) - elif window_type == "hann": - window = np.hanning(win_length) - elif window_type == "hamming": - window = np.hamming(win_length) - else: - raise ValueError("Not supported yet!") - - if win_length < n_fft: - window = F.pad(window, (0, n_fft - win_length)) - elif win_length > n_fft: - window = window[:n_fft] - - # (n_bins, n_fft) complex - kernel_size = min(n_fft, win_length) - weight = np.fft.fft(np.eye(n_fft))[:self.n_bin, :kernel_size] - w_real = weight.real - w_imag = weight.imag - - # (2 * n_bins, kernel_size) - w = np.concatenate([w_real, w_imag], axis=0) - w = w * window - - # (2 * n_bins, 1, kernel_size) # (C_out, C_in, kernel_size) - w = np.expand_dims(w, 1) - weight = paddle.cast(paddle.to_tensor(w), paddle.get_default_dtype()) - self.register_buffer("weight", weight) - - def forward(self, x: Tensor, num_samples: Tensor) -> Tuple[Tensor, Tensor]: - """Compute the stft transform. - Parameters - ------------ - x : Tensor [shape=(B, T)] - The input waveform. - num_samples : Tensor - Number of samples of each waveform. - Returns - ------------ - D : Tensor - Shape(N, T', n_bins, 2) Spectrogram. - - num_frames: Tensor - Shape (N,) number of samples of each spectrogram - """ - num_frames = (num_samples - self.win_length) // self.hop_length - padding = (0, 0) - if not self.clip: - num_frames += 1 - padding = (0, self.hop_length - 1) - - batch_size, _, _ = paddle.shape(x) - x = x.unsqueeze(-1) - D = F.conv1d(self.weight, - x, - stride=(self.hop_length, ), - padding=padding, - data_format="NLC") - D = paddle.reshape(D, [batch_size, -1, self.n_bin, 2]) - return D, num_frames - diff --git a/third_party/paddle_audio/frontend/common.py b/third_party/paddle_audio/frontend/common.py new file mode 100644 index 00000000..7638dae5 --- /dev/null +++ b/third_party/paddle_audio/frontend/common.py @@ -0,0 +1,201 @@ +import paddle +import numpy as np +from typing import Tuple, Optional, Union + + +# https://github.com/kaldi-asr/kaldi/blob/cbed4ff688/src/feat/feature-window.cc#L109 +def povey_window(frame_len:int) -> np.ndarray: + win = np.empty(frame_len) + a = 2 * np.pi / (frame_len -1) + for i in range(frame_len): + win[i] = (0.5 - 0.5 * np.cos(a * i) )**0.85 + return win + +def hann_window(frame_len:int) -> np.ndarray: + win = np.empty(frame_len) + a = 2 * np.pi / (frame_len -1) + for i in range(frame_len): + win[i] = 0.5 - 0.5 * np.cos(a * i) + return win + +def sine_window(frame_len:int) -> np.ndarray: + win = np.empty(frame_len) + a = 2 * np.pi / (frame_len -1) + for i in range(frame_len): + win[i] = np.sin(0.5 * a * i) + return win + +def hamm_window(frame_len:int) -> np.ndarray: + win = np.empty(frame_len) + a = 2 * np.pi / (frame_len -1) + for i in range(frame_len): + win[i] = 0.54 - 0.46 * np.cos(a * i) + return win + +def get_window(wintype:Optional[str], winlen:int) -> np.ndarray: + """get window function + + Args: + wintype (Optional[str]): window type. + winlen (int): window length in samples. + + Raises: + ValueError: not support window. + + Returns: + np.ndarray: window coeffs. + """ + # calculate window + if not wintype or wintype == 'rectangular': + window = np.ones(winlen) + elif wintype == "hann": + window = hann_window(winlen) + elif wintype == "hamm": + window = hamm_window(winlen) + elif wintype == "povey": + window = povey_window(winlen) + else: + msg = f"{wintype} Not supported yet!" + raise ValueError(msg) + return window + + +def dft_matrix(n_fft:int, winlen:int=None, n_bin:int=None) -> Tuple[np.ndarray, np.ndarray, int]: + # https://en.wikipedia.org/wiki/Discrete_Fourier_transform + # (n_bins, n_fft) complex + if n_bin is None: + n_bin = 1 + n_fft // 2 + if winlen is None: + winlen = n_bin + # https://github.com/numpy/numpy/blob/v1.20.0/numpy/fft/_pocketfft.py#L49 + kernel_size = min(n_fft, winlen) + + n = np.arange(0, n_fft, 1.) + wsin = np.empty((n_bin, kernel_size)) #[Cout, kernel_size] + wcos = np.empty((n_bin, kernel_size)) #[Cout, kernel_size] + for k in range(n_bin): # Only half of the bins contain useful info + wsin[k,:] = -np.sin(2*np.pi*k*n/n_fft)[:kernel_size] + wcos[k,:] = np.cos(2*np.pi*k*n/n_fft)[:kernel_size] + w_real = wcos + w_imag = wsin + return w_real, w_imag, kernel_size + + +def dft_matrix_fast(n_fft:int, winlen:int=None, n_bin:int=None) -> Tuple[np.ndarray, np.ndarray, int]: + # (n_bins, n_fft) complex + if n_bin is None: + n_bin = 1 + n_fft // 2 + if winlen is None: + winlen = n_bin + # https://github.com/numpy/numpy/blob/v1.20.0/numpy/fft/_pocketfft.py#L49 + kernel_size = min(n_fft, winlen) + + # https://en.wikipedia.org/wiki/DFT_matrix + # https://ccrma.stanford.edu/~jos/st/Matrix_Formulation_DFT.html + weight = np.fft.fft(np.eye(n_fft))[:self.n_bin, :kernel_size] + w_real = weight.real + w_imag = weight.imag + return w_real, w_imag, kernel_size + + +def bin2hz(bin:Union[List[int], np.ndarray], N:int, sr:int)->List[float]: + """FFT bins to Hz. + + http://practicalcryptography.com/miscellaneous/machine-learning/intuitive-guide-discrete-fourier-transform/ + + Args: + bins (List[int] or np.ndarray): bin index. + N (int): the number of samples, or FFT points. + sr (int): sampling rate. + + Returns: + List[float]: Hz's. + """ + hz = bin * float(sr) / N + + +def hz2mel(hz): + """Convert a value in Hertz to Mels + + :param hz: a value in Hz. This can also be a numpy array, conversion proceeds element-wise. + :returns: a value in Mels. If an array was passed in, an identical sized array is returned. + """ + return 1127 * np.log(1+hz/700.0) + + +def mel2hz(mel): + """Convert a value in Mels to Hertz + + :param mel: a value in Mels. This can also be a numpy array, conversion proceeds element-wise. + :returns: a value in Hertz. If an array was passed in, an identical sized array is returned. + """ + return 700 * (np.exp(mel/1127.0)-1) + + + +def rms_to_db(rms: float): + """Root Mean Square to dB. + + Args: + rms ([float]): root mean square + + Returns: + float: dB + """ + return 20.0 * math.log10(max(1e-16, rms)) + + +def rms_to_dbfs(rms: float): + """Root Mean Square to dBFS. + https://fireattack.wordpress.com/2017/02/06/replaygain-loudness-normalization-and-applications/ + Audio is mix of sine wave, so 1 amp sine wave's Full scale is 0.7071, equal to -3.0103dB. + + dB = dBFS + 3.0103 + dBFS = db - 3.0103 + e.g. 0 dB = -3.0103 dBFS + + Args: + rms ([float]): root mean square + + Returns: + float: dBFS + """ + return rms_to_db(rms) - 3.0103 + + +def max_dbfs(sample_data: np.ndarray): + """Peak dBFS based on the maximum energy sample. + + Args: + sample_data ([np.ndarray]): float array, [-1, 1]. + + Returns: + float: dBFS + """ + # Peak dBFS based on the maximum energy sample. Will prevent overdrive if used for normalization. + return rms_to_dbfs(max(abs(np.min(sample_data)), abs(np.max(sample_data)))) + + +def mean_dbfs(sample_data): + """Peak dBFS based on the RMS energy. + + Args: + sample_data ([np.ndarray]): float array, [-1, 1]. + + Returns: + float: dBFS + """ + return rms_to_dbfs( + math.sqrt(np.mean(np.square(sample_data, dtype=np.float64)))) + + +def gain_db_to_ratio(gain_db: float): + """dB to ratio + + Args: + gain_db (float): gain in dB + + Returns: + float: scale in amp + """ + return math.pow(10.0, gain_db / 20.0) \ No newline at end of file diff --git a/third_party/paddle_audio/frontend/english.wav b/third_party/paddle_audio/frontend/english.wav new file mode 100644 index 00000000..bb28291f Binary files /dev/null and b/third_party/paddle_audio/frontend/english.wav differ diff --git a/third_party/paddle_audio/frontend/kaldi.py b/third_party/paddle_audio/frontend/kaldi.py new file mode 100644 index 00000000..d1c13fe3 --- /dev/null +++ b/third_party/paddle_audio/frontend/kaldi.py @@ -0,0 +1,266 @@ +from typing import Tuple +import numpy as np +import paddle +from paddle import Tensor +from paddle import nn +from paddle.nn import functional as F +import soundfile as sf + +from .common import get_window +from .common import dft_matrix + + +def read(wavpath:str, sr:int = None, start=0, stop=None, dtype='int16', always_2d=True)->Tuple[int, np.ndarray]: + """load wav file. + + Args: + wavpath (str): wav path. + sr (int, optional): expect sample rate. Defaults to None. + dtype (str, optional): wav data bits. Defaults to 'int16'. + + Returns: + Tuple[int, np.ndarray]: sr (int), wav (int16) [T, C]. + """ + wav, r_sr = sf.read(wavpath, start=start, stop=stop, dtype=dtype, always_2d=always_2d) + if sr: + assert sr == r_sr + return r_sr, wav + + +def write(wavpath:str, wav:np.ndarray, sr:int, dtype='PCM_16'): + """write wav file. + + Args: + wavpath (str): file path to save. + wav (np.ndarray): wav data. + sr (int): data samplerate. + dtype (str, optional): wav bit format. Defaults to 'PCM_16'. + """ + sf.write(wavpath, wav, sr, subtype=dtype) + + +def frames(x: Tensor, + num_samples: Tensor, + sr: int, + win_length: float, + stride_length: float, + clip: bool = False) -> Tuple[Tensor, Tensor]: + """Extract frames from audio. + + Parameters + ---------- + x : Tensor + Shape (B, T), batched waveform. + num_samples : Tensor + Shape (B, ), number of samples of each waveform. + sr: int + Sampling Rate. + win_length : float + Window length in ms. + stride_length : float + Stride length in ms. + clip : bool, optional + Whether to clip audio that does not fit into the last frame, by + default True + + Returns + ------- + frames : Tensor + Shape (B, T', win_length). + num_frames : Tensor + Shape (B, ) number of valid frames + """ + assert stride_length <= win_length + stride_length = int(stride_length * sr) + win_length = int(win_length * sr) + + num_frames = (num_samples - win_length) // stride_length + padding = (0, 0) + if not clip: + num_frames += 1 + need_samples = num_frames * stride_length + win_length + padding = (0, need_samples - num_samples - 1) + + weight = paddle.eye(win_length).unsqueeze(1) #[win_length, 1, win_length] + + frames = F.conv1d(x.unsqueeze(-1), + weight, + padding=padding, + stride=(stride_length, ), + data_format='NLC') + return frames, num_frames + + +def dither(signal:Tensor, dither_value=1.0)->Tensor: + """dither frames for log compute. + + Args: + signal (Tensor): [B, T, D] + dither_value (float, optional): [scalar]. Defaults to 1.0. + + Returns: + Tensor: [B, T, D] + """ + D = paddle.shape(signal)[-1] + signal += paddle.normal(shape=[1, 1, D]) * dither_value + return signal + + +def remove_dc_offset(signal:Tensor)->Tensor: + """remove dc. + + Args: + signal (Tensor): [B, T, D] + + Returns: + Tensor: [B, T, D] + """ + signal -= paddle.mean(signal, axis=-1, keepdim=True) + return signal + +def preemphasis(signal:Tensor, coeff=0.97)->Tensor: + """perform preemphasis on the input signal. + + Args: + signal (Tensor): [B, T, D], The signal to filter. + coeff (float, optional): [scalar].The preemphasis coefficient. 0 is no filter, Defaults to 0.97. + + Returns: + Tensor: [B, T, D] + """ + return paddle.concat([ + (1-coeff)*signal[:, :, 0:1], + signal[:, :, 1:] - coeff * signal[:, :, :-1] + ], axis=-1) + + +class STFT(nn.Layer): + """A module for computing stft transformation in a differentiable way. + + http://practicalcryptography.com/miscellaneous/machine-learning/intuitive-guide-discrete-fourier-transform/ + + Parameters + ------------ + n_fft : int + Number of samples in a frame. + + sr: int + Number of Samplilng rate. + + stride_length : float + Number of samples shifted between adjacent frames. + + win_length : float + Length of the window. + + clip: bool + Whether to clip audio is necesaary. + """ + def __init__(self, + n_fft: int, + sr: int, + win_length: float, + stride_length: float, + dither:float=0.0, + preemph_coeff:float=0.97, + remove_dc_offset:bool=True, + window_type: str = 'povey', + clip: bool = False): + super().__init__() + self.sr = sr + self.win_length = win_length + self.stride_length = stride_length + self.dither = dither + self.preemph_coeff = preemph_coeff + self.remove_dc_offset = remove_dc_offset + self.window_type = window_type + self.clip = clip + + self.n_fft = n_fft + self.n_bin = 1 + n_fft // 2 + + w_real, w_imag, kernel_size = dft_matrix( + self.n_fft, int(self.win_length * self.sr), self.n_bin + ) + + # calculate window + window = get_window(window_type, kernel_size) + + # (2 * n_bins, kernel_size) + w = np.concatenate([w_real, w_imag], axis=0) + w = w * window + # (kernel_size, 2 * n_bins) + w = np.transpose(w) + weight = paddle.cast(paddle.to_tensor(w), paddle.get_default_dtype()) + self.register_buffer("weight", weight) + + def forward(self, x: Tensor, num_samples: Tensor) -> Tuple[Tensor, Tensor]: + """Compute the stft transform. + Parameters + ------------ + x : Tensor [shape=(B, T)] + The input waveform. + num_samples : Tensor [shape=(B,)] + Number of samples of each waveform. + Returns + ------------ + C : Tensor + Shape(B, T', n_bins, 2) Spectrogram. + + num_frames: Tensor + Shape (B,) number of samples of each spectrogram + """ + batch_size = paddle.shape(num_samples) + F, nframe = frames(x, num_samples, self.sr, self.win_length, self.stride_length, clip=self.clip) + if self.dither: + F = dither(F, self.dither) + if self.remove_dc_offset: + F = remove_dc_offset(F) + if self.preemph_coeff: + F = preemphasis(F) + C = paddle.matmul(F, self.weight) # [B, T, K] [K, 2 * n_bins] + C = paddle.reshape(C, [batch_size, -1, 2, self.n_bin]) + C = C.transpose([0, 1, 3, 2]) + return C, nframe + + +def powspec(C:Tensor) -> Tensor: + """Compute the power spectrum |X_k|^2. + + Args: + C (Tensor): [B, T, C, 2] + + Returns: + Tensor: [B, T, C] + """ + real, imag = paddle.chunk(C, 2, axis=-1) + return paddle.square(real.squeeze(-1)) + paddle.square(imag.squeeze(-1)) + + +def magspec(C: Tensor, eps=1e-10) -> Tensor: + """Compute the magnitude spectrum |X_k|. + + Args: + C (Tensor): [B, T, C, 2] + eps (float): epsilon. + + Returns: + Tensor: [B, T, C] + """ + pspec = powspec(C) + return paddle.sqrt(pspec + eps) + + +def logspec(C: Tensor, eps=1e-10) -> Tensor: + """Compute log-spectrum 20log10∣X_k∣. + + Args: + C (Tensor): [description] + eps ([type], optional): [description]. Defaults to 1e-10. + + Returns: + Tensor: [description] + """ + spec = magspec(C) + return 20 * paddle.log10(spec + eps) + diff --git a/third_party/paddle_audio/frontend/kaldi_test.py b/third_party/paddle_audio/frontend/kaldi_test.py new file mode 100644 index 00000000..34ff413c --- /dev/null +++ b/third_party/paddle_audio/frontend/kaldi_test.py @@ -0,0 +1,533 @@ +from typing import Tuple +import numpy as np +import paddle +import unittest + +import decimal +import numpy +import math +import logging +from pathlib import Path + +from scipy.fftpack import dct + +from third_party.paddle_audio.frontend import kaldi + +def round_half_up(number): + return int(decimal.Decimal(number).quantize(decimal.Decimal('1'), rounding=decimal.ROUND_HALF_UP)) + +def rolling_window(a, window, step=1): + # http://ellisvalentiner.com/post/2017-03-21-np-strides-trick + shape = a.shape[:-1] + (a.shape[-1] - window + 1, window) + strides = a.strides + (a.strides[-1],) + return numpy.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)[::step] + + +def do_dither(signal, dither_value=1.0): + signal += numpy.random.normal(size=signal.shape) * dither_value + return signal + +def do_remove_dc_offset(signal): + signal -= numpy.mean(signal) + return signal + +def do_preemphasis(signal, coeff=0.97): + """perform preemphasis on the input signal. + + :param signal: The signal to filter. + :param coeff: The preemphasis coefficient. 0 is no filter, default is 0.95. + :returns: the filtered signal. + """ + return numpy.append((1-coeff)*signal[0], signal[1:] - coeff * signal[:-1]) + + +def framesig(sig, frame_len, frame_step, dither=1.0, preemph=0.97, remove_dc_offset=True, wintype='hamming', stride_trick=True): + """Frame a signal into overlapping frames. + + :param sig: the audio signal to frame. + :param frame_len: length of each frame measured in samples. + :param frame_step: number of samples after the start of the previous frame that the next frame should begin. + :param winfunc: the analysis window to apply to each frame. By default no window is applied. + :param stride_trick: use stride trick to compute the rolling window and window multiplication faster + :returns: an array of frames. Size is NUMFRAMES by frame_len. + """ + slen = len(sig) + frame_len = int(round_half_up(frame_len)) + frame_step = int(round_half_up(frame_step)) + if slen <= frame_len: + numframes = 1 + else: + numframes = 1 + (( slen - frame_len) // frame_step) + + # check kaldi/src/feat/feature-window.h + padsignal = sig[:(numframes-1)*frame_step+frame_len] + if wintype is 'povey': + win = numpy.empty(frame_len) + for i in range(frame_len): + win[i] = (0.5-0.5*numpy.cos(2*numpy.pi/(frame_len-1)*i))**0.85 + else: # the hamming window + win = numpy.hamming(frame_len) + + if stride_trick: + frames = rolling_window(padsignal, window=frame_len, step=frame_step) + else: + indices = numpy.tile(numpy.arange(0, frame_len), (numframes, 1)) + numpy.tile( + numpy.arange(0, numframes * frame_step, frame_step), (frame_len, 1)).T + indices = numpy.array(indices, dtype=numpy.int32) + frames = padsignal[indices] + win = numpy.tile(win, (numframes, 1)) + + frames = frames.astype(numpy.float32) + raw_frames = numpy.zeros(frames.shape) + for frm in range(frames.shape[0]): + frames[frm,:] = do_dither(frames[frm,:], dither) # dither + frames[frm,:] = do_remove_dc_offset(frames[frm,:]) # remove dc offset + raw_frames[frm,:] = frames[frm,:] + frames[frm,:] = do_preemphasis(frames[frm,:], preemph) # preemphasize + + return frames * win, raw_frames + + +def magspec(frames, NFFT): + """Compute the magnitude spectrum of each frame in frames. If frames is an NxD matrix, output will be Nx(NFFT/2+1). + + :param frames: the array of frames. Each row is a frame. + :param NFFT: the FFT length to use. If NFFT > frame_len, the frames are zero-padded. + :returns: If frames is an NxD matrix, output will be Nx(NFFT/2+1). Each row will be the magnitude spectrum of the corresponding frame. + """ + if numpy.shape(frames)[1] > NFFT: + logging.warn( + 'frame length (%d) is greater than FFT size (%d), frame will be truncated. Increase NFFT to avoid.', + numpy.shape(frames)[1], NFFT) + complex_spec = numpy.fft.rfft(frames, NFFT) + return numpy.absolute(complex_spec) + + +def powspec(frames, NFFT): + """Compute the power spectrum of each frame in frames. If frames is an NxD matrix, output will be Nx(NFFT/2+1). + + :param frames: the array of frames. Each row is a frame. + :param NFFT: the FFT length to use. If NFFT > frame_len, the frames are zero-padded. + :returns: If frames is an NxD matrix, output will be Nx(NFFT/2+1). Each row will be the power spectrum of the corresponding frame. + """ + return numpy.square(magspec(frames, NFFT)) + + + +def mfcc(signal,samplerate=16000,winlen=0.025,winstep=0.01,numcep=13, + nfilt=23,nfft=512,lowfreq=20,highfreq=None,dither=1.0,remove_dc_offset=True,preemph=0.97, + ceplifter=22,useEnergy=True,wintype='povey'): + """Compute MFCC features from an audio signal. + + :param signal: the audio signal from which to compute features. Should be an N*1 array + :param samplerate: the samplerate of the signal we are working with. + :param winlen: the length of the analysis window in seconds. Default is 0.025s (25 milliseconds) + :param winstep: the step between successive windows in seconds. Default is 0.01s (10 milliseconds) + :param numcep: the number of cepstrum to return, default 13 + :param nfilt: the number of filters in the filterbank, default 26. + :param nfft: the FFT size. Default is 512. + :param lowfreq: lowest band edge of mel filters. In Hz, default is 0. + :param highfreq: highest band edge of mel filters. In Hz, default is samplerate/2 + :param preemph: apply preemphasis filter with preemph as coefficient. 0 is no filter. Default is 0.97. + :param ceplifter: apply a lifter to final cepstral coefficients. 0 is no lifter. Default is 22. + :param appendEnergy: if this is true, the zeroth cepstral coefficient is replaced with the log of the total frame energy. + :param winfunc: the analysis window to apply to each frame. By default no window is applied. You can use numpy window functions here e.g. winfunc=numpy.hamming + :returns: A numpy array of size (NUMFRAMES by numcep) containing features. Each row holds 1 feature vector. + """ + feat,energy = fbank(signal,samplerate,winlen,winstep,nfilt,nfft,lowfreq,highfreq,dither,remove_dc_offset,preemph,wintype) + feat = numpy.log(feat) + feat = dct(feat, type=2, axis=1, norm='ortho')[:,:numcep] + feat = lifter(feat,ceplifter) + if useEnergy: feat[:,0] = numpy.log(energy) # replace first cepstral coefficient with log of frame energy + return feat + +def fbank(signal,samplerate=16000,winlen=0.025,winstep=0.01, + nfilt=40,nfft=512,lowfreq=0,highfreq=None,dither=1.0,remove_dc_offset=True, preemph=0.97, + wintype='hamming'): + """Compute Mel-filterbank energy features from an audio signal. + + :param signal: the audio signal from which to compute features. Should be an N*1 array + :param samplerate: the samplerate of the signal we are working with. + :param winlen: the length of the analysis window in seconds. Default is 0.025s (25 milliseconds) + :param winstep: the step between successive windows in seconds. Default is 0.01s (10 milliseconds) + :param nfilt: the number of filters in the filterbank, default 26. + :param nfft: the FFT size. Default is 512. + :param lowfreq: lowest band edge of mel filters. In Hz, default is 0. + :param highfreq: highest band edge of mel filters. In Hz, default is samplerate/2 + :param preemph: apply preemphasis filter with preemph as coefficient. 0 is no filter. Default is 0.97. + :param winfunc: the analysis window to apply to each frame. By default no window is applied. You can use numpy window functions here e.g. winfunc=numpy.hamming + winfunc=lambda x:numpy.ones((x,)) + :returns: 2 values. The first is a numpy array of size (NUMFRAMES by nfilt) containing features. Each row holds 1 feature vector. The + second return value is the energy in each frame (total energy, unwindowed) + """ + highfreq= highfreq or samplerate/2 + frames,raw_frames = sigproc.framesig(signal, winlen*samplerate, winstep*samplerate, dither, preemph, remove_dc_offset, wintype) + pspec = sigproc.powspec(frames,nfft) # nearly the same until this part + energy = numpy.sum(raw_frames**2,1) # this stores the raw energy in each frame + energy = numpy.where(energy == 0,numpy.finfo(float).eps,energy) # if energy is zero, we get problems with log + + fb = get_filterbanks(nfilt,nfft,samplerate,lowfreq,highfreq) + feat = numpy.dot(pspec,fb.T) # compute the filterbank energies + feat = numpy.where(feat == 0,numpy.finfo(float).eps,feat) # if feat is zero, we get problems with log + + return feat,energy + +def logfbank(signal,samplerate=16000,winlen=0.025,winstep=0.01, + nfilt=40,nfft=512,lowfreq=64,highfreq=None,dither=1.0,remove_dc_offset=True,preemph=0.97,wintype='hamming'): + """Compute log Mel-filterbank energy features from an audio signal. + + :param signal: the audio signal from which to compute features. Should be an N*1 array + :param samplerate: the samplerate of the signal we are working with. + :param winlen: the length of the analysis window in seconds. Default is 0.025s (25 milliseconds) + :param winstep: the step between successive windows in seconds. Default is 0.01s (10 milliseconds) + :param nfilt: the number of filters in the filterbank, default 26. + :param nfft: the FFT size. Default is 512. + :param lowfreq: lowest band edge of mel filters. In Hz, default is 0. + :param highfreq: highest band edge of mel filters. In Hz, default is samplerate/2 + :param preemph: apply preemphasis filter with preemph as coefficient. 0 is no filter. Default is 0.97. + :returns: A numpy array of size (NUMFRAMES by nfilt) containing features. Each row holds 1 feature vector. + """ + feat,energy = fbank(signal,samplerate,winlen,winstep,nfilt,nfft,lowfreq,highfreq,dither, remove_dc_offset,preemph,wintype) + return numpy.log(feat) + +def hz2mel(hz): + """Convert a value in Hertz to Mels + + :param hz: a value in Hz. This can also be a numpy array, conversion proceeds element-wise. + :returns: a value in Mels. If an array was passed in, an identical sized array is returned. + """ + return 1127 * numpy.log(1+hz/700.0) + +def mel2hz(mel): + """Convert a value in Mels to Hertz + + :param mel: a value in Mels. This can also be a numpy array, conversion proceeds element-wise. + :returns: a value in Hertz. If an array was passed in, an identical sized array is returned. + """ + return 700 * (numpy.exp(mel/1127.0)-1) + +def get_filterbanks(nfilt=26,nfft=512,samplerate=16000,lowfreq=0,highfreq=None): + """Compute a Mel-filterbank. The filters are stored in the rows, the columns correspond + to fft bins. The filters are returned as an array of size nfilt * (nfft/2 + 1) + + :param nfilt: the number of filters in the filterbank, default 20. + :param nfft: the FFT size. Default is 512. + :param samplerate: the samplerate of the signal we are working with. Affects mel spacing. + :param lowfreq: lowest band edge of mel filters, default 0 Hz + :param highfreq: highest band edge of mel filters, default samplerate/2 + :returns: A numpy array of size nfilt * (nfft/2 + 1) containing filterbank. Each row holds 1 filter. + """ + highfreq= highfreq or samplerate/2 + assert highfreq <= samplerate/2, "highfreq is greater than samplerate/2" + + # compute points evenly spaced in mels + lowmel = hz2mel(lowfreq) + highmel = hz2mel(highfreq) + + # check kaldi/src/feat/Mel-computations.h + fbank = numpy.zeros([nfilt,nfft//2+1]) + mel_freq_delta = (highmel-lowmel)/(nfilt+1) + for j in range(0,nfilt): + leftmel = lowmel+j*mel_freq_delta + centermel = lowmel+(j+1)*mel_freq_delta + rightmel = lowmel+(j+2)*mel_freq_delta + for i in range(0,nfft//2): + mel=hz2mel(i*samplerate/nfft) + if mel>leftmel and mel 0: + nframes,ncoeff = numpy.shape(cepstra) + n = numpy.arange(ncoeff) + lift = 1 + (L/2.)*numpy.sin(numpy.pi*n/L) + return lift*cepstra + else: + # values of L <= 0, do nothing + return cepstra + +def delta(feat, N): + """Compute delta features from a feature vector sequence. + + :param feat: A numpy array of size (NUMFRAMES by number of features) containing features. Each row holds 1 feature vector. + :param N: For each frame, calculate delta features based on preceding and following N frames + :returns: A numpy array of size (NUMFRAMES by number of features) containing delta features. Each row holds 1 delta feature vector. + """ + if N < 1: + raise ValueError('N must be an integer >= 1') + NUMFRAMES = len(feat) + denominator = 2 * sum([i**2 for i in range(1, N+1)]) + delta_feat = numpy.empty_like(feat) + padded = numpy.pad(feat, ((N, N), (0, 0)), mode='edge') # padded version of feat + for t in range(NUMFRAMES): + delta_feat[t] = numpy.dot(numpy.arange(-N, N+1), padded[t : t+2*N+1]) / denominator # [t : t+2*N+1] == [(N+t)-N : (N+t)+N+1] + return delta_feat + +##### modify for test ###### + +def framesig_without_dither_dc_preemphasize(sig, frame_len, frame_step, wintype='hamming', stride_trick=True): + """Frame a signal into overlapping frames. + + :param sig: the audio signal to frame. + :param frame_len: length of each frame measured in samples. + :param frame_step: number of samples after the start of the previous frame that the next frame should begin. + :param winfunc: the analysis window to apply to each frame. By default no window is applied. + :param stride_trick: use stride trick to compute the rolling window and window multiplication faster + :returns: an array of frames. Size is NUMFRAMES by frame_len. + """ + slen = len(sig) + frame_len = int(round_half_up(frame_len)) + frame_step = int(round_half_up(frame_step)) + if slen <= frame_len: + numframes = 1 + else: + numframes = 1 + (( slen - frame_len) // frame_step) + + # check kaldi/src/feat/feature-window.h + padsignal = sig[:(numframes-1)*frame_step+frame_len] + + if wintype is 'povey': + win = numpy.empty(frame_len) + for i in range(frame_len): + win[i] = (0.5-0.5*numpy.cos(2*numpy.pi/(frame_len-1)*i))**0.85 + elif wintype == '': + win = numpy.ones(frame_len) + elif wintype == 'hann': + win = numpy.hanning(frame_len) + else: # the hamming window + win = numpy.hamming(frame_len) + + if stride_trick: + frames = rolling_window(padsignal, window=frame_len, step=frame_step) + else: + indices = numpy.tile(numpy.arange(0, frame_len), (numframes, 1)) + numpy.tile( + numpy.arange(0, numframes * frame_step, frame_step), (frame_len, 1)).T + indices = numpy.array(indices, dtype=numpy.int32) + frames = padsignal[indices] + win = numpy.tile(win, (numframes, 1)) + + frames = frames.astype(numpy.float32) + raw_frames = frames + return frames * win, raw_frames + + +def frames(signal,samplerate=16000,winlen=0.025,winstep=0.01, + nfilt=40,nfft=512,lowfreq=0,highfreq=None, wintype='hamming'): + frames_with_win, raw_frames = framesig_without_dither_dc_preemphasize(signal, winlen*samplerate, winstep*samplerate, wintype) + return frames_with_win, raw_frames + + +def complexspec(frames, NFFT): + """Compute the magnitude spectrum of each frame in frames. If frames is an NxD matrix, output will be Nx(NFFT/2+1). + + :param frames: the array of frames. Each row is a frame. + :param NFFT: the FFT length to use. If NFFT > frame_len, the frames are zero-padded. + :returns: If frames is an NxD matrix, output will be Nx(NFFT/2+1). Each row will be the magnitude spectrum of the corresponding frame. + """ + if numpy.shape(frames)[1] > NFFT: + logging.warn( + 'frame length (%d) is greater than FFT size (%d), frame will be truncated. Increase NFFT to avoid.', + numpy.shape(frames)[1], NFFT) + complex_spec = numpy.fft.rfft(frames, NFFT) + return complex_spec + + +def stft_with_window(signal,samplerate=16000,winlen=0.025,winstep=0.01, + nfilt=40,nfft=512,lowfreq=0,highfreq=None,dither=1.0,remove_dc_offset=True, preemph=0.97, + wintype='hamming'): + frames_with_win, raw_frames = framesig_without_dither_dc_preemphasize(signal, winlen*samplerate, winstep*samplerate, wintype) + + spec = magspec(frames_with_win, nfft) # nearly the same until this part + scomplex = complexspec(frames_with_win, nfft) + + rspec = magspec(raw_frames, nfft) + rcomplex = complexspec(raw_frames, nfft) + return spec, scomplex, rspec, rcomplex + + +class TestKaldiFE(unittest.TestCase): + def setUp(self): + self. this_dir = Path(__file__).parent + + self.wavpath = str(self.this_dir / 'english.wav') + self.winlen=0.025 # ms + self.winstep=0.01 # ms + self.nfft=512 + self.lowfreq = 0 + self.highfreq = None + self.wintype='hamm' + self.nfilt=40 + + paddle.set_device('cpu') + + + def test_read(self): + import scipy.io.wavfile as wav + rate, sig = wav.read(self.wavpath) + sr, wav = kaldi.read(self.wavpath) + wav = wav[:, 0] + self.assertTrue(np.all(sig == wav)) + self.assertEqual(rate, sr) + + def test_frames(self): + sr, wav = kaldi.read(self.wavpath) + wav = wav[:, 0] + _, fs = frames(wav, samplerate=sr, + winlen=self.winlen, winstep=self.winstep, + nfilt=self.nfilt, nfft=self.nfft, + lowfreq=self.lowfreq, highfreq=self.highfreq, + wintype=self.wintype) + + t_wav = paddle.to_tensor([wav], dtype='float32') + t_wavlen = paddle.to_tensor([len(wav)]) + t_fs, t_nframe = kaldi.frames(t_wav, t_wavlen, sr, self.winlen, self.winstep, clip=False) + t_fs = t_fs.astype(fs.dtype)[0] + + self.assertEqual(t_nframe.item(), fs.shape[0]) + self.assertTrue(np.allclose(t_fs.numpy(), fs)) + + + def test_stft(self): + sr, wav = kaldi.read(self.wavpath) + wav = wav[:, 0] + + for wintype in ['', 'hamm', 'hann', 'povey']: + self.wintype=wintype + _, stft_c_win, _, _ = stft_with_window(wav, samplerate=sr, + winlen=self.winlen, winstep=self.winstep, + nfilt=self.nfilt, nfft=self.nfft, + lowfreq=self.lowfreq, highfreq=self.highfreq, + wintype=self.wintype) + + t_wav = paddle.to_tensor([wav], dtype='float32') + t_wavlen = paddle.to_tensor([len(wav)]) + + stft_class = kaldi.STFT(self.nfft, sr, self.winlen, self.winstep, window_type=self.wintype, dither=0.0, preemph_coeff=0.0, remove_dc_offset=False, clip=False) + t_stft, t_nframe = stft_class(t_wav, t_wavlen) + t_stft = t_stft.astype(stft_c_win.real.dtype)[0] + t_real = t_stft[:, :, 0] + t_imag = t_stft[:, :, 1] + + self.assertEqual(t_nframe.item(), stft_c_win.real.shape[0]) + + self.assertLess(np.sum(t_real.numpy()) - np.sum(stft_c_win.real), 1) + self.assertTrue(np.allclose(t_real.numpy(), stft_c_win.real, atol=1e-1)) + + self.assertLess(np.sum(t_imag.numpy()) - np.sum(stft_c_win.imag), 1) + self.assertTrue(np.allclose(t_imag.numpy(), stft_c_win.imag, atol=1e-1)) + + + def test_magspec(self): + sr, wav = kaldi.read(self.wavpath) + wav = wav[:, 0] + for wintype in ['', 'hamm', 'hann', 'povey']: + self.wintype=wintype + stft_win, _, _, _ = stft_with_window(wav, samplerate=sr, + winlen=self.winlen, winstep=self.winstep, + nfilt=self.nfilt, nfft=self.nfft, + lowfreq=self.lowfreq, highfreq=self.highfreq, + wintype=self.wintype) + + t_wav = paddle.to_tensor([wav], dtype='float32') + t_wavlen = paddle.to_tensor([len(wav)]) + + stft_class = kaldi.STFT(self.nfft, sr, self.winlen, self.winstep, window_type=self.wintype, dither=0.0, preemph_coeff=0.0, remove_dc_offset=False, clip=False) + t_stft, t_nframe = stft_class(t_wav, t_wavlen) + t_stft = t_stft.astype(stft_win.dtype) + t_spec = kaldi.magspec(t_stft)[0] + + self.assertEqual(t_nframe.item(), stft_win.shape[0]) + + self.assertLess(np.sum(t_spec.numpy()) - np.sum(stft_win), 1) + self.assertTrue(np.allclose(t_spec.numpy(), stft_win, atol=1e-1)) + + + def test_magsepc_winprocess(self): + sr, wav = kaldi.read(self.wavpath) + wav = wav[:, 0] + fs, _= framesig(wav, self.winlen*sr, self.winstep*sr, + dither=0.0, preemph=0.97, remove_dc_offset=True, wintype='povey', stride_trick=True) + spec = magspec(fs, self.nfft) # nearly the same until this part + + t_wav = paddle.to_tensor([wav], dtype='float32') + t_wavlen = paddle.to_tensor([len(wav)]) + stft_class = kaldi.STFT( + self.nfft, sr, self.winlen, self.winstep, + window_type='povey', dither=0.0, preemph_coeff=0.97, remove_dc_offset=True, clip=False) + t_stft, t_nframe = stft_class(t_wav, t_wavlen) + t_stft = t_stft.astype(spec.dtype) + t_spec = kaldi.magspec(t_stft)[0] + + self.assertEqual(t_nframe.item(), fs.shape[0]) + + self.assertLess(np.sum(t_spec.numpy()) - np.sum(spec), 1) + self.assertTrue(np.allclose(t_spec.numpy(), spec, atol=1e-1)) + + + def test_powspec(self): + sr, wav = kaldi.read(self.wavpath) + wav = wav[:, 0] + for wintype in ['', 'hamm', 'hann', 'povey']: + self.wintype=wintype + stft_win, _, _, _ = stft_with_window(wav, samplerate=sr, + winlen=self.winlen, winstep=self.winstep, + nfilt=self.nfilt, nfft=self.nfft, + lowfreq=self.lowfreq, highfreq=self.highfreq, + wintype=self.wintype) + stft_win = np.square(stft_win) + + t_wav = paddle.to_tensor([wav], dtype='float32') + t_wavlen = paddle.to_tensor([len(wav)]) + + stft_class = kaldi.STFT(self.nfft, sr, self.winlen, self.winstep, window_type=self.wintype, dither=0.0, preemph_coeff=0.0, remove_dc_offset=False, clip=False) + t_stft, t_nframe = stft_class(t_wav, t_wavlen) + t_stft = t_stft.astype(stft_win.dtype) + t_spec = kaldi.powspec(t_stft)[0] + + self.assertEqual(t_nframe.item(), stft_win.shape[0]) + + self.assertLess(np.sum(t_spec.numpy() - stft_win), 5e4) + self.assertTrue(np.allclose(t_spec.numpy(), stft_win, atol=1e2)) + + +# from python_speech_features import mfcc +# from python_speech_features import delta +# from python_speech_features import logfbank +# import scipy.io.wavfile as wav + +# (rate,sig) = wav.read("english.wav") + +# # note that generally nfilt=40 is used for speech recognition +# fbank_feat = logfbank(sig,nfilt=23,lowfreq=20,dither=0,wintype='povey') + +# # the computed fbank coefficents of english.wav with dimension [110,23] +# # [ 12.2865 12.6906 13.1765 15.714 16.064 15.7553 16.5746 16.9205 16.6472 16.1302 16.4576 16.7326 16.8864 17.7215 18.88 19.1377 19.1495 18.6683 18.3886 20.3506 20.2772 18.8248 18.1899 +# # 11.9198 13.146 14.7215 15.8642 17.4288 16.394 16.8238 16.1095 16.4297 16.6331 16.3163 16.5093 17.4981 18.3429 19.6555 19.6263 19.8435 19.0534 19.001 20.0287 19.7707 19.5852 19.1112 +# # ... +# # ... +# # the same with that using kaldi commands: compute-fbank-feats --dither=0.0 + + +# mfcc_feat = mfcc(sig,dither=0,useEnergy=True,wintype='povey') + +# # the computed mfcc coefficents of english.wav with dimension [110,13] +# # [ 17.1337 -23.3651 -7.41751 -7.73686 -21.3682 -8.93884 -3.70843 4.68346 -16.0676 12.782 -7.24054 8.25089 10.7292 +# # 17.1692 -23.3028 -5.61872 -4.0075 -23.287 -20.6101 -5.51584 -6.15273 -14.4333 8.13052 -0.0345329 2.06274 -0.564298 +# # ... +# # ... +# # the same with that using kaldi commands: compute-mfcc-feats --dither=0.0 + + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file