parent
afa9466c89
commit
9e7dca2bc5
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,5 @@
|
||||
soundfile
|
||||
librosa
|
||||
scipy
|
||||
rich
|
||||
flatten_dict
|
@ -0,0 +1,240 @@
|
||||
import inspect
|
||||
from typing import Optional, Sequence
|
||||
import paddle
|
||||
import paddle.nn.functional as F
|
||||
import math
|
||||
|
||||
|
||||
def simple_repr(
|
||||
obj, attrs: Optional[Sequence[str]] = None, overrides: dict = {}
|
||||
):
|
||||
"""
|
||||
Return a simple representation string for `obj`.
|
||||
If `attrs` is not None, it should be a list of attributes to include.
|
||||
"""
|
||||
params = inspect.signature(obj.__class__).parameters
|
||||
attrs_repr = []
|
||||
if attrs is None:
|
||||
attrs = list(params.keys())
|
||||
for attr in attrs:
|
||||
display = False
|
||||
if attr in overrides:
|
||||
value = overrides[attr]
|
||||
elif hasattr(obj, attr):
|
||||
value = getattr(obj, attr)
|
||||
else:
|
||||
continue
|
||||
if attr in params:
|
||||
param = params[attr]
|
||||
if param.default is inspect._empty or value != param.default: # type: ignore
|
||||
display = True
|
||||
else:
|
||||
display = True
|
||||
|
||||
if display:
|
||||
attrs_repr.append(f"{attr}={value}")
|
||||
return f"{obj.__class__.__name__}({','.join(attrs_repr)})"
|
||||
|
||||
|
||||
def sinc(x: paddle.Tensor):
|
||||
"""
|
||||
Implementation of sinc, i.e. sin(x) / x
|
||||
|
||||
__Warning__: the input is not multiplied by `pi`!
|
||||
"""
|
||||
return paddle.where(
|
||||
x == 0,
|
||||
paddle.to_tensor(1.0, dtype=x.dtype, place=x.place),
|
||||
paddle.sin(x) / x,
|
||||
)
|
||||
|
||||
|
||||
class ResampleFrac(paddle.nn.Layer):
|
||||
"""
|
||||
Resampling from the sample rate `old_sr` to `new_sr`.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, old_sr: int, new_sr: int, zeros: int = 24, rolloff: float = 0.945
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
old_sr (int): sample rate of the input signal x.
|
||||
new_sr (int): sample rate of the output.
|
||||
zeros (int): number of zero crossing to keep in the sinc filter.
|
||||
rolloff (float): use a lowpass filter that is `rolloff * new_sr / 2`,
|
||||
to ensure sufficient margin due to the imperfection of the FIR filter used.
|
||||
Lowering this value will reduce anti-aliasing, but will reduce some of the
|
||||
highest frequencies.
|
||||
|
||||
Shape:
|
||||
|
||||
- Input: `[*, T]`
|
||||
- Output: `[*, T']` with `T' = int(new_sr * T / old_sr)`
|
||||
|
||||
|
||||
.. caution::
|
||||
After dividing `old_sr` and `new_sr` by their GCD, both should be small
|
||||
for this implementation to be fast.
|
||||
|
||||
>>> import paddle
|
||||
>>> resample = ResampleFrac(4, 5)
|
||||
>>> x = paddle.randn([1000])
|
||||
>>> print(len(resample(x)))
|
||||
1250
|
||||
"""
|
||||
super(ResampleFrac, self).__init__()
|
||||
if not isinstance(old_sr, int) or not isinstance(new_sr, int):
|
||||
raise ValueError("old_sr and new_sr should be integers")
|
||||
gcd = math.gcd(old_sr, new_sr)
|
||||
self.old_sr = old_sr // gcd
|
||||
self.new_sr = new_sr // gcd
|
||||
self.zeros = zeros
|
||||
self.rolloff = rolloff
|
||||
|
||||
self._init_kernels()
|
||||
|
||||
def _init_kernels(self):
|
||||
if self.old_sr == self.new_sr:
|
||||
return
|
||||
|
||||
kernels = []
|
||||
sr = min(self.new_sr, self.old_sr)
|
||||
# rolloff will perform antialiasing filtering by removing the highest frequencies.
|
||||
# At first I thought I only needed this when downsampling, but when upsampling
|
||||
# you will get edge artifacts without this, the edge is equivalent to zero padding,
|
||||
# which will add high freq artifacts.
|
||||
sr *= self.rolloff
|
||||
|
||||
# The key idea of the algorithm is that x(t) can be exactly reconstructed from x[i] (tensor)
|
||||
# using the sinc interpolation formula:
|
||||
# x(t) = sum_i x[i] sinc(pi * old_sr * (i / old_sr - t))
|
||||
# We can then sample the function x(t) with a different sample rate:
|
||||
# y[j] = x(j / new_sr)
|
||||
# or,
|
||||
# y[j] = sum_i x[i] sinc(pi * old_sr * (i / old_sr - j / new_sr))
|
||||
|
||||
# We see here that y[j] is the convolution of x[i] with a specific filter, for which
|
||||
# we take an FIR approximation, stopping when we see at least `zeros` zeros crossing.
|
||||
# But y[j+1] is going to have a different set of weights and so on, until y[j + new_sr].
|
||||
# Indeed:
|
||||
# y[j + new_sr] = sum_i x[i] sinc(pi * old_sr * ((i / old_sr - (j + new_sr) / new_sr))
|
||||
# = sum_i x[i] sinc(pi * old_sr * ((i - old_sr) / old_sr - j / new_sr))
|
||||
# = sum_i x[i + old_sr] sinc(pi * old_sr * (i / old_sr - j / new_sr))
|
||||
# so y[j+new_sr] uses the same filter as y[j], but on a shifted version of x by `old_sr`.
|
||||
# This will explain the F.conv1d after, with a stride of old_sr.
|
||||
self._width = math.ceil(self.zeros * self.old_sr / sr)
|
||||
# If old_sr is still big after GCD reduction, most filters will be very unbalanced, i.e.,
|
||||
# they will have a lot of almost zero values to the left or to the right...
|
||||
# There is probably a way to evaluate those filters more efficiently, but this is kept for
|
||||
# future work.
|
||||
idx = paddle.arange(
|
||||
-self._width, self._width + self.old_sr, dtype="float32"
|
||||
)
|
||||
for i in range(self.new_sr):
|
||||
t = (-i / self.new_sr + idx / self.old_sr) * sr
|
||||
t = paddle.clip(t, -self.zeros, self.zeros)
|
||||
t *= math.pi
|
||||
window = paddle.cos(t / self.zeros / 2) ** 2
|
||||
kernel = sinc(t) * window
|
||||
# Renormalize kernel to ensure a constant signal is preserved.
|
||||
kernel = kernel / kernel.sum()
|
||||
kernels.append(kernel)
|
||||
|
||||
_kernel = paddle.stack(kernels).reshape([self.new_sr, 1, -1])
|
||||
self.kernel = self.create_parameter(
|
||||
shape=_kernel.shape,
|
||||
dtype=_kernel.dtype,
|
||||
)
|
||||
self.kernel.set_value(_kernel)
|
||||
|
||||
def forward(
|
||||
self,
|
||||
x: paddle.Tensor,
|
||||
output_length: Optional[int] = None,
|
||||
full: bool = False,
|
||||
):
|
||||
"""
|
||||
Resample x.
|
||||
Args:
|
||||
x (Tensor): signal to resample, time should be the last dimension
|
||||
output_length (None or int): This can be set to the desired output length
|
||||
(last dimension). Allowed values are between 0 and
|
||||
ceil(length * new_sr / old_sr). When None (default) is specified, the
|
||||
floored output length will be used. In order to select the largest possible
|
||||
size, use the `full` argument.
|
||||
full (bool): return the longest possible output from the input. This can be useful
|
||||
if you chain resampling operations, and want to give the `output_length` only
|
||||
for the last one, while passing `full=True` to all the other ones.
|
||||
"""
|
||||
if self.old_sr == self.new_sr:
|
||||
return x
|
||||
shape = x.shape
|
||||
length = x.shape[-1]
|
||||
x = x.reshape([-1, length])
|
||||
x = F.pad(
|
||||
x.unsqueeze(1),
|
||||
[self._width, self._width + self.old_sr],
|
||||
mode="replicate",
|
||||
data_format="NCL",
|
||||
)
|
||||
ys = F.conv1d(x, self.kernel, stride=self.old_sr, data_format="NCL")
|
||||
y = ys.transpose([0, 2, 1]).reshape(list(shape[:-1]) + [-1])
|
||||
|
||||
float_output_length = paddle.to_tensor(
|
||||
self.new_sr * length / self.old_sr, dtype="float32"
|
||||
)
|
||||
max_output_length = paddle.ceil(float_output_length).astype("int64")
|
||||
default_output_length = paddle.floor(float_output_length).astype(
|
||||
"int64"
|
||||
)
|
||||
|
||||
if output_length is None:
|
||||
applied_output_length = (
|
||||
max_output_length if full else default_output_length
|
||||
)
|
||||
elif output_length < 0 or output_length > max_output_length:
|
||||
raise ValueError(
|
||||
f"output_length must be between 0 and {max_output_length.numpy()}"
|
||||
)
|
||||
else:
|
||||
applied_output_length = paddle.to_tensor(
|
||||
output_length, dtype="int64"
|
||||
)
|
||||
if full:
|
||||
raise ValueError(
|
||||
"You cannot pass both full=True and output_length"
|
||||
)
|
||||
return y[..., :applied_output_length]
|
||||
|
||||
def __repr__(self):
|
||||
return simple_repr(self)
|
||||
|
||||
|
||||
def resample_frac(
|
||||
x: paddle.Tensor,
|
||||
old_sr: int,
|
||||
new_sr: int,
|
||||
zeros: int = 24,
|
||||
rolloff: float = 0.945,
|
||||
output_length: Optional[int] = None,
|
||||
full: bool = False,
|
||||
):
|
||||
"""
|
||||
Functional version of `ResampleFrac`, refer to its documentation for more information.
|
||||
|
||||
..warning::
|
||||
If you call repeatidly this functions with the same sample rates, then the
|
||||
resampling kernel will be recomputed everytime. For best performance, you should use
|
||||
and cache an instance of `ResampleFrac`.
|
||||
"""
|
||||
return ResampleFrac(old_sr, new_sr, zeros, rolloff)(
|
||||
x, output_length, full
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
resample = ResampleFrac(4, 5)
|
||||
x = paddle.randn([1000])
|
||||
print(len(resample(x)))
|
@ -0,0 +1,669 @@
|
||||
import csv
|
||||
import glob
|
||||
import math
|
||||
import numbers
|
||||
import os
|
||||
import random
|
||||
import typing
|
||||
import soundfile
|
||||
from contextlib import contextmanager
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Dict, Optional, List
|
||||
|
||||
import numpy as np
|
||||
import paddle
|
||||
from flatten_dict import flatten
|
||||
from flatten_dict import unflatten
|
||||
|
||||
|
||||
@dataclass
|
||||
class Info:
|
||||
|
||||
sample_rate: float
|
||||
num_frames: int
|
||||
|
||||
@property
|
||||
def duration(self) -> float:
|
||||
return self.num_frames / self.sample_rate
|
||||
|
||||
|
||||
def info(audio_path: str):
|
||||
"""✅
|
||||
|
||||
Parameters
|
||||
----------
|
||||
audio_path : str
|
||||
Path to audio file.
|
||||
"""
|
||||
info = soundfile.info(str(audio_path))
|
||||
info = Info(sample_rate=info.samplerate, num_frames=info.frames)
|
||||
|
||||
return info
|
||||
|
||||
|
||||
def ensure_tensor(
|
||||
x: typing.Union[np.ndarray, paddle.Tensor, float, int],
|
||||
ndim: int = None,
|
||||
batch_size: int = None,
|
||||
):
|
||||
"""✅Ensures that the input ``x`` is a tensor of specified
|
||||
dimensions and batch size.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
x : typing.Union[np.ndarray, paddle.Tensor, float, int]
|
||||
Data that will become a tensor on its way out.
|
||||
ndim : int, optional
|
||||
How many dimensions should be in the output, by default None
|
||||
batch_size : int, optional
|
||||
The batch size of the output, by default None
|
||||
|
||||
Returns
|
||||
-------
|
||||
paddle.Tensor
|
||||
Modified version of ``x`` as a tensor.
|
||||
"""
|
||||
if not paddle.is_tensor(x):
|
||||
x = paddle.to_tensor(x)
|
||||
if ndim is not None:
|
||||
assert x.ndim <= ndim
|
||||
while x.ndim < ndim:
|
||||
x = x.unsqueeze(-1)
|
||||
if batch_size is not None:
|
||||
if x.shape[0] != batch_size:
|
||||
shape = list(x.shape)
|
||||
shape[0] = batch_size
|
||||
x = paddle.expand(x, shape)
|
||||
return x
|
||||
|
||||
|
||||
def _get_value(other):
|
||||
# ✅
|
||||
# from . import AudioSignal
|
||||
from audio_signal import AudioSignal
|
||||
|
||||
if isinstance(other, AudioSignal):
|
||||
return other.audio_data
|
||||
return other
|
||||
|
||||
|
||||
def hz_to_bin(hz: paddle.Tensor, n_fft: int, sample_rate: int):
|
||||
"""Closest frequency bin given a frequency, number
|
||||
of bins, and a sampling rate.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
hz : paddle.Tensor
|
||||
Tensor of frequencies in Hz.
|
||||
n_fft : int
|
||||
Number of FFT bins.
|
||||
sample_rate : int
|
||||
Sample rate of audio.
|
||||
|
||||
Returns
|
||||
-------
|
||||
paddle.Tensor
|
||||
Closest bins to the data.
|
||||
"""
|
||||
shape = hz.shape
|
||||
hz = hz.flatten()
|
||||
freqs = paddle.linspace(0, sample_rate / 2, 2 + n_fft // 2)
|
||||
hz[hz > sample_rate / 2] = sample_rate / 2
|
||||
|
||||
closest = (hz[None, :] - freqs[:, None]).abs()
|
||||
closest_bins = closest.min(dim=0).indices
|
||||
|
||||
return closest_bins.reshape(*shape)
|
||||
|
||||
|
||||
def random_state(seed: typing.Union[int, np.random.RandomState]):
|
||||
"""✅
|
||||
Turn seed into a np.random.RandomState instance.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
seed : typing.Union[int, np.random.RandomState] or None
|
||||
If seed is None, return the RandomState singleton used by np.random.
|
||||
If seed is an int, return a new RandomState instance seeded with seed.
|
||||
If seed is already a RandomState instance, return it.
|
||||
Otherwise raise ValueError.
|
||||
|
||||
Returns
|
||||
-------
|
||||
np.random.RandomState
|
||||
Random state object.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If seed is not valid, an error is thrown.
|
||||
"""
|
||||
if seed is None or seed is np.random:
|
||||
return np.random.mtrand._rand
|
||||
elif isinstance(seed, (numbers.Integral, np.integer, int)):
|
||||
return np.random.RandomState(seed)
|
||||
elif isinstance(seed, np.random.RandomState):
|
||||
return seed
|
||||
else:
|
||||
raise ValueError(
|
||||
"%r cannot be used to seed a numpy.random.RandomState"
|
||||
" instance" % seed
|
||||
)
|
||||
|
||||
|
||||
def seed(random_seed, set_cudnn=False):
|
||||
"""
|
||||
Seeds all random states with the same random seed
|
||||
for reproducibility. Seeds ``numpy``, ``random`` and ``paddle``
|
||||
random generators.
|
||||
For full reproducibility, two further options must be set
|
||||
according to the paddle documentation:
|
||||
https://pypaddle.org/docs/stable/notes/randomness.html
|
||||
To do this, ``set_cudnn`` must be True. It defaults to
|
||||
False, since setting it to True results in a performance
|
||||
hit.
|
||||
|
||||
Args:
|
||||
random_seed (int): integer corresponding to random seed to
|
||||
use.
|
||||
set_cudnn (bool): Whether or not to set cudnn into determinstic
|
||||
mode and off of benchmark mode. Defaults to False.
|
||||
"""
|
||||
|
||||
paddle.manual_seed(random_seed)
|
||||
np.random.seed(random_seed)
|
||||
random.seed(random_seed)
|
||||
|
||||
if set_cudnn:
|
||||
paddle.backends.cudnn.deterministic = True
|
||||
paddle.backends.cudnn.benchmark = False
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _close_temp_files(tmpfiles: list):
|
||||
"""Utility function for creating a context and closing all temporary files
|
||||
once the context is exited. For correct functionality, all temporary file
|
||||
handles created inside the context must be appended to the ```tmpfiles```
|
||||
list.
|
||||
|
||||
This function is taken wholesale from Scaper.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
tmpfiles : list
|
||||
List of temporary file handles
|
||||
"""
|
||||
|
||||
def _close():
|
||||
for t in tmpfiles:
|
||||
try:
|
||||
t.close()
|
||||
os.unlink(t.name)
|
||||
except:
|
||||
pass
|
||||
|
||||
try:
|
||||
yield
|
||||
except: # pragma: no cover
|
||||
_close()
|
||||
raise
|
||||
_close()
|
||||
|
||||
|
||||
AUDIO_EXTENSIONS = [".wav", ".flac", ".mp3", ".mp4"]
|
||||
|
||||
|
||||
def find_audio(folder: str, ext: List[str] = AUDIO_EXTENSIONS):
|
||||
"""Finds all audio files in a directory recursively.
|
||||
Returns a list.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
folder : str
|
||||
Folder to look for audio files in, recursively.
|
||||
ext : List[str], optional
|
||||
Extensions to look for without the ., by default
|
||||
``['.wav', '.flac', '.mp3', '.mp4']``.
|
||||
"""
|
||||
folder = Path(folder)
|
||||
# Take care of case where user has passed in an audio file directly
|
||||
# into one of the calling functions.
|
||||
if str(folder).endswith(tuple(ext)):
|
||||
# if, however, there's a glob in the path, we need to
|
||||
# return the glob, not the file.
|
||||
if "*" in str(folder):
|
||||
return glob.glob(str(folder), recursive=("**" in str(folder)))
|
||||
else:
|
||||
return [folder]
|
||||
|
||||
files = []
|
||||
for x in ext:
|
||||
files += folder.glob(f"**/*{x}")
|
||||
return files
|
||||
|
||||
|
||||
def read_sources(
|
||||
sources: List[str],
|
||||
remove_empty: bool = True,
|
||||
relative_path: str = "",
|
||||
ext: List[str] = AUDIO_EXTENSIONS,
|
||||
):
|
||||
"""Reads audio sources that can either be folders
|
||||
full of audio files, or CSV files that contain paths
|
||||
to audio files. CSV files that adhere to the expected
|
||||
format can be generated by
|
||||
:py:func:`audiotools.data.preprocess.create_csv`.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
sources : List[str]
|
||||
List of audio sources to be converted into a
|
||||
list of lists of audio files.
|
||||
remove_empty : bool, optional
|
||||
Whether or not to remove rows with an empty "path"
|
||||
from each CSV file, by default True.
|
||||
|
||||
Returns
|
||||
-------
|
||||
list
|
||||
List of lists of rows of CSV files.
|
||||
"""
|
||||
files = []
|
||||
relative_path = Path(relative_path)
|
||||
for source in sources:
|
||||
source = str(source)
|
||||
_files = []
|
||||
if source.endswith(".csv"):
|
||||
with open(source, "r") as f:
|
||||
reader = csv.DictReader(f)
|
||||
for x in reader:
|
||||
if remove_empty and x["path"] == "":
|
||||
continue
|
||||
if x["path"] != "":
|
||||
x["path"] = str(relative_path / x["path"])
|
||||
_files.append(x)
|
||||
else:
|
||||
for x in find_audio(source, ext=ext):
|
||||
x = str(relative_path / x)
|
||||
_files.append({"path": x})
|
||||
files.append(sorted(_files, key=lambda x: x["path"]))
|
||||
return files
|
||||
|
||||
|
||||
def choose_from_list_of_lists(
|
||||
state: np.random.RandomState, list_of_lists: list, p: float = None
|
||||
):
|
||||
"""Choose a single item from a list of lists.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
state : np.random.RandomState
|
||||
Random state to use when choosing an item.
|
||||
list_of_lists : list
|
||||
A list of lists from which items will be drawn.
|
||||
p : float, optional
|
||||
Probabilities of each list, by default None
|
||||
|
||||
Returns
|
||||
-------
|
||||
typing.Any
|
||||
An item from the list of lists.
|
||||
"""
|
||||
source_idx = state.choice(list(range(len(list_of_lists))), p=p)
|
||||
item_idx = state.randint(len(list_of_lists[source_idx]))
|
||||
return list_of_lists[source_idx][item_idx], source_idx, item_idx
|
||||
|
||||
|
||||
@contextmanager
|
||||
def chdir(newdir: typing.Union[Path, str]):
|
||||
"""✅
|
||||
Context manager for switching directories to run a
|
||||
function. Useful for when you want to use relative
|
||||
paths to different runs.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
newdir : typing.Union[Path, str]
|
||||
Directory to switch to.
|
||||
"""
|
||||
curdir = os.getcwd()
|
||||
try:
|
||||
os.chdir(newdir)
|
||||
yield
|
||||
finally:
|
||||
os.chdir(curdir)
|
||||
|
||||
|
||||
def prepare_batch(
|
||||
batch: typing.Union[dict, list, paddle.Tensor], device: str = "cpu"
|
||||
):
|
||||
"""Moves items in a batch (typically generated by a DataLoader as a list
|
||||
or a dict) to the specified device. This works even if dictionaries
|
||||
are nested.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
batch : typing.Union[dict, list, paddle.Tensor]
|
||||
Batch, typically generated by a dataloader, that will be moved to
|
||||
the device.
|
||||
device : str, optional
|
||||
Device to move batch to, by default "cpu"
|
||||
|
||||
Returns
|
||||
-------
|
||||
typing.Union[dict, list, paddle.Tensor]
|
||||
Batch with all values moved to the specified device.
|
||||
"""
|
||||
if isinstance(batch, dict):
|
||||
batch = flatten(batch)
|
||||
for key, val in batch.items():
|
||||
try:
|
||||
batch[key] = val.to(device)
|
||||
except:
|
||||
pass
|
||||
batch = unflatten(batch)
|
||||
elif paddle.is_tensor(batch):
|
||||
batch = batch.to(device)
|
||||
elif isinstance(batch, list):
|
||||
for i in range(len(batch)):
|
||||
try:
|
||||
batch[i] = batch[i].to(device)
|
||||
except:
|
||||
pass
|
||||
return batch
|
||||
|
||||
|
||||
def sample_from_dist(dist_tuple: tuple, state: np.random.RandomState = None):
|
||||
"""Samples from a distribution defined by a tuple. The first
|
||||
item in the tuple is the distribution type, and the rest of the
|
||||
items are arguments to that distribution. The distribution function
|
||||
is gotten from the ``np.random.RandomState`` object.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
dist_tuple : tuple
|
||||
Distribution tuple
|
||||
state : np.random.RandomState, optional
|
||||
Random state, or seed to use, by default None
|
||||
|
||||
Returns
|
||||
-------
|
||||
typing.Union[float, int, str]
|
||||
Draw from the distribution.
|
||||
|
||||
Examples
|
||||
--------
|
||||
Sample from a uniform distribution:
|
||||
|
||||
>>> dist_tuple = ("uniform", 0, 1)
|
||||
>>> sample_from_dist(dist_tuple)
|
||||
|
||||
Sample from a constant distribution:
|
||||
|
||||
>>> dist_tuple = ("const", 0)
|
||||
>>> sample_from_dist(dist_tuple)
|
||||
|
||||
Sample from a normal distribution:
|
||||
|
||||
>>> dist_tuple = ("normal", 0, 0.5)
|
||||
>>> sample_from_dist(dist_tuple)
|
||||
|
||||
"""
|
||||
if dist_tuple[0] == "const":
|
||||
return dist_tuple[1]
|
||||
state = random_state(state)
|
||||
dist_fn = getattr(state, dist_tuple[0])
|
||||
return dist_fn(*dist_tuple[1:])
|
||||
|
||||
|
||||
def collate(list_of_dicts: list, n_splits: int = None):
|
||||
"""Collates a list of dictionaries (e.g. as returned by a
|
||||
dataloader) into a dictionary with batched values. This routine
|
||||
uses the default paddle collate function for everything
|
||||
except AudioSignal objects, which are handled by the
|
||||
:py:func:`audiotools.core.audio_signal.AudioSignal.batch`
|
||||
function.
|
||||
|
||||
This function takes n_splits to enable splitting a batch
|
||||
into multiple sub-batches for the purposes of gradient accumulation,
|
||||
etc.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
list_of_dicts : list
|
||||
List of dictionaries to be collated.
|
||||
n_splits : int
|
||||
Number of splits to make when creating the batches (split into
|
||||
sub-batches). Useful for things like gradient accumulation.
|
||||
|
||||
Returns
|
||||
-------
|
||||
dict
|
||||
Dictionary containing batched data.
|
||||
"""
|
||||
|
||||
from . import AudioSignal
|
||||
|
||||
batches = []
|
||||
list_len = len(list_of_dicts)
|
||||
|
||||
return_list = False if n_splits is None else True
|
||||
n_splits = 1 if n_splits is None else n_splits
|
||||
n_items = int(math.ceil(list_len / n_splits))
|
||||
|
||||
for i in range(0, list_len, n_items):
|
||||
# Flatten the dictionaries to avoid recursion.
|
||||
list_of_dicts_ = [flatten(d) for d in list_of_dicts[i : i + n_items]]
|
||||
dict_of_lists = {
|
||||
k: [dic[k] for dic in list_of_dicts_] for k in list_of_dicts_[0]
|
||||
}
|
||||
|
||||
batch = {}
|
||||
for k, v in dict_of_lists.items():
|
||||
if isinstance(v, list):
|
||||
if all(isinstance(s, AudioSignal) for s in v):
|
||||
batch[k] = AudioSignal.batch(v, pad_signals=True)
|
||||
else:
|
||||
# Borrow the default collate fn from paddle.
|
||||
batch[k] = paddle.utils.data._utils.collate.default_collate(
|
||||
v
|
||||
)
|
||||
batches.append(unflatten(batch))
|
||||
|
||||
batches = batches[0] if not return_list else batches
|
||||
return batches
|
||||
|
||||
|
||||
BASE_SIZE = 864
|
||||
DEFAULT_FIG_SIZE = (9, 3)
|
||||
|
||||
|
||||
def format_figure(
|
||||
fig_size: tuple = None,
|
||||
title: str = None,
|
||||
fig=None,
|
||||
format_axes: bool = True,
|
||||
format: bool = True,
|
||||
font_color: str = "white",
|
||||
):
|
||||
"""Prettifies the spectrogram and waveform plots. A title
|
||||
can be inset into the top right corner, and the axes can be
|
||||
inset into the figure, allowing the data to take up the entire
|
||||
image. Used in
|
||||
|
||||
- :py:func:`audiotools.core.display.DisplayMixin.specshow`
|
||||
- :py:func:`audiotools.core.display.DisplayMixin.waveplot`
|
||||
- :py:func:`audiotools.core.display.DisplayMixin.wavespec`
|
||||
|
||||
Parameters
|
||||
----------
|
||||
fig_size : tuple, optional
|
||||
Size of figure, by default (9, 3)
|
||||
title : str, optional
|
||||
Title to inset in top right, by default None
|
||||
fig : matplotlib.figure.Figure, optional
|
||||
Figure object, if None ``plt.gcf()`` will be used, by default None
|
||||
format_axes : bool, optional
|
||||
Format the axes to be inside the figure, by default True
|
||||
format : bool, optional
|
||||
This formatting can be skipped entirely by passing ``format=False``
|
||||
to any of the plotting functions that use this formater, by default True
|
||||
font_color : str, optional
|
||||
Color of font of axes, by default "white"
|
||||
"""
|
||||
import matplotlib
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
if fig_size is None:
|
||||
fig_size = DEFAULT_FIG_SIZE
|
||||
if not format:
|
||||
return
|
||||
if fig is None:
|
||||
fig = plt.gcf()
|
||||
fig.set_size_inches(*fig_size)
|
||||
axs = fig.axes
|
||||
|
||||
pixels = (fig.get_size_inches() * fig.dpi)[0]
|
||||
font_scale = pixels / BASE_SIZE
|
||||
|
||||
if format_axes:
|
||||
axs = fig.axes
|
||||
|
||||
for ax in axs:
|
||||
ymin, _ = ax.get_ylim()
|
||||
xmin, _ = ax.get_xlim()
|
||||
|
||||
ticks = ax.get_yticks()
|
||||
for t in ticks[2:-1]:
|
||||
t = axs[0].annotate(
|
||||
f"{(t / 1000):2.1f}k",
|
||||
xy=(xmin, t),
|
||||
xycoords="data",
|
||||
xytext=(5, -5),
|
||||
textcoords="offset points",
|
||||
ha="left",
|
||||
va="top",
|
||||
color=font_color,
|
||||
fontsize=12 * font_scale,
|
||||
alpha=0.75,
|
||||
)
|
||||
|
||||
ticks = ax.get_xticks()[2:]
|
||||
for t in ticks[:-1]:
|
||||
t = axs[0].annotate(
|
||||
f"{t:2.1f}s",
|
||||
xy=(t, ymin),
|
||||
xycoords="data",
|
||||
xytext=(5, 5),
|
||||
textcoords="offset points",
|
||||
ha="center",
|
||||
va="bottom",
|
||||
color=font_color,
|
||||
fontsize=12 * font_scale,
|
||||
alpha=0.75,
|
||||
)
|
||||
|
||||
ax.margins(0, 0)
|
||||
ax.set_axis_off()
|
||||
ax.xaxis.set_major_locator(plt.NullLocator())
|
||||
ax.yaxis.set_major_locator(plt.NullLocator())
|
||||
|
||||
plt.subplots_adjust(
|
||||
top=1, bottom=0, right=1, left=0, hspace=0, wspace=0
|
||||
)
|
||||
|
||||
if title is not None:
|
||||
t = axs[0].annotate(
|
||||
title,
|
||||
xy=(1, 1),
|
||||
xycoords="axes fraction",
|
||||
fontsize=20 * font_scale,
|
||||
xytext=(-5, -5),
|
||||
textcoords="offset points",
|
||||
ha="right",
|
||||
va="top",
|
||||
color="white",
|
||||
)
|
||||
t.set_bbox(dict(facecolor="black", alpha=0.5, edgecolor="black"))
|
||||
|
||||
|
||||
def generate_chord_dataset(
|
||||
max_voices: int = 8,
|
||||
sample_rate: int = 44100,
|
||||
num_items: int = 5,
|
||||
duration: float = 1.0,
|
||||
min_note: str = "C2",
|
||||
max_note: str = "C6",
|
||||
output_dir: Path = "chords",
|
||||
):
|
||||
"""
|
||||
Generates a toy multitrack dataset of chords, synthesized from sine waves.
|
||||
|
||||
|
||||
Parameters
|
||||
----------
|
||||
max_voices : int, optional
|
||||
Maximum number of voices in a chord, by default 8
|
||||
sample_rate : int, optional
|
||||
Sample rate of audio, by default 44100
|
||||
num_items : int, optional
|
||||
Number of items to generate, by default 5
|
||||
duration : float, optional
|
||||
Duration of each item, by default 1.0
|
||||
min_note : str, optional
|
||||
Minimum note in the dataset, by default "C2"
|
||||
max_note : str, optional
|
||||
Maximum note in the dataset, by default "C6"
|
||||
output_dir : Path, optional
|
||||
Directory to save the dataset, by default "chords"
|
||||
|
||||
"""
|
||||
import librosa
|
||||
from . import AudioSignal
|
||||
from ..data.preprocess import create_csv
|
||||
|
||||
min_midi = librosa.note_to_midi(min_note)
|
||||
max_midi = librosa.note_to_midi(max_note)
|
||||
|
||||
tracks = []
|
||||
for idx in range(num_items):
|
||||
track = {}
|
||||
# figure out how many voices to put in this track
|
||||
num_voices = random.randint(1, max_voices)
|
||||
for voice_idx in range(num_voices):
|
||||
# choose some random params
|
||||
midinote = random.randint(min_midi, max_midi)
|
||||
dur = random.uniform(0.85 * duration, duration)
|
||||
|
||||
sig = AudioSignal.wave(
|
||||
frequency=librosa.midi_to_hz(midinote),
|
||||
duration=dur,
|
||||
sample_rate=sample_rate,
|
||||
shape="sine",
|
||||
)
|
||||
track[f"voice_{voice_idx}"] = sig
|
||||
tracks.append(track)
|
||||
|
||||
# save the tracks to disk
|
||||
output_dir = Path(output_dir)
|
||||
output_dir.mkdir(exist_ok=True)
|
||||
for idx, track in enumerate(tracks):
|
||||
track_dir = output_dir / f"track_{idx}"
|
||||
track_dir.mkdir(exist_ok=True)
|
||||
for voice_name, sig in track.items():
|
||||
sig.write(track_dir / f"{voice_name}.wav")
|
||||
|
||||
all_voices = list(set([k for track in tracks for k in track.keys()]))
|
||||
voice_lists = {voice: [] for voice in all_voices}
|
||||
for track in tracks:
|
||||
for voice_name in all_voices:
|
||||
if voice_name in track:
|
||||
voice_lists[voice_name].append(track[voice_name].path_to_file)
|
||||
else:
|
||||
voice_lists[voice_name].append("")
|
||||
|
||||
for voice_name, paths in voice_lists.items():
|
||||
create_csv(paths, output_dir / f"{voice_name}.csv", loudness=True)
|
||||
|
||||
return output_dir
|
Loading…
Reference in new issue