You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
521 lines
18 KiB
521 lines
18 KiB
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
import math
|
|
|
|
import paddle
|
|
import paddle.nn as nn
|
|
import paddle.nn.functional as F
|
|
|
|
|
|
def length_to_mask(length, max_len=None, dtype=None):
|
|
assert len(length.shape) == 1
|
|
|
|
if max_len is None:
|
|
max_len = length.max().astype(
|
|
'int').item() # using arange to generate mask
|
|
mask = paddle.arange(
|
|
max_len, dtype=length.dtype).expand(
|
|
(len(length), max_len)) < length.unsqueeze(1)
|
|
|
|
if dtype is None:
|
|
dtype = length.dtype
|
|
|
|
mask = paddle.to_tensor(mask, dtype=dtype)
|
|
return mask
|
|
|
|
|
|
class Conv1d(nn.Layer):
|
|
def __init__(
|
|
self,
|
|
in_channels,
|
|
out_channels,
|
|
kernel_size,
|
|
stride=1,
|
|
padding="same",
|
|
dilation=1,
|
|
groups=1,
|
|
bias=True,
|
|
padding_mode="reflect", ):
|
|
"""_summary_
|
|
|
|
Args:
|
|
in_channels (int): intput channel or input data dimensions
|
|
out_channels (int): output channel or output data dimensions
|
|
kernel_size (int): kernel size of 1-d convolution
|
|
stride (int, optional): strid in 1-d convolution . Defaults to 1.
|
|
padding (str, optional): padding value. Defaults to "same".
|
|
dilation (int, optional): dilation in 1-d convolution. Defaults to 1.
|
|
groups (int, optional): groups in 1-d convolution. Defaults to 1.
|
|
bias (bool, optional): bias in 1-d convolution . Defaults to True.
|
|
padding_mode (str, optional): padding mode. Defaults to "reflect".
|
|
"""
|
|
super().__init__()
|
|
|
|
self.kernel_size = kernel_size
|
|
self.stride = stride
|
|
self.dilation = dilation
|
|
self.padding = padding
|
|
self.padding_mode = padding_mode
|
|
|
|
self.conv = nn.Conv1D(
|
|
in_channels,
|
|
out_channels,
|
|
self.kernel_size,
|
|
stride=self.stride,
|
|
padding=0,
|
|
dilation=self.dilation,
|
|
groups=groups,
|
|
bias_attr=bias, )
|
|
|
|
def forward(self, x):
|
|
"""Do conv1d forward
|
|
|
|
Args:
|
|
x (paddle.Tensor): [N, C, L] input data,
|
|
N is the batch,
|
|
C is the data dimension,
|
|
L is the time
|
|
|
|
Raises:
|
|
ValueError: only support the same padding type
|
|
|
|
Returns:
|
|
paddle.Tensor: the value of conv1d
|
|
"""
|
|
if self.padding == "same":
|
|
x = self._manage_padding(x, self.kernel_size, self.dilation,
|
|
self.stride)
|
|
else:
|
|
raise ValueError("Padding must be 'same'. Got {self.padding}")
|
|
|
|
return self.conv(x)
|
|
|
|
def _manage_padding(self, x, kernel_size: int, dilation: int, stride: int):
|
|
"""Padding the input data
|
|
|
|
Args:
|
|
x (paddle.Tensor): [N, C, L] input data
|
|
N is the batch,
|
|
C is the data dimension,
|
|
L is the time
|
|
kernel_size (int): 1-d convolution kernel size
|
|
dilation (int): 1-d convolution dilation
|
|
stride (int): 1-d convolution stride
|
|
|
|
Returns:
|
|
paddle.Tensor: the padded input data
|
|
"""
|
|
L_in = x.shape[-1] # Detecting input shape
|
|
padding = self._get_padding_elem(L_in, stride, kernel_size,
|
|
dilation) # Time padding
|
|
x = F.pad(
|
|
x, padding, mode=self.padding_mode,
|
|
data_format="NCL") # Applying padding
|
|
return x
|
|
|
|
def _get_padding_elem(self,
|
|
L_in: int,
|
|
stride: int,
|
|
kernel_size: int,
|
|
dilation: int):
|
|
"""Calculate the padding value in same mode
|
|
|
|
Args:
|
|
L_in (int): the times of the input data,
|
|
stride (int): 1-d convolution stride
|
|
kernel_size (int): 1-d convolution kernel size
|
|
dilation (int): 1-d convolution stride
|
|
|
|
Returns:
|
|
int: return the padding value in same mode
|
|
"""
|
|
if stride > 1:
|
|
n_steps = math.ceil(((L_in - kernel_size * dilation) / stride) + 1)
|
|
L_out = stride * (n_steps - 1) + kernel_size * dilation
|
|
padding = [kernel_size // 2, kernel_size // 2]
|
|
else:
|
|
L_out = (L_in - dilation * (kernel_size - 1) - 1) // stride + 1
|
|
|
|
padding = [(L_in - L_out) // 2, (L_in - L_out) // 2]
|
|
|
|
return padding
|
|
|
|
|
|
class BatchNorm1d(nn.Layer):
|
|
def __init__(
|
|
self,
|
|
input_size,
|
|
eps=1e-05,
|
|
momentum=0.9,
|
|
weight_attr=None,
|
|
bias_attr=None,
|
|
data_format='NCL',
|
|
use_global_stats=None, ):
|
|
super().__init__()
|
|
|
|
self.norm = nn.BatchNorm1D(
|
|
input_size,
|
|
epsilon=eps,
|
|
momentum=momentum,
|
|
weight_attr=weight_attr,
|
|
bias_attr=bias_attr,
|
|
data_format=data_format,
|
|
use_global_stats=use_global_stats, )
|
|
|
|
def forward(self, x):
|
|
x_n = self.norm(x)
|
|
return x_n
|
|
|
|
|
|
class TDNNBlock(nn.Layer):
|
|
def __init__(
|
|
self,
|
|
in_channels,
|
|
out_channels,
|
|
kernel_size,
|
|
dilation,
|
|
activation=nn.ReLU, ):
|
|
"""Implementation of TDNN network
|
|
|
|
Args:
|
|
in_channels (int): input channels or input embedding dimensions
|
|
out_channels (int): output channels or output embedding dimensions
|
|
kernel_size (int): the kernel size of the TDNN network block
|
|
dilation (int): the dilation of the TDNN network block
|
|
activation (paddle class, optional): the activation layers. Defaults to nn.ReLU.
|
|
"""
|
|
super().__init__()
|
|
self.conv = Conv1d(
|
|
in_channels=in_channels,
|
|
out_channels=out_channels,
|
|
kernel_size=kernel_size,
|
|
dilation=dilation, )
|
|
self.activation = activation()
|
|
self.norm = BatchNorm1d(input_size=out_channels)
|
|
|
|
def forward(self, x):
|
|
return self.norm(self.activation(self.conv(x)))
|
|
|
|
|
|
class Res2NetBlock(nn.Layer):
|
|
def __init__(self, in_channels, out_channels, scale=8, dilation=1):
|
|
"""Implementation of Res2Net Block with dilation
|
|
The paper is refered as "Res2Net: A New Multi-scale Backbone Architecture",
|
|
whose url is https://arxiv.org/abs/1904.01169
|
|
Args:
|
|
in_channels (int): input channels or input dimensions
|
|
out_channels (int): output channels or output dimensions
|
|
scale (int, optional): scale in res2net bolck. Defaults to 8.
|
|
dilation (int, optional): dilation of 1-d convolution in TDNN block. Defaults to 1.
|
|
"""
|
|
super().__init__()
|
|
assert in_channels % scale == 0
|
|
assert out_channels % scale == 0
|
|
|
|
in_channel = in_channels // scale
|
|
hidden_channel = out_channels // scale
|
|
|
|
self.blocks = nn.LayerList([
|
|
TDNNBlock(
|
|
in_channel, hidden_channel, kernel_size=3, dilation=dilation)
|
|
for i in range(scale - 1)
|
|
])
|
|
self.scale = scale
|
|
|
|
def forward(self, x):
|
|
y = []
|
|
for i, x_i in enumerate(paddle.chunk(x, self.scale, axis=1)):
|
|
if i == 0:
|
|
y_i = x_i
|
|
elif i == 1:
|
|
y_i = self.blocks[i - 1](x_i)
|
|
else:
|
|
y_i = self.blocks[i - 1](x_i + y_i)
|
|
y.append(y_i)
|
|
y = paddle.concat(y, axis=1)
|
|
return y
|
|
|
|
|
|
class SEBlock(nn.Layer):
|
|
def __init__(self, in_channels, se_channels, out_channels):
|
|
"""Implementation of SEBlock
|
|
The paper is refered as "Squeeze-and-Excitation Networks"
|
|
whose url is https://arxiv.org/abs/1709.01507
|
|
Args:
|
|
in_channels (int): input channels or input data dimensions
|
|
se_channels (_type_): _description_
|
|
out_channels (int): output channels or output data dimensions
|
|
"""
|
|
super().__init__()
|
|
|
|
self.conv1 = Conv1d(
|
|
in_channels=in_channels, out_channels=se_channels, kernel_size=1)
|
|
self.relu = paddle.nn.ReLU()
|
|
self.conv2 = Conv1d(
|
|
in_channels=se_channels, out_channels=out_channels, kernel_size=1)
|
|
self.sigmoid = paddle.nn.Sigmoid()
|
|
|
|
def forward(self, x, lengths=None):
|
|
L = x.shape[-1]
|
|
if lengths is not None:
|
|
mask = length_to_mask(lengths * L, max_len=L)
|
|
mask = mask.unsqueeze(1)
|
|
total = mask.sum(axis=2, keepdim=True)
|
|
s = (x * mask).sum(axis=2, keepdim=True) / total
|
|
else:
|
|
s = x.mean(axis=2, keepdim=True)
|
|
|
|
s = self.relu(self.conv1(s))
|
|
s = self.sigmoid(self.conv2(s))
|
|
|
|
return s * x
|
|
|
|
|
|
class AttentiveStatisticsPooling(nn.Layer):
|
|
def __init__(self, channels, attention_channels=128, global_context=True):
|
|
"""Compute the speaker verification statistics
|
|
The detail info is section 3.1 in https://arxiv.org/pdf/1709.01507.pdf
|
|
Args:
|
|
channels (int): input data channel or data dimension
|
|
attention_channels (int, optional): attention dimension. Defaults to 128.
|
|
global_context (bool, optional): If use the global context information. Defaults to True.
|
|
"""
|
|
super().__init__()
|
|
|
|
self.eps = 1e-12
|
|
self.global_context = global_context
|
|
if global_context:
|
|
self.tdnn = TDNNBlock(channels * 3, attention_channels, 1, 1)
|
|
else:
|
|
self.tdnn = TDNNBlock(channels, attention_channels, 1, 1)
|
|
self.tanh = nn.Tanh()
|
|
self.conv = Conv1d(
|
|
in_channels=attention_channels,
|
|
out_channels=channels,
|
|
kernel_size=1)
|
|
|
|
def forward(self, x, lengths=None):
|
|
C, L = x.shape[1], x.shape[2] # KP: (N, C, L)
|
|
|
|
def _compute_statistics(x, m, axis=2, eps=self.eps):
|
|
mean = (m * x).sum(axis)
|
|
std = paddle.sqrt(
|
|
(m * (x - mean.unsqueeze(axis)).pow(2)).sum(axis).clip(eps))
|
|
return mean, std
|
|
|
|
if lengths is None:
|
|
lengths = paddle.ones([x.shape[0]])
|
|
|
|
# Make binary mask of shape [N, 1, L]
|
|
mask = length_to_mask(lengths * L, max_len=L)
|
|
mask = mask.unsqueeze(1)
|
|
|
|
# Expand the temporal context of the pooling layer by allowing the
|
|
# self-attention to look at global properties of the utterance.
|
|
if self.global_context:
|
|
total = mask.sum(axis=2, keepdim=True).astype('float32')
|
|
mean, std = _compute_statistics(x, mask / total)
|
|
mean = mean.unsqueeze(2).tile((1, 1, L))
|
|
std = std.unsqueeze(2).tile((1, 1, L))
|
|
attn = paddle.concat([x, mean, std], axis=1)
|
|
else:
|
|
attn = x
|
|
|
|
# Apply layers
|
|
attn = self.conv(self.tanh(self.tdnn(attn)))
|
|
|
|
# Filter out zero-paddings
|
|
attn = paddle.where(
|
|
mask.tile((1, C, 1)) == 0,
|
|
paddle.ones_like(attn) * float("-inf"), attn)
|
|
|
|
attn = F.softmax(attn, axis=2)
|
|
mean, std = _compute_statistics(x, attn)
|
|
|
|
# Append mean and std of the batch
|
|
pooled_stats = paddle.concat((mean, std), axis=1)
|
|
pooled_stats = pooled_stats.unsqueeze(2)
|
|
|
|
return pooled_stats
|
|
|
|
|
|
class SERes2NetBlock(nn.Layer):
|
|
def __init__(
|
|
self,
|
|
in_channels,
|
|
out_channels,
|
|
res2net_scale=8,
|
|
se_channels=128,
|
|
kernel_size=1,
|
|
dilation=1,
|
|
activation=nn.ReLU, ):
|
|
"""Implementation of Squeeze-Extraction Res2Blocks in ECAPA-TDNN network model
|
|
The paper is refered "Squeeze-and-Excitation Networks"
|
|
whose url is: https://arxiv.org/pdf/1709.01507.pdf
|
|
Args:
|
|
in_channels (int): input channels or input data dimensions
|
|
out_channels (int): output channels or output data dimensions
|
|
res2net_scale (int, optional): scale in the res2net block. Defaults to 8.
|
|
se_channels (int, optional): embedding dimensions of res2net block. Defaults to 128.
|
|
kernel_size (int, optional): kernel size of 1-d convolution in TDNN block. Defaults to 1.
|
|
dilation (int, optional): dilation of 1-d convolution in TDNN block. Defaults to 1.
|
|
activation (paddle.nn.class, optional): activation function. Defaults to nn.ReLU.
|
|
"""
|
|
super().__init__()
|
|
self.out_channels = out_channels
|
|
self.tdnn1 = TDNNBlock(
|
|
in_channels,
|
|
out_channels,
|
|
kernel_size=1,
|
|
dilation=1,
|
|
activation=activation, )
|
|
self.res2net_block = Res2NetBlock(out_channels, out_channels,
|
|
res2net_scale, dilation)
|
|
self.tdnn2 = TDNNBlock(
|
|
out_channels,
|
|
out_channels,
|
|
kernel_size=1,
|
|
dilation=1,
|
|
activation=activation, )
|
|
self.se_block = SEBlock(out_channels, se_channels, out_channels)
|
|
|
|
self.shortcut = None
|
|
if in_channels != out_channels:
|
|
self.shortcut = Conv1d(
|
|
in_channels=in_channels,
|
|
out_channels=out_channels,
|
|
kernel_size=1, )
|
|
|
|
def forward(self, x, lengths=None):
|
|
residual = x
|
|
if self.shortcut:
|
|
residual = self.shortcut(x)
|
|
|
|
x = self.tdnn1(x)
|
|
x = self.res2net_block(x)
|
|
x = self.tdnn2(x)
|
|
x = self.se_block(x, lengths)
|
|
|
|
return x + residual
|
|
|
|
|
|
class EcapaTdnn(nn.Layer):
|
|
def __init__(
|
|
self,
|
|
input_size,
|
|
lin_neurons=192,
|
|
activation=nn.ReLU,
|
|
channels=[512, 512, 512, 512, 1536],
|
|
kernel_sizes=[5, 3, 3, 3, 1],
|
|
dilations=[1, 2, 3, 4, 1],
|
|
attention_channels=128,
|
|
res2net_scale=8,
|
|
se_channels=128,
|
|
global_context=True, ):
|
|
"""Implementation of ECAPA-TDNN backbone model network
|
|
The paper is refered as "ECAPA-TDNN: Emphasized Channel Attention, Propagation and Aggregation in TDNN Based Speaker Verification"
|
|
whose url is: https://arxiv.org/abs/2005.07143
|
|
Args:
|
|
input_size (_type_): input fature dimension
|
|
lin_neurons (int, optional): speaker embedding size. Defaults to 192.
|
|
activation (paddle.nn.class, optional): activation function. Defaults to nn.ReLU.
|
|
channels (list, optional): inter embedding dimension. Defaults to [512, 512, 512, 512, 1536].
|
|
kernel_sizes (list, optional): kernel size of 1-d convolution in TDNN block . Defaults to [5, 3, 3, 3, 1].
|
|
dilations (list, optional): dilations of 1-d convolution in TDNN block. Defaults to [1, 2, 3, 4, 1].
|
|
attention_channels (int, optional): attention dimensions. Defaults to 128.
|
|
res2net_scale (int, optional): scale value in res2net. Defaults to 8.
|
|
se_channels (int, optional): dimensions of squeeze-excitation block. Defaults to 128.
|
|
global_context (bool, optional): global context flag. Defaults to True.
|
|
"""
|
|
super().__init__()
|
|
assert len(channels) == len(kernel_sizes)
|
|
assert len(channels) == len(dilations)
|
|
self.channels = channels
|
|
self.blocks = nn.LayerList()
|
|
self.emb_size = lin_neurons
|
|
|
|
# The initial TDNN layer
|
|
self.blocks.append(
|
|
TDNNBlock(
|
|
input_size,
|
|
channels[0],
|
|
kernel_sizes[0],
|
|
dilations[0],
|
|
activation, ))
|
|
|
|
# SE-Res2Net layers
|
|
for i in range(1, len(channels) - 1):
|
|
self.blocks.append(
|
|
SERes2NetBlock(
|
|
channels[i - 1],
|
|
channels[i],
|
|
res2net_scale=res2net_scale,
|
|
se_channels=se_channels,
|
|
kernel_size=kernel_sizes[i],
|
|
dilation=dilations[i],
|
|
activation=activation, ))
|
|
|
|
# Multi-layer feature aggregation
|
|
self.mfa = TDNNBlock(
|
|
channels[-1],
|
|
channels[-1],
|
|
kernel_sizes[-1],
|
|
dilations[-1],
|
|
activation, )
|
|
|
|
# Attentive Statistical Pooling
|
|
self.asp = AttentiveStatisticsPooling(
|
|
channels[-1],
|
|
attention_channels=attention_channels,
|
|
global_context=global_context, )
|
|
self.asp_bn = BatchNorm1d(input_size=channels[-1] * 2)
|
|
|
|
# Final linear transformation
|
|
self.fc = Conv1d(
|
|
in_channels=channels[-1] * 2,
|
|
out_channels=self.emb_size,
|
|
kernel_size=1, )
|
|
|
|
def forward(self, x, lengths=None):
|
|
"""
|
|
Compute embeddings.
|
|
|
|
Args:
|
|
x (paddle.Tensor): Input log-fbanks with shape (N, n_mels, T).
|
|
lengths (paddle.Tensor, optional): Length proportions of batch length with shape (N). Defaults to None.
|
|
|
|
Returns:
|
|
paddle.Tensor: Output embeddings with shape (N, self.emb_size, 1)
|
|
"""
|
|
xl = []
|
|
for layer in self.blocks:
|
|
try:
|
|
x = layer(x, lengths=lengths)
|
|
except TypeError:
|
|
x = layer(x)
|
|
xl.append(x)
|
|
|
|
# Multi-layer feature aggregation
|
|
x = paddle.concat(xl[1:], axis=1)
|
|
x = self.mfa(x)
|
|
|
|
# Attentive Statistical Pooling
|
|
x = self.asp(x, lengths=lengths)
|
|
x = self.asp_bn(x)
|
|
|
|
# Final linear transformation
|
|
x = self.fc(x)
|
|
|
|
return x
|