add unitest

pull/3900/head
drryanhuang 10 months ago
parent e243128c0b
commit 58990f051b

@ -0,0 +1,610 @@
import pathlib
import sys
import tempfile
import librosa
import numpy as np
import paddle
import pytest
import rich
sys.path.append("/home/work/pdaudoio")
import audiotools
from audiotools import AudioSignal
def test_io():
audio_path = "tests/audio/spk/f10_script4_produced.wav"
signal = AudioSignal(pathlib.Path(audio_path))
with tempfile.NamedTemporaryFile(suffix=".wav") as f:
signal.write(f.name)
signal_from_file = AudioSignal(f.name)
mp3_signal = AudioSignal(audio_path.replace("wav", "mp3"))
print(mp3_signal)
assert signal == signal_from_file
print(signal)
print(signal.markdown())
mp3_signal = AudioSignal.excerpt(
audio_path.replace("wav", "mp3"), offset=5, duration=5)
assert mp3_signal.signal_duration == 5.0
assert mp3_signal.duration == 5.0
assert mp3_signal.length == mp3_signal.signal_length
rich.print(signal)
array = np.random.randn(2, 16000)
signal = AudioSignal(array, sample_rate=16000)
assert np.allclose(signal.numpy(), array)
signal = AudioSignal(array, 44100)
assert signal.sample_rate == 44100
signal.shape
with pytest.raises(ValueError):
signal = AudioSignal(5, sample_rate=16000)
signal = AudioSignal(audio_path, offset=10, duration=10)
assert np.allclose(signal.signal_duration, 10.0)
assert np.allclose(signal.duration, 10.0)
signal = AudioSignal.excerpt(audio_path, offset=5, duration=5)
assert signal.signal_duration == 5.0
assert signal.duration == 5.0
assert "offset" in signal.metadata
assert "duration" in signal.metadata
signal = AudioSignal(paddle.randn([1000]), 44100)
assert signal.audio_data.ndim == 3
assert paddle.all(signal.samples == signal.audio_data)
audio_path = "tests/audio/spk/f10_script4_produced.wav"
assert AudioSignal(audio_path).hash() == AudioSignal(audio_path).hash()
assert AudioSignal(audio_path).hash() != AudioSignal(audio_path).normalize(
-20).hash()
with pytest.raises(RuntimeError):
AudioSignal(audio_path, offset=100000, duration=3)
def test_copy_and_clone():
audio_path = "tests/audio/spk/f10_script4_produced.wav"
signal = AudioSignal(audio_path)
signal.stft()
signal.loudness()
copied = signal.copy()
deep_copied = signal.deepcopy()
cloned = signal.clone()
for a in ["audio_data", "stft_data", "_loudness"]:
a1 = getattr(signal, a)
a2 = getattr(cloned, a)
a3 = getattr(copied, a)
a4 = getattr(deep_copied, a)
assert id(a1) != id(a2)
assert id(a1) == id(a3)
assert id(a1) != id(a4)
assert np.allclose(a1, a2)
assert np.allclose(a1, a3)
assert np.allclose(a1, a4)
for a in ["path_to_file", "metadata"]:
a1 = getattr(signal, a)
a2 = getattr(cloned, a)
a3 = getattr(copied, a)
a4 = getattr(deep_copied, a)
assert id(a1) == id(a2) if isinstance(a1, str) else id(a1) != id(a2)
assert id(a1) == id(a3)
assert id(a1) == id(a4) if isinstance(a1, str) else id(a1) != id(a2)
# for clone, id should differ if path is list, and should differ always for metadata
# if path is string, id should remain same...
assert signal.original_signal_length == copied.original_signal_length
assert signal.original_signal_length == deep_copied.original_signal_length
assert signal.original_signal_length == cloned.original_signal_length
signal = signal.detach()
@pytest.mark.parametrize("loudness_cutoff", [-np.inf, -160, -80, -40, -20])
def test_salient_excerpt(loudness_cutoff):
MAP = {-np.inf: 0.0, -160: 0.0, -80: 0.001, -40: 0.01, -20: 0.1}
with tempfile.NamedTemporaryFile(suffix=".wav") as f:
sr = 44100
signal = AudioSignal(paddle.zeros([sr * 60]), sr)
signal[..., sr * 20:sr * 21] = MAP[loudness_cutoff] * paddle.randn(
[44100])
signal.write(f.name)
signal = AudioSignal.salient_excerpt(
f.name, loudness_cutoff=loudness_cutoff, duration=1, num_tries=None)
assert "offset" in signal.metadata
assert "duration" in signal.metadata
assert signal.loudness() >= loudness_cutoff
signal = AudioSignal.salient_excerpt(
f.name, loudness_cutoff=np.inf, duration=1, num_tries=10)
signal = AudioSignal.salient_excerpt(
f.name,
loudness_cutoff=None,
duration=1, )
def test_arithmetic():
def _make_signals():
array = np.random.randn(2, 16000)
sig1 = AudioSignal(array, sample_rate=16000)
array = np.random.randn(2, 16000)
sig2 = AudioSignal(array, sample_rate=16000)
return sig1, sig2
# Addition (with a copy)
sig1, sig2 = _make_signals()
sig3 = sig1 + sig2
assert paddle.allclose(sig3.audio_data, sig1.audio_data + sig2.audio_data)
# Addition (rmul)
sig1, _ = _make_signals()
sig3 = 5.0 + sig1
assert paddle.allclose(sig3.audio_data, sig1.audio_data + 5.0)
# In place addition
sig3, sig2 = _make_signals()
sig1 = sig3.deepcopy()
sig3 += sig2
assert paddle.allclose(sig3.audio_data, sig1.audio_data + sig2.audio_data)
# Subtraction (with a copy)
sig1, sig2 = _make_signals()
sig3 = sig1 - sig2
assert paddle.allclose(sig3.audio_data, sig1.audio_data - sig2.audio_data)
# In place subtraction
sig3, sig2 = _make_signals()
sig1 = sig3.deepcopy()
sig3 -= sig2
assert paddle.allclose(sig3.audio_data, sig1.audio_data - sig2.audio_data)
# Multiplication (element-wise)
sig1, sig2 = _make_signals()
sig3 = sig1 * sig2
assert paddle.allclose(sig3.audio_data, sig1.audio_data * sig2.audio_data)
# Multiplication (gain)
sig1, _ = _make_signals()
sig3 = sig1 * 5.0
assert paddle.allclose(sig3.audio_data, sig1.audio_data * 5.0)
# Multiplication (rmul)
sig1, _ = _make_signals()
sig3 = 5.0 * sig1
assert paddle.allclose(sig3.audio_data, sig1.audio_data * 5.0)
# Multiplication (in-place)
sig3, sig2 = _make_signals()
sig1 = sig3.deepcopy()
sig3 *= sig2
assert paddle.allclose(sig3.audio_data, sig1.audio_data * sig2.audio_data)
def test_equality():
array = np.random.randn(2, 16000)
sig1 = AudioSignal(array, sample_rate=16000)
sig2 = AudioSignal(array, sample_rate=16000)
assert sig1 == sig2
array = np.random.randn(2, 16000)
sig3 = AudioSignal(array, sample_rate=16000)
assert sig1 != sig3
assert not np.allclose(sig1.numpy(), sig3.numpy())
def test_indexing():
array = np.random.randn(4, 2, 16000)
sig1 = AudioSignal(array, sample_rate=16000)
assert np.allclose(sig1[0].audio_data, array[0])
assert np.allclose(sig1[0, :, 8000].audio_data, array[0, :, 8000])
# Test with the associated STFT data.
array = np.random.randn(4, 2, 16000)
sig1 = AudioSignal(array, sample_rate=16000)
sig1.loudness()
sig1.stft()
indexed = sig1[0]
assert np.allclose(indexed.audio_data, array[0])
assert np.allclose(indexed.stft_data, sig1.stft_data[0])
assert np.allclose(indexed._loudness, sig1._loudness[0])
indexed = sig1[0:2]
assert np.allclose(indexed.audio_data, array[0:2])
assert np.allclose(indexed.stft_data, sig1.stft_data[0:2])
assert np.allclose(indexed._loudness, sig1._loudness[0:2])
# Test using a boolean tensor to index batch
mask = paddle.to_tensor([True, False, True, False])
indexed = sig1[mask]
assert np.allclose(indexed.audio_data, sig1.audio_data[mask])
assert np.allclose(indexed.stft_data, sig1.stft_data[mask])
assert np.allclose(indexed._loudness, sig1._loudness[mask])
# Set parts of signal using tensor
other_array = paddle.to_tensor(np.random.randn(4, 2, 16000))
sig1 = AudioSignal(array, sample_rate=16000)
sig1[0, :, 6000:8000] = other_array[0, :, 6000:8000]
assert np.allclose(sig1[0, :, 6000:8000].audio_data,
other_array[0, :, 6000:8000])
# Set parts of signal using AudioSignal
sig2 = AudioSignal(other_array, sample_rate=16000)
sig1 = AudioSignal(array, sample_rate=16000)
sig1[0, :, 6000:8000] = sig2[0, :, 6000:8000]
assert np.allclose(sig1[0, :, 6000:8000].audio_data,
sig2[0, :, 6000:8000].audio_data)
# Check that loudnesses and stft_data get set as well, if only the batch
# dim is indexed.
sig2 = AudioSignal(other_array, sample_rate=16000)
sig2.stft()
sig2.loudness()
sig1 = AudioSignal(array, sample_rate=16000)
sig1.stft()
sig1.loudness()
# Test using a boolean tensor to index batch
mask = paddle.to_tensor([True, False, True, False])
sig1[mask] = sig2[mask]
for k in ["stft_data", "audio_data", "_loudness"]:
a1 = getattr(sig1, k)
a2 = getattr(sig2, k)
assert np.allclose(a1[mask], a2[mask])
def test_zeros():
x = AudioSignal.zeros(0.5, 44100)
assert x.signal_duration == 0.5
assert x.duration == 0.5
assert x.sample_rate == 44100
@pytest.mark.parametrize("shape",
["sine", "square", "sawtooth", "triangle", "beep"])
def test_waves(shape: str):
# error case
if shape == "beep":
with pytest.raises(ValueError):
AudioSignal.wave(440, 0.5, 44100, shape=shape)
return
x = AudioSignal.wave(440, 0.5, 44100, shape=shape)
assert x.duration == 0.5
assert x.sample_rate == 44100
# test the default shape arg
x = AudioSignal.wave(440, 0.5, 44100)
assert x.duration == 0.5
assert x.sample_rate == 44100
def test_zero_pad():
array = np.random.randn(4, 2, 16000)
sig1 = AudioSignal(array, sample_rate=16000)
sig1.zero_pad(100, 100)
zeros = paddle.zeros([4, 2, 100], dtype="float64")
assert paddle.allclose(sig1.audio_data[..., :100], zeros)
assert paddle.allclose(sig1.audio_data[..., -100:], zeros)
def test_zero_pad_to():
array = np.random.randn(4, 2, 16000)
sig1 = AudioSignal(array, sample_rate=16000)
sig1.zero_pad_to(16100)
zeros = paddle.zeros([4, 2, 100], dtype="float64")
assert paddle.allclose(sig1.audio_data[..., -100:], zeros)
assert sig1.signal_length == 16100
sig1 = AudioSignal(array, sample_rate=16000)
sig1.zero_pad_to(15000)
assert sig1.signal_length == 16000
sig1 = AudioSignal(array, sample_rate=16000)
sig1.zero_pad_to(16100, mode="before")
zeros = paddle.zeros([4, 2, 100], dtype="float64")
assert paddle.allclose(sig1.audio_data[..., :100], zeros)
assert sig1.signal_length == 16100
sig1 = AudioSignal(array, sample_rate=16000)
sig1.zero_pad_to(15000, mode="before")
assert sig1.signal_length == 16000
def test_truncate():
array = np.random.randn(4, 2, 16000)
sig1 = AudioSignal(array, sample_rate=16000)
sig1.truncate_samples(100)
assert sig1.signal_length == 100
assert np.allclose(sig1.audio_data, array[..., :100])
def test_trim():
array = np.random.randn(4, 2, 16000)
sig1 = AudioSignal(array, sample_rate=16000)
sig1.trim(100, 100)
assert sig1.signal_length == 16000 - 200
assert np.allclose(sig1.audio_data, array[..., 100:-100])
array = np.random.randn(4, 2, 16000)
sig1 = AudioSignal(array, sample_rate=16000)
sig1.trim(0, 0)
assert np.allclose(sig1.audio_data, array)
def test_to_from_ops():
audio_path = "tests/audio/spk/f10_script4_produced.wav"
signal = AudioSignal(audio_path)
signal.stft()
signal.loudness()
signal = signal.to("cpu")
assert str(signal.audio_data.place) == "Place(cpu)"
assert isinstance(signal.numpy(), np.ndarray)
signal.cpu()
# signal.cuda()
signal.float()
def test_device():
audio_path = "tests/audio/spk/f10_script4_produced.wav"
signal = AudioSignal(audio_path)
signal.to("cpu")
assert str(signal.device) == "Place(cpu)"
signal.stft()
signal.audio_data = None
assert str(signal.device) == "Place(cpu)"
@pytest.mark.parametrize("window_length", [2048, 512])
@pytest.mark.parametrize("hop_length", [512, 128])
@pytest.mark.parametrize("window_type", ["sqrt_hann", "hann", None])
def test_stft(window_length, hop_length, window_type):
if hop_length >= window_length:
hop_length = window_length // 2
audio_path = "tests/audio/spk/f10_script4_produced.wav"
stft_params = audiotools.STFTParams(
window_length=window_length,
hop_length=hop_length,
window_type=window_type)
for _stft_params in [None, stft_params]:
signal = AudioSignal(audio_path, duration=10, stft_params=_stft_params)
with pytest.raises(RuntimeError):
signal.istft()
stft_data = signal.stft()
# assert paddle.allclose(signal.stft_data, stft_data)
assert np.allclose(signal.stft_data.cpu().numpy(),
stft_data.cpu().numpy())
copied_signal = signal.deepcopy()
copied_signal.stft()
copied_signal = copied_signal.istft()
assert copied_signal == signal
mag = signal.magnitude
phase = signal.phase
recon_stft = mag * paddle.exp(1j * phase)
# assert paddle.allclose(recon_stft, signal.stft_data)
assert np.allclose(recon_stft.cpu().numpy(),
signal.stft_data.cpu().numpy())
signal.stft_data = None
mag = signal.magnitude
signal.stft_data = None
phase = signal.phase
recon_stft = mag * paddle.exp(1j * phase)
# assert paddle.allclose(recon_stft, signal.stft_data)
assert np.allclose(recon_stft.cpu().numpy(),
signal.stft_data.cpu().numpy())
# Test with match_stride=True, ignoring the beginning and end.
s = signal.stft_params
if s.hop_length == s.window_length // 4:
og_signal = signal.clone()
stft_data = signal.stft(match_stride=True)
recon_data = signal.istft(match_stride=True)
discard = window_length * 2
right_pad, _ = signal.compute_stft_padding(
s.window_length, s.hop_length, match_stride=True)
length = signal.signal_length + right_pad
assert stft_data.shape[-1] == length // s.hop_length
assert paddle.allclose(
recon_data.audio_data[..., discard:-discard],
og_signal.audio_data[..., discard:-discard],
atol=1e-6, )
def test_log_magnitude():
audio_path = "tests/audio/spk/f10_script4_produced.wav"
for _ in range(10):
signal = AudioSignal.excerpt(audio_path, duration=5.0)
magnitude = signal.magnitude.numpy()[0, 0]
librosa_log_mag = librosa.amplitude_to_db(magnitude)
log_mag = signal.log_magnitude().numpy()[0, 0]
# print(abs((log_mag - librosa_log_mag)).max())
assert np.allclose(log_mag, librosa_log_mag, atol=10e-7)
@pytest.mark.parametrize("n_mels", [40, 80, 128])
@pytest.mark.parametrize("window_length", [2048, 512])
@pytest.mark.parametrize("hop_length", [512, 128])
@pytest.mark.parametrize("window_type", ["sqrt_hann", "hann", None])
def test_mel_spectrogram(n_mels, window_length, hop_length, window_type):
if hop_length >= window_length:
hop_length = window_length // 2
audio_path = "tests/audio/spk/f10_script4_produced.wav"
stft_params = audiotools.STFTParams(
window_length=window_length,
hop_length=hop_length,
window_type=window_type)
for _stft_params in [None, stft_params]:
signal = AudioSignal(audio_path, duration=10, stft_params=_stft_params)
mel_spec = signal.mel_spectrogram(n_mels=n_mels)
assert mel_spec.shape[2] == n_mels
@pytest.mark.parametrize("n_mfcc", [20, 40])
@pytest.mark.parametrize("n_mels", [40, 80, 128])
@pytest.mark.parametrize("window_length", [2048, 512])
@pytest.mark.parametrize("hop_length", [512, 128])
def test_mfcc(n_mfcc, n_mels, window_length, hop_length):
if hop_length >= window_length:
hop_length = window_length // 2
audio_path = "tests/audio/spk/f10_script4_produced.wav"
stft_params = audiotools.STFTParams(
window_length=window_length, hop_length=hop_length)
for _stft_params in [None, stft_params]:
signal = AudioSignal(audio_path, duration=10, stft_params=_stft_params)
mfcc = signal.mfcc(n_mfcc=n_mfcc, n_mels=n_mels)
assert mfcc.shape[2] == n_mfcc
def test_to_mono():
array = np.random.randn(4, 2, 16000)
sr = 16000
signal = AudioSignal(array, sample_rate=sr)
assert signal.num_channels == 2
signal = signal.to_mono()
assert signal.num_channels == 1
def test_float():
array = np.random.randn(4, 1, 16000).astype("float64")
sr = 1600
signal = AudioSignal(array, sample_rate=sr)
signal = signal.float()
assert signal.audio_data.dtype == paddle.float32
@pytest.mark.parametrize("sample_rate", [8000, 16000, 22050, 44100, 48000])
def test_resample(sample_rate):
array = np.random.randn(4, 2, 16000)
sr = 16000
signal = AudioSignal(array, sample_rate=sr)
signal = signal.resample(sample_rate)
assert signal.sample_rate == sample_rate
assert signal.signal_length == sample_rate
def test_batching():
signals = []
batch_size = 16
# All same length, same sample rate.
for _ in range(batch_size):
array = np.random.randn(2, 16000)
signal = AudioSignal(array, sample_rate=16000)
signals.append(signal)
batched_signal = AudioSignal.batch(signals)
assert batched_signal.batch_size == batch_size
signals = []
# All different lengths, same sample rate, pad signals
for _ in range(batch_size):
L = np.random.randint(8000, 32000)
array = np.random.randn(2, L)
signal = AudioSignal(array, sample_rate=16000)
signals.append(signal)
with pytest.raises(RuntimeError):
batched_signal = AudioSignal.batch(signals)
signal_lengths = [x.signal_length for x in signals]
max_length = max(signal_lengths)
batched_signal = AudioSignal.batch(signals, pad_signals=True)
assert batched_signal.signal_length == max_length
assert batched_signal.batch_size == batch_size
signals = []
# All different lengths, same sample rate, truncate signals
for _ in range(batch_size):
L = np.random.randint(8000, 32000)
array = np.random.randn(2, L)
signal = AudioSignal(array, sample_rate=16000)
signals.append(signal)
with pytest.raises(RuntimeError):
batched_signal = AudioSignal.batch(signals)
signal_lengths = [x.signal_length for x in signals]
min_length = min(signal_lengths)
batched_signal = AudioSignal.batch(signals, truncate_signals=True)
assert batched_signal.signal_length == min_length
assert batched_signal.batch_size == batch_size
signals = []
# All different lengths, different sample rate, pad signals
for _ in range(batch_size):
L = np.random.randint(8000, 32000)
sr = np.random.choice([8000, 16000, 32000])
array = np.random.randn(2, L)
signal = AudioSignal(array, sample_rate=int(sr))
signals.append(signal)
with pytest.raises(RuntimeError):
batched_signal = AudioSignal.batch(signals)
signal_lengths = [x.signal_length for x in signals]
max_length = max(signal_lengths)
for i, x in enumerate(signals):
x.path_to_file = i
batched_signal = AudioSignal.batch(signals, resample=True, pad_signals=True)
assert batched_signal.signal_length == max_length
assert batched_signal.batch_size == batch_size
assert batched_signal.path_to_file == list(range(len(signals)))
assert batched_signal.path_to_input_file == batched_signal.path_to_file

