You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
326 lines
11 KiB
326 lines
11 KiB
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
import os
|
|
import warnings
|
|
from typing import Optional
|
|
from typing import Tuple
|
|
from typing import Union
|
|
|
|
import numpy as np
|
|
import resampy
|
|
import soundfile as sf
|
|
from scipy.io import wavfile
|
|
|
|
from ..utils import ParameterError
|
|
|
|
__all__ = [
|
|
'resample',
|
|
'to_mono',
|
|
'depth_convert',
|
|
'normalize',
|
|
'save',
|
|
'load',
|
|
]
|
|
NORMALMIZE_TYPES = ['linear', 'gaussian']
|
|
MERGE_TYPES = ['ch0', 'ch1', 'random', 'average']
|
|
RESAMPLE_MODES = ['kaiser_best', 'kaiser_fast']
|
|
EPS = 1e-8
|
|
|
|
|
|
def resample(y: np.ndarray,
|
|
src_sr: int,
|
|
target_sr: int,
|
|
mode: str='kaiser_fast') -> np.ndarray:
|
|
"""Audio resampling.
|
|
|
|
Args:
|
|
y (np.ndarray): Input waveform array in 1D or 2D.
|
|
src_sr (int): Source sample rate.
|
|
target_sr (int): Target sample rate.
|
|
mode (str, optional): The resampling filter to use. Defaults to 'kaiser_fast'.
|
|
|
|
Returns:
|
|
np.ndarray: `y` resampled to `target_sr`
|
|
"""
|
|
|
|
if mode == 'kaiser_best':
|
|
warnings.warn(
|
|
f'Using resampy in kaiser_best to {src_sr}=>{target_sr}. This function is pretty slow, \
|
|
we recommend the mode kaiser_fast in large scale audio trainning')
|
|
|
|
if not isinstance(y, np.ndarray):
|
|
raise ParameterError(
|
|
'Only support numpy np.ndarray, but received y in {type(y)}')
|
|
|
|
if mode not in RESAMPLE_MODES:
|
|
raise ParameterError(f'resample mode must in {RESAMPLE_MODES}')
|
|
|
|
return resampy.resample(y, src_sr, target_sr, filter=mode)
|
|
|
|
|
|
def to_mono(y: np.ndarray, merge_type: str='average') -> np.ndarray:
|
|
"""Convert sterior audio to mono.
|
|
|
|
Args:
|
|
y (np.ndarray): Input waveform array in 1D or 2D.
|
|
merge_type (str, optional): Merge type to generate mono waveform. Defaults to 'average'.
|
|
|
|
Returns:
|
|
np.ndarray: `y` with mono channel.
|
|
"""
|
|
|
|
if merge_type not in MERGE_TYPES:
|
|
raise ParameterError(
|
|
f'Unsupported merge type {merge_type}, available types are {MERGE_TYPES}'
|
|
)
|
|
if y.ndim > 2:
|
|
raise ParameterError(
|
|
f'Unsupported audio array, y.ndim > 2, the shape is {y.shape}')
|
|
if y.ndim == 1: # nothing to merge
|
|
return y
|
|
|
|
if merge_type == 'ch0':
|
|
return y[0]
|
|
if merge_type == 'ch1':
|
|
return y[1]
|
|
if merge_type == 'random':
|
|
return y[np.random.randint(0, 2)]
|
|
|
|
# need to do averaging according to dtype
|
|
|
|
if y.dtype == 'float32':
|
|
y_out = (y[0] + y[1]) * 0.5
|
|
elif y.dtype == 'int16':
|
|
y_out = y.astype('int32')
|
|
y_out = (y_out[0] + y_out[1]) // 2
|
|
y_out = np.clip(y_out, np.iinfo(y.dtype).min,
|
|
np.iinfo(y.dtype).max).astype(y.dtype)
|
|
|
|
elif y.dtype == 'int8':
|
|
y_out = y.astype('int16')
|
|
y_out = (y_out[0] + y_out[1]) // 2
|
|
y_out = np.clip(y_out, np.iinfo(y.dtype).min,
|
|
np.iinfo(y.dtype).max).astype(y.dtype)
|
|
else:
|
|
raise ParameterError(f'Unsupported dtype: {y.dtype}')
|
|
return y_out
|
|
|
|
|
|
def _safe_cast(y: np.ndarray, dtype: Union[type, str]) -> np.ndarray:
|
|
"""Data type casting in a safe way, i.e., prevent overflow or underflow.
|
|
|
|
Args:
|
|
y (np.ndarray): Input waveform array in 1D or 2D.
|
|
dtype (Union[type, str]): Data type of waveform.
|
|
|
|
Returns:
|
|
np.ndarray: `y` after safe casting.
|
|
"""
|
|
if 'float' in str(y.dtype):
|
|
return np.clip(y, np.finfo(dtype).min,
|
|
np.finfo(dtype).max).astype(dtype)
|
|
else:
|
|
return np.clip(y, np.iinfo(dtype).min,
|
|
np.iinfo(dtype).max).astype(dtype)
|
|
|
|
|
|
def depth_convert(y: np.ndarray, dtype: Union[type, str]) -> np.ndarray:
|
|
"""Convert audio array to target dtype safely. This function convert audio waveform to a target dtype, with addition steps of
|
|
preventing overflow/underflow and preserving audio range.
|
|
|
|
Args:
|
|
y (np.ndarray): Input waveform array in 1D or 2D.
|
|
dtype (Union[type, str]): Data type of waveform.
|
|
|
|
Returns:
|
|
np.ndarray: `y` after safe casting.
|
|
"""
|
|
|
|
SUPPORT_DTYPE = ['int16', 'int8', 'float32', 'float64']
|
|
if y.dtype not in SUPPORT_DTYPE:
|
|
raise ParameterError(
|
|
'Unsupported audio dtype, '
|
|
f'y.dtype is {y.dtype}, supported dtypes are {SUPPORT_DTYPE}')
|
|
|
|
if dtype not in SUPPORT_DTYPE:
|
|
raise ParameterError(
|
|
'Unsupported audio dtype, '
|
|
f'target dtype is {dtype}, supported dtypes are {SUPPORT_DTYPE}')
|
|
|
|
if dtype == y.dtype:
|
|
return y
|
|
|
|
if dtype == 'float64' and y.dtype == 'float32':
|
|
return _safe_cast(y, dtype)
|
|
if dtype == 'float32' and y.dtype == 'float64':
|
|
return _safe_cast(y, dtype)
|
|
|
|
if dtype == 'int16' or dtype == 'int8':
|
|
if y.dtype in ['float64', 'float32']:
|
|
factor = np.iinfo(dtype).max
|
|
y = np.clip(y * factor, np.iinfo(dtype).min,
|
|
np.iinfo(dtype).max).astype(dtype)
|
|
y = y.astype(dtype)
|
|
else:
|
|
if dtype == 'int16' and y.dtype == 'int8':
|
|
factor = np.iinfo('int16').max / np.iinfo('int8').max - EPS
|
|
y = y.astype('float32') * factor
|
|
y = y.astype('int16')
|
|
|
|
else: # dtype == 'int8' and y.dtype=='int16':
|
|
y = y.astype('int32') * np.iinfo('int8').max / \
|
|
np.iinfo('int16').max
|
|
y = y.astype('int8')
|
|
|
|
if dtype in ['float32', 'float64']:
|
|
org_dtype = y.dtype
|
|
y = y.astype(dtype) / np.iinfo(org_dtype).max
|
|
return y
|
|
|
|
|
|
def sound_file_load(file: os.PathLike,
|
|
offset: Optional[float]=None,
|
|
dtype: str='int16',
|
|
duration: Optional[int]=None) -> Tuple[np.ndarray, int]:
|
|
"""Load audio using soundfile library. This function load audio file using libsndfile.
|
|
|
|
Args:
|
|
file (os.PathLike): File of waveform.
|
|
offset (Optional[float], optional): Offset to the start of waveform. Defaults to None.
|
|
dtype (str, optional): Data type of waveform. Defaults to 'int16'.
|
|
duration (Optional[int], optional): Duration of waveform to read. Defaults to None.
|
|
|
|
Returns:
|
|
Tuple[np.ndarray, int]: Waveform in ndarray and its samplerate.
|
|
"""
|
|
with sf.SoundFile(file) as sf_desc:
|
|
sr_native = sf_desc.samplerate
|
|
if offset:
|
|
sf_desc.seek(int(offset * sr_native))
|
|
if duration is not None:
|
|
frame_duration = int(duration * sr_native)
|
|
else:
|
|
frame_duration = -1
|
|
y = sf_desc.read(frames=frame_duration, dtype=dtype, always_2d=False).T
|
|
|
|
return y, sf_desc.samplerate
|
|
|
|
|
|
def normalize(y: np.ndarray, norm_type: str='linear',
|
|
mul_factor: float=1.0) -> np.ndarray:
|
|
"""Normalize an input audio with additional multiplier.
|
|
|
|
Args:
|
|
y (np.ndarray): Input waveform array in 1D or 2D.
|
|
norm_type (str, optional): Type of normalization. Defaults to 'linear'.
|
|
mul_factor (float, optional): Scaling factor. Defaults to 1.0.
|
|
|
|
Returns:
|
|
np.ndarray: `y` after normalization.
|
|
"""
|
|
|
|
if norm_type == 'linear':
|
|
amax = np.max(np.abs(y))
|
|
factor = 1.0 / (amax + EPS)
|
|
y = y * factor * mul_factor
|
|
elif norm_type == 'gaussian':
|
|
amean = np.mean(y)
|
|
astd = np.std(y)
|
|
astd = max(astd, EPS)
|
|
y = mul_factor * (y - amean) / astd
|
|
else:
|
|
raise NotImplementedError(f'norm_type should be in {NORMALMIZE_TYPES}')
|
|
|
|
return y
|
|
|
|
|
|
def save(y: np.ndarray, sr: int, file: os.PathLike) -> None:
|
|
"""Save audio file to disk. This function saves audio to disk using scipy.io.wavfile, with additional step to convert input waveform to int16.
|
|
|
|
Args:
|
|
y (np.ndarray): Input waveform array in 1D or 2D.
|
|
sr (int): Sample rate.
|
|
file (os.PathLike): Path of auido file to save.
|
|
"""
|
|
if not file.endswith('.wav'):
|
|
raise ParameterError(
|
|
f'only .wav file supported, but dst file name is: {file}')
|
|
|
|
if sr <= 0:
|
|
raise ParameterError(
|
|
f'Sample rate should be larger than 0, recieved sr = {sr}')
|
|
|
|
if y.dtype not in ['int16', 'int8']:
|
|
warnings.warn(
|
|
f'input data type is {y.dtype}, will convert data to int16 format before saving'
|
|
)
|
|
y_out = depth_convert(y, 'int16')
|
|
else:
|
|
y_out = y
|
|
|
|
wavfile.write(file, sr, y_out)
|
|
|
|
|
|
def load(
|
|
file: os.PathLike,
|
|
sr: Optional[int]=None,
|
|
mono: bool=True,
|
|
merge_type: str='average', # ch0,ch1,random,average
|
|
normal: bool=True,
|
|
norm_type: str='linear',
|
|
norm_mul_factor: float=1.0,
|
|
offset: float=0.0,
|
|
duration: Optional[int]=None,
|
|
dtype: str='float32',
|
|
resample_mode: str='kaiser_fast') -> Tuple[np.ndarray, int]:
|
|
"""Load audio file from disk. This function loads audio from disk using using audio beackend.
|
|
|
|
Args:
|
|
file (os.PathLike): Path of auido file to load.
|
|
sr (Optional[int], optional): Sample rate of loaded waveform. Defaults to None.
|
|
mono (bool, optional): Return waveform with mono channel. Defaults to True.
|
|
merge_type (str, optional): Merge type of multi-channels waveform. Defaults to 'average'.
|
|
normal (bool, optional): Waveform normalization. Defaults to True.
|
|
norm_type (str, optional): Type of normalization. Defaults to 'linear'.
|
|
norm_mul_factor (float, optional): Scaling factor. Defaults to 1.0.
|
|
offset (float, optional): Offset to the start of waveform. Defaults to 0.0.
|
|
duration (Optional[int], optional): Duration of waveform to read. Defaults to None.
|
|
dtype (str, optional): Data type of waveform. Defaults to 'float32'.
|
|
resample_mode (str, optional): The resampling filter to use. Defaults to 'kaiser_fast'.
|
|
|
|
Returns:
|
|
Tuple[np.ndarray, int]: Waveform in ndarray and its samplerate.
|
|
"""
|
|
|
|
y, r = sound_file_load(file, offset=offset, dtype=dtype, duration=duration)
|
|
|
|
if not ((y.ndim == 1 and len(y) > 0) or (y.ndim == 2 and len(y[0]) > 0)):
|
|
raise ParameterError(f'audio file {file} looks empty')
|
|
|
|
if mono:
|
|
y = to_mono(y, merge_type)
|
|
|
|
if sr is not None and sr != r:
|
|
y = resample(y, r, sr, mode=resample_mode)
|
|
r = sr
|
|
|
|
if normal:
|
|
y = normalize(y, norm_type, norm_mul_factor)
|
|
elif dtype in ['int8', 'int16']:
|
|
# still need to do normalization, before depth convertion
|
|
y = normalize(y, 'linear', 1.0)
|
|
|
|
y = depth_convert(y, dtype)
|
|
return y, r
|