You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
PaddleSpeech/paddlespeech/t2s/modules/diffusion.py

468 lines
18 KiB

# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Diffusion denoising related modules for paddle"""
import math
from typing import Callable
from typing import Optional
from typing import Tuple
import paddle
import ppdiffusers
from paddle import nn
from ppdiffusers.models.embeddings import Timesteps
from ppdiffusers.schedulers import DDPMScheduler
from paddlespeech.t2s.modules.nets_utils import initialize
from paddlespeech.t2s.modules.residual_block import WaveNetResidualBlock
class WaveNetDenoiser(nn.Layer):
"""A Mel-Spectrogram Denoiser modified from WaveNet
Args:
in_channels (int, optional):
Number of channels of the input mel-spectrogram, by default 80
out_channels (int, optional):
Number of channels of the output mel-spectrogram, by default 80
kernel_size (int, optional):
Kernel size of the residual blocks inside, by default 3
layers (int, optional):
Number of residual blocks inside, by default 20
stacks (int, optional):
The number of groups to split the residual blocks into, by default 4
Within each group, the dilation of the residual block grows exponentially.
residual_channels (int, optional):
Residual channel of the residual blocks, by default 256
gate_channels (int, optional):
Gate channel of the residual blocks, by default 512
skip_channels (int, optional):
Skip channel of the residual blocks, by default 256
aux_channels (int, optional):
Auxiliary channel of the residual blocks, by default 256
dropout (float, optional):
Dropout of the residual blocks, by default 0.
bias (bool, optional):
Whether to use bias in residual blocks, by default True
use_weight_norm (bool, optional):
Whether to use weight norm in all convolutions, by default False
"""
def __init__(
self,
in_channels: int=80,
out_channels: int=80,
kernel_size: int=3,
layers: int=20,
stacks: int=4,
residual_channels: int=256,
gate_channels: int=512,
skip_channels: int=256,
aux_channels: int=256,
dropout: float=0.,
bias: bool=True,
use_weight_norm: bool=False,
init_type: str="kaiming_uniform", ):
super().__init__()
# initialize parameters
initialize(self, init_type)
self.in_channels = in_channels
self.out_channels = out_channels
self.aux_channels = aux_channels
self.layers = layers
self.stacks = stacks
self.kernel_size = kernel_size
assert layers % stacks == 0
layers_per_stack = layers // stacks
self.first_t_emb = nn.Sequential(
Timesteps(
residual_channels,
flip_sin_to_cos=False,
downscale_freq_shift=1),
nn.Linear(residual_channels, residual_channels * 4),
nn.Mish(), nn.Linear(residual_channels * 4, residual_channels))
self.t_emb_layers = nn.LayerList([
nn.Linear(residual_channels, residual_channels)
for _ in range(layers)
])
self.first_conv = nn.Conv1D(
in_channels, residual_channels, 1, bias_attr=True)
self.first_act = nn.ReLU()
self.conv_layers = nn.LayerList()
for layer in range(layers):
dilation = 2**(layer % layers_per_stack)
conv = WaveNetResidualBlock(
kernel_size=kernel_size,
residual_channels=residual_channels,
gate_channels=gate_channels,
skip_channels=skip_channels,
aux_channels=aux_channels,
dilation=dilation,
dropout=dropout,
bias=bias)
self.conv_layers.append(conv)
self.last_conv_layers = nn.Sequential(nn.ReLU(),
nn.Conv1D(
skip_channels,
skip_channels,
1,
bias_attr=True),
nn.ReLU(),
nn.Conv1D(
skip_channels,
out_channels,
1,
bias_attr=True))
if use_weight_norm:
self.apply_weight_norm()
def forward(self, x, t, c):
"""Denoise mel-spectrogram.
Args:
x(Tensor):
Shape (N, C_in, T), The input mel-spectrogram.
t(Tensor):
Shape (N), The timestep input.
c(Tensor):
Shape (N, C_aux, T'). The auxiliary input (e.g. fastspeech2 encoder output).
Returns:
Tensor: Shape (N, C_out, T), the denoised mel-spectrogram.
"""
assert c.shape[-1] == x.shape[-1]
if t.shape[0] != x.shape[0]:
t = t.tile([x.shape[0]])
t_emb = self.first_t_emb(t)
t_embs = [
t_emb_layer(t_emb)[..., None] for t_emb_layer in self.t_emb_layers
]
x = self.first_conv(x)
x = self.first_act(x)
skips = 0
for f, t in zip(self.conv_layers, t_embs):
x = x + t
x, s = f(x, c)
skips += s
skips *= math.sqrt(1.0 / len(self.conv_layers))
x = self.last_conv_layers(skips)
return x
def apply_weight_norm(self):
"""Recursively apply weight normalization to all the Convolution layers
in the sublayers.
"""
def _apply_weight_norm(layer):
if isinstance(layer, (nn.Conv1D, nn.Conv2D)):
nn.utils.weight_norm(layer)
self.apply(_apply_weight_norm)
def remove_weight_norm(self):
"""Recursively remove weight normalization from all the Convolution
layers in the sublayers.
"""
def _remove_weight_norm(layer):
try:
nn.utils.remove_weight_norm(layer)
except ValueError:
pass
self.apply(_remove_weight_norm)
class GaussianDiffusion(nn.Layer):
"""Common Gaussian Diffusion Denoising Model Module
Args:
denoiser (Layer, optional):
The model used for denoising noises.
In fact, the denoiser model performs the operation
of producing a output with more noises from the noisy input.
Then we use the diffusion algorithm to calculate
the input with the output to get the denoised result.
num_train_timesteps (int, optional):
The number of timesteps between the noise and the real during training, by default 1000.
beta_start (float, optional):
beta start parameter for the scheduler, by default 0.0001.
beta_end (float, optional):
beta end parameter for the scheduler, by default 0.0001.
beta_schedule (str, optional):
beta schedule parameter for the scheduler, by default 'squaredcos_cap_v2' (cosine schedule).
num_max_timesteps (int, optional):
The max timestep transition from real to noise, by default None.
Examples:
>>> import paddle
>>> import paddle.nn.functional as F
>>> from tqdm import tqdm
>>>
>>> denoiser = WaveNetDenoiser()
>>> diffusion = GaussianDiffusion(denoiser, num_train_timesteps=1000, num_max_timesteps=100)
>>> x = paddle.ones([4, 80, 192]) # [B, mel_ch, T] # real mel input
>>> c = paddle.randn([4, 256, 192]) # [B, fs2_encoder_out_ch, T] # fastspeech2 encoder output
>>> loss = F.mse_loss(*diffusion(x, c))
>>> loss.backward()
>>> print('MSE Loss:', loss.item())
MSE Loss: 1.6669728755950928
>>> def create_progress_callback():
>>> pbar = None
>>> def callback(index, timestep, num_timesteps, sample):
>>> nonlocal pbar
>>> if pbar is None:
>>> pbar = tqdm(total=num_timesteps-index)
>>> pbar.update()
>>>
>>> return callback
>>>
>>> # ds=1000, K_step=60, scheduler=ddpm, from aux fs2 mel output
>>> ds = 1000
>>> infer_steps = 1000
>>> K_step = 60
>>> scheduler_type = 'ddpm'
>>> x_in = x
>>> diffusion = GaussianDiffusion(denoiser, num_train_timesteps=ds, num_max_timesteps=K_step)
>>> with paddle.no_grad():
>>> sample = diffusion.inference(
>>> paddle.randn(x.shape), c, x,
>>> num_inference_steps=infer_steps,
>>> scheduler_type=scheduler_type,
>>> callback=create_progress_callback())
100%|| 60/60 [00:03<00:00, 18.36it/s]
>>>
>>> # ds=100, K_step=100, scheduler=ddpm, from gaussian noise
>>> ds = 100
>>> infer_steps = 100
>>> K_step = 100
>>> scheduler_type = 'ddpm'
>>> x_in = None
>>> diffusion = GaussianDiffusion(denoiser, num_train_timesteps=ds, num_max_timesteps=K_step)
>>> with paddle.no_grad():
>>> sample = diffusion.inference(
>>> paddle.randn(x.shape), c, x_in,
>>> num_inference_steps=infer_steps,
>>> scheduler_type=scheduler_type,
>>> callback=create_progress_callback())
100%|| 100/100 [00:05<00:00, 18.29it/s]
>>>
>>> # ds=1000, K_step=1000, scheduler=pndm, infer_step=25, from gaussian noise
>>> ds = 1000
>>> infer_steps = 25
>>> K_step = 1000
>>> scheduler_type = 'pndm'
>>> x_in = None
>>> diffusion = GaussianDiffusion(denoiser, num_train_timesteps=ds, num_max_timesteps=K_step)
>>> with paddle.no_grad():
>>> sample = diffusion.inference(
>>> paddle.randn(x.shape), c, None,
>>> num_inference_steps=infer_steps,
>>> scheduler_type=scheduler_type,
>>> callback=create_progress_callback())
100%|| 25/25 [00:01<00:00, 19.75it/s]
>>>
>>> # ds=1000, K_step=100, scheduler=pndm, infer_step=50, from aux fs2 mel output
>>> ds = 1000
>>> infer_steps = 50
>>> K_step = 100
>>> scheduler_type = 'pndm'
>>> x_in = x
>>> diffusion = GaussianDiffusion(denoiser, num_train_timesteps=ds, num_max_timesteps=K_step)
>>> with paddle.no_grad():
>>> sample = diffusion.inference(
>>> paddle.randn(x.shape), c, x,
>>> num_inference_steps=infer_steps,
>>> scheduler_type=scheduler_type,
>>> callback=create_progress_callback())
100%|| 5/5 [00:00<00:00, 23.80it/s]
"""
def __init__(self,
denoiser: nn.Layer,
num_train_timesteps: Optional[int]=1000,
beta_start: Optional[float]=0.0001,
beta_end: Optional[float]=0.02,
beta_schedule: Optional[str]="squaredcos_cap_v2",
num_max_timesteps: Optional[int]=None):
super().__init__()
self.num_train_timesteps = num_train_timesteps
self.beta_start = beta_start
self.beta_end = beta_end
self.beta_schedule = beta_schedule
self.denoiser = denoiser
self.noise_scheduler = DDPMScheduler(
num_train_timesteps=num_train_timesteps,
beta_start=beta_start,
beta_end=beta_end,
beta_schedule=beta_schedule)
self.num_max_timesteps = num_max_timesteps
def forward(self, x: paddle.Tensor, cond: Optional[paddle.Tensor]=None
) -> Tuple[paddle.Tensor, paddle.Tensor]:
"""Generate random timesteps noised x.
Args:
x (Tensor):
The input for adding noises.
cond (Tensor, optional):
Conditional input for compute noises.
Returns:
y (Tensor):
The output with noises added in.
target (Tensor):
The noises which is added to the input.
"""
noise_scheduler = self.noise_scheduler
# Sample noise that we'll add to the mel-spectrograms
target = noise = paddle.randn(x.shape)
# Sample a random timestep for each mel-spectrogram
num_timesteps = self.num_train_timesteps
if self.num_max_timesteps is not None:
num_timesteps = self.num_max_timesteps
timesteps = paddle.randint(0, num_timesteps, (x.shape[0], ))
# Add noise to the clean mel-spectrograms according to the noise magnitude at each timestep
# (this is the forward diffusion process)
noisy_images = noise_scheduler.add_noise(x, noise, timesteps)
y = self.denoiser(noisy_images, timesteps, cond)
# then compute loss use output y and noisy target for prediction_type == "epsilon"
return y, target
def inference(self,
noise: paddle.Tensor,
cond: Optional[paddle.Tensor]=None,
ref_x: Optional[paddle.Tensor]=None,
num_inference_steps: Optional[int]=1000,
strength: Optional[float]=None,
scheduler_type: Optional[str]="ddpm",
callback: Optional[Callable[[int, int, int, paddle.Tensor],
None]]=None,
callback_steps: Optional[int]=1):
"""Denoising input from noises. Refer to ppdiffusers img2img pipeline.
Args:
noise (Tensor):
The input tensor as a starting point for denoising.
cond (Tensor, optional):
Conditional input for compute noises.
ref_x (Tensor, optional):
The real output for the denoising process to refer.
num_inference_steps (int, optional):
The number of timesteps between the noise and the real during inference, by default 1000.
strength (float, optional):
Mixing strength of ref_x with noise. The larger the value, the stronger the noise.
Range [0,1], by default None.
scheduler_type (str, optional):
Noise scheduler for generate noises.
Choose a great scheduler can skip many denoising step, by default 'ddpm'.
callback (Callable[[int,int,int,Tensor], None], optional):
Callback function during denoising steps.
Args:
index (int):
Current denoising index.
timestep (int):
Current denoising timestep.
num_timesteps (int):
Number of the denoising timesteps.
denoised_output (Tensor):
Current intermediate result produced during denoising.
callback_steps (int, optional):
The step to call the callback function.
Returns:
denoised_output (Tensor):
The denoised output tensor.
"""
scheduler_cls = None
for clsname in dir(ppdiffusers.schedulers):
if clsname.lower() == scheduler_type + "scheduler":
scheduler_cls = getattr(ppdiffusers.schedulers, clsname)
break
if scheduler_cls is None:
raise ValueError(f"No such scheduler type named {scheduler_type}")
scheduler = scheduler_cls(
num_train_timesteps=self.num_train_timesteps,
beta_start=self.beta_start,
beta_end=self.beta_end,
beta_schedule=self.beta_schedule)
# set timesteps
scheduler.set_timesteps(num_inference_steps)
# prepare first noise variables
noisy_input = noise
timesteps = scheduler.timesteps
if ref_x is not None:
init_timestep = None
if strength is None or strength < 0. or strength > 1.:
strength = None
if self.num_max_timesteps is not None:
strength = self.num_max_timesteps / self.num_train_timesteps
if strength is not None:
# get the original timestep using init_timestep
init_timestep = min(
int(num_inference_steps * strength), num_inference_steps)
t_start = max(num_inference_steps - init_timestep, 0)
timesteps = scheduler.timesteps[t_start:]
num_inference_steps = num_inference_steps - t_start
noisy_input = scheduler.add_noise(
ref_x, noise, timesteps[:1].tile([noise.shape[0]]))
# denoising loop
denoised_output = noisy_input
num_warmup_steps = len(
timesteps) - num_inference_steps * scheduler.order
for i, t in enumerate(timesteps):
denoised_output = scheduler.scale_model_input(denoised_output, t)
# predict the noise residual
noise_pred = self.denoiser(denoised_output, t, cond)
# compute the previous noisy sample x_t -> x_t-1
denoised_output = scheduler.step(noise_pred, t,
denoised_output).prev_sample
# call the callback, if provided
if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and
(i + 1) % scheduler.order == 0):
if callback is not None and i % callback_steps == 0:
callback(i, t, len(timesteps), denoised_output)
return denoised_output