|
|
from typing import Tuple
|
|
|
import numpy as np
|
|
|
import paddle
|
|
|
from paddle import Tensor
|
|
|
from paddle import nn
|
|
|
from paddle.nn import functional as F
|
|
|
import soundfile as sf
|
|
|
|
|
|
from .common import get_window
|
|
|
from .common import dft_matrix
|
|
|
|
|
|
|
|
|
def read(wavpath:str, sr:int = None, start=0, stop=None, dtype='int16', always_2d=True)->Tuple[int, np.ndarray]:
|
|
|
"""load wav file.
|
|
|
|
|
|
Args:
|
|
|
wavpath (str): wav path.
|
|
|
sr (int, optional): expect sample rate. Defaults to None.
|
|
|
dtype (str, optional): wav data bits. Defaults to 'int16'.
|
|
|
|
|
|
Returns:
|
|
|
Tuple[int, np.ndarray]: sr (int), wav (int16) [T, C].
|
|
|
"""
|
|
|
wav, r_sr = sf.read(wavpath, start=start, stop=stop, dtype=dtype, always_2d=always_2d)
|
|
|
if sr:
|
|
|
assert sr == r_sr
|
|
|
return r_sr, wav
|
|
|
|
|
|
|
|
|
def write(wavpath:str, wav:np.ndarray, sr:int, dtype='PCM_16'):
|
|
|
"""write wav file.
|
|
|
|
|
|
Args:
|
|
|
wavpath (str): file path to save.
|
|
|
wav (np.ndarray): wav data.
|
|
|
sr (int): data samplerate.
|
|
|
dtype (str, optional): wav bit format. Defaults to 'PCM_16'.
|
|
|
"""
|
|
|
sf.write(wavpath, wav, sr, subtype=dtype)
|
|
|
|
|
|
|
|
|
def frames(x: Tensor,
|
|
|
num_samples: Tensor,
|
|
|
sr: int,
|
|
|
win_length: float,
|
|
|
stride_length: float,
|
|
|
clip: bool = False) -> Tuple[Tensor, Tensor]:
|
|
|
"""Extract frames from audio.
|
|
|
|
|
|
Parameters
|
|
|
----------
|
|
|
x : Tensor
|
|
|
Shape (B, T), batched waveform.
|
|
|
num_samples : Tensor
|
|
|
Shape (B, ), number of samples of each waveform.
|
|
|
sr: int
|
|
|
Sampling Rate.
|
|
|
win_length : float
|
|
|
Window length in ms.
|
|
|
stride_length : float
|
|
|
Stride length in ms.
|
|
|
clip : bool, optional
|
|
|
Whether to clip audio that does not fit into the last frame, by
|
|
|
default True
|
|
|
|
|
|
Returns
|
|
|
-------
|
|
|
frames : Tensor
|
|
|
Shape (B, T', win_length).
|
|
|
num_frames : Tensor
|
|
|
Shape (B, ) number of valid frames
|
|
|
"""
|
|
|
assert stride_length <= win_length
|
|
|
stride_length = int(stride_length * sr)
|
|
|
win_length = int(win_length * sr)
|
|
|
|
|
|
num_frames = (num_samples - win_length) // stride_length
|
|
|
padding = (0, 0)
|
|
|
if not clip:
|
|
|
num_frames += 1
|
|
|
need_samples = num_frames * stride_length + win_length
|
|
|
padding = (0, need_samples - num_samples - 1)
|
|
|
|
|
|
weight = paddle.eye(win_length).unsqueeze(1) #[win_length, 1, win_length]
|
|
|
|
|
|
frames = F.conv1d(x.unsqueeze(-1),
|
|
|
weight,
|
|
|
padding=padding,
|
|
|
stride=(stride_length, ),
|
|
|
data_format='NLC')
|
|
|
return frames, num_frames
|
|
|
|
|
|
|
|
|
def dither(signal:Tensor, dither_value=1.0)->Tensor:
|
|
|
"""dither frames for log compute.
|
|
|
|
|
|
Args:
|
|
|
signal (Tensor): [B, T, D]
|
|
|
dither_value (float, optional): [scalar]. Defaults to 1.0.
|
|
|
|
|
|
Returns:
|
|
|
Tensor: [B, T, D]
|
|
|
"""
|
|
|
D = paddle.shape(signal)[-1]
|
|
|
signal += paddle.normal(shape=[1, 1, D]) * dither_value
|
|
|
return signal
|
|
|
|
|
|
|
|
|
def remove_dc_offset(signal:Tensor)->Tensor:
|
|
|
"""remove dc.
|
|
|
|
|
|
Args:
|
|
|
signal (Tensor): [B, T, D]
|
|
|
|
|
|
Returns:
|
|
|
Tensor: [B, T, D]
|
|
|
"""
|
|
|
signal -= paddle.mean(signal, axis=-1, keepdim=True)
|
|
|
return signal
|
|
|
|
|
|
def preemphasis(signal:Tensor, coeff=0.97)->Tensor:
|
|
|
"""perform preemphasis on the input signal.
|
|
|
|
|
|
Args:
|
|
|
signal (Tensor): [B, T, D], The signal to filter.
|
|
|
coeff (float, optional): [scalar].The preemphasis coefficient. 0 is no filter, Defaults to 0.97.
|
|
|
|
|
|
Returns:
|
|
|
Tensor: [B, T, D]
|
|
|
"""
|
|
|
return paddle.concat([
|
|
|
(1-coeff)*signal[:, :, 0:1],
|
|
|
signal[:, :, 1:] - coeff * signal[:, :, :-1]
|
|
|
], axis=-1)
|
|
|
|
|
|
|
|
|
class STFT(nn.Layer):
|
|
|
"""A module for computing stft transformation in a differentiable way.
|
|
|
|
|
|
http://practicalcryptography.com/miscellaneous/machine-learning/intuitive-guide-discrete-fourier-transform/
|
|
|
|
|
|
Parameters
|
|
|
------------
|
|
|
n_fft : int
|
|
|
Number of samples in a frame.
|
|
|
|
|
|
sr: int
|
|
|
Number of Samplilng rate.
|
|
|
|
|
|
stride_length : float
|
|
|
Number of samples shifted between adjacent frames.
|
|
|
|
|
|
win_length : float
|
|
|
Length of the window.
|
|
|
|
|
|
clip: bool
|
|
|
Whether to clip audio is necesaary.
|
|
|
"""
|
|
|
def __init__(self,
|
|
|
n_fft: int,
|
|
|
sr: int,
|
|
|
win_length: float,
|
|
|
stride_length: float,
|
|
|
dither:float=0.0,
|
|
|
preemph_coeff:float=0.97,
|
|
|
remove_dc_offset:bool=True,
|
|
|
window_type: str = 'povey',
|
|
|
clip: bool = False):
|
|
|
super().__init__()
|
|
|
self.sr = sr
|
|
|
self.win_length = win_length
|
|
|
self.stride_length = stride_length
|
|
|
self.dither = dither
|
|
|
self.preemph_coeff = preemph_coeff
|
|
|
self.remove_dc_offset = remove_dc_offset
|
|
|
self.window_type = window_type
|
|
|
self.clip = clip
|
|
|
|
|
|
self.n_fft = n_fft
|
|
|
self.n_bin = 1 + n_fft // 2
|
|
|
|
|
|
w_real, w_imag, kernel_size = dft_matrix(
|
|
|
self.n_fft, int(self.win_length * self.sr), self.n_bin
|
|
|
)
|
|
|
|
|
|
# calculate window
|
|
|
window = get_window(window_type, kernel_size)
|
|
|
|
|
|
# (2 * n_bins, kernel_size)
|
|
|
w = np.concatenate([w_real, w_imag], axis=0)
|
|
|
w = w * window
|
|
|
# (kernel_size, 2 * n_bins)
|
|
|
w = np.transpose(w)
|
|
|
weight = paddle.cast(paddle.to_tensor(w), paddle.get_default_dtype())
|
|
|
self.register_buffer("weight", weight)
|
|
|
|
|
|
def forward(self, x: Tensor, num_samples: Tensor) -> Tuple[Tensor, Tensor]:
|
|
|
"""Compute the stft transform.
|
|
|
Parameters
|
|
|
------------
|
|
|
x : Tensor [shape=(B, T)]
|
|
|
The input waveform.
|
|
|
num_samples : Tensor [shape=(B,)]
|
|
|
Number of samples of each waveform.
|
|
|
Returns
|
|
|
------------
|
|
|
C : Tensor
|
|
|
Shape(B, T', n_bins, 2) Spectrogram.
|
|
|
|
|
|
num_frames: Tensor
|
|
|
Shape (B,) number of samples of each spectrogram
|
|
|
"""
|
|
|
batch_size = paddle.shape(num_samples)
|
|
|
F, nframe = frames(x, num_samples, self.sr, self.win_length, self.stride_length, clip=self.clip)
|
|
|
if self.dither:
|
|
|
F = dither(F, self.dither)
|
|
|
if self.remove_dc_offset:
|
|
|
F = remove_dc_offset(F)
|
|
|
if self.preemph_coeff:
|
|
|
F = preemphasis(F)
|
|
|
C = paddle.matmul(F, self.weight) # [B, T, K] [K, 2 * n_bins]
|
|
|
C = paddle.reshape(C, [batch_size, -1, 2, self.n_bin])
|
|
|
C = C.transpose([0, 1, 3, 2])
|
|
|
return C, nframe
|
|
|
|
|
|
|
|
|
def powspec(C:Tensor) -> Tensor:
|
|
|
"""Compute the power spectrum |X_k|^2.
|
|
|
|
|
|
Args:
|
|
|
C (Tensor): [B, T, C, 2]
|
|
|
|
|
|
Returns:
|
|
|
Tensor: [B, T, C]
|
|
|
"""
|
|
|
real, imag = paddle.chunk(C, 2, axis=-1)
|
|
|
return paddle.square(real.squeeze(-1)) + paddle.square(imag.squeeze(-1))
|
|
|
|
|
|
|
|
|
def magspec(C: Tensor, eps=1e-10) -> Tensor:
|
|
|
"""Compute the magnitude spectrum |X_k|.
|
|
|
|
|
|
Args:
|
|
|
C (Tensor): [B, T, C, 2]
|
|
|
eps (float): epsilon.
|
|
|
|
|
|
Returns:
|
|
|
Tensor: [B, T, C]
|
|
|
"""
|
|
|
pspec = powspec(C)
|
|
|
return paddle.sqrt(pspec + eps)
|
|
|
|
|
|
|
|
|
def logspec(C: Tensor, eps=1e-10) -> Tensor:
|
|
|
"""Compute log-spectrum 20log10∣X_k∣.
|
|
|
|
|
|
Args:
|
|
|
C (Tensor): [description]
|
|
|
eps ([type], optional): [description]. Defaults to 1e-10.
|
|
|
|
|
|
Returns:
|
|
|
Tensor: [description]
|
|
|
"""
|
|
|
spec = magspec(C)
|
|
|
return 20 * paddle.log10(spec + eps)
|
|
|
|