Merge pull request #1494 from PaddlePaddle/audio

[audio] refactor audio arch
Hui Zhang 3 years ago committed by GitHub
commit d0bca1982e
No known key found for this signature in database

.gitignore vendored

@ -14,6 +14,7 @@
*.whl *.whl
*.egg-info *.egg-info
build build
docs/build/ docs/build/
docs/topic/ctc/warp-ctc/ docs/topic/ctc/warp-ctc/
@ -34,5 +35,3 @@ tools/
tools/CRF++-0.58/ tools/CRF++-0.58/
speechx/fc_patch/ speechx/fc_patch/

@ -1 +1,5 @@
# Changelog # Changelog
Date: 2022-2-25, Author: Hui Zhang.
- Refactor architecture.
- dtw distance and mcd style dtw

@ -1,170 +0,0 @@
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
import numpy as np
from numpy import ndarray as array
from ..backends import depth_convert
from ..utils import ParameterError
__all__ = [
def randint(high: int) -> int:
"""Generate one random integer in range [0 high)
This is a helper function for random data augmentaiton
return int(np.random.randint(0, high=high))
def rand() -> float:
"""Generate one floating-point number in range [0 1)
This is a helper function for random data augmentaiton
return float(np.random.rand(1))
def depth_augment(y: array,
choices: List=['int8', 'int16'],
probs: List[float]=[0.5, 0.5]) -> array:
""" Audio depth augmentation
Do audio depth augmentation to simulate the distortion brought by quantization.
assert len(probs) == len(
), 'number of choices {} must be equal to size of probs {}'.format(
len(choices), len(probs))
depth = np.random.choice(choices, p=probs)
src_depth = y.dtype
y1 = depth_convert(y, depth)
y2 = depth_convert(y1, src_depth)
return y2
def adaptive_spect_augment(spect: array, tempo_axis: int=0,
level: float=0.1) -> array:
"""Do adpative spectrogram augmentation
The level of the augmentation is gowern by the paramter level,
ranging from 0 to 1, with 0 represents no augmentation
assert spect.ndim == 2., 'only supports 2d tensor or numpy array'
if tempo_axis == 0:
nt, nf = spect.shape
nf, nt = spect.shape
time_mask_width = int(nt * level * 0.5)
freq_mask_width = int(nf * level * 0.5)
num_time_mask = int(10 * level)
num_freq_mask = int(10 * level)
if tempo_axis == 0:
for _ in range(num_time_mask):
start = randint(nt - time_mask_width)
spect[start:start + time_mask_width, :] = 0
for _ in range(num_freq_mask):
start = randint(nf - freq_mask_width)
spect[:, start:start + freq_mask_width] = 0
for _ in range(num_time_mask):
start = randint(nt - time_mask_width)
spect[:, start:start + time_mask_width] = 0
for _ in range(num_freq_mask):
start = randint(nf - freq_mask_width)
spect[start:start + freq_mask_width, :] = 0
return spect
def spect_augment(spect: array,
tempo_axis: int=0,
max_time_mask: int=3,
max_freq_mask: int=3,
max_time_mask_width: int=30,
max_freq_mask_width: int=20) -> array:
"""Do spectrogram augmentation in both time and freq axis
assert spect.ndim == 2., 'only supports 2d tensor or numpy array'
if tempo_axis == 0:
nt, nf = spect.shape
nf, nt = spect.shape
num_time_mask = randint(max_time_mask)
num_freq_mask = randint(max_freq_mask)
time_mask_width = randint(max_time_mask_width)
freq_mask_width = randint(max_freq_mask_width)
if tempo_axis == 0:
for _ in range(num_time_mask):
start = randint(nt - time_mask_width)
spect[start:start + time_mask_width, :] = 0
for _ in range(num_freq_mask):
start = randint(nf - freq_mask_width)
spect[:, start:start + freq_mask_width] = 0
for _ in range(num_time_mask):
start = randint(nt - time_mask_width)
spect[:, start:start + time_mask_width] = 0
for _ in range(num_freq_mask):
start = randint(nf - freq_mask_width)
spect[start:start + freq_mask_width, :] = 0
return spect
def random_crop1d(y: array, crop_len: int) -> array:
""" Do random cropping on 1d input signal
The input is a 1d signal, typically a sound waveform
if y.ndim != 1:
'only accept 1d tensor or numpy array'
n = len(y)
idx = randint(n - crop_len)
return y[idx:idx + crop_len]
def random_crop2d(s: array, crop_len: int, tempo_axis: int=0) -> array:
""" Do random cropping for 2D array, typically a spectrogram.
The cropping is done in temporal direction on the time-freq input signal.
if tempo_axis >= s.ndim:
raise ParameterError('axis out of range')
n = s.shape[tempo_axis]
idx = randint(high=n - crop_len)
sli = [slice(None) for i in range(s.ndim)]
sli[tempo_axis] = slice(idx, idx + crop_len)
out = s[tuple(sli)]
return out

@ -1,461 +0,0 @@
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
import math
from functools import partial
from typing import Optional
from typing import Union
import paddle
import paddle.nn as nn
from .window import get_window
__all__ = [
def hz_to_mel(freq: Union[paddle.Tensor, float],
htk: bool=False) -> Union[paddle.Tensor, float]:
"""Convert Hz to Mels.
freq: the input tensor of arbitrary shape, or a single floating point number.
htk: use HTK formula to do the conversion.
The default value is False.
The frequencies represented in Mel-scale.
if htk:
if isinstance(freq, paddle.Tensor):
return 2595.0 * paddle.log10(1.0 + freq / 700.0)
return 2595.0 * math.log10(1.0 + freq / 700.0)
# Fill in the linear part
f_min = 0.0
f_sp = 200.0 / 3
mels = (freq - f_min) / f_sp
# Fill in the log-scale part
min_log_hz = 1000.0 # beginning of log region (Hz)
min_log_mel = (min_log_hz - f_min) / f_sp # same (Mels)
logstep = math.log(6.4) / 27.0 # step size for log region
if isinstance(freq, paddle.Tensor):
target = min_log_mel + paddle.log(
freq / min_log_hz + 1e-10) / logstep # prevent nan with 1e-10
mask = (freq > min_log_hz).astype(freq.dtype)
mels = target * mask + mels * (
1 - mask) # will replace by masked_fill OP in future
if freq >= min_log_hz:
mels = min_log_mel + math.log(freq / min_log_hz + 1e-10) / logstep
return mels
def mel_to_hz(mel: Union[float, paddle.Tensor],
htk: bool=False) -> Union[float, paddle.Tensor]:
"""Convert mel bin numbers to frequencies.
mel: the mel frequency represented as a tensor of arbitrary shape, or a floating point number.
htk: use HTK formula to do the conversion.
The frequencies represented in hz.
if htk:
return 700.0 * (10.0**(mel / 2595.0) - 1.0)
f_min = 0.0
f_sp = 200.0 / 3
freqs = f_min + f_sp * mel
# And now the nonlinear scale
min_log_hz = 1000.0 # beginning of log region (Hz)
min_log_mel = (min_log_hz - f_min) / f_sp # same (Mels)
logstep = math.log(6.4) / 27.0 # step size for log region
if isinstance(mel, paddle.Tensor):
target = min_log_hz * paddle.exp(logstep * (mel - min_log_mel))
mask = (mel > min_log_mel).astype(mel.dtype)
freqs = target * mask + freqs * (
1 - mask) # will replace by masked_fill OP in future
if mel >= min_log_mel:
freqs = min_log_hz * math.exp(logstep * (mel - min_log_mel))
return freqs
def mel_frequencies(n_mels: int=64,
f_min: float=0.0,
f_max: float=11025.0,
htk: bool=False,
dtype: str=paddle.float32):
"""Compute mel frequencies.
n_mels(int): number of Mel bins.
f_min(float): the lower cut-off frequency, below which the filter response is zero.
f_max(float): the upper cut-off frequency, above which the filter response is zero.
htk(bool): whether to use htk formula.
dtype(str): the datatype of the return frequencies.
The frequencies represented in Mel-scale
# 'Center freqs' of mel bands - uniformly spaced between limits
min_mel = hz_to_mel(f_min, htk=htk)
max_mel = hz_to_mel(f_max, htk=htk)
mels = paddle.linspace(min_mel, max_mel, n_mels, dtype=dtype)
freqs = mel_to_hz(mels, htk=htk)
return freqs
def fft_frequencies(sr: int, n_fft: int, dtype: str=paddle.float32):
"""Compute fourier frequencies.
sr(int): the audio sample rate.
n_fft(float): the number of fft bins.
dtype(str): the datatype of the return frequencies.
The frequencies represented in hz.
return paddle.linspace(0, float(sr) / 2, int(1 + n_fft // 2), dtype=dtype)
def compute_fbank_matrix(sr: int,
n_fft: int,
n_mels: int=64,
f_min: float=0.0,
f_max: Optional[float]=None,
htk: bool=False,
norm: Union[str, float]='slaney',
dtype: str=paddle.float32):
"""Compute fbank matrix.
sr(int): the audio sample rate.
n_fft(int): the number of fft bins.
n_mels(int): the number of Mel bins.
f_min(float): the lower cut-off frequency, below which the filter response is zero.
f_max(float): the upper cut-off frequency, above which the filter response is zero.
htk: whether to use htk formula.
return_complex(bool): whether to return complex matrix. If True, the matrix will
be complex type. Otherwise, the real and image part will be stored in the last
axis of returned tensor.
dtype(str): the datatype of the returned fbank matrix.
The fbank matrix of shape (n_mels, int(1+n_fft//2)).
output: (n_mels, int(1+n_fft//2))
if f_max is None:
f_max = float(sr) / 2
# Initialize the weights
weights = paddle.zeros((n_mels, int(1 + n_fft // 2)), dtype=dtype)
# Center freqs of each FFT bin
fftfreqs = fft_frequencies(sr=sr, n_fft=n_fft, dtype=dtype)
# 'Center freqs' of mel bands - uniformly spaced between limits
mel_f = mel_frequencies(
n_mels + 2, f_min=f_min, f_max=f_max, htk=htk, dtype=dtype)
fdiff = mel_f[1:] - mel_f[:-1] #np.diff(mel_f)
ramps = mel_f.unsqueeze(1) - fftfreqs.unsqueeze(0)
#ramps = np.subtract.outer(mel_f, fftfreqs)
for i in range(n_mels):
# lower and upper slopes for all bins
lower = -ramps[i] / fdiff[i]
upper = ramps[i + 2] / fdiff[i + 1]
# .. then intersect them with each other and zero
weights[i] = paddle.maximum(
paddle.zeros_like(lower), paddle.minimum(lower, upper))
# Slaney-style mel is scaled to be approx constant energy per channel
if norm == 'slaney':
enorm = 2.0 / (mel_f[2:n_mels + 2] - mel_f[:n_mels])
weights *= enorm.unsqueeze(1)
elif isinstance(norm, int) or isinstance(norm, float):
weights = paddle.nn.functional.normalize(weights, p=norm, axis=-1)
return weights
def power_to_db(magnitude: paddle.Tensor,
ref_value: float=1.0,
amin: float=1e-10,
top_db: Optional[float]=None) -> paddle.Tensor:
"""Convert a power spectrogram (amplitude squared) to decibel (dB) units.
The function computes the scaling ``10 * log10(x / ref)`` in a numerically
stable way.
magnitude(Tensor): the input magnitude tensor of any shape.
ref_value(float): the reference value. If smaller than 1.0, the db level
of the signal will be pulled up accordingly. Otherwise, the db level
is pushed down.
amin(float): the minimum value of input magnitude, below which the input
magnitude is clipped(to amin).
top_db(float): the maximum db value of resulting spectrum, above which the
spectrum is clipped(to top_db).
The spectrogram in log-scale.
input: any shape
output: same as input
if amin <= 0:
raise Exception("amin must be strictly positive")
if ref_value <= 0:
raise Exception("ref_value must be strictly positive")
ones = paddle.ones_like(magnitude)
log_spec = 10.0 * paddle.log10(paddle.maximum(ones * amin, magnitude))
log_spec -= 10.0 * math.log10(max(ref_value, amin))
if top_db is not None:
if top_db < 0:
raise Exception("top_db must be non-negative")
log_spec = paddle.maximum(log_spec, ones * (log_spec.max() - top_db))
return log_spec
class Spectrogram(nn.Layer):
def __init__(self,
n_fft: int=512,
hop_length: Optional[int]=None,
win_length: Optional[int]=None,
window: str='hann',
center: bool=True,
pad_mode: str='reflect',
dtype: str=paddle.float32):
"""Compute spectrogram of a given signal, typically an audio waveform.
The spectorgram is defined as the complex norm of the short-time
Fourier transformation.
n_fft(int): the number of frequency components of the discrete Fourier transform.
The default value is 2048,
hop_length(int|None): the hop length of the short time FFT. If None, it is set to win_length//4.
The default value is None.
win_length: the window length of the short time FFt. If None, it is set to same as n_fft.
The default value is None.
window(str): the name of the window function applied to the single before the Fourier transform.
The folllowing window names are supported: 'hamming','hann','kaiser','gaussian',
The default value is 'hann'
center(bool): if True, the signal is padded so that frame t is centered at x[t * hop_length].
If False, frame t begins at x[t * hop_length]
The default value is True
pad_mode(str): the mode to pad the signal if necessary. The supported modes are 'reflect'
and 'constant'. The default value is 'reflect'.
dtype(str): the data type of input and window.
The Spectrogram transform relies on STFT transform to compute the spectrogram.
By default, the weights are not learnable. To fine-tune the Fourier coefficients,
set stop_gradient=False before training.
For more information, see STFT().
super(Spectrogram, self).__init__()
if win_length is None:
win_length = n_fft
fft_window = get_window(window, win_length, fftbins=True, dtype=dtype)
self._stft = partial(
def forward(self, x):
stft = self._stft(x)
spectrogram = paddle.square(paddle.abs(stft))
return spectrogram
class MelSpectrogram(nn.Layer):
def __init__(self,
sr: int=22050,
n_fft: int=512,
hop_length: Optional[int]=None,
win_length: Optional[int]=None,
window: str='hann',
center: bool=True,
pad_mode: str='reflect',
n_mels: int=64,
f_min: float=50.0,
f_max: Optional[float]=None,
htk: bool=False,
norm: Union[str, float]='slaney',
dtype: str=paddle.float32):
"""Compute the melspectrogram of a given signal, typically an audio waveform.
The melspectrogram is also known as filterbank or fbank feature in audio community.
It is computed by multiplying spectrogram with Mel filter bank matrix.
sr(int): the audio sample rate.
The default value is 22050.
n_fft(int): the number of frequency components of the discrete Fourier transform.
The default value is 2048,
hop_length(int|None): the hop length of the short time FFT. If None, it is set to win_length//4.
The default value is None.
win_length: the window length of the short time FFt. If None, it is set to same as n_fft.
The default value is None.
window(str): the name of the window function applied to the single before the Fourier transform.
The folllowing window names are supported: 'hamming','hann','kaiser','gaussian',
The default value is 'hann'
center(bool): if True, the signal is padded so that frame t is centered at x[t * hop_length].
If False, frame t begins at x[t * hop_length]
The default value is True
pad_mode(str): the mode to pad the signal if necessary. The supported modes are 'reflect'
and 'constant'.
The default value is 'reflect'.
n_mels(int): the mel bins.
f_min(float): the lower cut-off frequency, below which the filter response is zero.
f_max(float): the upper cut-off frequency, above which the filter response is zeros.
htk(bool): whether to use HTK formula in computing fbank matrix.
norm(str|float): the normalization type in computing fbank matrix. Slaney-style is used by default.
You can specify norm=1.0/2.0 to use customized p-norm normalization.
dtype(str): the datatype of fbank matrix used in the transform. Use float64 to increase numerical
accuracy. Note that the final transform will be conducted in float32 regardless of dtype of fbank matrix.
super(MelSpectrogram, self).__init__()
self._spectrogram = Spectrogram(
self.n_mels = n_mels
self.f_min = f_min
self.f_max = f_max
self.htk = htk
self.norm = norm
if f_max is None:
f_max = sr // 2
self.fbank_matrix = compute_fbank_matrix(
dtype=dtype) # float64 for better numerical results
self.register_buffer('fbank_matrix', self.fbank_matrix)
def forward(self, x):
spect_feature = self._spectrogram(x)
mel_feature = paddle.matmul(self.fbank_matrix, spect_feature)
return mel_feature
class LogMelSpectrogram(nn.Layer):
def __init__(self,
sr: int=22050,
n_fft: int=512,
hop_length: Optional[int]=None,
win_length: Optional[int]=None,
window: str='hann',
center: bool=True,
pad_mode: str='reflect',
n_mels: int=64,
f_min: float=50.0,
f_max: Optional[float]=None,
htk: bool=False,
norm: Union[str, float]='slaney',
ref_value: float=1.0,
amin: float=1e-10,
top_db: Optional[float]=None,
dtype: str=paddle.float32):
"""Compute log-mel-spectrogram(also known as LogFBank) feature of a given signal,
typically an audio waveform.
sr(int): the audio sample rate.
The default value is 22050.
n_fft(int): the number of frequency components of the discrete Fourier transform.
The default value is 2048,
hop_length(int|None): the hop length of the short time FFT. If None, it is set to win_length//4.
The default value is None.
win_length: the window length of the short time FFt. If None, it is set to same as n_fft.
The default value is None.
window(str): the name of the window function applied to the single before the Fourier transform.
The folllowing window names are supported: 'hamming','hann','kaiser','gaussian',
The default value is 'hann'
center(bool): if True, the signal is padded so that frame t is centered at x[t * hop_length].
If False, frame t begins at x[t * hop_length]
The default value is True
pad_mode(str): the mode to pad the signal if necessary. The supported modes are 'reflect'
and 'constant'.
The default value is 'reflect'.
n_mels(int): the mel bins.
f_min(float): the lower cut-off frequency, below which the filter response is zero.
f_max(float): the upper cut-off frequency, above which the filter response is zeros.
ref_value(float): the reference value. If smaller than 1.0, the db level
htk(bool): whether to use HTK formula in computing fbank matrix.
norm(str|float): the normalization type in computing fbank matrix. Slaney-style is used by default.
You can specify norm=1.0/2.0 to use customized p-norm normalization.
dtype(str): the datatype of fbank matrix used in the transform. Use float64 to increase numerical
accuracy. Note that the final transform will be conducted in float32 regardless of dtype of fbank matrix.
amin(float): the minimum value of input magnitude, below which the input of the signal will be pulled up accordingly.
Otherwise, the db level is pushed down.
magnitude is clipped(to amin). For numerical stability, set amin to a larger value,
e.g., 1e-3.
top_db(float): the maximum db value of resulting spectrum, above which the
spectrum is clipped(to top_db).
super(LogMelSpectrogram, self).__init__()
self._melspectrogram = MelSpectrogram(
self.ref_value = ref_value
self.amin = amin
self.top_db = top_db
def forward(self, x):
# import ipdb; ipdb.set_trace()
mel_feature = self._melspectrogram(x)
log_mel_feature = power_to_db(
return log_mel_feature

@ -0,0 +1,22 @@
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
from . import compliance
from . import datasets
from . import features
from . import functional
from . import io
from . import metric
from . import sox_effects
from .backends import load
from .backends import save

@ -0,0 +1,19 @@
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
from .soundfile_backend import depth_convert
from .soundfile_backend import load
from .soundfile_backend import normalize
from .soundfile_backend import resample
from .soundfile_backend import save
from .soundfile_backend import to_mono

@ -1,4 +1,4 @@
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. # Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -29,7 +29,7 @@ __all__ = [
'to_mono', 'to_mono',
'depth_convert', 'depth_convert',
'normalize', 'normalize',
'save_wav', 'save',
'load', 'load',
] ]
NORMALMIZE_TYPES = ['linear', 'gaussian'] NORMALMIZE_TYPES = ['linear', 'gaussian']
@ -41,12 +41,9 @@ EPS = 1e-8
def resample(y: array, src_sr: int, target_sr: int, def resample(y: array, src_sr: int, target_sr: int,
mode: str='kaiser_fast') -> array: mode: str='kaiser_fast') -> array:
""" Audio resampling """ Audio resampling
This function is the same as using resampy.resample(). This function is the same as using resampy.resample().
Notes: Notes:
The default mode is kaiser_fast. For better audio quality, use mode = 'kaiser_fast' The default mode is kaiser_fast. For better audio quality, use mode = 'kaiser_fast'
""" """
if mode == 'kaiser_best': if mode == 'kaiser_best':
@ -106,7 +103,6 @@ def to_mono(y: array, merge_type: str='average') -> array:
def _safe_cast(y: array, dtype: Union[type, str]) -> array: def _safe_cast(y: array, dtype: Union[type, str]) -> array:
""" data type casting in a safe way, i.e., prevent overflow or underflow """ data type casting in a safe way, i.e., prevent overflow or underflow
This function is used internally. This function is used internally.
""" """
return np.clip(y, np.iinfo(dtype).min, np.iinfo(dtype).max).astype(dtype) return np.clip(y, np.iinfo(dtype).min, np.iinfo(dtype).max).astype(dtype)
@ -115,10 +111,8 @@ def _safe_cast(y: array, dtype: Union[type, str]) -> array:
def depth_convert(y: array, dtype: Union[type, str], def depth_convert(y: array, dtype: Union[type, str],
dithering: bool=True) -> array: dithering: bool=True) -> array:
"""Convert audio array to target dtype safely """Convert audio array to target dtype safely
This function convert audio waveform to a target dtype, with addition steps of This function convert audio waveform to a target dtype, with addition steps of
preventing overflow/underflow and preserving audio range. preventing overflow/underflow and preserving audio range.
""" """
SUPPORT_DTYPE = ['int16', 'int8', 'float32', 'float64'] SUPPORT_DTYPE = ['int16', 'int8', 'float32', 'float64']
@ -168,12 +162,9 @@ def sound_file_load(file: str,
dtype: str='int16', dtype: str='int16',
duration: Optional[int]=None) -> Tuple[array, int]: duration: Optional[int]=None) -> Tuple[array, int]:
"""Load audio using soundfile library """Load audio using soundfile library
This function load audio file using libsndfile. This function load audio file using libsndfile.
Reference: Reference:
""" """
with sf.SoundFile(file) as sf_desc: with sf.SoundFile(file) as sf_desc:
sr_native = sf_desc.samplerate sr_native = sf_desc.samplerate
@ -188,33 +179,9 @@ def sound_file_load(file: str,
return y, sf_desc.samplerate return y, sf_desc.samplerate
def audio_file_load():
"""Load audio using audiofile library
This function load audio file using audiofile.
raise NotImplementedError()
def sox_file_load():
"""Load audio using sox library
This function load audio file using sox.
raise NotImplementedError()
def normalize(y: array, norm_type: str='linear', def normalize(y: array, norm_type: str='linear',
mul_factor: float=1.0) -> array: mul_factor: float=1.0) -> array:
""" normalize an input audio with additional multiplier. """ normalize an input audio with additional multiplier.
""" """
if norm_type == 'linear': if norm_type == 'linear':
@ -232,14 +199,12 @@ def normalize(y: array, norm_type: str='linear',
return y return y
def save_wav(y: array, sr: int, file: str) -> None: def save(y: array, sr: int, file: str) -> None:
"""Save audio file to disk. """Save audio file to disk.
This function saves audio to disk using, with additional step This function saves audio to disk using, with additional step
to convert input waveform to int16 unless it already is int16 to convert input waveform to int16 unless it already is int16
Notes: Notes:
It only support raw wav format. It only support raw wav format.
""" """
if not file.endswith('.wav'): if not file.endswith('.wav'):
raise ParameterError( raise ParameterError(
@ -274,11 +239,8 @@ def load(
resample_mode: str='kaiser_fast') -> Tuple[array, int]: resample_mode: str='kaiser_fast') -> Tuple[array, int]:
"""Load audio file from disk. """Load audio file from disk.
This function loads audio from disk using using audio beackend. This function loads audio from disk using using audio beackend.
Parameters: Parameters:
Notes: Notes:
""" """
y, r = sound_file_load(file, offset=offset, dtype=dtype, duration=duration) y, r = sound_file_load(file, offset=offset, dtype=dtype, duration=duration)

@ -0,0 +1,13 @@
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

@ -1,4 +1,4 @@
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. # Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -11,5 +11,3 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from .backends import *
from .features import *

@ -0,0 +1,638 @@
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# Modified from torchaudio(
import math
from typing import Tuple
import paddle
from paddle import Tensor
from ..functional import create_dct
from ..functional.window import get_window
__all__ = [
# window types
HANNING = 'hann'
HAMMING = 'hamming'
POVEY = 'povey'
BLACKMAN = 'blackman'
def _get_epsilon(dtype):
return paddle.to_tensor(1e-07, dtype=dtype)
def _next_power_of_2(x: int) -> int:
return 1 if x == 0 else 2**(x - 1).bit_length()
def _get_strided(waveform: Tensor,
window_size: int,
window_shift: int,
snip_edges: bool) -> Tensor:
assert waveform.dim() == 1
num_samples = waveform.shape[0]
if snip_edges:
if num_samples < window_size:
return paddle.empty((0, 0), dtype=waveform.dtype)
m = 1 + (num_samples - window_size) // window_shift
reversed_waveform = paddle.flip(waveform, [0])
m = (num_samples + (window_shift // 2)) // window_shift
pad = window_size // 2 - window_shift // 2
pad_right = reversed_waveform
if pad > 0:
pad_left = reversed_waveform[-pad:]
waveform = paddle.concat((pad_left, waveform, pad_right), axis=0)
waveform = paddle.concat((waveform[-pad:], pad_right), axis=0)
return paddle.signal.frame(waveform, window_size, window_shift)[:, :m].T
def _feature_window_function(
window_type: str,
window_size: int,
blackman_coeff: float,
dtype: int, ) -> Tensor:
if window_type == HANNING:
return get_window('hann', window_size, fftbins=False, dtype=dtype)
elif window_type == HAMMING:
return get_window('hamming', window_size, fftbins=False, dtype=dtype)
elif window_type == POVEY:
return get_window(
'hann', window_size, fftbins=False, dtype=dtype).pow(0.85)
elif window_type == RECTANGULAR:
return paddle.ones([window_size], dtype=dtype)
elif window_type == BLACKMAN:
a = 2 * math.pi / (window_size - 1)
window_function = paddle.arange(window_size, dtype=dtype)
return (blackman_coeff - 0.5 * paddle.cos(a * window_function) +
(0.5 - blackman_coeff) * paddle.cos(2 * a * window_function)
raise Exception('Invalid window type ' + window_type)
def _get_log_energy(strided_input: Tensor, epsilon: Tensor,
energy_floor: float) -> Tensor:
log_energy = paddle.maximum(strided_input.pow(2).sum(1), epsilon).log()
if energy_floor == 0.0:
return log_energy
return paddle.maximum(
paddle.to_tensor(math.log(energy_floor), dtype=strided_input.dtype))
def _get_waveform_and_window_properties(
waveform: Tensor,
channel: int,
sr: int,
frame_shift: float,
frame_length: float,
round_to_power_of_two: bool,
preemphasis_coefficient: float) -> Tuple[Tensor, int, int, int]:
channel = max(channel, 0)
assert channel < waveform.shape[0], (
'Invalid channel {} for size {}'.format(channel, waveform.shape[0]))
waveform = waveform[channel, :] # size (n)
window_shift = int(
sr * frame_shift *
0.001) # pass frame_shift and frame_length in milliseconds
window_size = int(sr * frame_length * 0.001)
padded_window_size = _next_power_of_2(
window_size) if round_to_power_of_two else window_size
assert 2 <= window_size <= len(waveform), (
'choose a window size {} that is [2, {}]'.format(window_size,
assert 0 < window_shift, '`window_shift` must be greater than 0'
assert padded_window_size % 2 == 0, 'the padded `window_size` must be divisible by two.' \
' use `round_to_power_of_two` or change `frame_length`'
assert 0. <= preemphasis_coefficient <= 1.0, '`preemphasis_coefficient` must be between [0,1]'
assert sr > 0, '`sr` must be greater than zero'
return waveform, window_shift, window_size, padded_window_size
def _get_window(waveform: Tensor,
padded_window_size: int,
window_size: int,
window_shift: int,
window_type: str,
blackman_coeff: float,
snip_edges: bool,
raw_energy: bool,
energy_floor: float,
dither: float,
remove_dc_offset: bool,
preemphasis_coefficient: float) -> Tuple[Tensor, Tensor]:
dtype = waveform.dtype
epsilon = _get_epsilon(dtype)
# (m, window_size)
strided_input = _get_strided(waveform, window_size, window_shift,
if dither != 0.0:
x = paddle.maximum(epsilon,
paddle.rand(strided_input.shape, dtype=dtype))
rand_gauss = paddle.sqrt(-2 * x.log()) * paddle.cos(2 * math.pi * x)
strided_input = strided_input + rand_gauss * dither
if remove_dc_offset:
row_means = paddle.mean(strided_input, axis=1).unsqueeze(1) # (m, 1)
strided_input = strided_input - row_means
if raw_energy:
signal_log_energy = _get_log_energy(strided_input, epsilon,
energy_floor) # (m)
if preemphasis_coefficient != 0.0:
offset_strided_input = paddle.nn.functional.pad(
strided_input.unsqueeze(0), (1, 0),
mode='replicate').squeeze(0) # (m, window_size + 1)
strided_input = strided_input - preemphasis_coefficient * offset_strided_input[:, :
window_function = _feature_window_function(
window_type, window_size, blackman_coeff,
dtype).unsqueeze(0) # (1, window_size)
strided_input = strided_input * window_function # (m, window_size)
# (m, padded_window_size)
if padded_window_size != window_size:
padding_right = padded_window_size - window_size
strided_input = paddle.nn.functional.pad(
strided_input.unsqueeze(0), (0, padding_right),
if not raw_energy:
signal_log_energy = _get_log_energy(strided_input, epsilon,
energy_floor) # size (m)
return strided_input, signal_log_energy
def _subtract_column_mean(tensor: Tensor, subtract_mean: bool) -> Tensor:
if subtract_mean:
col_means = paddle.mean(tensor, axis=0).unsqueeze(0)
tensor = tensor - col_means
return tensor
def spectrogram(waveform: Tensor,
blackman_coeff: float=0.42,
channel: int=-1,
dither: float=0.0,
energy_floor: float=1.0,
frame_length: float=25.0,
frame_shift: float=10.0,
preemphasis_coefficient: float=0.97,
raw_energy: bool=True,
remove_dc_offset: bool=True,
round_to_power_of_two: bool=True,
sr: int=16000,
snip_edges: bool=True,
subtract_mean: bool=False,
window_type: str=POVEY) -> Tensor:
"""Compute and return a spectrogram from a waveform. The output is identical to Kaldi's.
waveform (Tensor): A waveform tensor with shape [C, T].
blackman_coeff (float, optional): Coefficient for Blackman window.. Defaults to 0.42.
channel (int, optional): Select the channel of waveform. Defaults to -1.
dither (float, optional): Dithering constant . Defaults to 0.0.
energy_floor (float, optional): Floor on energy of the output Spectrogram. Defaults to 1.0.
frame_length (float, optional): Frame length in milliseconds. Defaults to 25.0.
frame_shift (float, optional): Shift between adjacent frames in milliseconds. Defaults to 10.0.
preemphasis_coefficient (float, optional): Preemphasis coefficient for input waveform. Defaults to 0.97.
raw_energy (bool, optional): Whether to compute before preemphasis and windowing. Defaults to True.
remove_dc_offset (bool, optional): Whether to subtract mean from waveform on frames. Defaults to True.
round_to_power_of_two (bool, optional): If True, round window size to power of two by zero-padding input
to FFT. Defaults to True.
sr (int, optional): Sample rate of input waveform. Defaults to 16000.
snip_edges (bool, optional): Drop samples in the end of waveform that cann't fit a singal frame when it
is set True. Otherwise performs reflect padding to the end of waveform. Defaults to True.
subtract_mean (bool, optional): Whether to subtract mean of feature files. Defaults to False.
window_type (str, optional): Choose type of window for FFT computation. Defaults to POVEY.
Tensor: A spectrogram tensor with shape (m, padded_window_size // 2 + 1) where m is the number of frames
depends on frame_length and frame_shift.
dtype = waveform.dtype
epsilon = _get_epsilon(dtype)
waveform, window_shift, window_size, padded_window_size = _get_waveform_and_window_properties(
waveform, channel, sr, frame_shift, frame_length, round_to_power_of_two,
strided_input, signal_log_energy = _get_window(
waveform, padded_window_size, window_size, window_shift, window_type,
blackman_coeff, snip_edges, raw_energy, energy_floor, dither,
remove_dc_offset, preemphasis_coefficient)
# (m, padded_window_size // 2 + 1, 2)
fft = paddle.fft.rfft(strided_input)
power_spectrum = paddle.maximum(
fft.abs().pow(2.), epsilon).log() # (m, padded_window_size // 2 + 1)
power_spectrum[:, 0] = signal_log_energy
power_spectrum = _subtract_column_mean(power_spectrum, subtract_mean)
return power_spectrum
def _inverse_mel_scale_scalar(mel_freq: float) -> float:
return 700.0 * (math.exp(mel_freq / 1127.0) - 1.0)
def _inverse_mel_scale(mel_freq: Tensor) -> Tensor:
return 700.0 * ((mel_freq / 1127.0).exp() - 1.0)
def _mel_scale_scalar(freq: float) -> float:
return 1127.0 * math.log(1.0 + freq / 700.0)
def _mel_scale(freq: Tensor) -> Tensor:
return 1127.0 * (1.0 + freq / 700.0).log()
def _vtln_warp_freq(vtln_low_cutoff: float,
vtln_high_cutoff: float,
low_freq: float,
high_freq: float,
vtln_warp_factor: float,
freq: Tensor) -> Tensor:
assert vtln_low_cutoff > low_freq, 'be sure to set the vtln_low option higher than low_freq'
assert vtln_high_cutoff < high_freq, 'be sure to set the vtln_high option lower than high_freq [or negative]'
l = vtln_low_cutoff * max(1.0, vtln_warp_factor)
h = vtln_high_cutoff * min(1.0, vtln_warp_factor)
scale = 1.0 / vtln_warp_factor
Fl = scale * l
Fh = scale * h
assert l > low_freq and h < high_freq
scale_left = (Fl - low_freq) / (l - low_freq)
scale_right = (high_freq - Fh) / (high_freq - h)
res = paddle.empty_like(freq)
outside_low_high_freq = paddle.less_than(freq, paddle.to_tensor(low_freq)) \
| paddle.greater_than(freq, paddle.to_tensor(high_freq))
before_l = paddle.less_than(freq, paddle.to_tensor(l))
before_h = paddle.less_than(freq, paddle.to_tensor(h))
after_h = paddle.greater_equal(freq, paddle.to_tensor(h))
res[after_h] = high_freq + scale_right * (freq[after_h] - high_freq)
res[before_h] = scale * freq[before_h]
res[before_l] = low_freq + scale_left * (freq[before_l] - low_freq)
res[outside_low_high_freq] = freq[outside_low_high_freq]
return res
def _vtln_warp_mel_freq(vtln_low_cutoff: float,
vtln_high_cutoff: float,
high_freq: float,
vtln_warp_factor: float,
mel_freq: Tensor) -> Tensor:
return _mel_scale(
_vtln_warp_freq(vtln_low_cutoff, vtln_high_cutoff, low_freq, high_freq,
vtln_warp_factor, _inverse_mel_scale(mel_freq)))
def _get_mel_banks(num_bins: int,
window_length_padded: int,
sample_freq: float,
low_freq: float,
high_freq: float,
vtln_low: float,
vtln_high: float,
vtln_warp_factor: float) -> Tuple[Tensor, Tensor]:
assert num_bins > 3, 'Must have at least 3 mel bins'
assert window_length_padded % 2 == 0
num_fft_bins = window_length_padded / 2
nyquist = 0.5 * sample_freq
if high_freq <= 0.0:
high_freq += nyquist
assert (0.0 <= low_freq < nyquist) and (0.0 < high_freq <= nyquist) and (low_freq < high_freq), \
('Bad values in options: low-freq {} and high-freq {} vs. nyquist {}'.format(low_freq, high_freq, nyquist))
fft_bin_width = sample_freq / window_length_padded
mel_low_freq = _mel_scale_scalar(low_freq)
mel_high_freq = _mel_scale_scalar(high_freq)
mel_freq_delta = (mel_high_freq - mel_low_freq) / (num_bins + 1)
if vtln_high < 0.0:
vtln_high += nyquist
assert vtln_warp_factor == 1.0 or ((low_freq < vtln_low < high_freq) and
(0.0 < vtln_high < high_freq) and (vtln_low < vtln_high)), \
('Bad values in options: vtln-low {} and vtln-high {}, versus '
'low-freq {} and high-freq {}'.format(vtln_low, vtln_high, low_freq, high_freq))
bin = paddle.arange(num_bins).unsqueeze(1)
left_mel = mel_low_freq + bin * mel_freq_delta # (num_bins, 1)
center_mel = mel_low_freq + (bin + 1.0) * mel_freq_delta # (num_bins, 1)
right_mel = mel_low_freq + (bin + 2.0) * mel_freq_delta # (num_bins, 1)
if vtln_warp_factor != 1.0:
left_mel = _vtln_warp_mel_freq(vtln_low, vtln_high, low_freq, high_freq,
vtln_warp_factor, left_mel)
center_mel = _vtln_warp_mel_freq(vtln_low, vtln_high, low_freq,
high_freq, vtln_warp_factor,
right_mel = _vtln_warp_mel_freq(vtln_low, vtln_high, low_freq,
high_freq, vtln_warp_factor, right_mel)
center_freqs = _inverse_mel_scale(center_mel) # (num_bins)
# (1, num_fft_bins)
mel = _mel_scale(fft_bin_width * paddle.arange(num_fft_bins)).unsqueeze(0)
# (num_bins, num_fft_bins)
up_slope = (mel - left_mel) / (center_mel - left_mel)
down_slope = (right_mel - mel) / (right_mel - center_mel)
if vtln_warp_factor == 1.0:
bins = paddle.maximum(
paddle.zeros([1]), paddle.minimum(up_slope, down_slope))
bins = paddle.zeros_like(up_slope)
up_idx = paddle.greater_than(mel, left_mel) & paddle.less_than(
mel, center_mel)
down_idx = paddle.greater_than(mel, center_mel) & paddle.less_than(
mel, right_mel)
bins[up_idx] = up_slope[up_idx]
bins[down_idx] = down_slope[down_idx]
return bins, center_freqs
def fbank(waveform: Tensor,
blackman_coeff: float=0.42,
channel: int=-1,
dither: float=0.0,
energy_floor: float=1.0,
frame_length: float=25.0,
frame_shift: float=10.0,
high_freq: float=0.0,
htk_compat: bool=False,
low_freq: float=20.0,
n_mels: int=23,
preemphasis_coefficient: float=0.97,
raw_energy: bool=True,
remove_dc_offset: bool=True,
round_to_power_of_two: bool=True,
sr: int=16000,
snip_edges: bool=True,
subtract_mean: bool=False,
use_energy: bool=False,
use_log_fbank: bool=True,
use_power: bool=True,
vtln_high: float=-500.0,
vtln_low: float=100.0,
vtln_warp: float=1.0,
window_type: str=POVEY) -> Tensor:
"""Compute and return filter banks from a waveform. The output is identical to Kaldi's.
waveform (Tensor): A waveform tensor with shape [C, T].
blackman_coeff (float, optional): Coefficient for Blackman window.. Defaults to 0.42.
channel (int, optional): Select the channel of waveform. Defaults to -1.
dither (float, optional): Dithering constant . Defaults to 0.0.
energy_floor (float, optional): Floor on energy of the output Spectrogram. Defaults to 1.0.
frame_length (float, optional): Frame length in milliseconds. Defaults to 25.0.
frame_shift (float, optional): Shift between adjacent frames in milliseconds. Defaults to 10.0.
high_freq (float, optional): The upper cut-off frequency. Defaults to 0.0.
htk_compat (bool, optional): Put energy to the last when it is set True. Defaults to False.
low_freq (float, optional): The lower cut-off frequency. Defaults to 20.0.
n_mels (int, optional): Number of output mel bins. Defaults to 23.
preemphasis_coefficient (float, optional): Preemphasis coefficient for input waveform. Defaults to 0.97.
raw_energy (bool, optional): Whether to compute before preemphasis and windowing. Defaults to True.
remove_dc_offset (bool, optional): Whether to subtract mean from waveform on frames. Defaults to True.
round_to_power_of_two (bool, optional): If True, round window size to power of two by zero-padding input
to FFT. Defaults to True.
sr (int, optional): Sample rate of input waveform. Defaults to 16000.
snip_edges (bool, optional): Drop samples in the end of waveform that cann't fit a singal frame when it
is set True. Otherwise performs reflect padding to the end of waveform. Defaults to True.
subtract_mean (bool, optional): Whether to subtract mean of feature files. Defaults to False.
use_energy (bool, optional): Add an dimension with energy of spectrogram to the output. Defaults to False.
use_log_fbank (bool, optional): Return log fbank when it is set True. Defaults to True.
use_power (bool, optional): Whether to use power instead of magnitude. Defaults to True.
vtln_high (float, optional): High inflection point in piecewise linear VTLN warping function. Defaults to -500.0.
vtln_low (float, optional): Low inflection point in piecewise linear VTLN warping function. Defaults to 100.0.
vtln_warp (float, optional): Vtln warp factor. Defaults to 1.0.
window_type (str, optional): Choose type of window for FFT computation. Defaults to POVEY.
Tensor: A filter banks tensor with shape (m, n_mels).
dtype = waveform.dtype
waveform, window_shift, window_size, padded_window_size = _get_waveform_and_window_properties(
waveform, channel, sr, frame_shift, frame_length, round_to_power_of_two,
strided_input, signal_log_energy = _get_window(
waveform, padded_window_size, window_size, window_shift, window_type,
blackman_coeff, snip_edges, raw_energy, energy_floor, dither,
remove_dc_offset, preemphasis_coefficient)
# (m, padded_window_size // 2 + 1)
spectrum = paddle.fft.rfft(strided_input).abs()
if use_power:
spectrum = spectrum.pow(2.)
# (n_mels, padded_window_size // 2)
mel_energies, _ = _get_mel_banks(n_mels, padded_window_size, sr, low_freq,
high_freq, vtln_low, vtln_high, vtln_warp)
mel_energies = mel_energies.astype(dtype)
# (n_mels, padded_window_size // 2 + 1)
mel_energies = paddle.nn.functional.pad(
mel_energies.unsqueeze(0), (0, 1),
# (m, n_mels)
mel_energies =, mel_energies.T)
if use_log_fbank:
mel_energies = paddle.maximum(mel_energies, _get_epsilon(dtype)).log()
if use_energy:
signal_log_energy = signal_log_energy.unsqueeze(1)
if htk_compat:
mel_energies = paddle.concat(
(mel_energies, signal_log_energy), axis=1)
mel_energies = paddle.concat(
(signal_log_energy, mel_energies), axis=1)
# (m, n_mels + 1)
mel_energies = _subtract_column_mean(mel_energies, subtract_mean)
return mel_energies
def _get_dct_matrix(n_mfcc: int, n_mels: int) -> Tensor:
dct_matrix = create_dct(n_mels, n_mels, 'ortho')
dct_matrix[:, 0] = math.sqrt(1 / float(n_mels))
dct_matrix = dct_matrix[:, :n_mfcc] # (n_mels, n_mfcc)
return dct_matrix
def _get_lifter_coeffs(n_mfcc: int, cepstral_lifter: float) -> Tensor:
i = paddle.arange(n_mfcc)
return 1.0 + 0.5 * cepstral_lifter * paddle.sin(math.pi * i /
def mfcc(waveform: Tensor,
blackman_coeff: float=0.42,
cepstral_lifter: float=22.0,
channel: int=-1,
dither: float=0.0,
energy_floor: float=1.0,
frame_length: float=25.0,
frame_shift: float=10.0,
high_freq: float=0.0,
htk_compat: bool=False,
low_freq: float=20.0,
n_mfcc: int=13,
n_mels: int=23,
preemphasis_coefficient: float=0.97,
raw_energy: bool=True,
remove_dc_offset: bool=True,
round_to_power_of_two: bool=True,
sr: int=16000,
snip_edges: bool=True,
subtract_mean: bool=False,
use_energy: bool=False,
vtln_high: float=-500.0,
vtln_low: float=100.0,
vtln_warp: float=1.0,
window_type: str=POVEY) -> Tensor:
"""Compute and return mel frequency cepstral coefficients from a waveform. The output is
identical to Kaldi's.
waveform (Tensor): A waveform tensor with shape [C, T].
blackman_coeff (float, optional): Coefficient for Blackman window.. Defaults to 0.42.
cepstral_lifter (float, optional): Scaling of output mfccs. Defaults to 22.0.
channel (int, optional): Select the channel of waveform. Defaults to -1.
dither (float, optional): Dithering constant . Defaults to 0.0.
energy_floor (float, optional): Floor on energy of the output Spectrogram. Defaults to 1.0.
frame_length (float, optional): Frame length in milliseconds. Defaults to 25.0.
frame_shift (float, optional): Shift between adjacent frames in milliseconds. Defaults to 10.0.
high_freq (float, optional): The upper cut-off frequency. Defaults to 0.0.
htk_compat (bool, optional): Put energy to the last when it is set True. Defaults to False.
low_freq (float, optional): The lower cut-off frequency. Defaults to 20.0.
n_mfcc (int, optional): Number of cepstra in MFCC. Defaults to 13.
n_mels (int, optional): Number of output mel bins. Defaults to 23.
preemphasis_coefficient (float, optional): Preemphasis coefficient for input waveform. Defaults to 0.97.
raw_energy (bool, optional): Whether to compute before preemphasis and windowing. Defaults to True.
remove_dc_offset (bool, optional): Whether to subtract mean from waveform on frames. Defaults to True.
round_to_power_of_two (bool, optional): If True, round window size to power of two by zero-padding input
to FFT. Defaults to True.
sr (int, optional): Sample rate of input waveform. Defaults to 16000.
snip_edges (bool, optional): Drop samples in the end of waveform that cann't fit a singal frame when it
is set True. Otherwise performs reflect padding to the end of waveform. Defaults to True.
subtract_mean (bool, optional): Whether to subtract mean of feature files. Defaults to False.
use_energy (bool, optional): Add an dimension with energy of spectrogram to the output. Defaults to False.
vtln_high (float, optional): High inflection point in piecewise linear VTLN warping function. Defaults to -500.0.
vtln_low (float, optional): Low inflection point in piecewise linear VTLN warping function. Defaults to 100.0.
vtln_warp (float, optional): Vtln warp factor. Defaults to 1.0.
window_type (str, optional): Choose type of window for FFT computation. Defaults to POVEY.
Tensor: A mel frequency cepstral coefficients tensor with shape (m, n_mfcc).
assert n_mfcc <= n_mels, 'n_mfcc cannot be larger than n_mels: %d vs %d' % (
n_mfcc, n_mels)
dtype = waveform.dtype
# (m, n_mels + use_energy)
feature = fbank(
if use_energy:
# (m)
signal_log_energy = feature[:, n_mels if htk_compat else 0]
mel_offset = int(not htk_compat)
feature = feature[:, mel_offset:(n_mels + mel_offset)]
# (n_mels, n_mfcc)
dct_matrix = _get_dct_matrix(n_mfcc, n_mels).astype(dtype=dtype)
# (m, n_mfcc)
feature = feature.matmul(dct_matrix)
if cepstral_lifter != 0.0:
# (1, n_mfcc)
lifter_coeffs = _get_lifter_coeffs(n_mfcc, cepstral_lifter).unsqueeze(0)
feature *= lifter_coeffs.astype(dtype=dtype)
if use_energy:
feature[:, 0] = signal_log_energy
if htk_compat:
energy = feature[:, 0].unsqueeze(1) # (m, 1)
feature = feature[:, 1:] # (m, n_mfcc - 1)
if not use_energy:
energy *= math.sqrt(2)
feature = paddle.concat((feature, energy), axis=1)
feature = _subtract_column_mean(feature, subtract_mean)
return feature

@ -21,11 +21,13 @@ import numpy as np
import scipy import scipy
from numpy import ndarray as array from numpy import ndarray as array
from numpy.lib.stride_tricks import as_strided from numpy.lib.stride_tricks import as_strided
from scipy.signal import get_window from scipy import signal
from ..backends import depth_convert
from ..utils import ParameterError from ..utils import ParameterError
__all__ = [ __all__ = [
# dsp
'stft', 'stft',
'mfcc', 'mfcc',
'hz_to_mel', 'hz_to_mel',
@ -38,6 +40,12 @@ __all__ = [
'spectrogram', 'spectrogram',
'mu_encode', 'mu_encode',
'mu_decode', 'mu_decode',
# augmentation
] ]
@ -303,7 +311,7 @@ def stft(x: array,
if hop_length is None: if hop_length is None:
hop_length = int(win_length // 4) hop_length = int(win_length // 4)
fft_window = get_window(window, win_length, fftbins=True) fft_window = signal.get_window(window, win_length, fftbins=True)
# Pad the window out to n_fft size # Pad the window out to n_fft size
fft_window = pad_center(fft_window, n_fft) fft_window = pad_center(fft_window, n_fft)
@ -576,3 +584,145 @@ def mu_decode(y: array, mu: int=255, quantized: bool=True) -> array:
y = y * 2 / mu - 1 y = y * 2 / mu - 1
x = np.sign(y) / mu * ((1 + mu)**np.abs(y) - 1) x = np.sign(y) / mu * ((1 + mu)**np.abs(y) - 1)
return x return x
def randint(high: int) -> int:
"""Generate one random integer in range [0 high)
This is a helper function for random data augmentaiton
return int(np.random.randint(0, high=high))
def rand() -> float:
"""Generate one floating-point number in range [0 1)
This is a helper function for random data augmentaiton
return float(np.random.rand(1))
def depth_augment(y: array,
choices: List=['int8', 'int16'],
probs: List[float]=[0.5, 0.5]) -> array:
""" Audio depth augmentation
Do audio depth augmentation to simulate the distortion brought by quantization.
assert len(probs) == len(
), 'number of choices {} must be equal to size of probs {}'.format(
len(choices), len(probs))
depth = np.random.choice(choices, p=probs)
src_depth = y.dtype
y1 = depth_convert(y, depth)
y2 = depth_convert(y1, src_depth)
return y2
def adaptive_spect_augment(spect: array, tempo_axis: int=0,
level: float=0.1) -> array:
"""Do adpative spectrogram augmentation
The level of the augmentation is gowern by the paramter level,
ranging from 0 to 1, with 0 represents no augmentation
assert spect.ndim == 2., 'only supports 2d tensor or numpy array'
if tempo_axis == 0:
nt, nf = spect.shape
nf, nt = spect.shape
time_mask_width = int(nt * level * 0.5)
freq_mask_width = int(nf * level * 0.5)
num_time_mask = int(10 * level)
num_freq_mask = int(10 * level)
if tempo_axis == 0:
for _ in range(num_time_mask):
start = randint(nt - time_mask_width)
spect[start:start + time_mask_width, :] = 0
for _ in range(num_freq_mask):
start = randint(nf - freq_mask_width)
spect[:, start:start + freq_mask_width] = 0
for _ in range(num_time_mask):
start = randint(nt - time_mask_width)
spect[:, start:start + time_mask_width] = 0
for _ in range(num_freq_mask):
start = randint(nf - freq_mask_width)
spect[start:start + freq_mask_width, :] = 0
return spect
def spect_augment(spect: array,
tempo_axis: int=0,
max_time_mask: int=3,
max_freq_mask: int=3,
max_time_mask_width: int=30,
max_freq_mask_width: int=20) -> array:
"""Do spectrogram augmentation in both time and freq axis
assert spect.ndim == 2., 'only supports 2d tensor or numpy array'
if tempo_axis == 0:
nt, nf = spect.shape
nf, nt = spect.shape
num_time_mask = randint(max_time_mask)
num_freq_mask = randint(max_freq_mask)
time_mask_width = randint(max_time_mask_width)
freq_mask_width = randint(max_freq_mask_width)
if tempo_axis == 0:
for _ in range(num_time_mask):
start = randint(nt - time_mask_width)
spect[start:start + time_mask_width, :] = 0
for _ in range(num_freq_mask):
start = randint(nf - freq_mask_width)
spect[:, start:start + freq_mask_width] = 0
for _ in range(num_time_mask):
start = randint(nt - time_mask_width)
spect[:, start:start + time_mask_width] = 0
for _ in range(num_freq_mask):
start = randint(nf - freq_mask_width)
spect[start:start + freq_mask_width, :] = 0
return spect
def random_crop1d(y: array, crop_len: int) -> array:
""" Do random cropping on 1d input signal
The input is a 1d signal, typically a sound waveform
if y.ndim != 1:
'only accept 1d tensor or numpy array'
n = len(y)
idx = randint(n - crop_len)
return y[idx:idx + crop_len]
def random_crop2d(s: array, crop_len: int, tempo_axis: int=0) -> array:
""" Do random cropping for 2D array, typically a spectrogram.
The cropping is done in temporal direction on the time-freq input signal.
if tempo_axis >= s.ndim:
raise ParameterError('axis out of range')
n = s.shape[tempo_axis]
idx = randint(high=n - crop_len)
sli = [slice(None) for i in range(s.ndim)]
sli[tempo_axis] = slice(idx, idx + crop_len)
out = s[tuple(sli)]
return out

@ -15,10 +15,3 @@ from .esc50 import ESC50
from .gtzan import GTZAN from .gtzan import GTZAN
from .tess import TESS from .tess import TESS
from .urban_sound import UrbanSound8K from .urban_sound import UrbanSound8K
__all__ = [

@ -17,8 +17,8 @@ import numpy as np
import paddle import paddle
from ..backends import load as load_audio from ..backends import load as load_audio
from ..features import melspectrogram from ..compliance.librosa import melspectrogram
from ..features import mfcc from ..compliance.librosa import mfcc
feat_funcs = { feat_funcs = {
'raw': None, 'raw': None,

@ -11,6 +11,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from .augment import * from .layers import LogMelSpectrogram
from .core import * from .layers import MelSpectrogram
from .spectrum import * from .layers import MFCC
from .layers import Spectrogram

@ -0,0 +1,344 @@
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
from functools import partial
from typing import Optional
from typing import Union
import paddle
import paddle.nn as nn
from ..functional import compute_fbank_matrix
from ..functional import create_dct
from ..functional import power_to_db
from ..functional.window import get_window
__all__ = [
class Spectrogram(nn.Layer):
def __init__(self,
n_fft: int=512,
hop_length: Optional[int]=None,
win_length: Optional[int]=None,
window: str='hann',
center: bool=True,
pad_mode: str='reflect',
dtype: str=paddle.float32):
"""Compute spectrogram of a given signal, typically an audio waveform.
The spectorgram is defined as the complex norm of the short-time
Fourier transformation.
n_fft (int): the number of frequency components of the discrete Fourier transform.
The default value is 2048,
hop_length (int|None): the hop length of the short time FFT. If None, it is set to win_length//4.
The default value is None.
win_length: the window length of the short time FFt. If None, it is set to same as n_fft.
The default value is None.
window (str): the name of the window function applied to the single before the Fourier transform.
The folllowing window names are supported: 'hamming','hann','kaiser','gaussian',
The default value is 'hann'
center (bool): if True, the signal is padded so that frame t is centered at x[t * hop_length].
If False, frame t begins at x[t * hop_length]
The default value is True
pad_mode (str): the mode to pad the signal if necessary. The supported modes are 'reflect'
and 'constant'. The default value is 'reflect'.
dtype (str): the data type of input and window.
The Spectrogram transform relies on STFT transform to compute the spectrogram.
By default, the weights are not learnable. To fine-tune the Fourier coefficients,
set stop_gradient=False before training.
For more information, see STFT().
super(Spectrogram, self).__init__()
if win_length is None:
win_length = n_fft
self.fft_window = get_window(
window, win_length, fftbins=True, dtype=dtype)
self._stft = partial(
self.register_buffer('fft_window', self.fft_window)
def forward(self, x):
stft = self._stft(x)
spectrogram = paddle.square(paddle.abs(stft))
return spectrogram
class MelSpectrogram(nn.Layer):
def __init__(self,
sr: int=22050,
n_fft: int=512,
hop_length: Optional[int]=None,
win_length: Optional[int]=None,
window: str='hann',
center: bool=True,
pad_mode: str='reflect',
n_mels: int=64,
f_min: float=50.0,
f_max: Optional[float]=None,
htk: bool=False,
norm: Union[str, float]='slaney',
dtype: str=paddle.float32):
"""Compute the melspectrogram of a given signal, typically an audio waveform.
The melspectrogram is also known as filterbank or fbank feature in audio community.
It is computed by multiplying spectrogram with Mel filter bank matrix.
sr(int): the audio sample rate.
The default value is 22050.
n_fft(int): the number of frequency components of the discrete Fourier transform.
The default value is 2048,
hop_length(int|None): the hop length of the short time FFT. If None, it is set to win_length//4.
The default value is None.
win_length: the window length of the short time FFt. If None, it is set to same as n_fft.
The default value is None.
window(str): the name of the window function applied to the single before the Fourier transform.
The folllowing window names are supported: 'hamming','hann','kaiser','gaussian',
The default value is 'hann'
center(bool): if True, the signal is padded so that frame t is centered at x[t * hop_length].
If False, frame t begins at x[t * hop_length]
The default value is True
pad_mode(str): the mode to pad the signal if necessary. The supported modes are 'reflect'
and 'constant'.
The default value is 'reflect'.
n_mels(int): the mel bins.
f_min(float): the lower cut-off frequency, below which the filter response is zero.
f_max(float): the upper cut-off frequency, above which the filter response is zeros.
htk(bool): whether to use HTK formula in computing fbank matrix.
norm(str|float): the normalization type in computing fbank matrix. Slaney-style is used by default.
You can specify norm=1.0/2.0 to use customized p-norm normalization.
dtype(str): the datatype of fbank matrix used in the transform. Use float64 to increase numerical
accuracy. Note that the final transform will be conducted in float32 regardless of dtype of fbank matrix.
super(MelSpectrogram, self).__init__()
self._spectrogram = Spectrogram(
self.n_mels = n_mels
self.f_min = f_min
self.f_max = f_max
self.htk = htk
self.norm = norm
if f_max is None:
f_max = sr // 2
self.fbank_matrix = compute_fbank_matrix(
dtype=dtype) # float64 for better numerical results
self.register_buffer('fbank_matrix', self.fbank_matrix)
def forward(self, x):
spect_feature = self._spectrogram(x)
mel_feature = paddle.matmul(self.fbank_matrix, spect_feature)
return mel_feature
class LogMelSpectrogram(nn.Layer):
def __init__(self,
sr: int=22050,
n_fft: int=512,
hop_length: Optional[int]=None,
win_length: Optional[int]=None,
window: str='hann',
center: bool=True,
pad_mode: str='reflect',
n_mels: int=64,
f_min: float=50.0,
f_max: Optional[float]=None,
htk: bool=False,
norm: Union[str, float]='slaney',
ref_value: float=1.0,
amin: float=1e-10,
top_db: Optional[float]=None,
dtype: str=paddle.float32):
"""Compute log-mel-spectrogram(also known as LogFBank) feature of a given signal,
typically an audio waveform.
sr (int): the audio sample rate.
The default value is 22050.
n_fft (int): the number of frequency components of the discrete Fourier transform.
The default value is 2048,
hop_length (int|None): the hop length of the short time FFT. If None, it is set to win_length//4.
The default value is None.
win_length: the window length of the short time FFt. If None, it is set to same as n_fft.
The default value is None.
window (str): the name of the window function applied to the single before the Fourier transform.
The folllowing window names are supported: 'hamming','hann','kaiser','gaussian',
The default value is 'hann'
center (bool): if True, the signal is padded so that frame t is centered at x[t * hop_length].
If False, frame t begins at x[t * hop_length]
The default value is True
pad_mode (str): the mode to pad the signal if necessary. The supported modes are 'reflect'
and 'constant'.
The default value is 'reflect'.
n_mels (int): the mel bins.
f_min (float): the lower cut-off frequency, below which the filter response is zero.
f_max (float): the upper cut-off frequency, above which the filter response is zeros.
htk (bool): whether to use HTK formula in computing fbank matrix.
norm (str|float): the normalization type in computing fbank matrix. Slaney-style is used by default.
You can specify norm=1.0/2.0 to use customized p-norm normalization.
ref_value (float): the reference value. If smaller than 1.0, the db level
amin (float): the minimum value of input magnitude, below which the input of the signal will be pulled up accordingly.
Otherwise, the db level is pushed down.
magnitude is clipped(to amin). For numerical stability, set amin to a larger value,
e.g., 1e-3.
top_db (float): the maximum db value of resulting spectrum, above which the
spectrum is clipped(to top_db).
dtype (str): the datatype of fbank matrix used in the transform. Use float64 to increase numerical
accuracy. Note that the final transform will be conducted in float32 regardless of dtype of fbank matrix.
super(LogMelSpectrogram, self).__init__()
self._melspectrogram = MelSpectrogram(
self.ref_value = ref_value
self.amin = amin
self.top_db = top_db
def forward(self, x):
# import ipdb; ipdb.set_trace()
mel_feature = self._melspectrogram(x)
log_mel_feature = power_to_db(
return log_mel_feature
class MFCC(nn.Layer):
def __init__(self,
sr: int=22050,
n_mfcc: int=40,
n_fft: int=512,
hop_length: Optional[int]=None,
win_length: Optional[int]=None,
window: str='hann',
center: bool=True,
pad_mode: str='reflect',
n_mels: int=64,
f_min: float=50.0,
f_max: Optional[float]=None,
htk: bool=False,
norm: Union[str, float]='slaney',
ref_value: float=1.0,
amin: float=1e-10,
top_db: Optional[float]=None,
dtype: str=paddle.float32):
"""Compute mel frequency cepstral coefficients(MFCCs) feature of given waveforms.
sr(int): the audio sample rate.
The default value is 22050.
n_mfcc (int, optional): Number of cepstra in MFCC. Defaults to 40.
n_fft (int): the number of frequency components of the discrete Fourier transform.
The default value is 2048,
hop_length (int|None): the hop length of the short time FFT. If None, it is set to win_length//4.
The default value is None.
win_length: the window length of the short time FFt. If None, it is set to same as n_fft.
The default value is None.
window (str): the name of the window function applied to the single before the Fourier transform.
The folllowing window names are supported: 'hamming','hann','kaiser','gaussian',
The default value is 'hann'
center (bool): if True, the signal is padded so that frame t is centered at x[t * hop_length].
If False, frame t begins at x[t * hop_length]
The default value is True
pad_mode (str): the mode to pad the signal if necessary. The supported modes are 'reflect'
and 'constant'.
The default value is 'reflect'.
n_mels (int): the mel bins.
f_min (float): the lower cut-off frequency, below which the filter response is zero.
f_max (float): the upper cut-off frequency, above which the filter response is zeros.
htk (bool): whether to use HTK formula in computing fbank matrix.
norm (str|float): the normalization type in computing fbank matrix. Slaney-style is used by default.
You can specify norm=1.0/2.0 to use customized p-norm normalization.
ref_value (float): the reference value. If smaller than 1.0, the db level
amin (float): the minimum value of input magnitude, below which the input of the signal will be pulled up accordingly.
Otherwise, the db level is pushed down.
magnitude is clipped(to amin). For numerical stability, set amin to a larger value,
e.g., 1e-3.
top_db (float): the maximum db value of resulting spectrum, above which the
spectrum is clipped(to top_db).
dtype (str): the datatype of fbank matrix used in the transform. Use float64 to increase numerical
accuracy. Note that the final transform will be conducted in float32 regardless of dtype of fbank matrix.
super(MFCC, self).__init__()
assert n_mfcc <= n_mels, 'n_mfcc cannot be larger than n_mels: %d vs %d' % (
n_mfcc, n_mels)
self._log_melspectrogram = LogMelSpectrogram(
self.dct_matrix = create_dct(n_mfcc=n_mfcc, n_mels=n_mels, dtype=dtype)
self.register_buffer('dct_matrix', self.dct_matrix)
def forward(self, x):
log_mel_feature = self._log_melspectrogram(x)
mfcc = paddle.matmul(
log_mel_feature.transpose((0, 2, 1)), self.dct_matrix).transpose(
(0, 2, 1)) # (B, n_mels, L)
return mfcc

@ -0,0 +1,20 @@
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
from .functional import compute_fbank_matrix
from .functional import create_dct
from .functional import fft_frequencies
from .functional import hz_to_mel
from .functional import mel_frequencies
from .functional import mel_to_hz
from .functional import power_to_db

@ -0,0 +1,265 @@
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# Modified from librosa(
import math
from typing import Optional
from typing import Union
import paddle
__all__ = [
def hz_to_mel(freq: Union[paddle.Tensor, float],
htk: bool=False) -> Union[paddle.Tensor, float]:
"""Convert Hz to Mels.
freq: the input tensor of arbitrary shape, or a single floating point number.
htk: use HTK formula to do the conversion.
The default value is False.
The frequencies represented in Mel-scale.
if htk:
if isinstance(freq, paddle.Tensor):
return 2595.0 * paddle.log10(1.0 + freq / 700.0)
return 2595.0 * math.log10(1.0 + freq / 700.0)
# Fill in the linear part
f_min = 0.0
f_sp = 200.0 / 3
mels = (freq - f_min) / f_sp
# Fill in the log-scale part
min_log_hz = 1000.0 # beginning of log region (Hz)
min_log_mel = (min_log_hz - f_min) / f_sp # same (Mels)
logstep = math.log(6.4) / 27.0 # step size for log region
if isinstance(freq, paddle.Tensor):
target = min_log_mel + paddle.log(
freq / min_log_hz + 1e-10) / logstep # prevent nan with 1e-10
mask = (freq > min_log_hz).astype(freq.dtype)
mels = target * mask + mels * (
1 - mask) # will replace by masked_fill OP in future
if freq >= min_log_hz:
mels = min_log_mel + math.log(freq / min_log_hz + 1e-10) / logstep
return mels
def mel_to_hz(mel: Union[float, paddle.Tensor],
htk: bool=False) -> Union[float, paddle.Tensor]:
"""Convert mel bin numbers to frequencies.
mel: the mel frequency represented as a tensor of arbitrary shape, or a floating point number.
htk: use HTK formula to do the conversion.
The frequencies represented in hz.
if htk:
return 700.0 * (10.0**(mel / 2595.0) - 1.0)
f_min = 0.0
f_sp = 200.0 / 3
freqs = f_min + f_sp * mel
# And now the nonlinear scale
min_log_hz = 1000.0 # beginning of log region (Hz)
min_log_mel = (min_log_hz - f_min) / f_sp # same (Mels)
logstep = math.log(6.4) / 27.0 # step size for log region
if isinstance(mel, paddle.Tensor):
target = min_log_hz * paddle.exp(logstep * (mel - min_log_mel))
mask = (mel > min_log_mel).astype(mel.dtype)
freqs = target * mask + freqs * (
1 - mask) # will replace by masked_fill OP in future
if mel >= min_log_mel:
freqs = min_log_hz * math.exp(logstep * (mel - min_log_mel))
return freqs
def mel_frequencies(n_mels: int=64,
f_min: float=0.0,
f_max: float=11025.0,
htk: bool=False,
dtype: str=paddle.float32):
"""Compute mel frequencies.
n_mels(int): number of Mel bins.
f_min(float): the lower cut-off frequency, below which the filter response is zero.
f_max(float): the upper cut-off frequency, above which the filter response is zero.
htk(bool): whether to use htk formula.
dtype(str): the datatype of the return frequencies.
The frequencies represented in Mel-scale
# 'Center freqs' of mel bands - uniformly spaced between limits
min_mel = hz_to_mel(f_min, htk=htk)
max_mel = hz_to_mel(f_max, htk=htk)
mels = paddle.linspace(min_mel, max_mel, n_mels, dtype=dtype)
freqs = mel_to_hz(mels, htk=htk)
return freqs
def fft_frequencies(sr: int, n_fft: int, dtype: str=paddle.float32):
"""Compute fourier frequencies.
sr(int): the audio sample rate.
n_fft(float): the number of fft bins.
dtype(str): the datatype of the return frequencies.
The frequencies represented in hz.
return paddle.linspace(0, float(sr) / 2, int(1 + n_fft // 2), dtype=dtype)
def compute_fbank_matrix(sr: int,
n_fft: int,
n_mels: int=64,
f_min: float=0.0,
f_max: Optional[float]=None,
htk: bool=False,
norm: Union[str, float]='slaney',
dtype: str=paddle.float32):
"""Compute fbank matrix.
sr(int): the audio sample rate.
n_fft(int): the number of fft bins.
n_mels(int): the number of Mel bins.
f_min(float): the lower cut-off frequency, below which the filter response is zero.
f_max(float): the upper cut-off frequency, above which the filter response is zero.
htk: whether to use htk formula.
return_complex(bool): whether to return complex matrix. If True, the matrix will
be complex type. Otherwise, the real and image part will be stored in the last
axis of returned tensor.
dtype(str): the datatype of the returned fbank matrix.
The fbank matrix of shape (n_mels, int(1+n_fft//2)).
output: (n_mels, int(1+n_fft//2))
if f_max is None:
f_max = float(sr) / 2
# Initialize the weights
weights = paddle.zeros((n_mels, int(1 + n_fft // 2)), dtype=dtype)
# Center freqs of each FFT bin
fftfreqs = fft_frequencies(sr=sr, n_fft=n_fft, dtype=dtype)
# 'Center freqs' of mel bands - uniformly spaced between limits
mel_f = mel_frequencies(
n_mels + 2, f_min=f_min, f_max=f_max, htk=htk, dtype=dtype)
fdiff = mel_f[1:] - mel_f[:-1] #np.diff(mel_f)
ramps = mel_f.unsqueeze(1) - fftfreqs.unsqueeze(0)
#ramps = np.subtract.outer(mel_f, fftfreqs)
for i in range(n_mels):
# lower and upper slopes for all bins
lower = -ramps[i] / fdiff[i]
upper = ramps[i + 2] / fdiff[i + 1]
# .. then intersect them with each other and zero
weights[i] = paddle.maximum(
paddle.zeros_like(lower), paddle.minimum(lower, upper))
# Slaney-style mel is scaled to be approx constant energy per channel
if norm == 'slaney':
enorm = 2.0 / (mel_f[2:n_mels + 2] - mel_f[:n_mels])
weights *= enorm.unsqueeze(1)
elif isinstance(norm, int) or isinstance(norm, float):
weights = paddle.nn.functional.normalize(weights, p=norm, axis=-1)
return weights
def power_to_db(magnitude: paddle.Tensor,
ref_value: float=1.0,
amin: float=1e-10,
top_db: Optional[float]=None) -> paddle.Tensor:
"""Convert a power spectrogram (amplitude squared) to decibel (dB) units.
The function computes the scaling ``10 * log10(x / ref)`` in a numerically
stable way.
magnitude(Tensor): the input magnitude tensor of any shape.
ref_value(float): the reference value. If smaller than 1.0, the db level
of the signal will be pulled up accordingly. Otherwise, the db level
is pushed down.
amin(float): the minimum value of input magnitude, below which the input
magnitude is clipped(to amin).
top_db(float): the maximum db value of resulting spectrum, above which the
spectrum is clipped(to top_db).
The spectrogram in log-scale.
input: any shape
output: same as input
if amin <= 0:
raise Exception("amin must be strictly positive")
if ref_value <= 0:
raise Exception("ref_value must be strictly positive")
ones = paddle.ones_like(magnitude)
log_spec = 10.0 * paddle.log10(paddle.maximum(ones * amin, magnitude))
log_spec -= 10.0 * math.log10(max(ref_value, amin))
if top_db is not None:
if top_db < 0:
raise Exception("top_db must be non-negative")
log_spec = paddle.maximum(log_spec, ones * (log_spec.max() - top_db))
return log_spec
def create_dct(n_mfcc: int,
n_mels: int,
norm: Optional[str]='ortho',
dtype: Optional[str]=paddle.float32) -> paddle.Tensor:
"""Create a discrete cosine transform(DCT) matrix.
n_mfcc (int): Number of mel frequency cepstral coefficients.
n_mels (int): Number of mel filterbanks.
norm (str, optional): Normalizaiton type. Defaults to 'ortho'.
Tensor: The DCT matrix with shape (n_mels, n_mfcc).
n = paddle.arange(n_mels, dtype=dtype)
k = paddle.arange(n_mfcc, dtype=dtype).unsqueeze(1)
dct = paddle.cos(math.pi / float(n_mels) * (n + 0.5) *
k) # size (n_mfcc, n_mels)
if norm is None:
dct *= 2.0
assert norm == "ortho"
dct[0] *= 1.0 / math.sqrt(2.0)
dct *= math.sqrt(2.0 / float(n_mels))
return dct.T

@ -20,6 +20,19 @@ from paddle import Tensor
__all__ = [ __all__ = [
'get_window', 'get_window',
# windows
] ]
@ -73,6 +86,21 @@ def general_gaussian(M: int, p, sig, sym: bool=True,
return _truncate(w, needs_trunc) return _truncate(w, needs_trunc)
def general_cosine(M: int, a: float, sym: bool=True,
dtype: str='float64') -> Tensor:
"""Compute a generic weighted sum of cosine terms window.
This function is consistent with
if _len_guards(M):
return paddle.ones((M, ), dtype=dtype)
M, needs_trunc = _extend(M, sym)
fac = paddle.linspace(-math.pi, math.pi, M, dtype=dtype)
w = paddle.zeros((M, ), dtype=dtype)
for k in range(len(a)):
w += a[k] * paddle.cos(k * fac)
return _truncate(w, needs_trunc)
def general_hamming(M: int, alpha: float, sym: bool=True, def general_hamming(M: int, alpha: float, sym: bool=True,
dtype: str='float64') -> Tensor: dtype: str='float64') -> Tensor:
"""Compute a generalized Hamming window. """Compute a generalized Hamming window.
@ -143,21 +171,6 @@ def taylor(M: int,
return _truncate(w, needs_trunc) return _truncate(w, needs_trunc)
def general_cosine(M: int, a: float, sym: bool=True,
dtype: str='float64') -> Tensor:
"""Compute a generic weighted sum of cosine terms window.
This function is consistent with
if _len_guards(M):
return paddle.ones((M, ), dtype=dtype)
M, needs_trunc = _extend(M, sym)
fac = paddle.linspace(-math.pi, math.pi, M, dtype=dtype)
w = paddle.zeros((M, ), dtype=dtype)
for k in range(len(a)):
w += a[k] * paddle.cos(k * fac)
return _truncate(w, needs_trunc)
def hamming(M: int, sym: bool=True, dtype: str='float64') -> Tensor: def hamming(M: int, sym: bool=True, dtype: str='float64') -> Tensor:
"""Compute a Hamming window. """Compute a Hamming window.
The Hamming window is a taper formed by using a raised cosine with The Hamming window is a taper formed by using a raised cosine with
@ -375,6 +388,7 @@ def cosine(M: int, sym: bool=True, dtype: str='float64') -> Tensor:
return _truncate(w, needs_trunc) return _truncate(w, needs_trunc)
## factory function
def get_window(window: Union[str, Tuple[str, float]], def get_window(window: Union[str, Tuple[str, float]],
win_length: int, win_length: int,
fftbins: bool=True, fftbins: bool=True,

@ -11,4 +11,3 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from .audio import *

@ -1,6 +1,6 @@
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. # Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License" # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
# You may obtain a copy of the License at # You may obtain a copy of the License at
# #
@ -11,8 +11,5 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from .download import * from .dtw import dtw_distance
from .env import * from .mcd import mcd_distance
from .error import *
from .log import *
from .time import *

@ -0,0 +1,42 @@
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
from dtaidistance import dtw_ndim
__all__ = [
def dtw_distance(xs: np.ndarray, ys: np.ndarray) -> float:
"""dtw distance
Dynamic Time Warping.
This function keeps a compact matrix, not the full warping paths matrix.
Uses dynamic programming to compute:
wps[i, j] = (s1[i]-s2[j])**2 + min(
wps[i-1, j ] + penalty, // vertical / insertion / expansion
wps[i , j-1] + penalty, // horizontal / deletion / compression
wps[i-1, j-1]) // diagonal / match
dtw = sqrt(wps[-1, -1])
xs (np.ndarray): ref sequence, [T,D]
ys (np.ndarray): hyp sequence, [T,D]
float: dtw distance
return dtw_ndim.distance(xs, ys)

@ -0,0 +1,48 @@
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
import mcd.metrics_fast as mt
import numpy as np
from mcd import dtw
__all__ = [
def mcd_distance(xs: np.ndarray, ys: np.ndarray, cost_fn=mt.logSpecDbDist):
"""Mel cepstral distortion (MCD), dtw distance.
Dynamic Time Warping.
Uses dynamic programming to compute:
wps[i, j] = cost_fn(xs[i], ys[j]) + min(
wps[i-1, j ], // vertical / insertion / expansion
wps[i , j-1], // horizontal / deletion / compression
wps[i-1, j-1]) // diagonal / match
dtw = sqrt(wps[-1, -1])
Cost Function:
logSpecDbConst = 10.0 / math.log(10.0) * math.sqrt(2.0)
def logSpecDbDist(x, y):
diff = x - y
return logSpecDbConst * math.sqrt(np.inner(diff, diff))
xs (np.ndarray): ref sequence, [T,D]
ys (np.ndarray): hyp sequence, [T,D]
float: dtw distance
min_cost, path = dtw.dtw(xs, ys, cost_fn)
return min_cost

@ -0,0 +1,13 @@
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

@ -0,0 +1,25 @@
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
from .download import decompress
from .download import download_and_decompress
from .download import load_state_dict_from_url
from .env import DATA_HOME
from .env import MODEL_HOME
from .env import PPAUDIO_HOME
from .env import USER_HOME
from .error import ParameterError
from .log import Logger
from .log import logger
from .time import seconds_to_hms
from .time import Timer

@ -22,6 +22,12 @@ from .log import logger
download.logger = logger download.logger = logger
__all__ = [
def decompress(file: str): def decompress(file: str):
""" """

@ -20,6 +20,13 @@ PPAUDIO_HOME --> the root directory for storing PaddleAudio related data. D
''' '''
import os import os
__all__ = [
def _get_user_home(): def _get_user_home():
return os.path.expanduser('~') return os.path.expanduser('~')

@ -19,7 +19,10 @@ import time
import colorlog import colorlog
loggers = {} __all__ = [
log_config = { log_config = {
'DEBUG': { 'DEBUG': {

@ -14,6 +14,11 @@
import math import math
import time import time
__all__ = [
class Timer(object): class Timer(object):
'''Calculate runing speed and estimated time of arrival(ETA)''' '''Calculate runing speed and estimated time of arrival(ETA)'''

@ -14,7 +14,7 @@
import setuptools import setuptools
# set the version here # set the version here
VERSION = '0.1.0' VERSION = '0.2.0'
def write_version_py(filename='paddleaudio/'): def write_version_py(filename='paddleaudio/'):
@ -59,6 +59,8 @@ setuptools.setup(
'resampy >= 0.2.2', 'resampy >= 0.2.2',
'soundfile >= 0.9.0', 'soundfile >= 0.9.0',
'colorlog', 'colorlog',
'dtaidistance >= 2.3.6',
'mcd >= 0.4',
], ) ], )
remove_version_py() remove_version_py()