# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import math import numpy as np from paddle import distributed as dist from paddle.io import BatchSampler from paddle.io import DistributedBatchSampler from paddlespeech.s2t.utils.log import Log __all__ = [ "SortagradDistributedBatchSampler", "SortagradBatchSampler", ] logger = Log(__name__).getlog() def _batch_shuffle(indices, batch_size, epoch, clipped=False): """Put similarly-sized instances into minibatches for better efficiency and make a batch-wise shuffle. 1. Sort the audio clips by duration. 2. Generate a random number `k`, k in [0, batch_size). 3. Randomly shift `k` instances in order to create different batches for different epochs. Create minibatches. 4. Shuffle the minibatches. :param indices: indexes. List of int. :type indices: list :param batch_size: Batch size. This size is also used for generate a random number for batch shuffle. :type batch_size: int :param clipped: Whether to clip the heading (small shift) and trailing (incomplete batch) instances. :type clipped: bool :return: Batch shuffled mainifest. :rtype: list """ rng = np.random.RandomState(epoch) shift_len = rng.randint(0, batch_size - 1) batch_indices = list(zip(* [iter(indices[shift_len:])] * batch_size)) rng.shuffle(batch_indices) batch_indices = [item for batch in batch_indices for item in batch] assert clipped is False if not clipped: res_len = len(indices) - shift_len - len(batch_indices) # when res_len is 0, will return whole list, len(List[-0:]) = len(List[:]) if res_len != 0: batch_indices.extend(indices[-res_len:]) batch_indices.extend(indices[0:shift_len]) assert len(indices) == len( batch_indices ), f"_batch_shuffle: {len(indices)} : {len(batch_indices)} : {res_len} - {shift_len}" return batch_indices class SortagradDistributedBatchSampler(DistributedBatchSampler): def __init__(self, dataset, batch_size, num_replicas=None, rank=None, shuffle=False, drop_last=False, sortagrad=False, shuffle_method="batch_shuffle"): """Sortagrad Sampler for multi gpus. Args: dataset (paddle.io.Dataset): batch_size (int): batch size for one gpu num_replicas (int, optional): world size or numbers of gpus. Defaults to None. rank (int, optional): rank id. Defaults to None. shuffle (bool, optional): True for do shuffle, or else. Defaults to False. drop_last (bool, optional): whether drop last batch which is less than batch size. Defaults to False. sortagrad (bool, optional): True, do sortgrad in first epoch, then shuffle as usual; or else. Defaults to False. shuffle_method (str, optional): shuffle method, "instance_shuffle" or "batch_shuffle". Defaults to "batch_shuffle". """ super().__init__(dataset, batch_size, num_replicas, rank, shuffle, drop_last) self._sortagrad = sortagrad self._shuffle_method = shuffle_method def __iter__(self): num_samples = len(self.dataset) indices = np.arange(num_samples).tolist() indices += indices[:(self.total_size - len(indices))] assert len(indices) == self.total_size # sort (by duration) or batch-wise shuffle the manifest if self.shuffle: if self.epoch == 0 and self._sortagrad: logger.info( f'rank: {dist.get_rank()} dataset sortagrad! epoch {self.epoch}' ) else: logger.info( f'rank: {dist.get_rank()} dataset shuffle! epoch {self.epoch}' ) if self._shuffle_method == "batch_shuffle": # using `batch_size * nrank`, or will cause instability loss and nan or inf grad, # since diff batch examlpe length in batches case instability loss in diff rank, # e.g. rank0 maxlength 20, rank3 maxlength 1000 indices = _batch_shuffle( indices, self.batch_size * self.nranks, self.epoch, clipped=False) elif self._shuffle_method == "instance_shuffle": np.random.RandomState(self.epoch).shuffle(indices) else: raise ValueError("Unknown shuffle method %s." % self._shuffle_method) assert len( indices ) == self.total_size, f"batch shuffle examples error: {len(indices)} : {self.total_size}" # slice `self.batch_size` examples by rank id def _get_indices_by_batch_size(indices): subsampled_indices = [] last_batch_size = self.total_size % (self.batch_size * self.nranks) assert last_batch_size % self.nranks == 0 last_local_batch_size = last_batch_size // self.nranks for i in range(self.local_rank * self.batch_size, len(indices) - last_batch_size, self.batch_size * self.nranks): subsampled_indices.extend(indices[i:i + self.batch_size]) indices = indices[len(indices) - last_batch_size:] subsampled_indices.extend( indices[self.local_rank * last_local_batch_size:( self.local_rank + 1) * last_local_batch_size]) return subsampled_indices if self.nranks > 1: indices = _get_indices_by_batch_size(indices) assert len(indices) == self.num_samples _sample_iter = iter(indices) batch_indices = [] for idx in _sample_iter: batch_indices.append(idx) if len(batch_indices) == self.batch_size: logger.debug( f"rank: {dist.get_rank()} batch index: {batch_indices} ") yield batch_indices batch_indices = [] if not self.drop_last and len(batch_indices) > 0: yield batch_indices def __len__(self): num_samples = self.num_samples num_samples += int(not self.drop_last) * (self.batch_size - 1) return num_samples // self.batch_size class SortagradBatchSampler(BatchSampler): def __init__(self, dataset, batch_size, shuffle=False, drop_last=False, sortagrad=False, shuffle_method="batch_shuffle"): """Sortagrad Sampler for one gpu. Args: dataset (paddle.io.Dataset): batch_size (int): batch size for one gpu shuffle (bool, optional): True for do shuffle, or else. Defaults to False. drop_last (bool, optional): whether drop last batch which is less than batch size. Defaults to False. sortagrad (bool, optional): True, do sortgrad in first epoch, then shuffle as usual; or else. Defaults to False. shuffle_method (str, optional): shuffle method, "instance_shuffle" or "batch_shuffle". Defaults to "batch_shuffle". """ self.dataset = dataset assert isinstance(batch_size, int) and batch_size > 0, \ "batch_size should be a positive integer" self.batch_size = batch_size assert isinstance(shuffle, bool), \ "shuffle should be a boolean value" self.shuffle = shuffle assert isinstance(drop_last, bool), \ "drop_last should be a boolean number" self.drop_last = drop_last self.epoch = 0 self.num_samples = int(math.ceil(len(self.dataset) * 1.0)) self.total_size = self.num_samples self._sortagrad = sortagrad self._shuffle_method = shuffle_method def __iter__(self): num_samples = len(self.dataset) indices = np.arange(num_samples).tolist() indices += indices[:(self.total_size - len(indices))] assert len(indices) == self.total_size # sort (by duration) or batch-wise shuffle the manifest if self.shuffle: if self.epoch == 0 and self._sortagrad: logger.info(f'dataset sortagrad! epoch {self.epoch}') else: logger.info(f'dataset shuffle! epoch {self.epoch}') if self._shuffle_method == "batch_shuffle": indices = _batch_shuffle( indices, self.batch_size, self.epoch, clipped=False) elif self._shuffle_method == "instance_shuffle": np.random.RandomState(self.epoch).shuffle(indices) else: raise ValueError("Unknown shuffle method %s." % self._shuffle_method) assert len( indices ) == self.total_size, f"batch shuffle examples error: {len(indices)} : {self.total_size}" assert len(indices) == self.num_samples _sample_iter = iter(indices) batch_indices = [] for idx in _sample_iter: batch_indices.append(idx) if len(batch_indices) == self.batch_size: logger.debug( f"rank: {dist.get_rank()} batch index: {batch_indices} ") yield batch_indices batch_indices = [] if not self.drop_last and len(batch_indices) > 0: yield batch_indices self.epoch += 1 def __len__(self): num_samples = self.num_samples num_samples += int(not self.drop_last) * (self.batch_size - 1) return num_samples // self.batch_size