# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math

import paddle
import paddle.nn as nn
import paddle.nn.functional as F


def length_to_mask(length, max_len=None, dtype=None):
    assert len(length.shape) == 1

    if max_len is None:
        max_len = length.max().astype(
            'int').item()  # using arange to generate mask
    mask = paddle.arange(
        max_len, dtype=length.dtype).expand(
            (len(length), max_len)) < length.unsqueeze(1)

    if dtype is None:
        dtype = length.dtype

    mask = paddle.to_tensor(mask, dtype=dtype)
    return mask


class Conv1d(nn.Layer):
    def __init__(
            self,
            in_channels,
            out_channels,
            kernel_size,
            stride=1,
            padding="same",
            dilation=1,
            groups=1,
            bias=True,
            padding_mode="reflect", ):
        """_summary_

        Args:
            in_channels (int): intput channel or input data dimensions
            out_channels (int): output channel or output data dimensions
            kernel_size (int): kernel size of 1-d convolution
            stride (int, optional): strid in 1-d convolution . Defaults to 1.
            padding (str, optional): padding value. Defaults to "same".
            dilation (int, optional): dilation in 1-d convolution. Defaults to 1.
            groups (int, optional): groups in 1-d convolution. Defaults to 1.
            bias (bool, optional): bias in 1-d convolution . Defaults to True.
            padding_mode (str, optional): padding mode. Defaults to "reflect".
        """
        super().__init__()

        self.kernel_size = kernel_size
        self.stride = stride
        self.dilation = dilation
        self.padding = padding
        self.padding_mode = padding_mode

        self.conv = nn.Conv1D(
            in_channels,
            out_channels,
            self.kernel_size,
            stride=self.stride,
            padding=0,
            dilation=self.dilation,
            groups=groups,
            bias_attr=bias, )

    def forward(self, x):
        """Do conv1d forward

        Args:
            x (paddle.Tensor): [N, C, L] input data, 
                                N is the batch,
                                C is the data dimension, 
                                L is the time

        Raises:
            ValueError: only support the same padding type

        Returns:
            paddle.Tensor: the value of conv1d
        """
        if self.padding == "same":
            x = self._manage_padding(x, self.kernel_size, self.dilation,
                                     self.stride)
        else:
            raise ValueError("Padding must be 'same'. Got {self.padding}")

        return self.conv(x)

    def _manage_padding(self, x, kernel_size: int, dilation: int, stride: int):
        """Padding the input data

        Args:
            x (paddle.Tensor): [N, C, L] input data
                                N is the batch,
                                C is the data dimension, 
                                L is the time
            kernel_size (int): 1-d convolution kernel size
            dilation (int): 1-d convolution dilation
            stride (int): 1-d convolution stride

        Returns:
            paddle.Tensor: the padded input data
        """
        L_in = x.shape[-1]  # Detecting input shape
        padding = self._get_padding_elem(L_in, stride, kernel_size,
                                         dilation)  # Time padding
        x = F.pad(
            x, padding, mode=self.padding_mode,
            data_format="NCL")  # Applying padding
        return x

    def _get_padding_elem(self,
                          L_in: int,
                          stride: int,
                          kernel_size: int,
                          dilation: int):
        """Calculate the padding value in same mode

        Args:
            L_in (int): the times of the input data, 
            stride (int): 1-d convolution stride
            kernel_size (int): 1-d convolution kernel size
            dilation (int): 1-d convolution stride

        Returns:
            int: return the padding value in same mode
        """
        if stride > 1:
            n_steps = math.ceil(((L_in - kernel_size * dilation) / stride) + 1)
            L_out = stride * (n_steps - 1) + kernel_size * dilation
            padding = [kernel_size // 2, kernel_size // 2]
        else:
            L_out = (L_in - dilation * (kernel_size - 1) - 1) // stride + 1

            padding = [(L_in - L_out) // 2, (L_in - L_out) // 2]

        return padding


class BatchNorm1d(nn.Layer):
    def __init__(
            self,
            input_size,
            eps=1e-05,
            momentum=0.9,
            weight_attr=None,
            bias_attr=None,
            data_format='NCL',
            use_global_stats=None, ):
        super().__init__()

        self.norm = nn.BatchNorm1D(
            input_size,
            epsilon=eps,
            momentum=momentum,
            weight_attr=weight_attr,
            bias_attr=bias_attr,
            data_format=data_format,
            use_global_stats=use_global_stats, )

    def forward(self, x):
        x_n = self.norm(x)
        return x_n


class TDNNBlock(nn.Layer):
    def __init__(
            self,
            in_channels,
            out_channels,
            kernel_size,
            dilation,
            activation=nn.ReLU, ):
        """Implementation of TDNN network

        Args:
            in_channels (int): input channels or input embedding dimensions
            out_channels (int): output channels or output embedding dimensions
            kernel_size (int): the kernel size of the TDNN network block
            dilation (int): the dilation of the TDNN network block
            activation (paddle class, optional): the activation layers. Defaults to nn.ReLU.
        """
        super().__init__()
        self.conv = Conv1d(
            in_channels=in_channels,
            out_channels=out_channels,
            kernel_size=kernel_size,
            dilation=dilation, )
        self.activation = activation()
        self.norm = BatchNorm1d(input_size=out_channels)

    def forward(self, x):
        return self.norm(self.activation(self.conv(x)))


class Res2NetBlock(nn.Layer):
    def __init__(self, in_channels, out_channels, scale=8, dilation=1):
        """Implementation of Res2Net Block with dilation
           The paper is refered as "Res2Net: A New Multi-scale Backbone Architecture",
           whose url is https://arxiv.org/abs/1904.01169
        Args:
            in_channels (int): input channels or input dimensions
            out_channels (int): output channels or output dimensions
            scale (int, optional): scale in res2net bolck. Defaults to 8.
            dilation (int, optional): dilation of 1-d convolution in TDNN block. Defaults to 1.
        """
        super().__init__()
        assert in_channels % scale == 0
        assert out_channels % scale == 0

        in_channel = in_channels // scale
        hidden_channel = out_channels // scale

        self.blocks = nn.LayerList([
            TDNNBlock(
                in_channel, hidden_channel, kernel_size=3, dilation=dilation)
            for i in range(scale - 1)
        ])
        self.scale = scale

    def forward(self, x):
        y = []
        for i, x_i in enumerate(paddle.chunk(x, self.scale, axis=1)):
            if i == 0:
                y_i = x_i
            elif i == 1:
                y_i = self.blocks[i - 1](x_i)
            else:
                y_i = self.blocks[i - 1](x_i + y_i)
            y.append(y_i)
        y = paddle.concat(y, axis=1)
        return y


class SEBlock(nn.Layer):
    def __init__(self, in_channels, se_channels, out_channels):
        """Implementation of SEBlock
           The paper is refered as "Squeeze-and-Excitation Networks"
           whose url is https://arxiv.org/abs/1709.01507
        Args:
            in_channels (int): input channels or input data dimensions
            se_channels (_type_): _description_
            out_channels (int): output channels or output data dimensions
        """
        super().__init__()

        self.conv1 = Conv1d(
            in_channels=in_channels, out_channels=se_channels, kernel_size=1)
        self.relu = paddle.nn.ReLU()
        self.conv2 = Conv1d(
            in_channels=se_channels, out_channels=out_channels, kernel_size=1)
        self.sigmoid = paddle.nn.Sigmoid()

    def forward(self, x, lengths=None):
        L = x.shape[-1]
        if lengths is not None:
            mask = length_to_mask(lengths * L, max_len=L)
            mask = mask.unsqueeze(1)
            total = mask.sum(axis=2, keepdim=True)
            s = (x * mask).sum(axis=2, keepdim=True) / total
        else:
            s = x.mean(axis=2, keepdim=True)

        s = self.relu(self.conv1(s))
        s = self.sigmoid(self.conv2(s))

        return s * x


class AttentiveStatisticsPooling(nn.Layer):
    def __init__(self, channels, attention_channels=128, global_context=True):
        """Compute the speaker verification statistics
           The detail info is section 3.1 in https://arxiv.org/pdf/1709.01507.pdf 
        Args:
            channels (int): input data channel or data dimension
            attention_channels (int, optional): attention dimension. Defaults to 128.
            global_context (bool, optional): If use the global context information. Defaults to True.
        """
        super().__init__()

        self.eps = 1e-12
        self.global_context = global_context
        if global_context:
            self.tdnn = TDNNBlock(channels * 3, attention_channels, 1, 1)
        else:
            self.tdnn = TDNNBlock(channels, attention_channels, 1, 1)
        self.tanh = nn.Tanh()
        self.conv = Conv1d(
            in_channels=attention_channels,
            out_channels=channels,
            kernel_size=1)

    def forward(self, x, lengths=None):
        C, L = x.shape[1], x.shape[2]  # KP: (N, C, L)

        def _compute_statistics(x, m, axis=2, eps=self.eps):
            mean = (m * x).sum(axis)
            std = paddle.sqrt(
                (m * (x - mean.unsqueeze(axis)).pow(2)).sum(axis).clip(eps))
            return mean, std

        if lengths is None:
            lengths = paddle.ones([x.shape[0]])

        # Make binary mask of shape [N, 1, L]
        mask = length_to_mask(lengths * L, max_len=L)
        mask = mask.unsqueeze(1)

        # Expand the temporal context of the pooling layer by allowing the
        # self-attention to look at global properties of the utterance.
        if self.global_context:
            total = mask.sum(axis=2, keepdim=True).astype('float32')
            mean, std = _compute_statistics(x, mask / total)
            mean = mean.unsqueeze(2).tile((1, 1, L))
            std = std.unsqueeze(2).tile((1, 1, L))
            attn = paddle.concat([x, mean, std], axis=1)
        else:
            attn = x

        # Apply layers
        attn = self.conv(self.tanh(self.tdnn(attn)))

        # Filter out zero-paddings
        attn = paddle.where(
            mask.tile((1, C, 1)) == 0,
            paddle.ones_like(attn) * float("-inf"), attn)

        attn = F.softmax(attn, axis=2)
        mean, std = _compute_statistics(x, attn)

        # Append mean and std of the batch
        pooled_stats = paddle.concat((mean, std), axis=1)
        pooled_stats = pooled_stats.unsqueeze(2)

        return pooled_stats


class SERes2NetBlock(nn.Layer):
    def __init__(
            self,
            in_channels,
            out_channels,
            res2net_scale=8,
            se_channels=128,
            kernel_size=1,
            dilation=1,
            activation=nn.ReLU, ):
        """Implementation of Squeeze-Extraction Res2Blocks in ECAPA-TDNN network model
           The paper is refered "Squeeze-and-Excitation Networks"
           whose url is: https://arxiv.org/pdf/1709.01507.pdf
        Args:
            in_channels (int): input channels or input data dimensions
            out_channels (int): output channels or output data dimensions
            res2net_scale (int, optional): scale in the res2net block. Defaults to 8.
            se_channels (int, optional): embedding dimensions of res2net block. Defaults to 128.
            kernel_size (int, optional): kernel size of 1-d convolution in TDNN block. Defaults to 1.
            dilation (int, optional): dilation of 1-d convolution in TDNN block. Defaults to 1.
            activation (paddle.nn.class, optional): activation function. Defaults to nn.ReLU.
        """
        super().__init__()
        self.out_channels = out_channels
        self.tdnn1 = TDNNBlock(
            in_channels,
            out_channels,
            kernel_size=1,
            dilation=1,
            activation=activation, )
        self.res2net_block = Res2NetBlock(out_channels, out_channels,
                                          res2net_scale, dilation)
        self.tdnn2 = TDNNBlock(
            out_channels,
            out_channels,
            kernel_size=1,
            dilation=1,
            activation=activation, )
        self.se_block = SEBlock(out_channels, se_channels, out_channels)

        self.shortcut = None
        if in_channels != out_channels:
            self.shortcut = Conv1d(
                in_channels=in_channels,
                out_channels=out_channels,
                kernel_size=1, )

    def forward(self, x, lengths=None):
        residual = x
        if self.shortcut:
            residual = self.shortcut(x)

        x = self.tdnn1(x)
        x = self.res2net_block(x)
        x = self.tdnn2(x)
        x = self.se_block(x, lengths)

        return x + residual


class EcapaTdnn(nn.Layer):
    def __init__(
            self,
            input_size,
            lin_neurons=192,
            activation=nn.ReLU,
            channels=[512, 512, 512, 512, 1536],
            kernel_sizes=[5, 3, 3, 3, 1],
            dilations=[1, 2, 3, 4, 1],
            attention_channels=128,
            res2net_scale=8,
            se_channels=128,
            global_context=True, ):
        """Implementation of ECAPA-TDNN backbone model network
           The paper is refered as "ECAPA-TDNN: Emphasized Channel Attention, Propagation and Aggregation in TDNN Based Speaker Verification"
           whose url is: https://arxiv.org/abs/2005.07143
        Args:
            input_size (_type_): input fature dimension
            lin_neurons (int, optional): speaker embedding size. Defaults to 192.
            activation (paddle.nn.class, optional): activation function. Defaults to nn.ReLU.
            channels (list, optional): inter embedding dimension. Defaults to [512, 512, 512, 512, 1536].
            kernel_sizes (list, optional): kernel size of 1-d convolution in TDNN block . Defaults to [5, 3, 3, 3, 1].
            dilations (list, optional): dilations of 1-d convolution in TDNN block. Defaults to [1, 2, 3, 4, 1].
            attention_channels (int, optional): attention dimensions. Defaults to 128.
            res2net_scale (int, optional): scale value in res2net. Defaults to 8.
            se_channels (int, optional): dimensions of squeeze-excitation block. Defaults to 128.
            global_context (bool, optional): global context flag. Defaults to True.
        """
        super().__init__()
        assert len(channels) == len(kernel_sizes)
        assert len(channels) == len(dilations)
        self.channels = channels
        self.blocks = nn.LayerList()
        self.emb_size = lin_neurons

        # The initial TDNN layer
        self.blocks.append(
            TDNNBlock(
                input_size,
                channels[0],
                kernel_sizes[0],
                dilations[0],
                activation, ))

        # SE-Res2Net layers
        for i in range(1, len(channels) - 1):
            self.blocks.append(
                SERes2NetBlock(
                    channels[i - 1],
                    channels[i],
                    res2net_scale=res2net_scale,
                    se_channels=se_channels,
                    kernel_size=kernel_sizes[i],
                    dilation=dilations[i],
                    activation=activation, ))

        # Multi-layer feature aggregation
        self.mfa = TDNNBlock(
            channels[-1],
            channels[-1],
            kernel_sizes[-1],
            dilations[-1],
            activation, )

        # Attentive Statistical Pooling
        self.asp = AttentiveStatisticsPooling(
            channels[-1],
            attention_channels=attention_channels,
            global_context=global_context, )
        self.asp_bn = BatchNorm1d(input_size=channels[-1] * 2)

        # Final linear transformation
        self.fc = Conv1d(
            in_channels=channels[-1] * 2,
            out_channels=self.emb_size,
            kernel_size=1, )

    def forward(self, x, lengths=None):
        """
        Compute embeddings.

        Args:
            x (paddle.Tensor): Input log-fbanks with shape (N, n_mels, T).
            lengths (paddle.Tensor, optional): Length proportions of batch length with shape (N). Defaults to None.

        Returns:
            paddle.Tensor: Output embeddings with shape (N, self.emb_size, 1)
        """
        xl = []
        for layer in self.blocks:
            try:
                x = layer(x, lengths=lengths)
            except TypeError:
                x = layer(x)
            xl.append(x)

        # Multi-layer feature aggregation
        x = paddle.concat(xl[1:], axis=1)
        x = self.mfa(x)

        # Attentive Statistical Pooling
        x = self.asp(x, lengths=lengths)
        x = self.asp_bn(x)

        # Final linear transformation
        x = self.fc(x)

        return x