parent
1c4e4e7d9c
commit
12339daddb
@ -1 +0,0 @@
|
||||
../../../../speechx/speechx/kaldi/base
|
@ -1 +0,0 @@
|
||||
../../../../speechx/speechx/kaldi/feat
|
@ -1 +0,0 @@
|
||||
../../../../speechx/speechx/kaldi/matrix
|
@ -1 +0,0 @@
|
||||
../../../../speechx/speechx/kaldi/util
|
@ -0,0 +1 @@
|
||||
../../common_utils
|
@ -0,0 +1,136 @@
|
||||
import os.path
|
||||
from typing import Optional, Union
|
||||
|
||||
import paddle
|
||||
import json
|
||||
|
||||
from parameterized import param, parameterized
|
||||
#code is from:https://github.com/pytorch/audio/blob/main/test/torchaudio_unittest/common_utils/data_utils.py with modification.
|
||||
|
||||
_TEST_DIR_PATH = os.path.realpath(os.path.join(os.path.dirname(__file__), ".."))
|
||||
|
||||
|
||||
def get_asset_path(*paths):
|
||||
"""Return full path of a test asset"""
|
||||
return os.path.join(_TEST_DIR_PATH, "assets", *paths)
|
||||
|
||||
def load_params(*paths):
|
||||
with open(get_asset_path(*paths), "r") as file:
|
||||
return [param(json.loads(line)) for line in file]
|
||||
|
||||
def load_effects_params(*paths):
|
||||
params = []
|
||||
with open(*paths, "r") as file:
|
||||
for line in file:
|
||||
data = json.loads(line)
|
||||
for effect in data["effects"]:
|
||||
for i, arg in enumerate(effect):
|
||||
if arg.startswith("<ASSET_DIR>"):
|
||||
effect[i] = arg.replace("<ASSET_DIR>", get_asset_path())
|
||||
params.append(param(data))
|
||||
return params
|
||||
|
||||
def convert_tensor_encoding(
|
||||
tensor: paddle.tensor,
|
||||
dtype: paddle.dtype,
|
||||
):
|
||||
"""Convert input tensor with values between -1 and 1 to integer encoding
|
||||
Args:
|
||||
tensor: input tensor, assumed between -1 and 1
|
||||
dtype: desired output tensor dtype
|
||||
Returns:
|
||||
Tensor: shape of (n_channels, sample_rate * duration)
|
||||
"""
|
||||
if dtype == paddle.int32:
|
||||
tensor *= (tensor > 0) * 2147483647 + (tensor < 0) * 2147483648
|
||||
if dtype == paddle.int16:
|
||||
tensor *= (tensor > 0) * 32767 + (tensor < 0) * 32768
|
||||
if dtype == paddle.uint8:
|
||||
tensor *= (tensor > 0) * 127 + (tensor < 0) * 128
|
||||
tensor += 128
|
||||
tensor = paddle.to_tensor(tensor, dtype)
|
||||
return tensor
|
||||
|
||||
|
||||
#def get_whitenoise(
|
||||
#*,
|
||||
#sample_rate: int = 16000,
|
||||
#duration: float = 1, # seconds
|
||||
#n_channels: int = 1,
|
||||
#seed: int = 0,
|
||||
#dtype: Union[str, paddle.dtype] = "float32",
|
||||
#device: Union[str, paddle.device] = "cpu",
|
||||
#channels_first=True,
|
||||
#scale_factor: float = 1,
|
||||
#):
|
||||
#"""Generate pseudo audio data with whitenoise
|
||||
#Args:
|
||||
#sample_rate: Sampling rate
|
||||
#duration: Length of the resulting Tensor in seconds.
|
||||
#n_channels: Number of channels
|
||||
#seed: Seed value used for random number generation.
|
||||
#Note that this function does not modify global random generator state.
|
||||
#dtype: Torch dtype
|
||||
#device: device
|
||||
#channels_first: whether first dimension is n_channels
|
||||
#scale_factor: scale the Tensor before clamping and quantization
|
||||
#Returns:
|
||||
#Tensor: shape of (n_channels, sample_rate * duration)
|
||||
#"""
|
||||
#if isinstance(dtype, str):
|
||||
#dtype = getattr(paddle, dtype)
|
||||
#if dtype not in [paddle.float64, paddle.float32, paddle.int32, paddle.int16, paddle.uint8]:
|
||||
#raise NotImplementedError(f"dtype {dtype} is not supported.")
|
||||
## According to the doc, folking rng on all CUDA devices is slow when there are many CUDA devices,
|
||||
## so we only fork on CPU, generate values and move the data to the given device
|
||||
#with paddle.random.fork_rng([]):
|
||||
#paddle.random.manual_seed(seed)
|
||||
#tensor = paddle.randn([n_channels, int(sample_rate * duration)], dtype=paddle.float32, device="cpu")
|
||||
#tensor /= 2.0
|
||||
#tensor *= scale_factor
|
||||
#tensor.clamp_(-1.0, 1.0)
|
||||
#if not channels_first:
|
||||
#tensor = tensor.t()
|
||||
|
||||
#tensor = tensor.to(device)
|
||||
|
||||
#return convert_tensor_encoding(tensor, dtype)
|
||||
|
||||
|
||||
def get_sinusoid(
|
||||
*,
|
||||
frequency: float = 300,
|
||||
sample_rate: int = 16000,
|
||||
duration: float = 1, # seconds
|
||||
n_channels: int = 1,
|
||||
dtype: str = "float32",
|
||||
device: str = "cpu",
|
||||
channels_first: bool = True,
|
||||
):
|
||||
"""Generate pseudo audio data with sine wave.
|
||||
|
||||
Args:
|
||||
frequency: Frequency of sine wave
|
||||
sample_rate: Sampling rate
|
||||
duration: Length of the resulting Tensor in seconds.
|
||||
n_channels: Number of channels
|
||||
dtype: Torch dtype
|
||||
device: device
|
||||
|
||||
Returns:
|
||||
Tensor: shape of (n_channels, sample_rate * duration)
|
||||
"""
|
||||
if isinstance(dtype, str):
|
||||
dtype = getattr(paddle, dtype)
|
||||
pie2 = 2 * 3.141592653589793
|
||||
end = pie2 * frequency * duration
|
||||
num_frames = int(sample_rate * duration)
|
||||
# Randomize the initial phase. (except the first channel)
|
||||
theta0 = pie2 * paddle.randn([n_channels, 1], dtype=paddle.float32)
|
||||
theta0[0, :] = 0
|
||||
theta = paddle.linspace(0, end, num_frames, dtype=paddle.float32)
|
||||
theta = theta0 + theta
|
||||
tensor = paddle.sin(theta)
|
||||
if not channels_first:
|
||||
tensor = paddle.t(tensor)
|
||||
return convert_tensor_encoding(tensor, dtype)
|
@ -0,0 +1,116 @@
|
||||
import subprocess
|
||||
import sys
|
||||
import warnings
|
||||
|
||||
|
||||
def get_encoding(dtype):
|
||||
encodings = {
|
||||
"float32": "floating-point",
|
||||
"int32": "signed-integer",
|
||||
"int16": "signed-integer",
|
||||
"uint8": "unsigned-integer",
|
||||
}
|
||||
return encodings[dtype]
|
||||
|
||||
|
||||
def get_bit_depth(dtype):
|
||||
bit_depths = {
|
||||
"float32": 32,
|
||||
"int32": 32,
|
||||
"int16": 16,
|
||||
"uint8": 8,
|
||||
}
|
||||
return bit_depths[dtype]
|
||||
|
||||
|
||||
def gen_audio_file(
|
||||
path,
|
||||
sample_rate,
|
||||
num_channels,
|
||||
*,
|
||||
encoding=None,
|
||||
bit_depth=None,
|
||||
compression=None,
|
||||
attenuation=None,
|
||||
duration=1,
|
||||
comment_file=None,
|
||||
):
|
||||
"""Generate synthetic audio file with `sox` command."""
|
||||
if path.endswith(".wav"):
|
||||
warnings.warn("Use get_wav_data and save_wav to generate wav file for accurate result.")
|
||||
command = [
|
||||
"sox",
|
||||
"-V3", # verbose
|
||||
"--no-dither", # disable automatic dithering
|
||||
"-R",
|
||||
# -R is supposed to be repeatable, though the implementation looks suspicious
|
||||
# and not setting the seed to a fixed value.
|
||||
# https://fossies.org/dox/sox-14.4.2/sox_8c_source.html
|
||||
# search "sox_globals.repeatable"
|
||||
]
|
||||
if bit_depth is not None:
|
||||
command += ["--bits", str(bit_depth)]
|
||||
command += [
|
||||
"--rate",
|
||||
str(sample_rate),
|
||||
"--null", # no input
|
||||
"--channels",
|
||||
str(num_channels),
|
||||
]
|
||||
if compression is not None:
|
||||
command += ["--compression", str(compression)]
|
||||
if bit_depth is not None:
|
||||
command += ["--bits", str(bit_depth)]
|
||||
if encoding is not None:
|
||||
command += ["--encoding", str(encoding)]
|
||||
if comment_file is not None:
|
||||
command += ["--comment-file", str(comment_file)]
|
||||
command += [
|
||||
str(path),
|
||||
"synth",
|
||||
str(duration), # synthesizes for the given duration [sec]
|
||||
"sawtooth",
|
||||
"1",
|
||||
# saw tooth covers the both ends of value range, which is a good property for test.
|
||||
# similar to linspace(-1., 1.)
|
||||
# this introduces bigger boundary effect than sine when converted to mp3
|
||||
]
|
||||
if attenuation is not None:
|
||||
command += ["vol", f"-{attenuation}dB"]
|
||||
print(" ".join(command), file=sys.stderr)
|
||||
subprocess.run(command, check=True)
|
||||
|
||||
|
||||
def convert_audio_file(src_path, dst_path, *, encoding=None, bit_depth=None, compression=None):
|
||||
"""Convert audio file with `sox` command."""
|
||||
command = ["sox", "-V3", "--no-dither", "-R", str(src_path)]
|
||||
if encoding is not None:
|
||||
command += ["--encoding", str(encoding)]
|
||||
if bit_depth is not None:
|
||||
command += ["--bits", str(bit_depth)]
|
||||
if compression is not None:
|
||||
command += ["--compression", str(compression)]
|
||||
command += [dst_path]
|
||||
print(" ".join(command), file=sys.stderr)
|
||||
subprocess.run(command, check=True)
|
||||
|
||||
|
||||
def _flattern(effects):
|
||||
if not effects:
|
||||
return effects
|
||||
if isinstance(effects[0], str):
|
||||
return effects
|
||||
return [item for sublist in effects for item in sublist]
|
||||
|
||||
|
||||
def run_sox_effect(input_file, output_file, effect, *, output_sample_rate=None, output_bitdepth=None):
|
||||
"""Run sox effects"""
|
||||
effect = _flattern(effect)
|
||||
command = ["sox", "-V", "--no-dither", input_file]
|
||||
if output_bitdepth:
|
||||
command += ["--bits", str(output_bitdepth)]
|
||||
command += [output_file] + effect
|
||||
if output_sample_rate:
|
||||
command += ["rate", str(output_sample_rate)]
|
||||
print(" ".join(command))
|
||||
subprocess.run(command, check=True)
|
Loading…
Reference in new issue