@ -0,0 +1,50 @@
# File under the MIT license, see https://github.com/adefossez/julius/LICENSE for details.
# Author: adefossez, 2020
import random
import sys
import unittest
import paddle
sys.path.append("/home/work/pdaudoio")
from audiotools.core import pure_tone, SplitBands, split_bands
def delta(a, b, ref, fraction=0.9):
length = a.shape[-1]
compare_length = int(length * fraction)
offset = (length - compare_length) // 2
a = a[..., offset:offset + length]
b = b[..., offset:offset + length]
return 100 * paddle.abs(a - b).mean() / ref.std()
TOLERANCE = 0.5 # Tolerance to errors as percentage of the std of the input signal
class _BaseTest(unittest.TestCase):
def assertSimilar(self, a, b, ref, msg=None, tol=TOLERANCE):
self.assertLessEqual(delta(a, b, ref), tol, msg)
class TestLowPassFilters(_BaseTest):
def setUp(self):
paddle.seed(1234)
random.seed(1234)
def test_keep_or_kill(self):
sr = 256
low = pure_tone(10, sr)
mid = pure_tone(40, sr)
high = pure_tone(100, sr)
x = low + mid + high
decomp = split_bands(x, sr, cutoffs=[20, 70])
self.assertEqual(len(decomp), 3)
for est, gt, name in zip(decomp, [low, mid, high],
["low", "mid", "high"]):
self.assertSimilar(est, gt, gt, name)
if __name__ == "__main__":
unittest.main()

