|
|
|
@ -1,11 +1,12 @@
|
|
|
|
|
import math
|
|
|
|
|
|
|
|
|
|
import paddle
|
|
|
|
|
import paddle.nn as nn
|
|
|
|
|
import paddle.nn.functional as F
|
|
|
|
|
from paddlespeech.s2t.models.wav2vec2.processing.signal_processing import (
|
|
|
|
|
compute_amplitude,
|
|
|
|
|
convolve1d,
|
|
|
|
|
notch_filter)
|
|
|
|
|
|
|
|
|
|
from paddlespeech.s2t.models.wav2vec2.processing.signal_processing import compute_amplitude
|
|
|
|
|
from paddlespeech.s2t.models.wav2vec2.processing.signal_processing import convolve1d
|
|
|
|
|
from paddlespeech.s2t.models.wav2vec2.processing.signal_processing import notch_filter
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SpeedPerturb(nn.Layer):
|
|
|
|
|
"""Slightly speed up or slow down an audio signal.
|
|
|
|
@ -36,8 +37,10 @@ class SpeedPerturb(nn.Layer):
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
|
self, orig_freq, speeds=[90, 100, 110], perturb_prob=1.0,
|
|
|
|
|
):
|
|
|
|
|
self,
|
|
|
|
|
orig_freq,
|
|
|
|
|
speeds=[90, 100, 110],
|
|
|
|
|
perturb_prob=1.0, ):
|
|
|
|
|
super().__init__()
|
|
|
|
|
self.orig_freq = orig_freq
|
|
|
|
|
self.speeds = speeds
|
|
|
|
@ -70,14 +73,15 @@ class SpeedPerturb(nn.Layer):
|
|
|
|
|
|
|
|
|
|
# Don't perturb (return early) 1-`perturb_prob` portion of the batches
|
|
|
|
|
if paddle.rand([1]) > self.perturb_prob:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return waveform.clone()
|
|
|
|
|
# Perform a random perturbation
|
|
|
|
|
self.samp_index = paddle.randint(len(self.speeds), shape=(1,))[0]
|
|
|
|
|
self.samp_index = paddle.randint(len(self.speeds), shape=(1, ))[0]
|
|
|
|
|
perturbed_waveform = self.resamplers[self.samp_index](waveform)
|
|
|
|
|
|
|
|
|
|
return perturbed_waveform
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Resample(nn.Layer):
|
|
|
|
|
"""This class resamples an audio signal using sinc-based interpolation.
|
|
|
|
|
|
|
|
|
@ -94,9 +98,12 @@ class Resample(nn.Layer):
|
|
|
|
|
Controls the sharpness of the filter, larger numbers result in a
|
|
|
|
|
sharper filter, but they are less efficient. Values from 4 to 10 are allowed.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
|
self, orig_freq=16000, new_freq=16000, lowpass_filter_width=6,
|
|
|
|
|
):
|
|
|
|
|
self,
|
|
|
|
|
orig_freq=16000,
|
|
|
|
|
new_freq=16000,
|
|
|
|
|
lowpass_filter_width=6, ):
|
|
|
|
|
super().__init__()
|
|
|
|
|
self.orig_freq = orig_freq
|
|
|
|
|
self.new_freq = new_freq
|
|
|
|
@ -193,8 +200,7 @@ class Resample(nn.Layer):
|
|
|
|
|
window_size = self.weights.shape[1]
|
|
|
|
|
tot_output_samp = self._output_samples(wave_len)
|
|
|
|
|
resampled_waveform = paddle.zeros(
|
|
|
|
|
(batch_size, num_channels, tot_output_samp)
|
|
|
|
|
)
|
|
|
|
|
(batch_size, num_channels, tot_output_samp))
|
|
|
|
|
# self.weights = self.weights.to(waveforms.device)
|
|
|
|
|
|
|
|
|
|
# Check weights are on correct device
|
|
|
|
@ -222,28 +228,25 @@ class Resample(nn.Layer):
|
|
|
|
|
right_padding = max(0, end_index + 1 - current_wave_len)
|
|
|
|
|
left_padding = max(0, -first_index)
|
|
|
|
|
wave_to_conv = paddle.nn.functional.pad(
|
|
|
|
|
wave_to_conv, (left_padding, right_padding), data_format='NCL'
|
|
|
|
|
)
|
|
|
|
|
wave_to_conv, (left_padding, right_padding), data_format='NCL')
|
|
|
|
|
conv_wave = paddle.nn.functional.conv1d(
|
|
|
|
|
x=wave_to_conv,
|
|
|
|
|
weight=self.weights[i].repeat(num_channels, 1, 1),
|
|
|
|
|
stride=self.conv_stride,
|
|
|
|
|
groups=num_channels,
|
|
|
|
|
)
|
|
|
|
|
groups=num_channels, )
|
|
|
|
|
|
|
|
|
|
# we want conv_wave[:, i] to be at
|
|
|
|
|
# output[:, i + n*conv_transpose_stride]
|
|
|
|
|
dilated_conv_wave = paddle.nn.functional.conv1d_transpose(
|
|
|
|
|
conv_wave, eye, stride=self.conv_transpose_stride
|
|
|
|
|
)
|
|
|
|
|
conv_wave, eye, stride=self.conv_transpose_stride)
|
|
|
|
|
|
|
|
|
|
# pad dilated_conv_wave so it reaches the output length if needed.
|
|
|
|
|
left_padding = i
|
|
|
|
|
previous_padding = left_padding + dilated_conv_wave.shape[-1]
|
|
|
|
|
right_padding = max(0, tot_output_samp - previous_padding)
|
|
|
|
|
dilated_conv_wave = paddle.nn.functional.pad(
|
|
|
|
|
dilated_conv_wave, (left_padding, right_padding), data_format='NCL'
|
|
|
|
|
)
|
|
|
|
|
dilated_conv_wave, (left_padding, right_padding),
|
|
|
|
|
data_format='NCL')
|
|
|
|
|
dilated_conv_wave = dilated_conv_wave[..., :tot_output_samp]
|
|
|
|
|
|
|
|
|
|
resampled_waveform += dilated_conv_wave
|
|
|
|
@ -326,9 +329,7 @@ class Resample(nn.Layer):
|
|
|
|
|
window_width = self.lowpass_filter_width / (2.0 * lowpass_cutoff)
|
|
|
|
|
|
|
|
|
|
assert lowpass_cutoff < min(self.orig_freq, self.new_freq) / 2
|
|
|
|
|
output_t = paddle.arange(
|
|
|
|
|
start=0.0, end=self.output_samples
|
|
|
|
|
)
|
|
|
|
|
output_t = paddle.arange(start=0.0, end=self.output_samples)
|
|
|
|
|
output_t /= self.new_freq
|
|
|
|
|
min_t = output_t - window_width
|
|
|
|
|
max_t = output_t + window_width
|
|
|
|
@ -346,23 +347,16 @@ class Resample(nn.Layer):
|
|
|
|
|
|
|
|
|
|
inside_window_indices = delta_t.abs() < (window_width)
|
|
|
|
|
# raised-cosine (Hanning) window with width `window_width`
|
|
|
|
|
weights[inside_window_indices] = 0.5 * (
|
|
|
|
|
1
|
|
|
|
|
+ paddle.cos(
|
|
|
|
|
2
|
|
|
|
|
* math.pi
|
|
|
|
|
* lowpass_cutoff
|
|
|
|
|
/ self.lowpass_filter_width
|
|
|
|
|
* delta_t[inside_window_indices]
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
weights[inside_window_indices] = 0.5 * (1 + paddle.cos(
|
|
|
|
|
2 * math.pi * lowpass_cutoff / self.lowpass_filter_width *
|
|
|
|
|
delta_t[inside_window_indices]))
|
|
|
|
|
t_eq_zero_indices = delta_t == 0.0
|
|
|
|
|
t_not_eq_zero_indices = ~t_eq_zero_indices
|
|
|
|
|
|
|
|
|
|
# sinc filter function
|
|
|
|
|
weights[t_not_eq_zero_indices] *= paddle.sin(
|
|
|
|
|
2 * math.pi * lowpass_cutoff * delta_t[t_not_eq_zero_indices]
|
|
|
|
|
) / (math.pi * delta_t[t_not_eq_zero_indices])
|
|
|
|
|
2 * math.pi * lowpass_cutoff * delta_t[t_not_eq_zero_indices]) / (
|
|
|
|
|
math.pi * delta_t[t_not_eq_zero_indices])
|
|
|
|
|
|
|
|
|
|
# limit of the function at t = 0
|
|
|
|
|
weights[t_eq_zero_indices] *= 2 * lowpass_cutoff
|
|
|
|
@ -405,14 +399,13 @@ class DropFreq(nn.Layer):
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
drop_freq_low=1e-14,
|
|
|
|
|
drop_freq_high=1,
|
|
|
|
|
drop_count_low=1,
|
|
|
|
|
drop_count_high=2,
|
|
|
|
|
drop_width=0.05,
|
|
|
|
|
drop_prob=1,
|
|
|
|
|
):
|
|
|
|
|
self,
|
|
|
|
|
drop_freq_low=1e-14,
|
|
|
|
|
drop_freq_high=1,
|
|
|
|
|
drop_count_low=1,
|
|
|
|
|
drop_count_high=2,
|
|
|
|
|
drop_width=0.05,
|
|
|
|
|
drop_prob=1, ):
|
|
|
|
|
super().__init__()
|
|
|
|
|
self.drop_freq_low = drop_freq_low
|
|
|
|
|
self.drop_freq_high = drop_freq_high
|
|
|
|
@ -443,14 +436,14 @@ class DropFreq(nn.Layer):
|
|
|
|
|
|
|
|
|
|
# Pick number of frequencies to drop
|
|
|
|
|
drop_count = paddle.randint(
|
|
|
|
|
low=self.drop_count_low, high=self.drop_count_high + 1, shape=(1,),
|
|
|
|
|
)
|
|
|
|
|
low=self.drop_count_low,
|
|
|
|
|
high=self.drop_count_high + 1,
|
|
|
|
|
shape=(1, ), )
|
|
|
|
|
|
|
|
|
|
# Pick a frequency to drop
|
|
|
|
|
drop_range = self.drop_freq_high - self.drop_freq_low
|
|
|
|
|
drop_frequency = (
|
|
|
|
|
paddle.rand(drop_count) * drop_range + self.drop_freq_low
|
|
|
|
|
)
|
|
|
|
|
paddle.rand(drop_count) * drop_range + self.drop_freq_low)
|
|
|
|
|
# Filter parameters
|
|
|
|
|
filter_length = 101
|
|
|
|
|
pad = filter_length // 2
|
|
|
|
@ -461,8 +454,9 @@ class DropFreq(nn.Layer):
|
|
|
|
|
# Subtract each frequency
|
|
|
|
|
for frequency in drop_frequency:
|
|
|
|
|
notch_kernel = notch_filter(
|
|
|
|
|
frequency, filter_length, self.drop_width,
|
|
|
|
|
)
|
|
|
|
|
frequency,
|
|
|
|
|
filter_length,
|
|
|
|
|
self.drop_width, )
|
|
|
|
|
drop_filter = convolve1d(drop_filter, notch_kernel, pad)
|
|
|
|
|
|
|
|
|
|
# Apply filter
|
|
|
|
@ -471,6 +465,7 @@ class DropFreq(nn.Layer):
|
|
|
|
|
# Remove channels dimension if added
|
|
|
|
|
return dropped_waveform.squeeze(-1)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DropChunk(nn.Layer):
|
|
|
|
|
"""This class drops portions of the input signal.
|
|
|
|
|
Using `DropChunk` as an augmentation strategy helps a models learn to rely
|
|
|
|
@ -515,16 +510,15 @@ class DropChunk(nn.Layer):
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
drop_length_low=100,
|
|
|
|
|
drop_length_high=1000,
|
|
|
|
|
drop_count_low=1,
|
|
|
|
|
drop_count_high=10,
|
|
|
|
|
drop_start=0,
|
|
|
|
|
drop_end=None,
|
|
|
|
|
drop_prob=1,
|
|
|
|
|
noise_factor=0.0,
|
|
|
|
|
):
|
|
|
|
|
self,
|
|
|
|
|
drop_length_low=100,
|
|
|
|
|
drop_length_high=1000,
|
|
|
|
|
drop_count_low=1,
|
|
|
|
|
drop_count_high=10,
|
|
|
|
|
drop_start=0,
|
|
|
|
|
drop_end=None,
|
|
|
|
|
drop_prob=1,
|
|
|
|
|
noise_factor=0.0, ):
|
|
|
|
|
super().__init__()
|
|
|
|
|
self.drop_length_low = drop_length_low
|
|
|
|
|
self.drop_length_high = drop_length_high
|
|
|
|
@ -580,8 +574,7 @@ class DropChunk(nn.Layer):
|
|
|
|
|
drop_times = paddle.randint(
|
|
|
|
|
low=self.drop_count_low,
|
|
|
|
|
high=self.drop_count_high + 1,
|
|
|
|
|
shape=(batch_size,),
|
|
|
|
|
)
|
|
|
|
|
shape=(batch_size, ), )
|
|
|
|
|
|
|
|
|
|
# Iterate batch to set mask
|
|
|
|
|
for i in range(batch_size):
|
|
|
|
@ -592,8 +585,7 @@ class DropChunk(nn.Layer):
|
|
|
|
|
length = paddle.randint(
|
|
|
|
|
low=self.drop_length_low,
|
|
|
|
|
high=self.drop_length_high + 1,
|
|
|
|
|
shape=(drop_times[i],),
|
|
|
|
|
)
|
|
|
|
|
shape=(drop_times[i], ), )
|
|
|
|
|
|
|
|
|
|
# Compute range of starting locations
|
|
|
|
|
start_min = self.drop_start
|
|
|
|
@ -608,15 +600,16 @@ class DropChunk(nn.Layer):
|
|
|
|
|
|
|
|
|
|
# Pick starting locations
|
|
|
|
|
start = paddle.randint(
|
|
|
|
|
low=start_min, high=start_max + 1, shape=(drop_times[i],),
|
|
|
|
|
)
|
|
|
|
|
low=start_min,
|
|
|
|
|
high=start_max + 1,
|
|
|
|
|
shape=(drop_times[i], ), )
|
|
|
|
|
|
|
|
|
|
end = start + length
|
|
|
|
|
|
|
|
|
|
# Update waveform
|
|
|
|
|
if not self.noise_factor:
|
|
|
|
|
for j in range(drop_times[i]):
|
|
|
|
|
dropped_waveform[i, start[j] : end[j]] = 0.0
|
|
|
|
|
dropped_waveform[i, start[j]:end[j]] = 0.0
|
|
|
|
|
else:
|
|
|
|
|
# Uniform distribution of -2 to +2 * avg amplitude should
|
|
|
|
|
# preserve the average for normalization
|
|
|
|
@ -625,7 +618,7 @@ class DropChunk(nn.Layer):
|
|
|
|
|
# zero-center the noise distribution
|
|
|
|
|
noise_vec = paddle.rand([length[j]])
|
|
|
|
|
noise_vec = 2 * noise_max * noise_vec - noise_max
|
|
|
|
|
dropped_waveform[i, start[j] : end[j]] = noise_vec
|
|
|
|
|
dropped_waveform[i, start[j]:end[j]] = noise_vec
|
|
|
|
|
|
|
|
|
|
return dropped_waveform
|
|
|
|
|
|
|
|
|
@ -679,37 +672,33 @@ class TimeDomainSpecAugment(nn.Layer):
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
perturb_prob=1.0,
|
|
|
|
|
drop_freq_prob=1.0,
|
|
|
|
|
drop_chunk_prob=1.0,
|
|
|
|
|
speeds=[95, 100, 105],
|
|
|
|
|
sample_rate=16000,
|
|
|
|
|
drop_freq_count_low=0,
|
|
|
|
|
drop_freq_count_high=3,
|
|
|
|
|
drop_chunk_count_low=0,
|
|
|
|
|
drop_chunk_count_high=5,
|
|
|
|
|
drop_chunk_length_low=1000,
|
|
|
|
|
drop_chunk_length_high=2000,
|
|
|
|
|
drop_chunk_noise_factor=0,
|
|
|
|
|
):
|
|
|
|
|
self,
|
|
|
|
|
perturb_prob=1.0,
|
|
|
|
|
drop_freq_prob=1.0,
|
|
|
|
|
drop_chunk_prob=1.0,
|
|
|
|
|
speeds=[95, 100, 105],
|
|
|
|
|
sample_rate=16000,
|
|
|
|
|
drop_freq_count_low=0,
|
|
|
|
|
drop_freq_count_high=3,
|
|
|
|
|
drop_chunk_count_low=0,
|
|
|
|
|
drop_chunk_count_high=5,
|
|
|
|
|
drop_chunk_length_low=1000,
|
|
|
|
|
drop_chunk_length_high=2000,
|
|
|
|
|
drop_chunk_noise_factor=0, ):
|
|
|
|
|
super().__init__()
|
|
|
|
|
self.speed_perturb = SpeedPerturb(
|
|
|
|
|
perturb_prob=perturb_prob, orig_freq=sample_rate, speeds=speeds
|
|
|
|
|
)
|
|
|
|
|
perturb_prob=perturb_prob, orig_freq=sample_rate, speeds=speeds)
|
|
|
|
|
self.drop_freq = DropFreq(
|
|
|
|
|
drop_prob=drop_freq_prob,
|
|
|
|
|
drop_count_low=drop_freq_count_low,
|
|
|
|
|
drop_count_high=drop_freq_count_high,
|
|
|
|
|
)
|
|
|
|
|
drop_count_high=drop_freq_count_high, )
|
|
|
|
|
self.drop_chunk = DropChunk(
|
|
|
|
|
drop_prob=drop_chunk_prob,
|
|
|
|
|
drop_count_low=drop_chunk_count_low,
|
|
|
|
|
drop_count_high=drop_chunk_count_high,
|
|
|
|
|
drop_length_low=drop_chunk_length_low,
|
|
|
|
|
drop_length_high=drop_chunk_length_high,
|
|
|
|
|
noise_factor=drop_chunk_noise_factor,
|
|
|
|
|
)
|
|
|
|
|
noise_factor=drop_chunk_noise_factor, )
|
|
|
|
|
|
|
|
|
|
def forward(self, waveforms, lengths):
|
|
|
|
|
"""Returns the distorted waveforms.
|
|
|
|
@ -724,4 +713,4 @@ class TimeDomainSpecAugment(nn.Layer):
|
|
|
|
|
waveforms = self.speed_perturb(waveforms)
|
|
|
|
|
waveforms = self.drop_freq(waveforms)
|
|
|
|
|
waveforms = self.drop_chunk(waveforms, lengths)
|
|
|
|
|
return waveforms
|
|
|
|
|
return waveforms
|
|
|
|
|