From 58990f051b0a83734c00063fdd59d8014f0e18da Mon Sep 17 00:00:00 2001 From: drryanhuang Date: Fri, 29 Nov 2024 10:05:20 +0000 Subject: [PATCH] add unitest --- audio/tests_at/core/test_audio_signal✅.py | 610 ++++++++++++++++++++ audio/tests_at/core/test_bands✅.py | 50 ++ audio/tests_at/core/test_fftconv✅.py | 107 ++++ audio/tests_at/core/test_highpass✅.py | 171 ++++++ audio/tests_at/core/test_loudness✅.py | 283 +++++++++ audio/tests_at/core/test_lowpass✅.py | 104 ++++ audio/tests_at/core/test_util✅.py | 153 +++++ audio/tests_at/data/test_datasets✅.py | 206 +++++++ audio/tests_at/data/test_preprocess✅.py | 29 + audio/tests_at/ml/test_decorators✅.py | 96 +++ audio/tests_at/ml/test_model✅.py | 85 +++ 11 files changed, 1894 insertions(+) create mode 100644 audio/tests_at/core/test_audio_signal✅.py create mode 100644 audio/tests_at/core/test_bands✅.py create mode 100644 audio/tests_at/core/test_fftconv✅.py create mode 100644 audio/tests_at/core/test_highpass✅.py create mode 100644 audio/tests_at/core/test_loudness✅.py create mode 100644 audio/tests_at/core/test_lowpass✅.py create mode 100644 audio/tests_at/core/test_util✅.py create mode 100644 audio/tests_at/data/test_datasets✅.py create mode 100644 audio/tests_at/data/test_preprocess✅.py create mode 100644 audio/tests_at/ml/test_decorators✅.py create mode 100644 audio/tests_at/ml/test_model✅.py diff --git a/audio/tests_at/core/test_audio_signal✅.py b/audio/tests_at/core/test_audio_signal✅.py new file mode 100644 index 000000000..f65ce9ef3 --- /dev/null +++ b/audio/tests_at/core/test_audio_signal✅.py @@ -0,0 +1,610 @@ +import pathlib +import sys +import tempfile + +import librosa +import numpy as np +import paddle +import pytest +import rich +sys.path.append("/home/work/pdaudoio") +import audiotools +from audiotools import AudioSignal + + +def test_io(): + audio_path = "tests/audio/spk/f10_script4_produced.wav" + signal = AudioSignal(pathlib.Path(audio_path)) + + with tempfile.NamedTemporaryFile(suffix=".wav") as f: + signal.write(f.name) + signal_from_file = AudioSignal(f.name) + + mp3_signal = AudioSignal(audio_path.replace("wav", "mp3")) + print(mp3_signal) + + assert signal == signal_from_file + print(signal) + print(signal.markdown()) + + mp3_signal = AudioSignal.excerpt( + audio_path.replace("wav", "mp3"), offset=5, duration=5) + assert mp3_signal.signal_duration == 5.0 + assert mp3_signal.duration == 5.0 + assert mp3_signal.length == mp3_signal.signal_length + + rich.print(signal) + + array = np.random.randn(2, 16000) + signal = AudioSignal(array, sample_rate=16000) + assert np.allclose(signal.numpy(), array) + + signal = AudioSignal(array, 44100) + assert signal.sample_rate == 44100 + signal.shape + + with pytest.raises(ValueError): + signal = AudioSignal(5, sample_rate=16000) + + signal = AudioSignal(audio_path, offset=10, duration=10) + assert np.allclose(signal.signal_duration, 10.0) + assert np.allclose(signal.duration, 10.0) + + signal = AudioSignal.excerpt(audio_path, offset=5, duration=5) + assert signal.signal_duration == 5.0 + assert signal.duration == 5.0 + + assert "offset" in signal.metadata + assert "duration" in signal.metadata + + signal = AudioSignal(paddle.randn([1000]), 44100) + assert signal.audio_data.ndim == 3 + assert paddle.all(signal.samples == signal.audio_data) + + audio_path = "tests/audio/spk/f10_script4_produced.wav" + assert AudioSignal(audio_path).hash() == AudioSignal(audio_path).hash() + assert AudioSignal(audio_path).hash() != AudioSignal(audio_path).normalize( + -20).hash() + + with pytest.raises(RuntimeError): + AudioSignal(audio_path, offset=100000, duration=3) + + +def test_copy_and_clone(): + audio_path = "tests/audio/spk/f10_script4_produced.wav" + signal = AudioSignal(audio_path) + signal.stft() + signal.loudness() + + copied = signal.copy() + deep_copied = signal.deepcopy() + cloned = signal.clone() + + for a in ["audio_data", "stft_data", "_loudness"]: + a1 = getattr(signal, a) + a2 = getattr(cloned, a) + a3 = getattr(copied, a) + a4 = getattr(deep_copied, a) + + assert id(a1) != id(a2) + assert id(a1) == id(a3) + assert id(a1) != id(a4) + + assert np.allclose(a1, a2) + assert np.allclose(a1, a3) + assert np.allclose(a1, a4) + + for a in ["path_to_file", "metadata"]: + a1 = getattr(signal, a) + a2 = getattr(cloned, a) + a3 = getattr(copied, a) + a4 = getattr(deep_copied, a) + + assert id(a1) == id(a2) if isinstance(a1, str) else id(a1) != id(a2) + assert id(a1) == id(a3) + assert id(a1) == id(a4) if isinstance(a1, str) else id(a1) != id(a2) + + # for clone, id should differ if path is list, and should differ always for metadata + # if path is string, id should remain same... + + assert signal.original_signal_length == copied.original_signal_length + assert signal.original_signal_length == deep_copied.original_signal_length + assert signal.original_signal_length == cloned.original_signal_length + + signal = signal.detach() + + +@pytest.mark.parametrize("loudness_cutoff", [-np.inf, -160, -80, -40, -20]) +def test_salient_excerpt(loudness_cutoff): + MAP = {-np.inf: 0.0, -160: 0.0, -80: 0.001, -40: 0.01, -20: 0.1} + with tempfile.NamedTemporaryFile(suffix=".wav") as f: + sr = 44100 + signal = AudioSignal(paddle.zeros([sr * 60]), sr) + + signal[..., sr * 20:sr * 21] = MAP[loudness_cutoff] * paddle.randn( + [44100]) + + signal.write(f.name) + signal = AudioSignal.salient_excerpt( + f.name, loudness_cutoff=loudness_cutoff, duration=1, num_tries=None) + + assert "offset" in signal.metadata + assert "duration" in signal.metadata + assert signal.loudness() >= loudness_cutoff + + signal = AudioSignal.salient_excerpt( + f.name, loudness_cutoff=np.inf, duration=1, num_tries=10) + signal = AudioSignal.salient_excerpt( + f.name, + loudness_cutoff=None, + duration=1, ) + + +def test_arithmetic(): + def _make_signals(): + array = np.random.randn(2, 16000) + sig1 = AudioSignal(array, sample_rate=16000) + + array = np.random.randn(2, 16000) + sig2 = AudioSignal(array, sample_rate=16000) + return sig1, sig2 + + # Addition (with a copy) + sig1, sig2 = _make_signals() + sig3 = sig1 + sig2 + assert paddle.allclose(sig3.audio_data, sig1.audio_data + sig2.audio_data) + + # Addition (rmul) + sig1, _ = _make_signals() + sig3 = 5.0 + sig1 + assert paddle.allclose(sig3.audio_data, sig1.audio_data + 5.0) + + # In place addition + sig3, sig2 = _make_signals() + sig1 = sig3.deepcopy() + sig3 += sig2 + assert paddle.allclose(sig3.audio_data, sig1.audio_data + sig2.audio_data) + + # Subtraction (with a copy) + sig1, sig2 = _make_signals() + sig3 = sig1 - sig2 + assert paddle.allclose(sig3.audio_data, sig1.audio_data - sig2.audio_data) + + # In place subtraction + sig3, sig2 = _make_signals() + sig1 = sig3.deepcopy() + sig3 -= sig2 + assert paddle.allclose(sig3.audio_data, sig1.audio_data - sig2.audio_data) + + # Multiplication (element-wise) + sig1, sig2 = _make_signals() + sig3 = sig1 * sig2 + assert paddle.allclose(sig3.audio_data, sig1.audio_data * sig2.audio_data) + + # Multiplication (gain) + sig1, _ = _make_signals() + sig3 = sig1 * 5.0 + assert paddle.allclose(sig3.audio_data, sig1.audio_data * 5.0) + + # Multiplication (rmul) + sig1, _ = _make_signals() + sig3 = 5.0 * sig1 + assert paddle.allclose(sig3.audio_data, sig1.audio_data * 5.0) + + # Multiplication (in-place) + sig3, sig2 = _make_signals() + sig1 = sig3.deepcopy() + sig3 *= sig2 + assert paddle.allclose(sig3.audio_data, sig1.audio_data * sig2.audio_data) + + +def test_equality(): + array = np.random.randn(2, 16000) + sig1 = AudioSignal(array, sample_rate=16000) + sig2 = AudioSignal(array, sample_rate=16000) + + assert sig1 == sig2 + + array = np.random.randn(2, 16000) + sig3 = AudioSignal(array, sample_rate=16000) + + assert sig1 != sig3 + + assert not np.allclose(sig1.numpy(), sig3.numpy()) + + +def test_indexing(): + array = np.random.randn(4, 2, 16000) + sig1 = AudioSignal(array, sample_rate=16000) + + assert np.allclose(sig1[0].audio_data, array[0]) + assert np.allclose(sig1[0, :, 8000].audio_data, array[0, :, 8000]) + + # Test with the associated STFT data. + array = np.random.randn(4, 2, 16000) + sig1 = AudioSignal(array, sample_rate=16000) + sig1.loudness() + sig1.stft() + + indexed = sig1[0] + + assert np.allclose(indexed.audio_data, array[0]) + assert np.allclose(indexed.stft_data, sig1.stft_data[0]) + assert np.allclose(indexed._loudness, sig1._loudness[0]) + + indexed = sig1[0:2] + + assert np.allclose(indexed.audio_data, array[0:2]) + assert np.allclose(indexed.stft_data, sig1.stft_data[0:2]) + assert np.allclose(indexed._loudness, sig1._loudness[0:2]) + + # Test using a boolean tensor to index batch + mask = paddle.to_tensor([True, False, True, False]) + indexed = sig1[mask] + + assert np.allclose(indexed.audio_data, sig1.audio_data[mask]) + assert np.allclose(indexed.stft_data, sig1.stft_data[mask]) + assert np.allclose(indexed._loudness, sig1._loudness[mask]) + + # Set parts of signal using tensor + other_array = paddle.to_tensor(np.random.randn(4, 2, 16000)) + sig1 = AudioSignal(array, sample_rate=16000) + sig1[0, :, 6000:8000] = other_array[0, :, 6000:8000] + + assert np.allclose(sig1[0, :, 6000:8000].audio_data, + other_array[0, :, 6000:8000]) + + # Set parts of signal using AudioSignal + sig2 = AudioSignal(other_array, sample_rate=16000) + + sig1 = AudioSignal(array, sample_rate=16000) + sig1[0, :, 6000:8000] = sig2[0, :, 6000:8000] + + assert np.allclose(sig1[0, :, 6000:8000].audio_data, + sig2[0, :, 6000:8000].audio_data) + + # Check that loudnesses and stft_data get set as well, if only the batch + # dim is indexed. + sig2 = AudioSignal(other_array, sample_rate=16000) + sig2.stft() + sig2.loudness() + + sig1 = AudioSignal(array, sample_rate=16000) + sig1.stft() + sig1.loudness() + + # Test using a boolean tensor to index batch + mask = paddle.to_tensor([True, False, True, False]) + sig1[mask] = sig2[mask] + + for k in ["stft_data", "audio_data", "_loudness"]: + a1 = getattr(sig1, k) + a2 = getattr(sig2, k) + + assert np.allclose(a1[mask], a2[mask]) + + +def test_zeros(): + x = AudioSignal.zeros(0.5, 44100) + assert x.signal_duration == 0.5 + assert x.duration == 0.5 + assert x.sample_rate == 44100 + + +@pytest.mark.parametrize("shape", + ["sine", "square", "sawtooth", "triangle", "beep"]) +def test_waves(shape: str): + # error case + if shape == "beep": + with pytest.raises(ValueError): + AudioSignal.wave(440, 0.5, 44100, shape=shape) + + return + + x = AudioSignal.wave(440, 0.5, 44100, shape=shape) + assert x.duration == 0.5 + assert x.sample_rate == 44100 + + # test the default shape arg + x = AudioSignal.wave(440, 0.5, 44100) + assert x.duration == 0.5 + assert x.sample_rate == 44100 + + +def test_zero_pad(): + array = np.random.randn(4, 2, 16000) + sig1 = AudioSignal(array, sample_rate=16000) + + sig1.zero_pad(100, 100) + zeros = paddle.zeros([4, 2, 100], dtype="float64") + assert paddle.allclose(sig1.audio_data[..., :100], zeros) + assert paddle.allclose(sig1.audio_data[..., -100:], zeros) + + +def test_zero_pad_to(): + array = np.random.randn(4, 2, 16000) + sig1 = AudioSignal(array, sample_rate=16000) + + sig1.zero_pad_to(16100) + zeros = paddle.zeros([4, 2, 100], dtype="float64") + assert paddle.allclose(sig1.audio_data[..., -100:], zeros) + assert sig1.signal_length == 16100 + + sig1 = AudioSignal(array, sample_rate=16000) + sig1.zero_pad_to(15000) + assert sig1.signal_length == 16000 + + sig1 = AudioSignal(array, sample_rate=16000) + sig1.zero_pad_to(16100, mode="before") + zeros = paddle.zeros([4, 2, 100], dtype="float64") + assert paddle.allclose(sig1.audio_data[..., :100], zeros) + assert sig1.signal_length == 16100 + + sig1 = AudioSignal(array, sample_rate=16000) + sig1.zero_pad_to(15000, mode="before") + assert sig1.signal_length == 16000 + + +def test_truncate(): + array = np.random.randn(4, 2, 16000) + sig1 = AudioSignal(array, sample_rate=16000) + + sig1.truncate_samples(100) + assert sig1.signal_length == 100 + assert np.allclose(sig1.audio_data, array[..., :100]) + + +def test_trim(): + array = np.random.randn(4, 2, 16000) + sig1 = AudioSignal(array, sample_rate=16000) + + sig1.trim(100, 100) + assert sig1.signal_length == 16000 - 200 + assert np.allclose(sig1.audio_data, array[..., 100:-100]) + + array = np.random.randn(4, 2, 16000) + sig1 = AudioSignal(array, sample_rate=16000) + sig1.trim(0, 0) + assert np.allclose(sig1.audio_data, array) + + +def test_to_from_ops(): + audio_path = "tests/audio/spk/f10_script4_produced.wav" + signal = AudioSignal(audio_path) + signal.stft() + signal.loudness() + signal = signal.to("cpu") + + assert str(signal.audio_data.place) == "Place(cpu)" + assert isinstance(signal.numpy(), np.ndarray) + + signal.cpu() + # signal.cuda() + signal.float() + + +def test_device(): + audio_path = "tests/audio/spk/f10_script4_produced.wav" + signal = AudioSignal(audio_path) + signal.to("cpu") + + assert str(signal.device) == "Place(cpu)" + + signal.stft() + signal.audio_data = None + assert str(signal.device) == "Place(cpu)" + + +@pytest.mark.parametrize("window_length", [2048, 512]) +@pytest.mark.parametrize("hop_length", [512, 128]) +@pytest.mark.parametrize("window_type", ["sqrt_hann", "hann", None]) +def test_stft(window_length, hop_length, window_type): + if hop_length >= window_length: + hop_length = window_length // 2 + audio_path = "tests/audio/spk/f10_script4_produced.wav" + stft_params = audiotools.STFTParams( + window_length=window_length, + hop_length=hop_length, + window_type=window_type) + for _stft_params in [None, stft_params]: + signal = AudioSignal(audio_path, duration=10, stft_params=_stft_params) + with pytest.raises(RuntimeError): + signal.istft() + + stft_data = signal.stft() + + # assert paddle.allclose(signal.stft_data, stft_data) + assert np.allclose(signal.stft_data.cpu().numpy(), + stft_data.cpu().numpy()) + copied_signal = signal.deepcopy() + copied_signal.stft() + copied_signal = copied_signal.istft() + + assert copied_signal == signal + + mag = signal.magnitude + phase = signal.phase + + recon_stft = mag * paddle.exp(1j * phase) + # assert paddle.allclose(recon_stft, signal.stft_data) + assert np.allclose(recon_stft.cpu().numpy(), + signal.stft_data.cpu().numpy()) + + signal.stft_data = None + mag = signal.magnitude + signal.stft_data = None + phase = signal.phase + + recon_stft = mag * paddle.exp(1j * phase) + # assert paddle.allclose(recon_stft, signal.stft_data) + assert np.allclose(recon_stft.cpu().numpy(), + signal.stft_data.cpu().numpy()) + + # Test with match_stride=True, ignoring the beginning and end. + s = signal.stft_params + if s.hop_length == s.window_length // 4: + og_signal = signal.clone() + stft_data = signal.stft(match_stride=True) + recon_data = signal.istft(match_stride=True) + discard = window_length * 2 + + right_pad, _ = signal.compute_stft_padding( + s.window_length, s.hop_length, match_stride=True) + length = signal.signal_length + right_pad + assert stft_data.shape[-1] == length // s.hop_length + + assert paddle.allclose( + recon_data.audio_data[..., discard:-discard], + og_signal.audio_data[..., discard:-discard], + atol=1e-6, ) + + +def test_log_magnitude(): + audio_path = "tests/audio/spk/f10_script4_produced.wav" + for _ in range(10): + signal = AudioSignal.excerpt(audio_path, duration=5.0) + magnitude = signal.magnitude.numpy()[0, 0] + librosa_log_mag = librosa.amplitude_to_db(magnitude) + log_mag = signal.log_magnitude().numpy()[0, 0] + + # print(abs((log_mag - librosa_log_mag)).max()) + assert np.allclose(log_mag, librosa_log_mag, atol=10e-7) + + +@pytest.mark.parametrize("n_mels", [40, 80, 128]) +@pytest.mark.parametrize("window_length", [2048, 512]) +@pytest.mark.parametrize("hop_length", [512, 128]) +@pytest.mark.parametrize("window_type", ["sqrt_hann", "hann", None]) +def test_mel_spectrogram(n_mels, window_length, hop_length, window_type): + if hop_length >= window_length: + hop_length = window_length // 2 + audio_path = "tests/audio/spk/f10_script4_produced.wav" + stft_params = audiotools.STFTParams( + window_length=window_length, + hop_length=hop_length, + window_type=window_type) + for _stft_params in [None, stft_params]: + signal = AudioSignal(audio_path, duration=10, stft_params=_stft_params) + mel_spec = signal.mel_spectrogram(n_mels=n_mels) + assert mel_spec.shape[2] == n_mels + + +@pytest.mark.parametrize("n_mfcc", [20, 40]) +@pytest.mark.parametrize("n_mels", [40, 80, 128]) +@pytest.mark.parametrize("window_length", [2048, 512]) +@pytest.mark.parametrize("hop_length", [512, 128]) +def test_mfcc(n_mfcc, n_mels, window_length, hop_length): + if hop_length >= window_length: + hop_length = window_length // 2 + audio_path = "tests/audio/spk/f10_script4_produced.wav" + stft_params = audiotools.STFTParams( + window_length=window_length, hop_length=hop_length) + for _stft_params in [None, stft_params]: + signal = AudioSignal(audio_path, duration=10, stft_params=_stft_params) + mfcc = signal.mfcc(n_mfcc=n_mfcc, n_mels=n_mels) + assert mfcc.shape[2] == n_mfcc + + +def test_to_mono(): + array = np.random.randn(4, 2, 16000) + sr = 16000 + + signal = AudioSignal(array, sample_rate=sr) + assert signal.num_channels == 2 + + signal = signal.to_mono() + assert signal.num_channels == 1 + + +def test_float(): + array = np.random.randn(4, 1, 16000).astype("float64") + sr = 1600 + signal = AudioSignal(array, sample_rate=sr) + + signal = signal.float() + assert signal.audio_data.dtype == paddle.float32 + + +@pytest.mark.parametrize("sample_rate", [8000, 16000, 22050, 44100, 48000]) +def test_resample(sample_rate): + array = np.random.randn(4, 2, 16000) + sr = 16000 + + signal = AudioSignal(array, sample_rate=sr) + + signal = signal.resample(sample_rate) + assert signal.sample_rate == sample_rate + assert signal.signal_length == sample_rate + + +def test_batching(): + signals = [] + batch_size = 16 + + # All same length, same sample rate. + for _ in range(batch_size): + array = np.random.randn(2, 16000) + signal = AudioSignal(array, sample_rate=16000) + signals.append(signal) + + batched_signal = AudioSignal.batch(signals) + assert batched_signal.batch_size == batch_size + + signals = [] + # All different lengths, same sample rate, pad signals + for _ in range(batch_size): + L = np.random.randint(8000, 32000) + array = np.random.randn(2, L) + signal = AudioSignal(array, sample_rate=16000) + signals.append(signal) + + with pytest.raises(RuntimeError): + batched_signal = AudioSignal.batch(signals) + + signal_lengths = [x.signal_length for x in signals] + max_length = max(signal_lengths) + batched_signal = AudioSignal.batch(signals, pad_signals=True) + + assert batched_signal.signal_length == max_length + assert batched_signal.batch_size == batch_size + + signals = [] + # All different lengths, same sample rate, truncate signals + for _ in range(batch_size): + L = np.random.randint(8000, 32000) + array = np.random.randn(2, L) + signal = AudioSignal(array, sample_rate=16000) + signals.append(signal) + + with pytest.raises(RuntimeError): + batched_signal = AudioSignal.batch(signals) + + signal_lengths = [x.signal_length for x in signals] + min_length = min(signal_lengths) + batched_signal = AudioSignal.batch(signals, truncate_signals=True) + + assert batched_signal.signal_length == min_length + assert batched_signal.batch_size == batch_size + + signals = [] + # All different lengths, different sample rate, pad signals + for _ in range(batch_size): + L = np.random.randint(8000, 32000) + sr = np.random.choice([8000, 16000, 32000]) + array = np.random.randn(2, L) + signal = AudioSignal(array, sample_rate=int(sr)) + signals.append(signal) + + with pytest.raises(RuntimeError): + batched_signal = AudioSignal.batch(signals) + + signal_lengths = [x.signal_length for x in signals] + max_length = max(signal_lengths) + for i, x in enumerate(signals): + x.path_to_file = i + batched_signal = AudioSignal.batch(signals, resample=True, pad_signals=True) + + assert batched_signal.signal_length == max_length + assert batched_signal.batch_size == batch_size + assert batched_signal.path_to_file == list(range(len(signals))) + assert batched_signal.path_to_input_file == batched_signal.path_to_file diff --git a/audio/tests_at/core/test_bands✅.py b/audio/tests_at/core/test_bands✅.py new file mode 100644 index 000000000..35be292f6 --- /dev/null +++ b/audio/tests_at/core/test_bands✅.py @@ -0,0 +1,50 @@ +# File under the MIT license, see https://github.com/adefossez/julius/LICENSE for details. +# Author: adefossez, 2020 +import random +import sys +import unittest + +import paddle +sys.path.append("/home/work/pdaudoio") +from audiotools.core import pure_tone, SplitBands, split_bands + + +def delta(a, b, ref, fraction=0.9): + length = a.shape[-1] + compare_length = int(length * fraction) + offset = (length - compare_length) // 2 + a = a[..., offset:offset + length] + b = b[..., offset:offset + length] + return 100 * paddle.abs(a - b).mean() / ref.std() + + +TOLERANCE = 0.5 # Tolerance to errors as percentage of the std of the input signal + + +class _BaseTest(unittest.TestCase): + def assertSimilar(self, a, b, ref, msg=None, tol=TOLERANCE): + self.assertLessEqual(delta(a, b, ref), tol, msg) + + +class TestLowPassFilters(_BaseTest): + def setUp(self): + paddle.seed(1234) + random.seed(1234) + + def test_keep_or_kill(self): + sr = 256 + low = pure_tone(10, sr) + mid = pure_tone(40, sr) + high = pure_tone(100, sr) + + x = low + mid + high + + decomp = split_bands(x, sr, cutoffs=[20, 70]) + self.assertEqual(len(decomp), 3) + for est, gt, name in zip(decomp, [low, mid, high], + ["low", "mid", "high"]): + self.assertSimilar(est, gt, gt, name) + + +if __name__ == "__main__": + unittest.main() diff --git a/audio/tests_at/core/test_fftconv✅.py b/audio/tests_at/core/test_fftconv✅.py new file mode 100644 index 000000000..3243f337d --- /dev/null +++ b/audio/tests_at/core/test_fftconv✅.py @@ -0,0 +1,107 @@ +# File under the MIT license, see https://github.com/your_repo/your_license for details. +# Author: your_name, current_year +import random +import sys +import unittest + +import paddle +import paddle.nn.functional as F +sys.path.append("/home/work/pdaudoio") +from audiotools.core import fft_conv1d, FFTConv1d + +TOLERANCE = 1e-4 # as relative delta in percentage + + +class _BaseTest(unittest.TestCase): + def setUp(self): + paddle.seed(1234) + random.seed(1234) + + def assertSimilar(self, a, b, msg=None, tol=TOLERANCE): + delta = 100 * paddle.norm(a - b, p=2) / paddle.norm(b, p=2) + self.assertLessEqual(delta.numpy(), tol, msg) + + def compare_paddle(self, + *args, + block_ratio=10, + msg=None, + tol=TOLERANCE, + **kwargs): + y_ref = F.conv1d(*args, **kwargs) + y = fft_conv1d(*args, block_ratio=block_ratio, **kwargs) + self.assertEqual(list(y.shape), list(y_ref.shape), msg) + self.assertSimilar(y, y_ref, msg, tol) + + +class TestFFTConv1d(_BaseTest): + def test_same_as_paddle(self): + for _ in range(5): + kernel_size = random.randrange(4, 128) + batch_size = random.randrange(1, 6) + length = random.randrange(kernel_size, 1024) + chin = random.randrange(1, 12) + chout = random.randrange(1, 12) + block_ratio = random.choice([5, 10, 20]) + bias = random.random() < 0.5 + if random.random() < 0.5: + padding = 0 + else: + padding = random.randrange(kernel_size // 2, 2 * kernel_size) + x = paddle.randn([batch_size, chin, length]) + w = paddle.randn([chout, chin, kernel_size]) + keys = [ + "length", "kernel_size", "chin", "chout", "block_ratio", "bias" + ] + loc = locals() + state = {key: loc[key] for key in keys} + if bias: + bias = paddle.randn([chout]) + else: + bias = None + for stride in [1, 2, 5]: + state["stride"] = stride + self.compare_paddle( + x, + w, + bias, + stride, + padding, + block_ratio=block_ratio, + msg=repr(state)) + + def test_small_input(self): + x = paddle.randn([1, 5, 19]) + w = paddle.randn([10, 5, 32]) + with self.assertRaises(RuntimeError): + fft_conv1d(x, w) + + x = paddle.randn([1, 5, 19]) + w = paddle.randn([10, 5, 19]) + self.assertEqual(list(fft_conv1d(x, w).shape), [1, 10, 1]) + + def test_block_ratio(self): + x = paddle.randn([1, 5, 1024]) + w = paddle.randn([10, 5, 19]) + ref = fft_conv1d(x, w) + for block_ratio in [1, 5, 10, 20]: + y = fft_conv1d(x, w, block_ratio=block_ratio) + self.assertSimilar(y, ref, msg=str(block_ratio)) + + with self.assertRaises(RuntimeError): + y = fft_conv1d(x, w, block_ratio=0.9) + + def test_module(self): + x = paddle.randn([16, 4, 1024]) + mod = FFTConv1d(4, 5, 8, bias=True) + mod(x) + mod = FFTConv1d(4, 5, 8, bias=False) + mod(x) + + def test_dynamic_graph(self): + x = paddle.randn([16, 4, 1024]) + mod = FFTConv1d(4, 5, 8, bias=True) + self.assertEqual(list(mod(x).shape), [16, 5, 1024 - 8 + 1]) + + +if __name__ == "__main__": + unittest.main() diff --git a/audio/tests_at/core/test_highpass✅.py b/audio/tests_at/core/test_highpass✅.py new file mode 100644 index 000000000..8ad302abf --- /dev/null +++ b/audio/tests_at/core/test_highpass✅.py @@ -0,0 +1,171 @@ +# File under the MIT license, see https://github.com/adefossez/julius/LICENSE for details. +# Author: adefossez, 2020 +import math +import random +import sys +import unittest + +import paddle +sys.path.append("/home/work/pdaudoio") +from audiotools.core import highpass_filter, highpass_filters + + +def pure_tone(freq: float, sr: float=128, dur: float=4, device=None): + """ + Return a pure tone, i.e. cosine. + + Args: + freq (float): frequency (in Hz) + sr (float): sample rate (in Hz) + dur (float): duration (in seconds) + """ + time = paddle.arange(int(sr * dur), dtype="float32") / sr + return paddle.cos(2 * math.pi * freq * time) + + +def delta(a, b, ref, fraction=0.9): + length = a.shape[-1] + compare_length = int(length * fraction) + offset = (length - compare_length) // 2 + a = a[..., offset:offset + length] + b = b[..., offset:offset + length] + # 计算绝对差值,均值,然后除以ref的标准差,乘以100 + return 100 * paddle.mean(paddle.abs(a - b)) / paddle.std(ref) + + +TOLERANCE = 1 # Tolerance to errors as percentage of the std of the input signal + + +class _BaseTest(unittest.TestCase): + def assertSimilar(self, a, b, ref, msg=None, tol=TOLERANCE): + self.assertLessEqual(delta(a, b, ref), tol, msg) + + +class TestHighPassFilters(_BaseTest): + def setUp(self): + paddle.seed(1234) + random.seed(1234) + + def test_keep_or_kill(self): + for _ in range(10): + freq = random.uniform(0.01, 0.4) + sr = 1024 + tone = pure_tone(freq * sr, sr=sr, dur=10) + + # For this test we accept 5% tolerance in amplitude, or -26dB in power. + tol = 5 + zeros = 16 + + # If cutoff frequency is under freq, output should be input + y_pass = highpass_filter(tone, 0.9 * freq, zeros=zeros) + self.assertSimilar( + y_pass, tone, tone, f"freq={freq}, pass", tol=tol) + + # If cutoff frequency is over freq, output should be zero + y_killed = highpass_filter(tone, 1.1 * freq, zeros=zeros) + self.assertSimilar( + y_killed, 0 * tone, tone, f"freq={freq}, kill", tol=tol) + + def test_fft_nofft(self): + for _ in range(10): + x = paddle.randn([1024]) + freq = random.uniform(0.01, 0.5) + y_fft = highpass_filter(x, freq, fft=True) + y_ref = highpass_filter(x, freq, fft=False) + self.assertSimilar(y_fft, y_ref, x, f"freq={freq}", tol=0.01) + + def test_constant(self): + x = paddle.ones([2048]) + for zeros in [4, 10]: + for freq in [0.01, 0.1]: + y_high = highpass_filter(x, freq, zeros=zeros) + self.assertLessEqual(y_high.abs().mean(), 1e-6, (zeros, freq)) + + def test_stride(self): + x = paddle.randn([1024]) + + y = highpass_filters(x, [0.1, 0.2], stride=1)[:, ::3] + y2 = highpass_filters(x, [0.1, 0.2], stride=3) + + self.assertEqual(y.shape, y2.shape) + self.assertSimilar(y, y2, x) + + y = highpass_filters(x, [0.1, 0.2], stride=1, pad=False)[:, ::3] + y2 = highpass_filters(x, [0.1, 0.2], stride=3, pad=False) + + self.assertEqual(y.shape, y2.shape) + self.assertSimilar(y, y2, x) + + +# class TestBandPassFilters(_BaseTest): + +# def setUp(self): +# paddle.seed(1234) +# random.seed(1234) + +# def test_keep_or_kill(self): +# for _ in range(10): +# freq = random.uniform(0.01, 0.4) +# sr = 1024 +# tone = pure_tone(freq * sr, sr=sr, dur=10) + +# # For this test we accept 5% tolerance in amplitude, or -26dB in power. +# tol = 5 +# zeros = 16 + +# y_pass = filters.bandpass_filter(tone, 0.9 * freq, 1.1 * freq, zeros=zeros) +# self.assertSimilar(y_pass, tone, tone, f"freq={freq}, pass", tol=tol) + +# y_killed = filters.bandpass_filter(tone, 1.1 * freq, 1.2 * freq, zeros=zeros) +# self.assertSimilar(y_killed, 0 * tone, tone, f"freq={freq}, kill", tol=tol) + +# y_killed = filters.bandpass_filter(tone, 0.8 * freq, 0.9 * freq, zeros=zeros) +# self.assertSimilar(y_killed, 0 * tone, tone, f"freq={freq}, kill", tol=tol) + +# def test_fft_nofft(self): +# for _ in range(10): +# x = paddle.randn([1024]) +# freq = random.uniform(0.01, 0.5) +# freq2 = random.uniform(freq, 0.5) +# y_fft = filters.bandpass_filter(x, freq, freq2, fft=True) +# y_ref = filters.bandpass_filter(x, freq, freq2, fft=False) +# self.assertSimilar(y_fft, y_ref, x, f"freq={freq}", tol=0.01) + +# def test_constant(self): +# x = paddle.ones([2048]) +# for zeros in [4, 10]: +# for freq in [0.01, 0.1]: +# y = filters.bandpass_filter(x, freq, 1.2 * freq, zeros=zeros) +# self.assertLessEqual(y.abs().mean(), 1e-6, (zeros, freq)) + +# def test_stride(self): +# x = paddle.randn([1024]) + +# y = filters.bandpass_filter(x, 0.1, 0.2, stride=1)[::3] +# y2 = filters.bandpass_filter(x, 0.1, 0.2, stride=3) + +# self.assertEqual(y.shape, y2.shape) +# self.assertSimilar(y, y2, x) + +# y = filters.bandpass_filter(x, 0.1, 0.2, stride=1, pad=False)[::3] +# y2 = filters.bandpass_filter(x, 0.1, 0.2, stride=3, pad=False) + +# self.assertEqual(y.shape, y2.shape) +# self.assertSimilar(y, y2, x) + +# def test_same_as_highpass(self): +# x = paddle.randn([1024]) + +# y_ref = highpass_filter(x, 0.2) +# y = filters.bandpass_filter(x, 0.2, 0.5) +# self.assertSimilar(y, y_ref, x) + +# def test_same_as_lowpass(self): +# x = paddle.randn([1024]) + +# y_ref = filters.lowpass_filter(x, 0.2) +# y = filters.bandpass_filter(x, 0.0, 0.2) +# self.assertSimilar(y, y_ref, x) + +if __name__ == "__main__": + unittest.main() diff --git a/audio/tests_at/core/test_loudness✅.py b/audio/tests_at/core/test_loudness✅.py new file mode 100644 index 000000000..a25ad45cb --- /dev/null +++ b/audio/tests_at/core/test_loudness✅.py @@ -0,0 +1,283 @@ +import sys + +import numpy as np +import pyloudnorm +import soundfile as sf +sys.path.append("/home/work/pdaudoio") +from audiotools import AudioSignal +from audiotools import datasets +from audiotools import Meter +from audiotools import transforms + +ATOL = 1e-1 + + +def test_loudness_against_pyln(): + audio_path = "tests/audio/spk/f10_script4_produced.wav" + signal = AudioSignal(audio_path, offset=5, duration=10) + signal_loudness = signal.loudness() + + meter = pyloudnorm.Meter( + signal.sample_rate, filter_class="K-weighting", block_size=0.4) + py_loudness = meter.integrated_loudness(signal.numpy()[0].T) + assert np.allclose(signal_loudness, py_loudness) + + +def test_loudness_short(): + audio_path = "tests/audio/spk/f10_script4_produced.wav" + signal = AudioSignal(audio_path, offset=10, duration=0.25) + signal_loudness = signal.loudness() + + +def test_batch_loudness(): + np.random.seed(0) + array = np.random.randn(16, 2, 16000) + array /= np.abs(array).max() + + gains = np.random.rand(array.shape[0])[:, None, None] + array = array * gains + + meter = pyloudnorm.Meter(16000) + py_loudness = [ + meter.integrated_loudness(array[i].T) for i in range(array.shape[0]) + ] + + meter = Meter(16000) + meter.filter_class + at_loudness_iso = [ + meter.integrated_loudness(array[i].T).item() + for i in range(array.shape[0]) + ] + + assert np.allclose(py_loudness, at_loudness_iso, atol=1e-1) + + signal = AudioSignal(array, sample_rate=16000) + at_loudness_batch = signal.loudness() + assert np.allclose(py_loudness, at_loudness_batch, atol=1e-1) + + +# Tests below are copied from pyloudnorm +def test_integrated_loudness(): + data, rate = sf.read("tests/audio/loudness/sine_1000.wav") + meter = Meter(rate) + loudness = meter(data) + + targetLoudness = -3.0523438444331137 + assert np.allclose(loudness, targetLoudness) + + +def test_rel_gate_test(): + data, rate = sf.read("tests/audio/loudness/1770-2_Comp_RelGateTest.wav") + meter = Meter(rate) + loudness = meter.integrated_loudness(data) + + targetLoudness = -10.0 + assert np.allclose(loudness, targetLoudness, atol=ATOL) + + +def test_abs_gate_test(): + data, rate = sf.read("tests/audio/loudness/1770-2_Comp_AbsGateTest.wav") + meter = Meter(rate) + loudness = meter.integrated_loudness(data) + + targetLoudness = -69.5 + assert np.allclose(loudness, targetLoudness, atol=ATOL) + + +def test_24LKFS_25Hz_2ch(): + data, rate = sf.read("tests/audio/loudness/1770-2_Comp_24LKFS_25Hz_2ch.wav") + meter = Meter(rate) + loudness = meter.integrated_loudness(data) + + targetLoudness = -24.0 + assert np.allclose(loudness, targetLoudness, atol=ATOL) + + +def test_24LKFS_100Hz_2ch(): + data, rate = sf.read( + "tests/audio/loudness/1770-2_Comp_24LKFS_100Hz_2ch.wav") + meter = Meter(rate) + loudness = meter.integrated_loudness(data) + + targetLoudness = -24.0 + assert np.allclose(loudness, targetLoudness, atol=ATOL) + + +def test_24LKFS_500Hz_2ch(): + data, rate = sf.read( + "tests/audio/loudness/1770-2_Comp_24LKFS_500Hz_2ch.wav") + meter = Meter(rate) + loudness = meter.integrated_loudness(data) + + targetLoudness = -24.0 + assert np.allclose(loudness, targetLoudness, atol=ATOL) + + +def test_24LKFS_1000Hz_2ch(): + data, rate = sf.read( + "tests/audio/loudness/1770-2_Comp_24LKFS_1000Hz_2ch.wav") + meter = Meter(rate) + loudness = meter.integrated_loudness(data) + + targetLoudness = -24.0 + assert np.allclose(loudness, targetLoudness, atol=ATOL) + + +def test_24LKFS_2000Hz_2ch(): + data, rate = sf.read( + "tests/audio/loudness/1770-2_Comp_24LKFS_2000Hz_2ch.wav") + meter = Meter(rate) + loudness = meter.integrated_loudness(data) + + targetLoudness = -24.0 + assert np.allclose(loudness, targetLoudness, atol=ATOL) + + +def test_24LKFS_10000Hz_2ch(): + data, rate = sf.read( + "tests/audio/loudness/1770-2_Comp_24LKFS_10000Hz_2ch.wav") + meter = Meter(rate) + loudness = meter.integrated_loudness(data) + + targetLoudness = -24.0 + assert np.allclose(loudness, targetLoudness, atol=ATOL) + + +def test_23LKFS_25Hz_2ch(): + data, rate = sf.read("tests/audio/loudness/1770-2_Comp_23LKFS_25Hz_2ch.wav") + meter = Meter(rate) + loudness = meter.integrated_loudness(data) + + targetLoudness = -23.0 + assert np.allclose(loudness, targetLoudness, atol=ATOL) + + +def test_23LKFS_100Hz_2ch(): + data, rate = sf.read( + "tests/audio/loudness/1770-2_Comp_23LKFS_100Hz_2ch.wav") + meter = Meter(rate) + loudness = meter.integrated_loudness(data) + + targetLoudness = -23.0 + assert np.allclose(loudness, targetLoudness, atol=ATOL) + + +def test_23LKFS_500Hz_2ch(): + data, rate = sf.read( + "tests/audio/loudness/1770-2_Comp_23LKFS_500Hz_2ch.wav") + meter = Meter(rate) + loudness = meter.integrated_loudness(data) + + targetLoudness = -23.0 + assert np.allclose(loudness, targetLoudness, atol=ATOL) + + +def test_23LKFS_1000Hz_2ch(): + data, rate = sf.read( + "tests/audio/loudness/1770-2_Comp_23LKFS_1000Hz_2ch.wav") + meter = Meter(rate) + loudness = meter.integrated_loudness(data) + + targetLoudness = -23.0 + assert np.allclose(loudness, targetLoudness, atol=ATOL) + + +def test_23LKFS_2000Hz_2ch(): + data, rate = sf.read( + "tests/audio/loudness/1770-2_Comp_23LKFS_2000Hz_2ch.wav") + meter = Meter(rate) + loudness = meter.integrated_loudness(data) + + targetLoudness = -23.0 + assert np.allclose(loudness, targetLoudness, atol=ATOL) + + +def test_23LKFS_10000Hz_2ch(): + data, rate = sf.read( + "tests/audio/loudness/1770-2_Comp_23LKFS_10000Hz_2ch.wav") + meter = Meter(rate) + loudness = meter.integrated_loudness(data) + + targetLoudness = -23.0 + assert np.allclose(loudness, targetLoudness, atol=ATOL) + + +def test_18LKFS_frequency_sweep(): + data, rate = sf.read( + "tests/audio/loudness/1770-2_Comp_18LKFS_FrequencySweep.wav") + meter = Meter(rate) + loudness = meter.integrated_loudness(data) + + targetLoudness = -18.0 + assert np.allclose(loudness, targetLoudness, atol=ATOL) + + +def test_conf_stereo_vinL_R_23LKFS(): + data, rate = sf.read( + "tests/audio/loudness/1770-2_Conf_Stereo_VinL+R-23LKFS.wav") + meter = Meter(rate) + loudness = meter.integrated_loudness(data) + + targetLoudness = -23.0 + assert np.allclose(loudness, targetLoudness, atol=ATOL) + + +def test_conf_monovoice_music_24LKFS(): + data, rate = sf.read( + "tests/audio/loudness/1770-2_Conf_Mono_Voice+Music-24LKFS.wav") + meter = Meter(rate) + loudness = meter.integrated_loudness(data) + + targetLoudness = -24.0 + assert np.allclose(loudness, targetLoudness, atol=ATOL) + + +def conf_monovoice_music_24LKFS(): + data, rate = sf.read( + "tests/audio/loudness/1770-2_Conf_Mono_Voice+Music-24LKFS.wav") + meter = Meter(rate) + loudness = meter.integrated_loudness(data) + + targetLoudness = -24.0 + assert np.allclose(loudness, targetLoudness, atol=ATOL) + + +def test_conf_monovoice_music_23LKFS(): + data, rate = sf.read( + "tests/audio/loudness/1770-2_Conf_Mono_Voice+Music-23LKFS.wav") + meter = Meter(rate) + loudness = meter.integrated_loudness(data) + + targetLoudness = -23.0 + assert np.allclose(loudness, targetLoudness, atol=ATOL) + + +def test_fir_accuracy(): + transform = transforms.Compose( + transforms.ClippingDistortion(prob=0.5), + transforms.LowPass(prob=0.5), + transforms.HighPass(prob=0.5), + transforms.Equalizer(prob=0.5), + prob=0.5, ) + loader = datasets.AudioLoader(sources=["tests/audio/spk.csv"]) + dataset = datasets.AudioDataset( + loader, + 44100, + 10, + 5.0, + transform=transform, ) + + for i in range(20): + item = dataset[i] + kwargs = item["transform_args"] + signal = item["signal"] + signal = transform(signal, **kwargs) + + signal._loudness = None + iir_db = signal.clone().loudness() + fir_db = signal.clone().loudness(use_fir=True) + + assert np.allclose(iir_db, fir_db, atol=1e-2) + + +test_fir_accuracy() diff --git a/audio/tests_at/core/test_lowpass✅.py b/audio/tests_at/core/test_lowpass✅.py new file mode 100644 index 000000000..a35d64f9f --- /dev/null +++ b/audio/tests_at/core/test_lowpass✅.py @@ -0,0 +1,104 @@ +# File under the MIT license, see https://github.com/adefossez/julius/LICENSE for details. +# Author: adefossez, 2020 +import math +import random +import sys +import unittest + +import numpy as np +import paddle +sys.path.append("/home/work/pdaudoio") +from audiotools.core import LowPassFilter, LowPassFilters, lowpass_filter, resample_frac + + +def pure_tone(freq: float, sr: float=128, dur: float=4, device=None): + """ + Return a pure tone, i.e. cosine. + + Args: + freq (float): frequency (in Hz) + sr (float): sample rate (in Hz) + dur (float): duration (in seconds) + """ + time = paddle.arange(int(sr * dur), dtype="float32") / sr + return paddle.cos(2 * math.pi * freq * time) + + +def delta(a, b, ref, fraction=0.9): + length = a.shape[-1] + compare_length = int(length * fraction) + offset = (length - compare_length) // 2 + a = a[..., offset:offset + length] + b = b[..., offset:offset + length] + # 计算绝对差值,均值,然后除以ref的标准差,乘以100 + return 100 * paddle.mean(paddle.abs(a - b)) / paddle.std(ref) + + +TOLERANCE = 1 # Tolerance to errors as percentage of the std of the input signal + + +class _BaseTest(unittest.TestCase): + def assertSimilar(self, a, b, ref, msg=None, tol=TOLERANCE): + self.assertLessEqual(delta(a, b, ref), tol, msg) + + +class TestLowPassFilters(_BaseTest): + def setUp(self): + paddle.seed(1234) + random.seed(1234) + + def test_keep_or_kill(self): + for _ in range(10): + freq = random.uniform(0.01, 0.4) + sr = 1024 + tone = pure_tone(freq * sr, sr=sr, dur=10) + + # For this test we accept 5% tolerance in amplitude, or -26dB in power. + tol = 5 + zeros = 16 + + # If cutoff frequency is under freq, output should be zero + y_killed = lowpass_filter(tone, 0.9 * freq, zeros=zeros) + self.assertSimilar( + y_killed, 0 * y_killed, tone, f"freq={freq}, kill", tol=tol) + + # If cutoff frequency is under freq, output should be input + y_pass = lowpass_filter(tone, 1.1 * freq, zeros=zeros) + self.assertSimilar( + y_pass, tone, tone, f"freq={freq}, pass", tol=tol) + + def test_same_as_downsample(self): + for _ in range(10): + x = paddle.randn([2 * 3 * 4 * 100]) + x = paddle.ones_like(x) + np.random.seed(1234) + x = paddle.to_tensor( + np.random.randn(2 * 3 * 4 * 100), dtype="float32") + rolloff = 0.945 + for old_sr in [2, 3, 4]: + y_resampled = resample_frac( + x, old_sr, 1, rolloff=rolloff, zeros=16) + y_lowpass = lowpass_filter( + x, rolloff / old_sr / 2, stride=old_sr, zeros=16) + self.assertSimilar(y_resampled, y_lowpass, x, + f"old_sr={old_sr}") + + def test_fft_nofft(self): + for _ in range(10): + x = paddle.randn([1024]) + freq = random.uniform(0.01, 0.5) + y_fft = lowpass_filter(x, freq, fft=True) + y_ref = lowpass_filter(x, freq, fft=False) + self.assertSimilar(y_fft, y_ref, x, f"freq={freq}", tol=0.01) + + def test_constant(self): + x = paddle.ones([2048]) + for zeros in [4, 10]: + for freq in [0.01, 0.1]: + y_low = lowpass_filter(x, freq, zeros=zeros) + self.assertLessEqual((y_low - 1).abs().mean(), 1e-6, + (zeros, freq)) + + +if __name__ == "__main__": + unittest.main() diff --git a/audio/tests_at/core/test_util✅.py b/audio/tests_at/core/test_util✅.py new file mode 100644 index 000000000..dc333d2d7 --- /dev/null +++ b/audio/tests_at/core/test_util✅.py @@ -0,0 +1,153 @@ +import os +import random +import sys +import tempfile + +import numpy as np +import paddle +import pytest + +sys.path.append("/home/work/pdaudoio") +from audiotools import util +from audiotools.core.audio_signal import AudioSignal + + +def test_check_random_state(): + # seed is None + rng_type = type(np.random.RandomState(10)) + rng = util.random_state(None) + assert type(rng) == rng_type + + # seed is int + rng = util.random_state(10) + assert type(rng) == rng_type + + # seed is RandomState + rng_test = np.random.RandomState(10) + rng = util.random_state(rng_test) + assert type(rng) == rng_type + + # seed is none of the above : error + pytest.raises(ValueError, util.random_state, "random") + + +def test_seed(): + util.seed(0) + paddle_result_a = paddle.randn([1]) + np_result_a = np.random.randn(1) + py_result_a = random.random() + + util.seed(0, set_cudnn=True) + paddle_result_b = paddle.randn([1]) + np_result_b = np.random.randn(1) + py_result_b = random.random() + + assert paddle_result_a == paddle_result_b + assert np_result_a == np_result_b + assert py_result_a == py_result_b + + +def test_hz_to_bin(): + hz = paddle.to_tensor(np.array([100, 200, 300])) + sr = 1000 + n_fft = 2048 + + bins = util.hz_to_bin(hz, n_fft, sr) + + assert (((bins / n_fft) * sr) - hz).abs().max() < 1 + + +def test_find_audio(): + wav_files = util.find_audio("tests/", ["wav"]) + for a in wav_files: + assert "wav" in str(a) + + audio_files = util.find_audio("tests/", ["flac"]) + assert not audio_files + + # Make sure it works with single audio files + audio_files = util.find_audio("tests/audio/spk//f10_script4_produced.wav") + + # Make sure it works with globs + audio_files = util.find_audio("tests/**/*.wav") + assert len(audio_files) == len(wav_files) + + +def test_chdir(): + with tempfile.TemporaryDirectory(suffix="tmp") as d: + with util.chdir(d): + assert os.path.samefile(d, os.path.realpath(".")) + + +def test_prepare_batch(): + batch = {"tensor": paddle.randn([1]), "non_tensor": np.random.randn(1)} + util.prepare_batch(batch) + + batch = paddle.randn([1]) + util.prepare_batch(batch) + + batch = [paddle.randn([1]), np.random.randn(1)] + util.prepare_batch(batch) + + +def test_sample_dist(): + state = util.random_state(0) + v1 = state.uniform(0.0, 1.0) + v2 = util.sample_from_dist(("uniform", 0.0, 1.0), 0) + assert v1 == v2 + + assert util.sample_from_dist(("const", 1.0)) == 1.0 + + dist_tuple = ("choice", [8, 16, 32]) + assert util.sample_from_dist(dist_tuple) in [8, 16, 32] + + +def test_collate(): + batch_size = 16 + + def _one_item(): + return { + "signal": AudioSignal(paddle.randn([1, 1, 44100]), 44100), + "tensor": paddle.randn([1]), + "string": "Testing", + "dict": { + "nested_signal": + AudioSignal(paddle.randn([1, 1, 44100]), 44100), + }, + } + + items = [_one_item() for _ in range(batch_size)] + collated = util.collate(items) + + assert collated["signal"].batch_size == batch_size + assert collated["tensor"].shape[0] == batch_size + assert len(collated["string"]) == batch_size + assert collated["dict"]["nested_signal"].batch_size == batch_size + + # test collate with splitting (evenly) + batch_size = 16 + n_splits = 4 + + items = [_one_item() for _ in range(batch_size)] + collated = util.collate(items, n_splits=n_splits) + + for x in collated: + assert x["signal"].batch_size == batch_size // n_splits + assert x["tensor"].shape[0] == batch_size // n_splits + assert len(x["string"]) == batch_size // n_splits + assert x["dict"]["nested_signal"].batch_size == batch_size // n_splits + + # test collate with splitting (unevenly) + batch_size = 15 + n_splits = 4 + + items = [_one_item() for _ in range(batch_size)] + collated = util.collate(items, n_splits=n_splits) + + tlen = [4, 4, 4, 3] + + for x, t in zip(collated, tlen): + assert x["signal"].batch_size == t + assert x["tensor"].shape[0] == t + assert len(x["string"]) == t + assert x["dict"]["nested_signal"].batch_size == t diff --git a/audio/tests_at/data/test_datasets✅.py b/audio/tests_at/data/test_datasets✅.py new file mode 100644 index 000000000..995f8004b --- /dev/null +++ b/audio/tests_at/data/test_datasets✅.py @@ -0,0 +1,206 @@ +import sys +import tempfile +from pathlib import Path + +import numpy as np +import pytest + +sys.path.append("/home/work/pdaudoio") +import paddle +import audiotools +from audiotools.data import transforms as tfm + + +def test_align_lists(): + input_lists = [ + ["a/1.wav", "b/1.wav", "c/1.wav", "d/1.wav"], + ["a/2.wav", "c/2.wav"], + ["c/3.wav"], + ] + target_lists = [ + ["a/1.wav", "b/1.wav", "c/1.wav", "d/1.wav"], + ["a/2.wav", "none", "c/2.wav", "none"], + ["none", "none", "c/3.wav", "none"], + ] + + def _preprocess(lists): + output = [] + for x in lists: + output.append([]) + for y in x: + output[-1].append({"path": y}) + return output + + input_lists = _preprocess(input_lists) + target_lists = _preprocess(target_lists) + + aligned_lists = audiotools.datasets.align_lists(input_lists) + assert target_lists == aligned_lists + + +def test_audio_dataset(): + transform = tfm.Compose( + [ + tfm.VolumeNorm(), + tfm.Silence(prob=0.5), + ], ) + loader = audiotools.data.datasets.AudioLoader( + sources=["tests/audio/spk.csv"], + transform=transform, ) + dataset = audiotools.data.datasets.AudioDataset( + loader, + 44100, + n_examples=100, + transform=transform, ) + dataloader = paddle.io.DataLoader( + dataset, + batch_size=16, + num_workers=0, + collate_fn=dataset.collate, ) + for batch in dataloader: + kwargs = batch["transform_args"] + signal = batch["signal"] + original = signal.clone() + + signal = dataset.transform(signal, **kwargs) + original = dataset.transform(original, **kwargs) + + mask = kwargs["Compose"]["1.Silence"]["mask"] + + zeros_ = paddle.zeros_like(signal[mask].audio_data) + original_ = original[~mask].audio_data + + assert paddle.allclose(signal[mask].audio_data, zeros_) + assert paddle.allclose(signal[~mask].audio_data, original_) + + +def test_aligned_audio_dataset(): + with tempfile.TemporaryDirectory() as d: + dataset_dir = Path(d) + audiotools.util.generate_chord_dataset( + max_voices=8, num_items=3, output_dir=dataset_dir) + loaders = [ + audiotools.data.datasets.AudioLoader([dataset_dir / f"track_{i}"]) + for i in range(3) + ] + dataset = audiotools.data.datasets.AudioDataset( + loaders, 44100, n_examples=1000, aligned=True, shuffle_loaders=True) + dataloader = paddle.io.DataLoader( + dataset, + batch_size=16, + num_workers=0, + collate_fn=dataset.collate, ) + + # Make sure the voice tracks are aligned. + for batch in dataloader: + paths = [] + for i in range(len(loaders)): + _paths = [p.split("/")[-1] for p in batch[i]["path"]] + paths.append(_paths) + paths = np.array(paths) + for i in range(paths.shape[1]): + col = paths[:, i] + col = col[col != "none"] + assert np.all(col == col[0]) + + +def test_loader_without_replacement(): + with tempfile.TemporaryDirectory() as d: + dataset_dir = Path(d) + num_items = 100 + audiotools.util.generate_chord_dataset( + max_voices=1, + num_items=num_items, + output_dir=dataset_dir, + duration=0.01, ) + loader = audiotools.data.datasets.AudioLoader( + [dataset_dir], shuffle=False) + dataset = audiotools.data.datasets.AudioDataset(loader, 44100) + + for idx in range(num_items): + item = dataset[idx] + assert item["item_idx"] == idx + + +def test_loader_with_replacement(): + with tempfile.TemporaryDirectory() as d: + dataset_dir = Path(d) + num_items = 100 + audiotools.util.generate_chord_dataset( + max_voices=1, + num_items=num_items, + output_dir=dataset_dir, + duration=0.01, ) + loader = audiotools.data.datasets.AudioLoader([dataset_dir]) + dataset = audiotools.data.datasets.AudioDataset( + loader, 44100, without_replacement=False) + + for idx in range(num_items): + item = dataset[idx] + + +def test_loader_out_of_range(): + with tempfile.TemporaryDirectory() as d: + dataset_dir = Path(d) + num_items = 100 + audiotools.util.generate_chord_dataset( + max_voices=1, + num_items=num_items, + output_dir=dataset_dir, + duration=0.01, ) + loader = audiotools.data.datasets.AudioLoader([dataset_dir]) + + item = loader( + sample_rate=44100, + duration=0.01, + state=audiotools.util.random_state(0), + source_idx=0, + item_idx=101, ) + assert item["path"] == "none" + + +def test_dataset_pipeline(): + transform = tfm.Compose([ + tfm.RoomImpulseResponse(sources=["tests/audio/irs.csv"]), + tfm.BackgroundNoise(sources=["tests/audio/noises.csv"]), + ]) + loader = audiotools.data.datasets.AudioLoader( + sources=["tests/audio/spk.csv"]) + dataset = audiotools.data.datasets.AudioDataset( + loader, + 44100, + n_examples=10, + transform=transform, ) + dataloader = paddle.io.DataLoader( + dataset, num_workers=0, batch_size=1, collate_fn=dataset.collate) + for batch in dataloader: + batch = audiotools.core.util.prepare_batch(batch, device="cpu") + kwargs = batch["transform_args"] + signal = batch["signal"] + batch = dataset.transform(signal, **kwargs) + + +class NumberDataset: + def __init__(self): + pass + + def __len__(self): + return 10 + + def __getitem__(self, idx): + return {"idx": idx} + + +def test_concat_dataset(): + d1 = NumberDataset() + d2 = NumberDataset() + d3 = NumberDataset() + + d = audiotools.datasets.ConcatDataset([d1, d2, d3]) + x = d.collate([d[i] for i in range(len(d))])["idx"].tolist() + + t = [] + for i in range(10): + t += [i, i, i] + + assert x == t diff --git a/audio/tests_at/data/test_preprocess✅.py b/audio/tests_at/data/test_preprocess✅.py new file mode 100644 index 000000000..b344fa6f4 --- /dev/null +++ b/audio/tests_at/data/test_preprocess✅.py @@ -0,0 +1,29 @@ +import sys +import tempfile +from pathlib import Path + +import paddle +sys.path.append("/home/work/pdaudoio") +from audiotools.core.util import find_audio +from audiotools.core.util import read_sources +from audiotools.data import preprocess + + +def test_create_csv(): + with tempfile.NamedTemporaryFile(suffix=".csv") as f: + preprocess.create_csv( + find_audio("./tests/audio/spk", ext=["wav"]), f.name, loudness=True) + + +def test_create_csv_with_empty_rows(): + audio_files = find_audio("./tests/audio/spk", ext=["wav"]) + audio_files.insert(0, "") + audio_files.insert(2, "") + + with tempfile.NamedTemporaryFile(suffix=".csv") as f: + preprocess.create_csv(audio_files, f.name, loudness=True) + + audio_files = read_sources([f.name], remove_empty=True) + assert len(audio_files[0]) == 1 + audio_files = read_sources([f.name], remove_empty=False) + assert len(audio_files[0]) == 3 diff --git a/audio/tests_at/ml/test_decorators✅.py b/audio/tests_at/ml/test_decorators✅.py new file mode 100644 index 000000000..40fa23616 --- /dev/null +++ b/audio/tests_at/ml/test_decorators✅.py @@ -0,0 +1,96 @@ +import sys +import time +sys.path.append("/home/work/pdaudoio") +import paddle +from visualdl import LogWriter + +from audiotools.ml.decorators import timer +from audiotools.ml.decorators import Tracker +from audiotools.ml.decorators import when + + +def test_all_decorators(): + rank = 0 + max_iters = 100 + + writer = LogWriter("/tmp/logs") + tracker = Tracker(writer, log_file="/tmp/log.txt") + + train_data = range(100) + val_data = range(100) + + @tracker.log("train", "value", history=False) + @tracker.track("train", max_iters, tracker.step) + @timer() + def train_loop(): + i = tracker.step + time.sleep(0.01) + return { + "loss": paddle.exp(paddle.to_tensor([-i / 100], dtype="float32")), + "mel": paddle.exp(paddle.to_tensor([-i / 100], dtype="float32")), + "stft": paddle.exp(paddle.to_tensor([-i / 100], dtype="float32")), + "waveform": + paddle.exp(paddle.to_tensor([-i / 100], dtype="float32")), + "not_scalar": paddle.arange(start=0, end=10, step=1, dtype="int64"), + } + + @tracker.track("val", len(val_data)) + @timer() + def val_loop(): + i = tracker.step + time.sleep(0.01) + return { + "loss": paddle.exp(paddle.to_tensor([-i / 100], dtype="float32")), + "mel": paddle.exp(paddle.to_tensor([-i / 100], dtype="float32")), + "stft": paddle.exp(paddle.to_tensor([-i / 100], dtype="float32")), + "waveform": + paddle.exp(paddle.to_tensor([-i / 100], dtype="float32")), + "not_scalar": paddle.arange(10, dtype="int64"), + "string": "string", + } + + @when(lambda: tracker.step % 1000 == 0 and rank == 0) + @paddle.no_grad() + def save_samples(): + tracker.print("Saving samples to TensorBoard.") + + @when(lambda: tracker.step % 100 == 0 and rank == 0) + def checkpoint(): + save_samples() + if tracker.is_best("val", "mel"): + tracker.print("Best model so far.") + tracker.print("Saving to /runs/exp1") + tracker.done("val", f"Iteration {tracker.step}") + + @when(lambda: tracker.step % 100 == 0) + @tracker.log("val", "mean") + @paddle.no_grad() + def validate(): + for _ in range(len(val_data)): + output = val_loop() + return output + + with tracker.live: + for tracker.step in range(max_iters): + validate() + checkpoint() + train_loop() + + state_dict = tracker.state_dict() + tracker.load_state_dict(state_dict) + + # If train loop returned not a dict + @tracker.track("train", max_iters, tracker.step) + def train_loop_2(): + i = tracker.step + time.sleep(0.01) + + with tracker.live: + for tracker.step in range(max_iters): + validate() + checkpoint() + train_loop_2() + + +if __name__ == "__main__": + test_all_decorators() diff --git a/audio/tests_at/ml/test_model✅.py b/audio/tests_at/ml/test_model✅.py new file mode 100644 index 000000000..d88c58365 --- /dev/null +++ b/audio/tests_at/ml/test_model✅.py @@ -0,0 +1,85 @@ +import sys +import tempfile + +import paddle +from paddle import nn +sys.path.append("/home/work/pdaudoio") +from audiotools import ml +from audiotools import util + +SEED = 0 + + +def seed_and_run(model, *args, **kwargs): + util.seed(SEED) + return model(*args, **kwargs) + + +class Model(ml.BaseModel): + def __init__(self, arg1: float=1.0): + super().__init__() + self.arg1 = arg1 + self.linear = nn.Linear(1, 1) + + def forward(self, x): + return self.linear(x) + + +class OtherModel(ml.BaseModel): + def __init__(self): + super().__init__() + self.linear = nn.Linear(1, 1) + + def forward(self, x): + return self.linear(x) + + +def test_base_model(): + # Save and load + # ml.BaseModel.EXTERN += ["test_model"] + + x = paddle.randn([10, 1]) + model1 = Model() + + assert str(model1.device) == 'Place(cpu)' + + out1 = seed_and_run(model1, x) + + with tempfile.NamedTemporaryFile(suffix=".pdparams") as f: + model1.save( + f.name, ) + model2 = Model.load(f.name) + out2 = seed_and_run(model2, x) + assert paddle.allclose(out1, out2) + + # test re-export + model2.save(f.name) + model3 = Model.load(f.name) + out3 = seed_and_run(model3, x) + assert paddle.allclose(out1, out3) + + # make sure legacy/save load works + model1.save(f.name, package=False) + model2 = Model.load(f.name) + out2 = seed_and_run(model2, x) + assert paddle.allclose(out1, out2) + + # make sure new way -> legacy save -> legacy load works + model1.save(f.name, package=False) + model2 = Model.load(f.name) + model2.save(f.name, package=False) + model3 = Model.load(f.name) + out3 = seed_and_run(model3, x) + + # save/load without package, but with model2 being a model + # without an argument of arg1 to its instantiation. + model1.save(f.name, package=False) + model2 = OtherModel.load(f.name) + out2 = seed_and_run(model2, x) + assert paddle.allclose(out1, out2) + + assert paddle.allclose(out1, out3) + + with tempfile.TemporaryDirectory() as d: + model1.save_to_folder(d, {"data": 1.0}) + Model.load_from_folder(d)