@ -0,0 +1,107 @@
# File under the MIT license, see https://github.com/your_repo/your_license for details.
# Author: your_name, current_year
import random
import sys
import unittest
import paddle
import paddle.nn.functional as F
sys.path.append("/home/work/pdaudoio")
from audiotools.core import fft_conv1d, FFTConv1d
TOLERANCE = 1e-4 # as relative delta in percentage
class _BaseTest(unittest.TestCase):
def setUp(self):
paddle.seed(1234)
random.seed(1234)
def assertSimilar(self, a, b, msg=None, tol=TOLERANCE):
delta = 100 * paddle.norm(a - b, p=2) / paddle.norm(b, p=2)
self.assertLessEqual(delta.numpy(), tol, msg)
def compare_paddle(self,
*args,
block_ratio=10,
msg=None,
tol=TOLERANCE,
**kwargs):
y_ref = F.conv1d(*args, **kwargs)
y = fft_conv1d(*args, block_ratio=block_ratio, **kwargs)
self.assertEqual(list(y.shape), list(y_ref.shape), msg)
self.assertSimilar(y, y_ref, msg, tol)
class TestFFTConv1d(_BaseTest):
def test_same_as_paddle(self):
for _ in range(5):
kernel_size = random.randrange(4, 128)
batch_size = random.randrange(1, 6)
length = random.randrange(kernel_size, 1024)
chin = random.randrange(1, 12)
chout = random.randrange(1, 12)
block_ratio = random.choice([5, 10, 20])
bias = random.random() < 0.5
if random.random() < 0.5:
padding = 0
else:
padding = random.randrange(kernel_size // 2, 2 * kernel_size)
x = paddle.randn([batch_size, chin, length])
w = paddle.randn([chout, chin, kernel_size])
keys = [
"length", "kernel_size", "chin", "chout", "block_ratio", "bias"
]
loc = locals()
state = {key: loc[key] for key in keys}
if bias:
bias = paddle.randn([chout])
else:
bias = None
for stride in [1, 2, 5]:
state["stride"] = stride
self.compare_paddle(
x,
w,
bias,
stride,
padding,
block_ratio=block_ratio,
msg=repr(state))
def test_small_input(self):
x = paddle.randn([1, 5, 19])
w = paddle.randn([10, 5, 32])
with self.assertRaises(RuntimeError):
fft_conv1d(x, w)
x = paddle.randn([1, 5, 19])
w = paddle.randn([10, 5, 19])
self.assertEqual(list(fft_conv1d(x, w).shape), [1, 10, 1])
def test_block_ratio(self):
x = paddle.randn([1, 5, 1024])
w = paddle.randn([10, 5, 19])
ref = fft_conv1d(x, w)
for block_ratio in [1, 5, 10, 20]:
y = fft_conv1d(x, w, block_ratio=block_ratio)
self.assertSimilar(y, ref, msg=str(block_ratio))
with self.assertRaises(RuntimeError):
y = fft_conv1d(x, w, block_ratio=0.9)
def test_module(self):
x = paddle.randn([16, 4, 1024])
mod = FFTConv1d(4, 5, 8, bias=True)
mod(x)
mod = FFTConv1d(4, 5, 8, bias=False)
mod(x)
def test_dynamic_graph(self):
x = paddle.randn([16, 4, 1024])
mod = FFTConv1d(4, 5, 8, bias=True)
self.assertEqual(list(mod(x).shape), [16, 5, 1024 - 8 + 1])
if __name__ == "__main__":
unittest.main()

@ -0,0 +1,171 @@
# File under the MIT license, see https://github.com/adefossez/julius/LICENSE for details.
# Author: adefossez, 2020
import math
import random
import sys
import unittest
import paddle
sys.path.append("/home/work/pdaudoio")
from audiotools.core import highpass_filter, highpass_filters
def pure_tone(freq: float, sr: float=128, dur: float=4, device=None):
"""
Return a pure tone, i.e. cosine.
Args:
freq (float): frequency (in Hz)
sr (float): sample rate (in Hz)
dur (float): duration (in seconds)
"""
time = paddle.arange(int(sr * dur), dtype="float32") / sr
return paddle.cos(2 * math.pi * freq * time)
def delta(a, b, ref, fraction=0.9):
length = a.shape[-1]
compare_length = int(length * fraction)
offset = (length - compare_length) // 2
a = a[..., offset:offset + length]
b = b[..., offset:offset + length]
# 计算绝对差值均值然后除以ref的标准差乘以100
return 100 * paddle.mean(paddle.abs(a - b)) / paddle.std(ref)
TOLERANCE = 1 # Tolerance to errors as percentage of the std of the input signal
class _BaseTest(unittest.TestCase):
def assertSimilar(self, a, b, ref, msg=None, tol=TOLERANCE):
self.assertLessEqual(delta(a, b, ref), tol, msg)
class TestHighPassFilters(_BaseTest):
def setUp(self):
paddle.seed(1234)
random.seed(1234)
def test_keep_or_kill(self):
for _ in range(10):
freq = random.uniform(0.01, 0.4)
sr = 1024
tone = pure_tone(freq * sr, sr=sr, dur=10)
# For this test we accept 5% tolerance in amplitude, or -26dB in power.
tol = 5
zeros = 16
# If cutoff frequency is under freq, output should be input
y_pass = highpass_filter(tone, 0.9 * freq, zeros=zeros)
self.assertSimilar(
y_pass, tone, tone, f"freq={freq}, pass", tol=tol)
# If cutoff frequency is over freq, output should be zero
y_killed = highpass_filter(tone, 1.1 * freq, zeros=zeros)
self.assertSimilar(
y_killed, 0 * tone, tone, f"freq={freq}, kill", tol=tol)
def test_fft_nofft(self):
for _ in range(10):
x = paddle.randn([1024])
freq = random.uniform(0.01, 0.5)
y_fft = highpass_filter(x, freq, fft=True)
y_ref = highpass_filter(x, freq, fft=False)
self.assertSimilar(y_fft, y_ref, x, f"freq={freq}", tol=0.01)
def test_constant(self):
x = paddle.ones([2048])
for zeros in [4, 10]:
for freq in [0.01, 0.1]:
y_high = highpass_filter(x, freq, zeros=zeros)
self.assertLessEqual(y_high.abs().mean(), 1e-6, (zeros, freq))
def test_stride(self):
x = paddle.randn([1024])
y = highpass_filters(x, [0.1, 0.2], stride=1)[:, ::3]
y2 = highpass_filters(x, [0.1, 0.2], stride=3)
self.assertEqual(y.shape, y2.shape)
self.assertSimilar(y, y2, x)
y = highpass_filters(x, [0.1, 0.2], stride=1, pad=False)[:, ::3]
y2 = highpass_filters(x, [0.1, 0.2], stride=3, pad=False)
self.assertEqual(y.shape, y2.shape)
self.assertSimilar(y, y2, x)
# class TestBandPassFilters(_BaseTest):
# def setUp(self):
# paddle.seed(1234)
# random.seed(1234)
# def test_keep_or_kill(self):
# for _ in range(10):
# freq = random.uniform(0.01, 0.4)
# sr = 1024
# tone = pure_tone(freq * sr, sr=sr, dur=10)
# # For this test we accept 5% tolerance in amplitude, or -26dB in power.
# tol = 5
# zeros = 16
# y_pass = filters.bandpass_filter(tone, 0.9 * freq, 1.1 * freq, zeros=zeros)
# self.assertSimilar(y_pass, tone, tone, f"freq={freq}, pass", tol=tol)
# y_killed = filters.bandpass_filter(tone, 1.1 * freq, 1.2 * freq, zeros=zeros)
# self.assertSimilar(y_killed, 0 * tone, tone, f"freq={freq}, kill", tol=tol)
# y_killed = filters.bandpass_filter(tone, 0.8 * freq, 0.9 * freq, zeros=zeros)
# self.assertSimilar(y_killed, 0 * tone, tone, f"freq={freq}, kill", tol=tol)
# def test_fft_nofft(self):
# for _ in range(10):
# x = paddle.randn([1024])
# freq = random.uniform(0.01, 0.5)
# freq2 = random.uniform(freq, 0.5)
# y_fft = filters.bandpass_filter(x, freq, freq2, fft=True)
# y_ref = filters.bandpass_filter(x, freq, freq2, fft=False)
# self.assertSimilar(y_fft, y_ref, x, f"freq={freq}", tol=0.01)
# def test_constant(self):
# x = paddle.ones([2048])
# for zeros in [4, 10]:
# for freq in [0.01, 0.1]:
# y = filters.bandpass_filter(x, freq, 1.2 * freq, zeros=zeros)
# self.assertLessEqual(y.abs().mean(), 1e-6, (zeros, freq))
# def test_stride(self):
# x = paddle.randn([1024])
# y = filters.bandpass_filter(x, 0.1, 0.2, stride=1)[::3]
# y2 = filters.bandpass_filter(x, 0.1, 0.2, stride=3)
# self.assertEqual(y.shape, y2.shape)
# self.assertSimilar(y, y2, x)
# y = filters.bandpass_filter(x, 0.1, 0.2, stride=1, pad=False)[::3]
# y2 = filters.bandpass_filter(x, 0.1, 0.2, stride=3, pad=False)
# self.assertEqual(y.shape, y2.shape)
# self.assertSimilar(y, y2, x)
# def test_same_as_highpass(self):
# x = paddle.randn([1024])
# y_ref = highpass_filter(x, 0.2)
# y = filters.bandpass_filter(x, 0.2, 0.5)
# self.assertSimilar(y, y_ref, x)
# def test_same_as_lowpass(self):
# x = paddle.randn([1024])
# y_ref = filters.lowpass_filter(x, 0.2)
# y = filters.bandpass_filter(x, 0.0, 0.2)
# self.assertSimilar(y, y_ref, x)
if __name__ == "__main__":
unittest.main()

@ -0,0 +1,283 @@
import sys
import numpy as np
import pyloudnorm
import soundfile as sf
sys.path.append("/home/work/pdaudoio")
from audiotools import AudioSignal
from audiotools import datasets
from audiotools import Meter
from audiotools import transforms
ATOL = 1e-1
def test_loudness_against_pyln():
audio_path = "tests/audio/spk/f10_script4_produced.wav"
signal = AudioSignal(audio_path, offset=5, duration=10)
signal_loudness = signal.loudness()
meter = pyloudnorm.Meter(
signal.sample_rate, filter_class="K-weighting", block_size=0.4)
py_loudness = meter.integrated_loudness(signal.numpy()[0].T)
assert np.allclose(signal_loudness, py_loudness)
def test_loudness_short():
audio_path = "tests/audio/spk/f10_script4_produced.wav"
signal = AudioSignal(audio_path, offset=10, duration=0.25)
signal_loudness = signal.loudness()
def test_batch_loudness():
np.random.seed(0)
array = np.random.randn(16, 2, 16000)
array /= np.abs(array).max()
gains = np.random.rand(array.shape[0])[:, None, None]
array = array * gains
meter = pyloudnorm.Meter(16000)
py_loudness = [
meter.integrated_loudness(array[i].T) for i in range(array.shape[0])
]
meter = Meter(16000)
meter.filter_class
at_loudness_iso = [
meter.integrated_loudness(array[i].T).item()
for i in range(array.shape[0])
]
assert np.allclose(py_loudness, at_loudness_iso, atol=1e-1)
signal = AudioSignal(array, sample_rate=16000)
at_loudness_batch = signal.loudness()
assert np.allclose(py_loudness, at_loudness_batch, atol=1e-1)
# Tests below are copied from pyloudnorm
def test_integrated_loudness():
data, rate = sf.read("tests/audio/loudness/sine_1000.wav")
meter = Meter(rate)
loudness = meter(data)
targetLoudness = -3.0523438444331137
assert np.allclose(loudness, targetLoudness)
def test_rel_gate_test():
data, rate = sf.read("tests/audio/loudness/1770-2_Comp_RelGateTest.wav")
meter = Meter(rate)
loudness = meter.integrated_loudness(data)
targetLoudness = -10.0
assert np.allclose(loudness, targetLoudness, atol=ATOL)
def test_abs_gate_test():
data, rate = sf.read("tests/audio/loudness/1770-2_Comp_AbsGateTest.wav")
meter = Meter(rate)
loudness = meter.integrated_loudness(data)
targetLoudness = -69.5
assert np.allclose(loudness, targetLoudness, atol=ATOL)
def test_24LKFS_25Hz_2ch():
data, rate = sf.read("tests/audio/loudness/1770-2_Comp_24LKFS_25Hz_2ch.wav")
meter = Meter(rate)
loudness = meter.integrated_loudness(data)
targetLoudness = -24.0
assert np.allclose(loudness, targetLoudness, atol=ATOL)
def test_24LKFS_100Hz_2ch():
data, rate = sf.read(
"tests/audio/loudness/1770-2_Comp_24LKFS_100Hz_2ch.wav")
meter = Meter(rate)
loudness = meter.integrated_loudness(data)
targetLoudness = -24.0
assert np.allclose(loudness, targetLoudness, atol=ATOL)
def test_24LKFS_500Hz_2ch():
data, rate = sf.read(
"tests/audio/loudness/1770-2_Comp_24LKFS_500Hz_2ch.wav")
meter = Meter(rate)
loudness = meter.integrated_loudness(data)
targetLoudness = -24.0
assert np.allclose(loudness, targetLoudness, atol=ATOL)
def test_24LKFS_1000Hz_2ch():
data, rate = sf.read(
"tests/audio/loudness/1770-2_Comp_24LKFS_1000Hz_2ch.wav")
meter = Meter(rate)
loudness = meter.integrated_loudness(data)
targetLoudness = -24.0
assert np.allclose(loudness, targetLoudness, atol=ATOL)
def test_24LKFS_2000Hz_2ch():
data, rate = sf.read(
"tests/audio/loudness/1770-2_Comp_24LKFS_2000Hz_2ch.wav")
meter = Meter(rate)
loudness = meter.integrated_loudness(data)
targetLoudness = -24.0
assert np.allclose(loudness, targetLoudness, atol=ATOL)
def test_24LKFS_10000Hz_2ch():
data, rate = sf.read(
"tests/audio/loudness/1770-2_Comp_24LKFS_10000Hz_2ch.wav")
meter = Meter(rate)
loudness = meter.integrated_loudness(data)
targetLoudness = -24.0
assert np.allclose(loudness, targetLoudness, atol=ATOL)
def test_23LKFS_25Hz_2ch():
data, rate = sf.read("tests/audio/loudness/1770-2_Comp_23LKFS_25Hz_2ch.wav")
meter = Meter(rate)
loudness = meter.integrated_loudness(data)
targetLoudness = -23.0
assert np.allclose(loudness, targetLoudness, atol=ATOL)
def test_23LKFS_100Hz_2ch():
data, rate = sf.read(
"tests/audio/loudness/1770-2_Comp_23LKFS_100Hz_2ch.wav")
meter = Meter(rate)
loudness = meter.integrated_loudness(data)
targetLoudness = -23.0
assert np.allclose(loudness, targetLoudness, atol=ATOL)
def test_23LKFS_500Hz_2ch():
data, rate = sf.read(
"tests/audio/loudness/1770-2_Comp_23LKFS_500Hz_2ch.wav")
meter = Meter(rate)
loudness = meter.integrated_loudness(data)
targetLoudness = -23.0
assert np.allclose(loudness, targetLoudness, atol=ATOL)
def test_23LKFS_1000Hz_2ch():
data, rate = sf.read(
"tests/audio/loudness/1770-2_Comp_23LKFS_1000Hz_2ch.wav")
meter = Meter(rate)
loudness = meter.integrated_loudness(data)
targetLoudness = -23.0
assert np.allclose(loudness, targetLoudness, atol=ATOL)
def test_23LKFS_2000Hz_2ch():
data, rate = sf.read(
"tests/audio/loudness/1770-2_Comp_23LKFS_2000Hz_2ch.wav")
meter = Meter(rate)
loudness = meter.integrated_loudness(data)
targetLoudness = -23.0
assert np.allclose(loudness, targetLoudness, atol=ATOL)
def test_23LKFS_10000Hz_2ch():
data, rate = sf.read(
"tests/audio/loudness/1770-2_Comp_23LKFS_10000Hz_2ch.wav")
meter = Meter(rate)
loudness = meter.integrated_loudness(data)
targetLoudness = -23.0
assert np.allclose(loudness, targetLoudness, atol=ATOL)
def test_18LKFS_frequency_sweep():
data, rate = sf.read(
"tests/audio/loudness/1770-2_Comp_18LKFS_FrequencySweep.wav")
meter = Meter(rate)
loudness = meter.integrated_loudness(data)
targetLoudness = -18.0
assert np.allclose(loudness, targetLoudness, atol=ATOL)
def test_conf_stereo_vinL_R_23LKFS():
data, rate = sf.read(
"tests/audio/loudness/1770-2_Conf_Stereo_VinL+R-23LKFS.wav")
meter = Meter(rate)
loudness = meter.integrated_loudness(data)
targetLoudness = -23.0
assert np.allclose(loudness, targetLoudness, atol=ATOL)
def test_conf_monovoice_music_24LKFS():
data, rate = sf.read(
"tests/audio/loudness/1770-2_Conf_Mono_Voice+Music-24LKFS.wav")
meter = Meter(rate)
loudness = meter.integrated_loudness(data)
targetLoudness = -24.0
assert np.allclose(loudness, targetLoudness, atol=ATOL)
def conf_monovoice_music_24LKFS():
data, rate = sf.read(
"tests/audio/loudness/1770-2_Conf_Mono_Voice+Music-24LKFS.wav")
meter = Meter(rate)
loudness = meter.integrated_loudness(data)
targetLoudness = -24.0
assert np.allclose(loudness, targetLoudness, atol=ATOL)
def test_conf_monovoice_music_23LKFS():
data, rate = sf.read(
"tests/audio/loudness/1770-2_Conf_Mono_Voice+Music-23LKFS.wav")
meter = Meter(rate)
loudness = meter.integrated_loudness(data)
targetLoudness = -23.0
assert np.allclose(loudness, targetLoudness, atol=ATOL)
def test_fir_accuracy():
transform = transforms.Compose(
transforms.ClippingDistortion(prob=0.5),
transforms.LowPass(prob=0.5),
transforms.HighPass(prob=0.5),
transforms.Equalizer(prob=0.5),
prob=0.5, )
loader = datasets.AudioLoader(sources=["tests/audio/spk.csv"])
dataset = datasets.AudioDataset(
loader,
44100,
10,
5.0,
transform=transform, )
for i in range(20):
item = dataset[i]
kwargs = item["transform_args"]
signal = item["signal"]
signal = transform(signal, **kwargs)
signal._loudness = None
iir_db = signal.clone().loudness()
fir_db = signal.clone().loudness(use_fir=True)
assert np.allclose(iir_db, fir_db, atol=1e-2)
test_fir_accuracy()

@ -0,0 +1,104 @@
# File under the MIT license, see https://github.com/adefossez/julius/LICENSE for details.
# Author: adefossez, 2020
import math
import random
import sys
import unittest
import numpy as np
import paddle
sys.path.append("/home/work/pdaudoio")
from audiotools.core import LowPassFilter, LowPassFilters, lowpass_filter, resample_frac
def pure_tone(freq: float, sr: float=128, dur: float=4, device=None):
"""
Return a pure tone, i.e. cosine.
Args:
freq (float): frequency (in Hz)
sr (float): sample rate (in Hz)
dur (float): duration (in seconds)
"""
time = paddle.arange(int(sr * dur), dtype="float32") / sr
return paddle.cos(2 * math.pi * freq * time)
def delta(a, b, ref, fraction=0.9):
length = a.shape[-1]
compare_length = int(length * fraction)
offset = (length - compare_length) // 2
a = a[..., offset:offset + length]
b = b[..., offset:offset + length]
# 计算绝对差值均值然后除以ref的标准差乘以100
return 100 * paddle.mean(paddle.abs(a - b)) / paddle.std(ref)
TOLERANCE = 1 # Tolerance to errors as percentage of the std of the input signal
class _BaseTest(unittest.TestCase):
def assertSimilar(self, a, b, ref, msg=None, tol=TOLERANCE):
self.assertLessEqual(delta(a, b, ref), tol, msg)
class TestLowPassFilters(_BaseTest):
def setUp(self):
paddle.seed(1234)
random.seed(1234)
def test_keep_or_kill(self):
for _ in range(10):
freq = random.uniform(0.01, 0.4)
sr = 1024
tone = pure_tone(freq * sr, sr=sr, dur=10)
# For this test we accept 5% tolerance in amplitude, or -26dB in power.
tol = 5
zeros = 16
# If cutoff frequency is under freq, output should be zero
y_killed = lowpass_filter(tone, 0.9 * freq, zeros=zeros)
self.assertSimilar(
y_killed, 0 * y_killed, tone, f"freq={freq}, kill", tol=tol)
# If cutoff frequency is under freq, output should be input
y_pass = lowpass_filter(tone, 1.1 * freq, zeros=zeros)
self.assertSimilar(
y_pass, tone, tone, f"freq={freq}, pass", tol=tol)
def test_same_as_downsample(self):
for _ in range(10):
x = paddle.randn([2 * 3 * 4 * 100])
x = paddle.ones_like(x)
np.random.seed(1234)
x = paddle.to_tensor(
np.random.randn(2 * 3 * 4 * 100), dtype="float32")
rolloff = 0.945
for old_sr in [2, 3, 4]:
y_resampled = resample_frac(
x, old_sr, 1, rolloff=rolloff, zeros=16)
y_lowpass = lowpass_filter(
x, rolloff / old_sr / 2, stride=old_sr, zeros=16)
self.assertSimilar(y_resampled, y_lowpass, x,
f"old_sr={old_sr}")
def test_fft_nofft(self):
for _ in range(10):
x = paddle.randn([1024])
freq = random.uniform(0.01, 0.5)
y_fft = lowpass_filter(x, freq, fft=True)
y_ref = lowpass_filter(x, freq, fft=False)
self.assertSimilar(y_fft, y_ref, x, f"freq={freq}", tol=0.01)
def test_constant(self):
x = paddle.ones([2048])
for zeros in [4, 10]:
for freq in [0.01, 0.1]:
y_low = lowpass_filter(x, freq, zeros=zeros)
self.assertLessEqual((y_low - 1).abs().mean(), 1e-6,
(zeros, freq))
if __name__ == "__main__":
unittest.main()

@ -0,0 +1,153 @@
import os
import random
import sys
import tempfile
import numpy as np
import paddle
import pytest
sys.path.append("/home/work/pdaudoio")
from audiotools import util
from audiotools.core.audio_signal import AudioSignal
def test_check_random_state():
# seed is None
rng_type = type(np.random.RandomState(10))
rng = util.random_state(None)
assert type(rng) == rng_type
# seed is int
rng = util.random_state(10)
assert type(rng) == rng_type
# seed is RandomState
rng_test = np.random.RandomState(10)
rng = util.random_state(rng_test)
assert type(rng) == rng_type
# seed is none of the above : error
pytest.raises(ValueError, util.random_state, "random")
def test_seed():
util.seed(0)
paddle_result_a = paddle.randn([1])
np_result_a = np.random.randn(1)
py_result_a = random.random()
util.seed(0, set_cudnn=True)
paddle_result_b = paddle.randn([1])
np_result_b = np.random.randn(1)
py_result_b = random.random()
assert paddle_result_a == paddle_result_b
assert np_result_a == np_result_b
assert py_result_a == py_result_b
def test_hz_to_bin():
hz = paddle.to_tensor(np.array([100, 200, 300]))
sr = 1000
n_fft = 2048
bins = util.hz_to_bin(hz, n_fft, sr)
assert (((bins / n_fft) * sr) - hz).abs().max() < 1
def test_find_audio():
wav_files = util.find_audio("tests/", ["wav"])
for a in wav_files:
assert "wav" in str(a)
audio_files = util.find_audio("tests/", ["flac"])
assert not audio_files
# Make sure it works with single audio files
audio_files = util.find_audio("tests/audio/spk//f10_script4_produced.wav")
# Make sure it works with globs
audio_files = util.find_audio("tests/**/*.wav")
assert len(audio_files) == len(wav_files)
def test_chdir():
with tempfile.TemporaryDirectory(suffix="tmp") as d:
with util.chdir(d):
assert os.path.samefile(d, os.path.realpath("."))
def test_prepare_batch():
batch = {"tensor": paddle.randn([1]), "non_tensor": np.random.randn(1)}
util.prepare_batch(batch)
batch = paddle.randn([1])
util.prepare_batch(batch)
batch = [paddle.randn([1]), np.random.randn(1)]
util.prepare_batch(batch)
def test_sample_dist():
state = util.random_state(0)
v1 = state.uniform(0.0, 1.0)
v2 = util.sample_from_dist(("uniform", 0.0, 1.0), 0)
assert v1 == v2
assert util.sample_from_dist(("const", 1.0)) == 1.0
dist_tuple = ("choice", [8, 16, 32])
assert util.sample_from_dist(dist_tuple) in [8, 16, 32]
def test_collate():
batch_size = 16
def _one_item():
return {
"signal": AudioSignal(paddle.randn([1, 1, 44100]), 44100),
"tensor": paddle.randn([1]),
"string": "Testing",
"dict": {
"nested_signal":
AudioSignal(paddle.randn([1, 1, 44100]), 44100),
},
}
items = [_one_item() for _ in range(batch_size)]
collated = util.collate(items)
assert collated["signal"].batch_size == batch_size
assert collated["tensor"].shape[0] == batch_size
assert len(collated["string"]) == batch_size
assert collated["dict"]["nested_signal"].batch_size == batch_size
# test collate with splitting (evenly)
batch_size = 16
n_splits = 4
items = [_one_item() for _ in range(batch_size)]
collated = util.collate(items, n_splits=n_splits)
for x in collated:
assert x["signal"].batch_size == batch_size // n_splits
assert x["tensor"].shape[0] == batch_size // n_splits
assert len(x["string"]) == batch_size // n_splits
assert x["dict"]["nested_signal"].batch_size == batch_size // n_splits
# test collate with splitting (unevenly)
batch_size = 15
n_splits = 4
items = [_one_item() for _ in range(batch_size)]
collated = util.collate(items, n_splits=n_splits)
tlen = [4, 4, 4, 3]
for x, t in zip(collated, tlen):
assert x["signal"].batch_size == t
assert x["tensor"].shape[0] == t
assert len(x["string"]) == t
assert x["dict"]["nested_signal"].batch_size == t

@ -0,0 +1,206 @@
import sys
import tempfile
from pathlib import Path
import numpy as np
import pytest
sys.path.append("/home/work/pdaudoio")
import paddle
import audiotools
from audiotools.data import transforms as tfm
def test_align_lists():
input_lists = [
["a/1.wav", "b/1.wav", "c/1.wav", "d/1.wav"],
["a/2.wav", "c/2.wav"],
["c/3.wav"],
]
target_lists = [
["a/1.wav", "b/1.wav", "c/1.wav", "d/1.wav"],
["a/2.wav", "none", "c/2.wav", "none"],
["none", "none", "c/3.wav", "none"],
]
def _preprocess(lists):
output = []
for x in lists:
output.append([])
for y in x:
output[-1].append({"path": y})
return output
input_lists = _preprocess(input_lists)
target_lists = _preprocess(target_lists)
aligned_lists = audiotools.datasets.align_lists(input_lists)
assert target_lists == aligned_lists
def test_audio_dataset():
transform = tfm.Compose(
[
tfm.VolumeNorm(),
tfm.Silence(prob=0.5),
], )
loader = audiotools.data.datasets.AudioLoader(
sources=["tests/audio/spk.csv"],
transform=transform, )
dataset = audiotools.data.datasets.AudioDataset(
loader,
44100,
n_examples=100,
transform=transform, )
dataloader = paddle.io.DataLoader(
dataset,
batch_size=16,
num_workers=0,
collate_fn=dataset.collate, )
for batch in dataloader:
kwargs = batch["transform_args"]
signal = batch["signal"]
original = signal.clone()
signal = dataset.transform(signal, **kwargs)
original = dataset.transform(original, **kwargs)
mask = kwargs["Compose"]["1.Silence"]["mask"]
zeros_ = paddle.zeros_like(signal[mask].audio_data)
original_ = original[~mask].audio_data
assert paddle.allclose(signal[mask].audio_data, zeros_)
assert paddle.allclose(signal[~mask].audio_data, original_)
def test_aligned_audio_dataset():
with tempfile.TemporaryDirectory() as d:
dataset_dir = Path(d)
audiotools.util.generate_chord_dataset(
max_voices=8, num_items=3, output_dir=dataset_dir)
loaders = [
audiotools.data.datasets.AudioLoader([dataset_dir / f"track_{i}"])
for i in range(3)
]
dataset = audiotools.data.datasets.AudioDataset(
loaders, 44100, n_examples=1000, aligned=True, shuffle_loaders=True)
dataloader = paddle.io.DataLoader(
dataset,
batch_size=16,
num_workers=0,
collate_fn=dataset.collate, )
# Make sure the voice tracks are aligned.
for batch in dataloader:
paths = []
for i in range(len(loaders)):
_paths = [p.split("/")[-1] for p in batch[i]["path"]]
paths.append(_paths)
paths = np.array(paths)
for i in range(paths.shape[1]):
col = paths[:, i]
col = col[col != "none"]
assert np.all(col == col[0])
def test_loader_without_replacement():
with tempfile.TemporaryDirectory() as d:
dataset_dir = Path(d)
num_items = 100
audiotools.util.generate_chord_dataset(
max_voices=1,
num_items=num_items,
output_dir=dataset_dir,
duration=0.01, )
loader = audiotools.data.datasets.AudioLoader(
[dataset_dir], shuffle=False)
dataset = audiotools.data.datasets.AudioDataset(loader, 44100)
for idx in range(num_items):
item = dataset[idx]
assert item["item_idx"] == idx
def test_loader_with_replacement():
with tempfile.TemporaryDirectory() as d:
dataset_dir = Path(d)
num_items = 100
audiotools.util.generate_chord_dataset(
max_voices=1,
num_items=num_items,
output_dir=dataset_dir,
duration=0.01, )
loader = audiotools.data.datasets.AudioLoader([dataset_dir])
dataset = audiotools.data.datasets.AudioDataset(
loader, 44100, without_replacement=False)
for idx in range(num_items):
item = dataset[idx]
def test_loader_out_of_range():
with tempfile.TemporaryDirectory() as d:
dataset_dir = Path(d)
num_items = 100
audiotools.util.generate_chord_dataset(
max_voices=1,
num_items=num_items,
output_dir=dataset_dir,
duration=0.01, )
loader = audiotools.data.datasets.AudioLoader([dataset_dir])
item = loader(
sample_rate=44100,
duration=0.01,
state=audiotools.util.random_state(0),
source_idx=0,
item_idx=101, )
assert item["path"] == "none"
def test_dataset_pipeline():
transform = tfm.Compose([
tfm.RoomImpulseResponse(sources=["tests/audio/irs.csv"]),
tfm.BackgroundNoise(sources=["tests/audio/noises.csv"]),
])
loader = audiotools.data.datasets.AudioLoader(
sources=["tests/audio/spk.csv"])
dataset = audiotools.data.datasets.AudioDataset(
loader,
44100,
n_examples=10,
transform=transform, )
dataloader = paddle.io.DataLoader(
dataset, num_workers=0, batch_size=1, collate_fn=dataset.collate)
for batch in dataloader:
batch = audiotools.core.util.prepare_batch(batch, device="cpu")
kwargs = batch["transform_args"]
signal = batch["signal"]
batch = dataset.transform(signal, **kwargs)
class NumberDataset:
def __init__(self):
pass
def __len__(self):
return 10
def __getitem__(self, idx):
return {"idx": idx}
def test_concat_dataset():
d1 = NumberDataset()
d2 = NumberDataset()
d3 = NumberDataset()
d = audiotools.datasets.ConcatDataset([d1, d2, d3])
x = d.collate([d[i] for i in range(len(d))])["idx"].tolist()
t = []
for i in range(10):
t += [i, i, i]
assert x == t

@ -0,0 +1,29 @@
import sys
import tempfile
from pathlib import Path
import paddle
sys.path.append("/home/work/pdaudoio")
from audiotools.core.util import find_audio
from audiotools.core.util import read_sources
from audiotools.data import preprocess
def test_create_csv():
with tempfile.NamedTemporaryFile(suffix=".csv") as f:
preprocess.create_csv(
find_audio("./tests/audio/spk", ext=["wav"]), f.name, loudness=True)
def test_create_csv_with_empty_rows():
audio_files = find_audio("./tests/audio/spk", ext=["wav"])
audio_files.insert(0, "")
audio_files.insert(2, "")
with tempfile.NamedTemporaryFile(suffix=".csv") as f:
preprocess.create_csv(audio_files, f.name, loudness=True)
audio_files = read_sources([f.name], remove_empty=True)
assert len(audio_files[0]) == 1
audio_files = read_sources([f.name], remove_empty=False)
assert len(audio_files[0]) == 3

@ -0,0 +1,96 @@
import sys
import time
sys.path.append("/home/work/pdaudoio")
import paddle
from visualdl import LogWriter
from audiotools.ml.decorators import timer
from audiotools.ml.decorators import Tracker
from audiotools.ml.decorators import when
def test_all_decorators():
rank = 0
max_iters = 100
writer = LogWriter("/tmp/logs")
tracker = Tracker(writer, log_file="/tmp/log.txt")
train_data = range(100)
val_data = range(100)
@tracker.log("train", "value", history=False)
@tracker.track("train", max_iters, tracker.step)
@timer()
def train_loop():
i = tracker.step
time.sleep(0.01)
return {
"loss": paddle.exp(paddle.to_tensor([-i / 100], dtype="float32")),
"mel": paddle.exp(paddle.to_tensor([-i / 100], dtype="float32")),
"stft": paddle.exp(paddle.to_tensor([-i / 100], dtype="float32")),
"waveform":
paddle.exp(paddle.to_tensor([-i / 100], dtype="float32")),
"not_scalar": paddle.arange(start=0, end=10, step=1, dtype="int64"),
}
@tracker.track("val", len(val_data))
@timer()
def val_loop():
i = tracker.step
time.sleep(0.01)
return {
"loss": paddle.exp(paddle.to_tensor([-i / 100], dtype="float32")),
"mel": paddle.exp(paddle.to_tensor([-i / 100], dtype="float32")),
"stft": paddle.exp(paddle.to_tensor([-i / 100], dtype="float32")),
"waveform":
paddle.exp(paddle.to_tensor([-i / 100], dtype="float32")),
"not_scalar": paddle.arange(10, dtype="int64"),
"string": "string",
}
@when(lambda: tracker.step % 1000 == 0 and rank == 0)
@paddle.no_grad()
def save_samples():
tracker.print("Saving samples to TensorBoard.")
@when(lambda: tracker.step % 100 == 0 and rank == 0)
def checkpoint():
save_samples()
if tracker.is_best("val", "mel"):
tracker.print("Best model so far.")
tracker.print("Saving to /runs/exp1")
tracker.done("val", f"Iteration {tracker.step}")
@when(lambda: tracker.step % 100 == 0)
@tracker.log("val", "mean")
@paddle.no_grad()
def validate():
for _ in range(len(val_data)):
output = val_loop()
return output
with tracker.live:
for tracker.step in range(max_iters):
validate()
checkpoint()
train_loop()
state_dict = tracker.state_dict()
tracker.load_state_dict(state_dict)
# If train loop returned not a dict
@tracker.track("train", max_iters, tracker.step)
def train_loop_2():
i = tracker.step
time.sleep(0.01)
with tracker.live:
for tracker.step in range(max_iters):
validate()
checkpoint()
train_loop_2()
if __name__ == "__main__":
test_all_decorators()

@ -0,0 +1,85 @@
import sys
import tempfile
import paddle
from paddle import nn
sys.path.append("/home/work/pdaudoio")
from audiotools import ml
from audiotools import util
SEED = 0
def seed_and_run(model, *args, **kwargs):
util.seed(SEED)
return model(*args, **kwargs)
class Model(ml.BaseModel):
def __init__(self, arg1: float=1.0):
super().__init__()
self.arg1 = arg1
self.linear = nn.Linear(1, 1)
def forward(self, x):
return self.linear(x)
class OtherModel(ml.BaseModel):
def __init__(self):
super().__init__()
self.linear = nn.Linear(1, 1)
def forward(self, x):
return self.linear(x)
def test_base_model():
# Save and load
# ml.BaseModel.EXTERN += ["test_model"]
x = paddle.randn([10, 1])
model1 = Model()
assert str(model1.device) == 'Place(cpu)'
out1 = seed_and_run(model1, x)
with tempfile.NamedTemporaryFile(suffix=".pdparams") as f:
model1.save(
f.name, )
model2 = Model.load(f.name)
out2 = seed_and_run(model2, x)
assert paddle.allclose(out1, out2)
# test re-export
model2.save(f.name)
model3 = Model.load(f.name)
out3 = seed_and_run(model3, x)
assert paddle.allclose(out1, out3)
# make sure legacy/save load works
model1.save(f.name, package=False)
model2 = Model.load(f.name)
out2 = seed_and_run(model2, x)
assert paddle.allclose(out1, out2)
# make sure new way -> legacy save -> legacy load works
model1.save(f.name, package=False)
model2 = Model.load(f.name)
model2.save(f.name, package=False)
model3 = Model.load(f.name)
out3 = seed_and_run(model3, x)
# save/load without package, but with model2 being a model
# without an argument of arg1 to its instantiation.
model1.save(f.name, package=False)
model2 = OtherModel.load(f.name)
out2 = seed_and_run(model2, x)
assert paddle.allclose(out1, out2)
assert paddle.allclose(out1, out3)
with tempfile.TemporaryDirectory() as d:
model1.save_to_folder(d, {"data": 1.0})
Model.load_from_folder(d)
Loading…
Cancel
Save