parent
bc0dd51149
commit
469329221b
@ -1,125 +0,0 @@
|
|||||||
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
# Modified from espnet(https://github.com/espnet/espnet)
|
|
||||||
"""Adversarial loss modules."""
|
|
||||||
import paddle
|
|
||||||
import paddle.nn.functional as F
|
|
||||||
from paddle import nn
|
|
||||||
|
|
||||||
|
|
||||||
class GeneratorAdversarialLoss(nn.Layer):
|
|
||||||
"""Generator adversarial loss module."""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
average_by_discriminators=True,
|
|
||||||
loss_type="mse", ):
|
|
||||||
"""Initialize GeneratorAversarialLoss module."""
|
|
||||||
super().__init__()
|
|
||||||
self.average_by_discriminators = average_by_discriminators
|
|
||||||
assert loss_type in ["mse", "hinge"], f"{loss_type} is not supported."
|
|
||||||
if loss_type == "mse":
|
|
||||||
self.criterion = self._mse_loss
|
|
||||||
else:
|
|
||||||
self.criterion = self._hinge_loss
|
|
||||||
|
|
||||||
def forward(self, outputs):
|
|
||||||
"""Calcualate generator adversarial loss.
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
outputs: Tensor or List
|
|
||||||
Discriminator outputs or list of discriminator outputs.
|
|
||||||
Returns
|
|
||||||
----------
|
|
||||||
Tensor
|
|
||||||
Generator adversarial loss value.
|
|
||||||
"""
|
|
||||||
if isinstance(outputs, (tuple, list)):
|
|
||||||
adv_loss = 0.0
|
|
||||||
for i, outputs_ in enumerate(outputs):
|
|
||||||
if isinstance(outputs_, (tuple, list)):
|
|
||||||
# case including feature maps
|
|
||||||
outputs_ = outputs_[-1]
|
|
||||||
adv_loss += self.criterion(outputs_)
|
|
||||||
if self.average_by_discriminators:
|
|
||||||
adv_loss /= i + 1
|
|
||||||
else:
|
|
||||||
adv_loss = self.criterion(outputs)
|
|
||||||
|
|
||||||
return adv_loss
|
|
||||||
|
|
||||||
def _mse_loss(self, x):
|
|
||||||
return F.mse_loss(x, paddle.ones_like(x))
|
|
||||||
|
|
||||||
def _hinge_loss(self, x):
|
|
||||||
return -x.mean()
|
|
||||||
|
|
||||||
|
|
||||||
class DiscriminatorAdversarialLoss(nn.Layer):
|
|
||||||
"""Discriminator adversarial loss module."""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
average_by_discriminators=True,
|
|
||||||
loss_type="mse", ):
|
|
||||||
"""Initialize DiscriminatorAversarialLoss module."""
|
|
||||||
super().__init__()
|
|
||||||
self.average_by_discriminators = average_by_discriminators
|
|
||||||
assert loss_type in ["mse"], f"{loss_type} is not supported."
|
|
||||||
if loss_type == "mse":
|
|
||||||
self.fake_criterion = self._mse_fake_loss
|
|
||||||
self.real_criterion = self._mse_real_loss
|
|
||||||
|
|
||||||
def forward(self, outputs_hat, outputs):
|
|
||||||
"""Calcualate discriminator adversarial loss.
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
outputs_hat : Tensor or list
|
|
||||||
Discriminator outputs or list of
|
|
||||||
discriminator outputs calculated from generator outputs.
|
|
||||||
outputs : Tensor or list
|
|
||||||
Discriminator outputs or list of
|
|
||||||
discriminator outputs calculated from groundtruth.
|
|
||||||
Returns
|
|
||||||
----------
|
|
||||||
Tensor
|
|
||||||
Discriminator real loss value.
|
|
||||||
Tensor
|
|
||||||
Discriminator fake loss value.
|
|
||||||
"""
|
|
||||||
if isinstance(outputs, (tuple, list)):
|
|
||||||
real_loss = 0.0
|
|
||||||
fake_loss = 0.0
|
|
||||||
for i, (outputs_hat_,
|
|
||||||
outputs_) in enumerate(zip(outputs_hat, outputs)):
|
|
||||||
if isinstance(outputs_hat_, (tuple, list)):
|
|
||||||
# case including feature maps
|
|
||||||
outputs_hat_ = outputs_hat_[-1]
|
|
||||||
outputs_ = outputs_[-1]
|
|
||||||
real_loss += self.real_criterion(outputs_)
|
|
||||||
fake_loss += self.fake_criterion(outputs_hat_)
|
|
||||||
if self.average_by_discriminators:
|
|
||||||
fake_loss /= i + 1
|
|
||||||
real_loss /= i + 1
|
|
||||||
else:
|
|
||||||
real_loss = self.real_criterion(outputs)
|
|
||||||
fake_loss = self.fake_criterion(outputs_hat)
|
|
||||||
|
|
||||||
return real_loss, fake_loss
|
|
||||||
|
|
||||||
def _mse_real_loss(self, x):
|
|
||||||
return F.mse_loss(x, paddle.ones_like(x))
|
|
||||||
|
|
||||||
def _mse_fake_loss(self, x):
|
|
||||||
return F.mse_loss(x, paddle.zeros_like(x))
|
|
@ -1,348 +0,0 @@
|
|||||||
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
import math
|
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
import paddle
|
|
||||||
from paddle import nn
|
|
||||||
from paddle.nn import functional as F
|
|
||||||
|
|
||||||
|
|
||||||
def scaled_dot_product_attention(q, k, v, mask=None, dropout=0.0,
|
|
||||||
training=True):
|
|
||||||
r"""Scaled dot product attention with masking.
|
|
||||||
|
|
||||||
Assume that q, k, v all have the same leading dimensions (denoted as * in
|
|
||||||
descriptions below). Dropout is applied to attention weights before
|
|
||||||
weighted sum of values.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
-----------
|
|
||||||
q : Tensor [shape=(\*, T_q, d)]
|
|
||||||
the query tensor.
|
|
||||||
k : Tensor [shape=(\*, T_k, d)]
|
|
||||||
the key tensor.
|
|
||||||
v : Tensor [shape=(\*, T_k, d_v)]
|
|
||||||
the value tensor.
|
|
||||||
mask : Tensor, [shape=(\*, T_q, T_k) or broadcastable shape], optional
|
|
||||||
the mask tensor, zeros correspond to paddings. Defaults to None.
|
|
||||||
|
|
||||||
Returns
|
|
||||||
----------
|
|
||||||
out : Tensor [shape=(\*, T_q, d_v)]
|
|
||||||
the context vector.
|
|
||||||
attn_weights : Tensor [shape=(\*, T_q, T_k)]
|
|
||||||
the attention weights.
|
|
||||||
"""
|
|
||||||
d = q.shape[-1] # we only support imperative execution
|
|
||||||
qk = paddle.matmul(q, k, transpose_y=True)
|
|
||||||
scaled_logit = paddle.scale(qk, 1.0 / math.sqrt(d))
|
|
||||||
|
|
||||||
if mask is not None:
|
|
||||||
scaled_logit += paddle.scale((1.0 - mask), -1e9) # hard coded here
|
|
||||||
|
|
||||||
attn_weights = F.softmax(scaled_logit, axis=-1)
|
|
||||||
attn_weights = F.dropout(attn_weights, dropout, training=training)
|
|
||||||
out = paddle.matmul(attn_weights, v)
|
|
||||||
return out, attn_weights
|
|
||||||
|
|
||||||
|
|
||||||
def drop_head(x, drop_n_heads, training=True):
|
|
||||||
"""Drop n context vectors from multiple ones.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
x : Tensor [shape=(batch_size, num_heads, time_steps, channels)]
|
|
||||||
The input, multiple context vectors.
|
|
||||||
drop_n_heads : int [0<= drop_n_heads <= num_heads]
|
|
||||||
Number of vectors to drop.
|
|
||||||
training : bool
|
|
||||||
A flag indicating whether it is in training. If `False`, no dropout is
|
|
||||||
applied.
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
Tensor
|
|
||||||
The output.
|
|
||||||
"""
|
|
||||||
if not training or (drop_n_heads == 0):
|
|
||||||
return x
|
|
||||||
|
|
||||||
batch_size, num_heads, _, _ = x.shape
|
|
||||||
# drop all heads
|
|
||||||
if num_heads == drop_n_heads:
|
|
||||||
return paddle.zeros_like(x)
|
|
||||||
|
|
||||||
mask = np.ones([batch_size, num_heads])
|
|
||||||
mask[:, :drop_n_heads] = 0
|
|
||||||
for subarray in mask:
|
|
||||||
np.random.shuffle(subarray)
|
|
||||||
scale = float(num_heads) / (num_heads - drop_n_heads)
|
|
||||||
mask = scale * np.reshape(mask, [batch_size, num_heads, 1, 1])
|
|
||||||
out = x * paddle.to_tensor(mask)
|
|
||||||
return out
|
|
||||||
|
|
||||||
|
|
||||||
def _split_heads(x, num_heads):
|
|
||||||
batch_size, time_steps, _ = x.shape
|
|
||||||
x = paddle.reshape(x, [batch_size, time_steps, num_heads, -1])
|
|
||||||
x = paddle.transpose(x, [0, 2, 1, 3])
|
|
||||||
return x
|
|
||||||
|
|
||||||
|
|
||||||
def _concat_heads(x):
|
|
||||||
batch_size, _, time_steps, _ = x.shape
|
|
||||||
x = paddle.transpose(x, [0, 2, 1, 3])
|
|
||||||
x = paddle.reshape(x, [batch_size, time_steps, -1])
|
|
||||||
return x
|
|
||||||
|
|
||||||
|
|
||||||
# Standard implementations of Monohead Attention & Multihead Attention
|
|
||||||
class MonoheadAttention(nn.Layer):
|
|
||||||
"""Monohead Attention module.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
model_dim : int
|
|
||||||
Feature size of the query.
|
|
||||||
dropout : float, optional
|
|
||||||
Dropout probability of scaled dot product attention and final context
|
|
||||||
vector. Defaults to 0.0.
|
|
||||||
k_dim : int, optional
|
|
||||||
Feature size of the key of each scaled dot product attention. If not
|
|
||||||
provided, it is set to `model_dim / num_heads`. Defaults to None.
|
|
||||||
v_dim : int, optional
|
|
||||||
Feature size of the key of each scaled dot product attention. If not
|
|
||||||
provided, it is set to `model_dim / num_heads`. Defaults to None.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self,
|
|
||||||
model_dim: int,
|
|
||||||
dropout: float=0.0,
|
|
||||||
k_dim: int=None,
|
|
||||||
v_dim: int=None):
|
|
||||||
super(MonoheadAttention, self).__init__()
|
|
||||||
k_dim = k_dim or model_dim
|
|
||||||
v_dim = v_dim or model_dim
|
|
||||||
self.affine_q = nn.Linear(model_dim, k_dim)
|
|
||||||
self.affine_k = nn.Linear(model_dim, k_dim)
|
|
||||||
self.affine_v = nn.Linear(model_dim, v_dim)
|
|
||||||
self.affine_o = nn.Linear(v_dim, model_dim)
|
|
||||||
|
|
||||||
self.model_dim = model_dim
|
|
||||||
self.dropout = dropout
|
|
||||||
|
|
||||||
def forward(self, q, k, v, mask):
|
|
||||||
"""Compute context vector and attention weights.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
-----------
|
|
||||||
q : Tensor [shape=(batch_size, time_steps_q, model_dim)]
|
|
||||||
The queries.
|
|
||||||
k : Tensor [shape=(batch_size, time_steps_k, model_dim)]
|
|
||||||
The keys.
|
|
||||||
v : Tensor [shape=(batch_size, time_steps_k, model_dim)]
|
|
||||||
The values.
|
|
||||||
mask : Tensor [shape=(batch_size, times_steps_q, time_steps_k] or broadcastable shape
|
|
||||||
The mask.
|
|
||||||
|
|
||||||
Returns
|
|
||||||
----------
|
|
||||||
out : Tensor [shape=(batch_size, time_steps_q, model_dim)]
|
|
||||||
The context vector.
|
|
||||||
attention_weights : Tensor [shape=(batch_size, times_steps_q, time_steps_k)]
|
|
||||||
The attention weights.
|
|
||||||
"""
|
|
||||||
q = self.affine_q(q) # (B, T, C)
|
|
||||||
k = self.affine_k(k)
|
|
||||||
v = self.affine_v(v)
|
|
||||||
|
|
||||||
context_vectors, attention_weights = scaled_dot_product_attention(
|
|
||||||
q, k, v, mask, self.dropout, self.training)
|
|
||||||
|
|
||||||
out = self.affine_o(context_vectors)
|
|
||||||
return out, attention_weights
|
|
||||||
|
|
||||||
|
|
||||||
class MultiheadAttention(nn.Layer):
|
|
||||||
"""Multihead Attention module.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
-----------
|
|
||||||
model_dim: int
|
|
||||||
The feature size of query.
|
|
||||||
num_heads : int
|
|
||||||
The number of attention heads.
|
|
||||||
dropout : float, optional
|
|
||||||
Dropout probability of scaled dot product attention and final context
|
|
||||||
vector. Defaults to 0.0.
|
|
||||||
k_dim : int, optional
|
|
||||||
Feature size of the key of each scaled dot product attention. If not
|
|
||||||
provided, it is set to ``model_dim / num_heads``. Defaults to None.
|
|
||||||
v_dim : int, optional
|
|
||||||
Feature size of the key of each scaled dot product attention. If not
|
|
||||||
provided, it is set to ``model_dim / num_heads``. Defaults to None.
|
|
||||||
|
|
||||||
Raises
|
|
||||||
---------
|
|
||||||
ValueError
|
|
||||||
If ``model_dim`` is not divisible by ``num_heads``.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self,
|
|
||||||
model_dim: int,
|
|
||||||
num_heads: int,
|
|
||||||
dropout: float=0.0,
|
|
||||||
k_dim: int=None,
|
|
||||||
v_dim: int=None):
|
|
||||||
super(MultiheadAttention, self).__init__()
|
|
||||||
if model_dim % num_heads != 0:
|
|
||||||
raise ValueError("model_dim must be divisible by num_heads")
|
|
||||||
depth = model_dim // num_heads
|
|
||||||
k_dim = k_dim or depth
|
|
||||||
v_dim = v_dim or depth
|
|
||||||
self.affine_q = nn.Linear(model_dim, num_heads * k_dim)
|
|
||||||
self.affine_k = nn.Linear(model_dim, num_heads * k_dim)
|
|
||||||
self.affine_v = nn.Linear(model_dim, num_heads * v_dim)
|
|
||||||
self.affine_o = nn.Linear(num_heads * v_dim, model_dim)
|
|
||||||
|
|
||||||
self.num_heads = num_heads
|
|
||||||
self.model_dim = model_dim
|
|
||||||
self.dropout = dropout
|
|
||||||
|
|
||||||
def forward(self, q, k, v, mask):
|
|
||||||
"""Compute context vector and attention weights.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
-----------
|
|
||||||
q : Tensor [shape=(batch_size, time_steps_q, model_dim)]
|
|
||||||
The queries.
|
|
||||||
k : Tensor [shape=(batch_size, time_steps_k, model_dim)]
|
|
||||||
The keys.
|
|
||||||
v : Tensor [shape=(batch_size, time_steps_k, model_dim)]
|
|
||||||
The values.
|
|
||||||
mask : Tensor [shape=(batch_size, times_steps_q, time_steps_k] or broadcastable shape
|
|
||||||
The mask.
|
|
||||||
|
|
||||||
Returns
|
|
||||||
----------
|
|
||||||
out : Tensor [shape=(batch_size, time_steps_q, model_dim)]
|
|
||||||
The context vector.
|
|
||||||
attention_weights : Tensor [shape=(batch_size, times_steps_q, time_steps_k)]
|
|
||||||
The attention weights.
|
|
||||||
"""
|
|
||||||
q = _split_heads(self.affine_q(q), self.num_heads) # (B, h, T, C)
|
|
||||||
k = _split_heads(self.affine_k(k), self.num_heads)
|
|
||||||
v = _split_heads(self.affine_v(v), self.num_heads)
|
|
||||||
mask = paddle.unsqueeze(mask, 1) # unsqueeze for the h dim
|
|
||||||
|
|
||||||
context_vectors, attention_weights = scaled_dot_product_attention(
|
|
||||||
q, k, v, mask, self.dropout, self.training)
|
|
||||||
# NOTE: there is more sophisticated implementation: Scheduled DropHead
|
|
||||||
context_vectors = _concat_heads(context_vectors) # (B, T, h*C)
|
|
||||||
out = self.affine_o(context_vectors)
|
|
||||||
return out, attention_weights
|
|
||||||
|
|
||||||
|
|
||||||
class LocationSensitiveAttention(nn.Layer):
|
|
||||||
"""Location Sensitive Attention module.
|
|
||||||
|
|
||||||
Reference: `Attention-Based Models for Speech Recognition <https://arxiv.org/pdf/1506.07503.pdf>`_
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
-----------
|
|
||||||
d_query: int
|
|
||||||
The feature size of query.
|
|
||||||
d_key : int
|
|
||||||
The feature size of key.
|
|
||||||
d_attention : int
|
|
||||||
The feature size of dimension.
|
|
||||||
location_filters : int
|
|
||||||
Filter size of attention convolution.
|
|
||||||
location_kernel_size : int
|
|
||||||
Kernel size of attention convolution.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self,
|
|
||||||
d_query: int,
|
|
||||||
d_key: int,
|
|
||||||
d_attention: int,
|
|
||||||
location_filters: int,
|
|
||||||
location_kernel_size: int):
|
|
||||||
super().__init__()
|
|
||||||
|
|
||||||
self.query_layer = nn.Linear(d_query, d_attention, bias_attr=False)
|
|
||||||
self.key_layer = nn.Linear(d_key, d_attention, bias_attr=False)
|
|
||||||
self.value = nn.Linear(d_attention, 1, bias_attr=False)
|
|
||||||
|
|
||||||
# Location Layer
|
|
||||||
self.location_conv = nn.Conv1D(
|
|
||||||
2,
|
|
||||||
location_filters,
|
|
||||||
kernel_size=location_kernel_size,
|
|
||||||
padding=int((location_kernel_size - 1) / 2),
|
|
||||||
bias_attr=False,
|
|
||||||
data_format='NLC')
|
|
||||||
self.location_layer = nn.Linear(
|
|
||||||
location_filters, d_attention, bias_attr=False)
|
|
||||||
|
|
||||||
def forward(self,
|
|
||||||
query,
|
|
||||||
processed_key,
|
|
||||||
value,
|
|
||||||
attention_weights_cat,
|
|
||||||
mask=None):
|
|
||||||
"""Compute context vector and attention weights.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
-----------
|
|
||||||
query : Tensor [shape=(batch_size, d_query)]
|
|
||||||
The queries.
|
|
||||||
processed_key : Tensor [shape=(batch_size, time_steps_k, d_attention)]
|
|
||||||
The keys after linear layer.
|
|
||||||
value : Tensor [shape=(batch_size, time_steps_k, d_key)]
|
|
||||||
The values.
|
|
||||||
attention_weights_cat : Tensor [shape=(batch_size, time_step_k, 2)]
|
|
||||||
Attention weights concat.
|
|
||||||
mask : Tensor, optional
|
|
||||||
The mask. Shape should be (batch_size, times_steps_k, 1).
|
|
||||||
Defaults to None.
|
|
||||||
|
|
||||||
Returns
|
|
||||||
----------
|
|
||||||
attention_context : Tensor [shape=(batch_size, d_attention)]
|
|
||||||
The context vector.
|
|
||||||
attention_weights : Tensor [shape=(batch_size, time_steps_k)]
|
|
||||||
The attention weights.
|
|
||||||
"""
|
|
||||||
|
|
||||||
processed_query = self.query_layer(paddle.unsqueeze(query, axis=[1]))
|
|
||||||
processed_attention_weights = self.location_layer(
|
|
||||||
self.location_conv(attention_weights_cat))
|
|
||||||
# (B, T_enc, 1)
|
|
||||||
alignment = self.value(
|
|
||||||
paddle.tanh(processed_attention_weights + processed_key +
|
|
||||||
processed_query))
|
|
||||||
|
|
||||||
if mask is not None:
|
|
||||||
alignment = alignment + (1.0 - mask) * -1e9
|
|
||||||
|
|
||||||
attention_weights = F.softmax(alignment, axis=1)
|
|
||||||
attention_context = paddle.matmul(
|
|
||||||
attention_weights, value, transpose_x=True)
|
|
||||||
|
|
||||||
attention_weights = paddle.squeeze(attention_weights, axis=-1)
|
|
||||||
attention_context = paddle.squeeze(attention_context, axis=1)
|
|
||||||
|
|
||||||
return attention_context, attention_weights
|
|
@ -1,229 +0,0 @@
|
|||||||
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
import librosa
|
|
||||||
import numpy as np
|
|
||||||
import paddle
|
|
||||||
from librosa.util import pad_center
|
|
||||||
from paddle import nn
|
|
||||||
from paddle.nn import functional as F
|
|
||||||
from scipy import signal
|
|
||||||
|
|
||||||
__all__ = ["quantize", "dequantize", "STFT", "MelScale"]
|
|
||||||
|
|
||||||
|
|
||||||
def quantize(values, n_bands):
|
|
||||||
"""Linearlly quantize a float Tensor in [-1, 1) to an interger Tensor in
|
|
||||||
[0, n_bands).
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
-----------
|
|
||||||
values : Tensor [dtype: flaot32 or float64]
|
|
||||||
The floating point value.
|
|
||||||
|
|
||||||
n_bands : int
|
|
||||||
The number of bands. The output integer Tensor's value is in the range
|
|
||||||
[0, n_bans).
|
|
||||||
|
|
||||||
Returns
|
|
||||||
----------
|
|
||||||
Tensor [dtype: int 64]
|
|
||||||
The quantized tensor.
|
|
||||||
"""
|
|
||||||
quantized = paddle.cast((values + 1.0) / 2.0 * n_bands, "int64")
|
|
||||||
return quantized
|
|
||||||
|
|
||||||
|
|
||||||
def dequantize(quantized, n_bands, dtype=None):
|
|
||||||
"""Linearlly dequantize an integer Tensor into a float Tensor in the range
|
|
||||||
[-1, 1).
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
-----------
|
|
||||||
quantized : Tensor [dtype: int]
|
|
||||||
The quantized value in the range [0, n_bands).
|
|
||||||
|
|
||||||
n_bands : int
|
|
||||||
Number of bands. The input integer Tensor's value is in the range
|
|
||||||
[0, n_bans).
|
|
||||||
|
|
||||||
dtype : str, optional
|
|
||||||
Data type of the output.
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-----------
|
|
||||||
Tensor
|
|
||||||
The dequantized tensor, dtype is specified by `dtype`. If `dtype` is
|
|
||||||
not specified, the default float data type is used.
|
|
||||||
"""
|
|
||||||
dtype = dtype or paddle.get_default_dtype()
|
|
||||||
value = (paddle.cast(quantized, dtype) + 0.5) * (2.0 / n_bands) - 1.0
|
|
||||||
return value
|
|
||||||
|
|
||||||
|
|
||||||
class STFT(nn.Layer):
|
|
||||||
"""A module for computing stft transformation in a differentiable way.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
------------
|
|
||||||
n_fft : int
|
|
||||||
Number of samples in a frame.
|
|
||||||
hop_length : int
|
|
||||||
Number of samples shifted between adjacent frames.
|
|
||||||
win_length : int
|
|
||||||
Length of the window.
|
|
||||||
window : str, optional
|
|
||||||
Name of window function, see `scipy.signal.get_window` for more
|
|
||||||
details. Defaults to "hanning".
|
|
||||||
center : bool
|
|
||||||
If True, the signal y is padded so that frame D[:, t] is centered
|
|
||||||
at y[t * hop_length]. If False, then D[:, t] begins at y[t * hop_length].
|
|
||||||
Defaults to True.
|
|
||||||
pad_mode : string or function
|
|
||||||
If center=True, this argument is passed to np.pad for padding the edges
|
|
||||||
of the signal y. By default (pad_mode="reflect"), y is padded on both
|
|
||||||
sides with its own reflection, mirrored around its first and last
|
|
||||||
sample respectively. If center=False, this argument is ignored.
|
|
||||||
|
|
||||||
Notes
|
|
||||||
-----------
|
|
||||||
It behaves like ``librosa.core.stft``. See ``librosa.core.stft`` for more
|
|
||||||
details.
|
|
||||||
|
|
||||||
Given a audio which ``T`` samples, it the STFT transformation outputs a
|
|
||||||
spectrum with (C, frames) and complex dtype, where ``C = 1 + n_fft / 2``
|
|
||||||
and ``frames = 1 + T // hop_lenghth``.
|
|
||||||
|
|
||||||
Ony ``center`` and ``reflect`` padding is supported now.
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self,
|
|
||||||
n_fft,
|
|
||||||
hop_length=None,
|
|
||||||
win_length=None,
|
|
||||||
window="hanning",
|
|
||||||
center=True,
|
|
||||||
pad_mode="reflect"):
|
|
||||||
super().__init__()
|
|
||||||
# By default, use the entire frame
|
|
||||||
if win_length is None:
|
|
||||||
win_length = n_fft
|
|
||||||
|
|
||||||
# Set the default hop, if it's not already specified
|
|
||||||
if hop_length is None:
|
|
||||||
hop_length = int(win_length // 4)
|
|
||||||
|
|
||||||
self.hop_length = hop_length
|
|
||||||
self.n_bin = 1 + n_fft // 2
|
|
||||||
self.n_fft = n_fft
|
|
||||||
self.center = center
|
|
||||||
self.pad_mode = pad_mode
|
|
||||||
|
|
||||||
# calculate window
|
|
||||||
window = signal.get_window(window, win_length, fftbins=True)
|
|
||||||
|
|
||||||
# pad window to n_fft size
|
|
||||||
if n_fft != win_length:
|
|
||||||
window = pad_center(window, n_fft, mode="constant")
|
|
||||||
# lpad = (n_fft - win_length) // 2
|
|
||||||
# rpad = n_fft - win_length - lpad
|
|
||||||
# window = np.pad(window, ((lpad, pad), ), 'constant')
|
|
||||||
|
|
||||||
# calculate weights
|
|
||||||
# r = np.arange(0, n_fft)
|
|
||||||
# M = np.expand_dims(r, -1) * np.expand_dims(r, 0)
|
|
||||||
# w_real = np.reshape(window *
|
|
||||||
# np.cos(2 * np.pi * M / n_fft)[:self.n_bin],
|
|
||||||
# (self.n_bin, 1, self.n_fft))
|
|
||||||
# w_imag = np.reshape(window *
|
|
||||||
# np.sin(-2 * np.pi * M / n_fft)[:self.n_bin],
|
|
||||||
# (self.n_bin, 1, self.n_fft))
|
|
||||||
weight = np.fft.fft(np.eye(n_fft))[:self.n_bin]
|
|
||||||
w_real = weight.real
|
|
||||||
w_imag = weight.imag
|
|
||||||
w = np.concatenate([w_real, w_imag], axis=0)
|
|
||||||
w = w * window
|
|
||||||
w = np.expand_dims(w, 1)
|
|
||||||
weight = paddle.cast(paddle.to_tensor(w), paddle.get_default_dtype())
|
|
||||||
self.register_buffer("weight", weight)
|
|
||||||
|
|
||||||
def forward(self, x):
|
|
||||||
"""Compute the stft transform.
|
|
||||||
Parameters
|
|
||||||
------------
|
|
||||||
x : Tensor [shape=(B, T)]
|
|
||||||
The input waveform.
|
|
||||||
Returns
|
|
||||||
------------
|
|
||||||
real : Tensor [shape=(B, C, frames)]
|
|
||||||
The real part of the spectrogram.
|
|
||||||
|
|
||||||
imag : Tensor [shape=(B, C, frames)]
|
|
||||||
The image part of the spectrogram.
|
|
||||||
"""
|
|
||||||
x = paddle.unsqueeze(x, axis=1)
|
|
||||||
if self.center:
|
|
||||||
x = F.pad(
|
|
||||||
x, [self.n_fft // 2, self.n_fft // 2],
|
|
||||||
data_format='NCL',
|
|
||||||
mode=self.pad_mode)
|
|
||||||
|
|
||||||
# to BCT, C=1
|
|
||||||
out = F.conv1d(x, self.weight, stride=self.hop_length)
|
|
||||||
real, imag = paddle.chunk(out, 2, axis=1) # BCT
|
|
||||||
return real, imag
|
|
||||||
|
|
||||||
def power(self, x):
|
|
||||||
"""Compute the power spectrum.
|
|
||||||
Parameters
|
|
||||||
------------
|
|
||||||
x : Tensor [shape=(B, T)]
|
|
||||||
The input waveform.
|
|
||||||
Returns
|
|
||||||
------------
|
|
||||||
Tensor [shape=(B, C, T)]
|
|
||||||
The power spectrum.
|
|
||||||
"""
|
|
||||||
real, imag = self.forward(x)
|
|
||||||
power = real**2 + imag**2
|
|
||||||
return power
|
|
||||||
|
|
||||||
def magnitude(self, x):
|
|
||||||
"""Compute the magnitude of the spectrum.
|
|
||||||
Parameters
|
|
||||||
------------
|
|
||||||
x : Tensor [shape=(B, T)]
|
|
||||||
The input waveform.
|
|
||||||
Returns
|
|
||||||
------------
|
|
||||||
Tensor [shape=(B, C, T)]
|
|
||||||
The magnitude of the spectrum.
|
|
||||||
"""
|
|
||||||
power = self.power(x)
|
|
||||||
magnitude = paddle.sqrt(power) # TODO(chenfeiyu): maybe clipping
|
|
||||||
return magnitude
|
|
||||||
|
|
||||||
|
|
||||||
class MelScale(nn.Layer):
|
|
||||||
def __init__(self, sr, n_fft, n_mels, fmin, fmax):
|
|
||||||
super().__init__()
|
|
||||||
mel_basis = librosa.filters.mel(sr, n_fft, n_mels, fmin, fmax)
|
|
||||||
# self.weight = paddle.to_tensor(mel_basis)
|
|
||||||
weight = paddle.to_tensor(mel_basis, dtype=paddle.get_default_dtype())
|
|
||||||
self.register_buffer("weight", weight)
|
|
||||||
|
|
||||||
def forward(self, spec):
|
|
||||||
# (n_mels, n_freq) * (batch_size, n_freq, n_frames)
|
|
||||||
mel = paddle.matmul(self.weight, spec)
|
|
||||||
return mel
|
|
@ -1,37 +0,0 @@
|
|||||||
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
import numpy as np
|
|
||||||
import paddle
|
|
||||||
from paddle import Tensor
|
|
||||||
|
|
||||||
|
|
||||||
def expand(encodings: Tensor, durations: Tensor) -> Tensor:
|
|
||||||
"""
|
|
||||||
encodings: (B, T, C)
|
|
||||||
durations: (B, T)
|
|
||||||
"""
|
|
||||||
batch_size, t_enc = durations.shape
|
|
||||||
durations = durations.numpy()
|
|
||||||
slens = np.sum(durations, -1)
|
|
||||||
t_dec = np.max(slens)
|
|
||||||
M = np.zeros([batch_size, t_dec, t_enc])
|
|
||||||
for i in range(batch_size):
|
|
||||||
k = 0
|
|
||||||
for j in range(t_enc):
|
|
||||||
d = durations[i, j]
|
|
||||||
M[i, k:k + d, j] = 1
|
|
||||||
k += d
|
|
||||||
M = paddle.to_tensor(M, dtype=encodings.dtype)
|
|
||||||
encodings = paddle.matmul(M, encodings)
|
|
||||||
return encodings
|
|
@ -1,120 +0,0 @@
|
|||||||
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
import paddle
|
|
||||||
|
|
||||||
__all__ = [
|
|
||||||
"id_mask",
|
|
||||||
"feature_mask",
|
|
||||||
"combine_mask",
|
|
||||||
"future_mask",
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
def id_mask(input, padding_index=0, dtype="bool"):
|
|
||||||
"""Generate mask with input ids.
|
|
||||||
|
|
||||||
Those positions where the value equals ``padding_index`` correspond to 0 or
|
|
||||||
``False``, otherwise, 1 or ``True``.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
input : Tensor [dtype: int]
|
|
||||||
The input tensor. It represents the ids.
|
|
||||||
padding_index : int, optional
|
|
||||||
The id which represents padding, by default 0.
|
|
||||||
dtype : str, optional
|
|
||||||
Data type of the returned mask, by default "bool".
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
Tensor
|
|
||||||
The generate mask. It has the same shape as ``input`` does.
|
|
||||||
"""
|
|
||||||
return paddle.cast(input != padding_index, dtype)
|
|
||||||
|
|
||||||
|
|
||||||
def feature_mask(input, axis, dtype="bool"):
|
|
||||||
"""Compute mask from input features.
|
|
||||||
|
|
||||||
For a input features, represented as batched feature vectors, those vectors
|
|
||||||
which all zeros are considerd padding vectors.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
input : Tensor [dtype: float]
|
|
||||||
The input tensor which represents featues.
|
|
||||||
axis : int
|
|
||||||
The index of the feature dimension in ``input``. Other dimensions are
|
|
||||||
considered ``spatial`` dimensions.
|
|
||||||
dtype : str, optional
|
|
||||||
Data type of the generated mask, by default "bool"
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
Tensor
|
|
||||||
The geenrated mask with ``spatial`` shape as mentioned above.
|
|
||||||
|
|
||||||
It has one less dimension than ``input`` does.
|
|
||||||
"""
|
|
||||||
feature_sum = paddle.sum(paddle.abs(input), axis)
|
|
||||||
return paddle.cast(feature_sum != 0, dtype)
|
|
||||||
|
|
||||||
|
|
||||||
def combine_mask(mask1, mask2):
|
|
||||||
"""Combine two mask with multiplication or logical and.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
-----------
|
|
||||||
mask1 : Tensor
|
|
||||||
The first mask.
|
|
||||||
mask2 : Tensor
|
|
||||||
The second mask with broadcastable shape with ``mask1``.
|
|
||||||
Returns
|
|
||||||
--------
|
|
||||||
Tensor
|
|
||||||
Combined mask.
|
|
||||||
|
|
||||||
Notes
|
|
||||||
------
|
|
||||||
It is mainly used to combine the padding mask and no future mask for
|
|
||||||
transformer decoder.
|
|
||||||
|
|
||||||
Padding mask is used to mask padding positions of the decoder inputs and
|
|
||||||
no future mask is used to prevent the decoder to see future information.
|
|
||||||
"""
|
|
||||||
if mask1.dtype == paddle.fluid.core.VarDesc.VarType.BOOL:
|
|
||||||
return paddle.logical_and(mask1, mask2)
|
|
||||||
else:
|
|
||||||
return mask1 * mask2
|
|
||||||
|
|
||||||
|
|
||||||
def future_mask(time_steps, dtype="bool"):
|
|
||||||
"""Generate lower triangular mask.
|
|
||||||
|
|
||||||
It is used at transformer decoder to prevent the decoder to see future
|
|
||||||
information.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
time_steps : int
|
|
||||||
Decoder time steps.
|
|
||||||
dtype : str, optional
|
|
||||||
The data type of the generate mask, by default "bool".
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
Tensor
|
|
||||||
The generated mask.
|
|
||||||
"""
|
|
||||||
mask = paddle.tril(paddle.ones([time_steps, time_steps]))
|
|
||||||
return paddle.cast(mask, dtype)
|
|
@ -1,80 +0,0 @@
|
|||||||
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
from math import exp
|
|
||||||
|
|
||||||
import paddle
|
|
||||||
import paddle.nn.functional as F
|
|
||||||
from paddle import nn
|
|
||||||
|
|
||||||
|
|
||||||
def gaussian(window_size, sigma):
|
|
||||||
gauss = paddle.to_tensor([
|
|
||||||
exp(-(x - window_size // 2)**2 / float(2 * sigma**2))
|
|
||||||
for x in range(window_size)
|
|
||||||
])
|
|
||||||
return gauss / gauss.sum()
|
|
||||||
|
|
||||||
|
|
||||||
def create_window(window_size, channel):
|
|
||||||
_1D_window = gaussian(window_size, 1.5).unsqueeze(1)
|
|
||||||
_2D_window = paddle.matmul(_1D_window, paddle.transpose(
|
|
||||||
_1D_window, [1, 0])).unsqueeze([0, 1])
|
|
||||||
window = paddle.expand(_2D_window, [channel, 1, window_size, window_size])
|
|
||||||
return window
|
|
||||||
|
|
||||||
|
|
||||||
def _ssim(img1, img2, window, window_size, channel, size_average=True):
|
|
||||||
mu1 = F.conv2d(img1, window, padding=window_size // 2, groups=channel)
|
|
||||||
mu2 = F.conv2d(img2, window, padding=window_size // 2, groups=channel)
|
|
||||||
|
|
||||||
mu1_sq = mu1.pow(2)
|
|
||||||
mu2_sq = mu2.pow(2)
|
|
||||||
mu1_mu2 = mu1 * mu2
|
|
||||||
|
|
||||||
sigma1_sq = F.conv2d(
|
|
||||||
img1 * img1, window, padding=window_size // 2, groups=channel) - mu1_sq
|
|
||||||
sigma2_sq = F.conv2d(
|
|
||||||
img2 * img2, window, padding=window_size // 2, groups=channel) - mu2_sq
|
|
||||||
sigma12 = F.conv2d(
|
|
||||||
img1 * img2, window, padding=window_size // 2, groups=channel) - mu1_mu2
|
|
||||||
|
|
||||||
C1 = 0.01**2
|
|
||||||
C2 = 0.03**2
|
|
||||||
|
|
||||||
ssim_map = ((2 * mu1_mu2 + C1) * (2 * sigma12 + C2)) \
|
|
||||||
/ ((mu1_sq + mu2_sq + C1) * (sigma1_sq + sigma2_sq + C2))
|
|
||||||
|
|
||||||
if size_average:
|
|
||||||
return ssim_map.mean()
|
|
||||||
else:
|
|
||||||
return ssim_map.mean(1).mean(1).mean(1)
|
|
||||||
|
|
||||||
|
|
||||||
class SSIM(nn.Layer):
|
|
||||||
def __init__(self, window_size=11, size_average=True):
|
|
||||||
super().__init__()
|
|
||||||
self.window_size = window_size
|
|
||||||
self.size_average = size_average
|
|
||||||
self.channel = 1
|
|
||||||
self.window = create_window(window_size, self.channel)
|
|
||||||
|
|
||||||
def forward(self, img1, img2):
|
|
||||||
return _ssim(img1, img2, self.window, self.window_size, self.channel,
|
|
||||||
self.size_average)
|
|
||||||
|
|
||||||
|
|
||||||
def ssim(img1, img2, window_size=11, size_average=True):
|
|
||||||
(_, channel, _, _) = img1.shape
|
|
||||||
window = create_window(window_size, channel)
|
|
||||||
return _ssim(img1, img2, window, window_size, channel, size_average)
|
|
@ -1,220 +0,0 @@
|
|||||||
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
# Modified from espnet(https://github.com/espnet/espnet)
|
|
||||||
import paddle
|
|
||||||
from paddle import nn
|
|
||||||
from paddle.nn import functional as F
|
|
||||||
from scipy import signal
|
|
||||||
|
|
||||||
|
|
||||||
def stft(x,
|
|
||||||
fft_size,
|
|
||||||
hop_length=None,
|
|
||||||
win_length=None,
|
|
||||||
window='hann',
|
|
||||||
center=True,
|
|
||||||
pad_mode='reflect'):
|
|
||||||
"""Perform STFT and convert to magnitude spectrogram.
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
x : Tensor
|
|
||||||
Input signal tensor (B, T).
|
|
||||||
fft_size : int
|
|
||||||
FFT size.
|
|
||||||
hop_size : int
|
|
||||||
Hop size.
|
|
||||||
win_length : int
|
|
||||||
window : str, optional
|
|
||||||
window : str
|
|
||||||
Name of window function, see `scipy.signal.get_window` for more
|
|
||||||
details. Defaults to "hann".
|
|
||||||
center : bool, optional
|
|
||||||
center (bool, optional): Whether to pad `x` to make that the
|
|
||||||
:math:`t \times hop\_length` at the center of :math:`t`-th frame. Default: `True`.
|
|
||||||
pad_mode : str, optional
|
|
||||||
Choose padding pattern when `center` is `True`.
|
|
||||||
Returns
|
|
||||||
----------
|
|
||||||
Tensor:
|
|
||||||
Magnitude spectrogram (B, #frames, fft_size // 2 + 1).
|
|
||||||
"""
|
|
||||||
# calculate window
|
|
||||||
window = signal.get_window(window, win_length, fftbins=True)
|
|
||||||
window = paddle.to_tensor(window)
|
|
||||||
x_stft = paddle.signal.stft(
|
|
||||||
x,
|
|
||||||
fft_size,
|
|
||||||
hop_length,
|
|
||||||
win_length,
|
|
||||||
window=window,
|
|
||||||
center=center,
|
|
||||||
pad_mode=pad_mode)
|
|
||||||
|
|
||||||
real = x_stft.real()
|
|
||||||
imag = x_stft.imag()
|
|
||||||
|
|
||||||
return paddle.sqrt(paddle.clip(real**2 + imag**2, min=1e-7)).transpose(
|
|
||||||
[0, 2, 1])
|
|
||||||
|
|
||||||
|
|
||||||
class SpectralConvergenceLoss(nn.Layer):
|
|
||||||
"""Spectral convergence loss module."""
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
"""Initilize spectral convergence loss module."""
|
|
||||||
super().__init__()
|
|
||||||
|
|
||||||
def forward(self, x_mag, y_mag):
|
|
||||||
"""Calculate forward propagation.
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
x_mag : Tensor
|
|
||||||
Magnitude spectrogram of predicted signal (B, #frames, #freq_bins).
|
|
||||||
y_mag : Tensor)
|
|
||||||
Magnitude spectrogram of groundtruth signal (B, #frames, #freq_bins).
|
|
||||||
Returns
|
|
||||||
----------
|
|
||||||
Tensor
|
|
||||||
Spectral convergence loss value.
|
|
||||||
"""
|
|
||||||
return paddle.norm(
|
|
||||||
y_mag - x_mag, p="fro") / paddle.clip(
|
|
||||||
paddle.norm(y_mag, p="fro"), min=1e-10)
|
|
||||||
|
|
||||||
|
|
||||||
class LogSTFTMagnitudeLoss(nn.Layer):
|
|
||||||
"""Log STFT magnitude loss module."""
|
|
||||||
|
|
||||||
def __init__(self, epsilon=1e-7):
|
|
||||||
"""Initilize los STFT magnitude loss module."""
|
|
||||||
super().__init__()
|
|
||||||
self.epsilon = epsilon
|
|
||||||
|
|
||||||
def forward(self, x_mag, y_mag):
|
|
||||||
"""Calculate forward propagation.
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
x_mag : Tensor
|
|
||||||
Magnitude spectrogram of predicted signal (B, #frames, #freq_bins).
|
|
||||||
y_mag : Tensor
|
|
||||||
Magnitude spectrogram of groundtruth signal (B, #frames, #freq_bins).
|
|
||||||
Returns
|
|
||||||
----------
|
|
||||||
Tensor
|
|
||||||
Log STFT magnitude loss value.
|
|
||||||
"""
|
|
||||||
return F.l1_loss(
|
|
||||||
paddle.log(paddle.clip(y_mag, min=self.epsilon)),
|
|
||||||
paddle.log(paddle.clip(x_mag, min=self.epsilon)))
|
|
||||||
|
|
||||||
|
|
||||||
class STFTLoss(nn.Layer):
|
|
||||||
"""STFT loss module."""
|
|
||||||
|
|
||||||
def __init__(self,
|
|
||||||
fft_size=1024,
|
|
||||||
shift_size=120,
|
|
||||||
win_length=600,
|
|
||||||
window="hann"):
|
|
||||||
"""Initialize STFT loss module."""
|
|
||||||
super().__init__()
|
|
||||||
self.fft_size = fft_size
|
|
||||||
self.shift_size = shift_size
|
|
||||||
self.win_length = win_length
|
|
||||||
self.window = window
|
|
||||||
self.spectral_convergence_loss = SpectralConvergenceLoss()
|
|
||||||
self.log_stft_magnitude_loss = LogSTFTMagnitudeLoss()
|
|
||||||
|
|
||||||
def forward(self, x, y):
|
|
||||||
"""Calculate forward propagation.
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
x : Tensor
|
|
||||||
Predicted signal (B, T).
|
|
||||||
y : Tensor
|
|
||||||
Groundtruth signal (B, T).
|
|
||||||
Returns
|
|
||||||
----------
|
|
||||||
Tensor
|
|
||||||
Spectral convergence loss value.
|
|
||||||
Tensor
|
|
||||||
Log STFT magnitude loss value.
|
|
||||||
"""
|
|
||||||
x_mag = stft(x, self.fft_size, self.shift_size, self.win_length,
|
|
||||||
self.window)
|
|
||||||
y_mag = stft(y, self.fft_size, self.shift_size, self.win_length,
|
|
||||||
self.window)
|
|
||||||
sc_loss = self.spectral_convergence_loss(x_mag, y_mag)
|
|
||||||
mag_loss = self.log_stft_magnitude_loss(x_mag, y_mag)
|
|
||||||
|
|
||||||
return sc_loss, mag_loss
|
|
||||||
|
|
||||||
|
|
||||||
class MultiResolutionSTFTLoss(nn.Layer):
|
|
||||||
"""Multi resolution STFT loss module."""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
fft_sizes=[1024, 2048, 512],
|
|
||||||
hop_sizes=[120, 240, 50],
|
|
||||||
win_lengths=[600, 1200, 240],
|
|
||||||
window="hann", ):
|
|
||||||
"""Initialize Multi resolution STFT loss module.
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
fft_sizes : list
|
|
||||||
List of FFT sizes.
|
|
||||||
hop_sizes : list
|
|
||||||
List of hop sizes.
|
|
||||||
win_lengths : list
|
|
||||||
List of window lengths.
|
|
||||||
window : str
|
|
||||||
Window function type.
|
|
||||||
"""
|
|
||||||
super().__init__()
|
|
||||||
assert len(fft_sizes) == len(hop_sizes) == len(win_lengths)
|
|
||||||
self.stft_losses = nn.LayerList()
|
|
||||||
for fs, ss, wl in zip(fft_sizes, hop_sizes, win_lengths):
|
|
||||||
self.stft_losses.append(STFTLoss(fs, ss, wl, window))
|
|
||||||
|
|
||||||
def forward(self, x, y):
|
|
||||||
"""Calculate forward propagation.
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
x : Tensor
|
|
||||||
Predicted signal (B, T) or (B, #subband, T).
|
|
||||||
y : Tensor
|
|
||||||
Groundtruth signal (B, T) or (B, #subband, T).
|
|
||||||
Returns
|
|
||||||
----------
|
|
||||||
Tensor
|
|
||||||
Multi resolution spectral convergence loss value.
|
|
||||||
Tensor
|
|
||||||
Multi resolution log STFT magnitude loss value.
|
|
||||||
"""
|
|
||||||
if len(x.shape) == 3:
|
|
||||||
# (B, C, T) -> (B x C, T)
|
|
||||||
x = x.reshape([-1, x.shape[2]])
|
|
||||||
# (B, C, T) -> (B x C, T)
|
|
||||||
y = y.reshape([-1, y.shape[2]])
|
|
||||||
sc_loss = 0.0
|
|
||||||
mag_loss = 0.0
|
|
||||||
for f in self.stft_losses:
|
|
||||||
sc_l, mag_l = f(x, y)
|
|
||||||
sc_loss += sc_l
|
|
||||||
mag_loss += mag_l
|
|
||||||
sc_loss /= len(self.stft_losses)
|
|
||||||
mag_loss /= len(self.stft_losses)
|
|
||||||
|
|
||||||
return sc_loss, mag_loss
|
|
Loading…
Reference in new issue