fix coding style && mv audio docs

pull/2405/head
YangZhou 3 years ago committed by Yang Zhou
parent e66d1b7d96
commit 750e0bdddd

@ -1,19 +0,0 @@
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXBUILD = sphinx-build
SOURCEDIR = source
BUILDDIR = build
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)

@ -1,24 +0,0 @@
# Build docs for PaddleAudio
Execute the following steps in **current directory**.
## 1. Install
`pip install Sphinx sphinx_rtd_theme`
## 2. Generate API docs
Generate API docs from doc string.
`sphinx-apidoc -fMeT -o source ../paddleaudio ../paddleaudio/utils --templatedir source/_templates`
## 3. Build
`sphinx-build source _html`
## 4. Preview
Open `_html/index.html` for page preview.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 4.9 KiB

@ -1,35 +0,0 @@
@ECHO OFF
pushd %~dp0
REM Command file for Sphinx documentation
if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=sphinx-build
)
set SOURCEDIR=source
set BUILDDIR=build
if "%1" == "" goto help
%SPHINXBUILD% >NUL 2>NUL
if errorlevel 9009 (
echo.
echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
echo.installed, then set the SPHINXBUILD environment variable to point
echo.to the full path of the 'sphinx-build' executable. Alternatively you
echo.may add the Sphinx directory to PATH.
echo.
echo.If you don't have Sphinx installed, grab it from
echo.http://sphinx-doc.org/
exit /b 1
)
%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS%
goto end
:help
%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS%
:end
popd

@ -1,5 +0,0 @@
.wy-nav-content {
max-width: 80%;
}
.table table{ background:#b9b9b9}
.table table td{ background:#FFF; }

@ -1,9 +0,0 @@
{%- if show_headings %}
{{- basename | e | heading }}
{% endif -%}
.. automodule:: {{ qualname }}
{%- for option in automodule_options %}
:{{ option }}:
{%- endfor %}

@ -1,57 +0,0 @@
{%- macro automodule(modname, options) -%}
.. automodule:: {{ modname }}
{%- for option in options %}
:{{ option }}:
{%- endfor %}
{%- endmacro %}
{%- macro toctree(docnames) -%}
.. toctree::
:maxdepth: {{ maxdepth }}
{% for docname in docnames %}
{{ docname }}
{%- endfor %}
{%- endmacro %}
{%- if is_namespace %}
{{- [pkgname, "namespace"] | join(" ") | e | heading }}
{% else %}
{{- pkgname | e | heading }}
{% endif %}
{%- if is_namespace %}
.. py:module:: {{ pkgname }}
{% endif %}
{%- if modulefirst and not is_namespace %}
{{ automodule(pkgname, automodule_options) }}
{% endif %}
{%- if subpackages %}
Subpackages
-----------
{{ toctree(subpackages) }}
{% endif %}
{%- if submodules %}
Submodules
----------
{% if separatemodules %}
{{ toctree(submodules) }}
{% else %}
{%- for submodule in submodules %}
{% if show_headings %}
{{- submodule | e | heading(2) }}
{% endif %}
{{ automodule(submodule, automodule_options) }}
{% endfor %}
{%- endif %}
{%- endif %}
{%- if not modulefirst and not is_namespace %}
Module contents
---------------
{{ automodule(pkgname, automodule_options) }}
{% endif %}

@ -1,8 +0,0 @@
{{ header | heading }}
.. toctree::
:maxdepth: {{ maxdepth }}
{% for docname in docnames %}
{{ docname }}
{%- endfor %}

@ -1,181 +0,0 @@
# -*- coding: utf-8 -*-
#
# Configuration file for the Sphinx documentation builder.
#
# This file does only contain a selection of the most common options. For a
# full list see the documentation:
# http://www.sphinx-doc.org/en/master/config
# -- Path setup --------------------------------------------------------------
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
import os
import sys
sys.path.insert(0, os.path.abspath('../..'))
# -- Project information -----------------------------------------------------
project = 'PaddleAudio'
copyright = '2022, PaddlePaddle'
author = 'PaddlePaddle'
# The short X.Y version
version = ''
# The full version, including alpha/beta/rc tags
release = '0.2.0'
# -- General configuration ---------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#
# needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.intersphinx',
'sphinx.ext.mathjax',
'sphinx.ext.viewcode',
'sphinx.ext.napoleon',
]
napoleon_google_docstring = True
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# The suffix(es) of source filenames.
# You can specify multiple suffix as a list of string:
#
# source_suffix = ['.rst', '.md']
source_suffix = '.rst'
# The master toctree document.
master_doc = 'index'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#
# This is also used if you do content translation via gettext catalogs.
# Usually you set "language" from the command line for these cases.
language = None
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path.
exclude_patterns = []
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = None
# -- Options for HTML output -------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
import sphinx_rtd_theme
html_theme = 'sphinx_rtd_theme'
html_theme_path = [sphinx_rtd_theme.get_html_theme_path()]
smartquotes = False
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#
# html_theme_options = {}
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
html_logo = '../images/paddle.png'
html_css_files = [
'custom.css',
]
# Custom sidebar templates, must be a dictionary that maps document names
# to template names.
#
# The default sidebars (for documents that don't match any pattern) are
# defined by theme itself. Builtin themes are using these templates by
# default: ``['localtoc.html', 'relations.html', 'sourcelink.html',
# 'searchbox.html']``.
#
# html_sidebars = {}
# -- Options for HTMLHelp output ---------------------------------------------
# Output file base name for HTML help builder.
htmlhelp_basename = 'PaddleAudiodoc'
# -- Options for LaTeX output ------------------------------------------------
latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
#
# 'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#
# 'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#
# 'preamble': '',
# Latex figure (float) alignment
#
# 'figure_align': 'htbp',
}
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
(master_doc, 'PaddleAudio.tex', 'PaddleAudio Documentation', 'PaddlePaddle',
'manual'),
]
# -- Options for manual page output ------------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [(master_doc, 'paddleaudio', 'PaddleAudio Documentation', [author],
1)]
# -- Options for Texinfo output ----------------------------------------------
# Grouping the document tree into Texinfo files. List of tuples
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
(master_doc, 'PaddleAudio', 'PaddleAudio Documentation', author,
'PaddleAudio', 'One line description of project.', 'Miscellaneous'),
]
# -- Options for Epub output -------------------------------------------------
# Bibliographic Dublin Core info.
epub_title = project
# The unique identifier of the text. This can be a ISBN number
# or the project homepage.
#
# epub_identifier = ''
# A unique identification for the text.
#
# epub_uid = ''
# A list of files that should not be packed into the epub file.
epub_exclude_files = ['search.html']
# -- Extension configuration -------------------------------------------------
# -- Options for intersphinx extension ---------------------------------------
# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {'https://docs.python.org/': None}

@ -1,22 +0,0 @@
.. PaddleAudio documentation master file, created by
sphinx-quickstart on Tue Mar 22 15:57:16 2022.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to PaddleAudio's documentation!
=======================================
.. toctree::
:maxdepth: 1
Index <self>
API References
--------------
.. toctree::
:maxdepth: 2
:titlesonly:
paddleaudio

@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from . import backends
from . import compliance
from . import datasets
from . import features
@ -18,4 +19,3 @@ from . import functional
from . import io
from . import metric
from . import sox_effects
from . import backends

@ -11,14 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from . import utils
from .soundfile_backend import depth_convert
from .soundfile_backend import soundfile_load
from .soundfile_backend import normalize
from .soundfile_backend import resample
from .soundfile_backend import soundfile_load
from .soundfile_backend import soundfile_save
from .soundfile_backend import to_mono
from . import utils
from .utils import get_audio_backend
from .utils import list_audio_backends
from .utils import set_audio_backend

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import warnings
from typing import Optional
@ -204,6 +203,7 @@ def soundfile_save(y: np.ndarray, sr: int, file: os.PathLike) -> None:
wavfile.write(file, sr, y_out)
def soundfile_load(
file: os.PathLike,
sr: Optional[int]=None,
@ -256,9 +256,13 @@ def soundfile_load(
y = depth_convert(y, dtype)
return y, r
#the code below token form: https://github.com/pytorch/audio/blob/main/torchaudio/backend/soundfile_backend.py with modificaion.
def _get_subtype_for_wav(dtype: paddle.dtype, encoding: str, bits_per_sample: int):
def _get_subtype_for_wav(dtype: paddle.dtype,
encoding: str,
bits_per_sample: int):
if not encoding:
if not bits_per_sample:
subtype = {
@ -315,7 +319,10 @@ def _get_subtype_for_sphere(encoding: str, bits_per_sample: int):
raise ValueError(f"sph does not support {encoding}.")
def _get_subtype(dtype: paddle.dtype, format: str, encoding: str, bits_per_sample: int):
def _get_subtype(dtype: paddle.dtype,
format: str,
encoding: str,
bits_per_sample: int):
if format == "wav":
return _get_subtype_for_wav(dtype, encoding, bits_per_sample)
if format == "flac":
@ -328,7 +335,8 @@ def _get_subtype(dtype: paddle.dtype, format: str, encoding: str, bits_per_sampl
return "PCM_S8" if bits_per_sample == 8 else f"PCM_{bits_per_sample}"
if format in ("ogg", "vorbis"):
if encoding or bits_per_sample:
raise ValueError("ogg/vorbis does not support encoding/bits_per_sample.")
raise ValueError(
"ogg/vorbis does not support encoding/bits_per_sample.")
return "VORBIS"
if format == "sph":
return _get_subtype_for_sphere(encoding, bits_per_sample)
@ -336,16 +344,16 @@ def _get_subtype(dtype: paddle.dtype, format: str, encoding: str, bits_per_sampl
return "PCM_16"
raise ValueError(f"Unsupported format: {format}")
def save(
filepath: str,
src: paddle.Tensor,
sample_rate: int,
channels_first: bool = True,
compression: Optional[float] = None,
format: Optional[str] = None,
encoding: Optional[str] = None,
bits_per_sample: Optional[int] = None,
):
channels_first: bool=True,
compression: Optional[float]=None,
format: Optional[str]=None,
encoding: Optional[str]=None,
bits_per_sample: Optional[int]=None, ):
"""Save audio data to file.
Note:
@ -441,11 +449,11 @@ def save(
if compression is not None:
warnings.warn(
'`save` function of "soundfile" backend does not support "compression" parameter. '
"The argument is silently ignored."
)
"The argument is silently ignored.")
if hasattr(filepath, "write"):
if format is None:
raise RuntimeError("`format` is required when saving to file object.")
raise RuntimeError(
"`format` is required when saving to file object.")
ext = format.lower()
else:
ext = str(filepath).split(".")[-1].lower()
@ -455,8 +463,7 @@ def save(
if bits_per_sample == 24:
warnings.warn(
"Saving audio with 24 bits per sample might warp samples near -1. "
"Using 16 bits per sample might be able to avoid this."
)
"Using 16 bits per sample might be able to avoid this.")
subtype = _get_subtype(src.dtype, ext, encoding, bits_per_sample)
# sph is a extension used in TED-LIUM but soundfile does not recognize it as NIST format,
@ -467,7 +474,13 @@ def save(
if channels_first:
src = src.t()
soundfile.write(file=filepath, data=src, samplerate=sample_rate, subtype=subtype, format=format)
soundfile.write(
file=filepath,
data=src,
samplerate=sample_rate,
subtype=subtype,
format=format)
_SUBTYPE2DTYPE = {
"PCM_S8": "int8",
@ -478,14 +491,14 @@ _SUBTYPE2DTYPE = {
"DOUBLE": "float64",
}
def load(
filepath: str,
frame_offset: int = 0,
num_frames: int = -1,
normalize: bool = True,
channels_first: bool = True,
format: Optional[str] = None,
) -> Tuple[paddle.Tensor, int]:
frame_offset: int=0,
num_frames: int=-1,
normalize: bool=True,
channels_first: bool=True,
format: Optional[str]=None, ) -> Tuple[paddle.Tensor, int]:
"""Load audio data from file.
Note:
@ -564,7 +577,7 @@ def load(
waveform = paddle.to_tensor(waveform)
if channels_first:
waveform = paddle.transpose(waveform, perm=[1,0])
waveform = paddle.transpose(waveform, perm=[1, 0])
return waveform, sample_rate
@ -588,7 +601,8 @@ _SUBTYPE_TO_BITS_PER_SAMPLE = {
"ALAW": 8, # A-Law encoded. See https://en.wikipedia.org/wiki/G.711#Types
"IMA_ADPCM": 0, # IMA ADPCM.
"MS_ADPCM": 0, # Microsoft ADPCM.
"GSM610": 0, # GSM 6.10 encoding. (Wikipedia says 1.625 bit depth?? https://en.wikipedia.org/wiki/Full_Rate)
"GSM610":
0, # GSM 6.10 encoding. (Wikipedia says 1.625 bit depth?? https://en.wikipedia.org/wiki/Full_Rate)
"VOX_ADPCM": 0, # OKI / Dialogix ADPCM
"G721_32": 0, # 32kbs G721 ADPCM encoding.
"G723_24": 0, # 24kbs G723 ADPCM encoding.
@ -606,16 +620,17 @@ _SUBTYPE_TO_BITS_PER_SAMPLE = {
"ALAC_32": 32, # Apple Lossless Audio Codec (32 bit).
}
def _get_bit_depth(subtype):
if subtype not in _SUBTYPE_TO_BITS_PER_SAMPLE:
warnings.warn(
f"The {subtype} subtype is unknown to PaddleAudio. As a result, the bits_per_sample "
"attribute will be set to 0. If you are seeing this warning, please "
"report by opening an issue on github (after checking for existing/closed ones). "
"You may otherwise ignore this warning."
)
"You may otherwise ignore this warning.")
return _SUBTYPE_TO_BITS_PER_SAMPLE.get(subtype, 0)
_SUBTYPE_TO_ENCODING = {
"PCM_S8": "PCM_S",
"PCM_16": "PCM_S",
@ -629,12 +644,14 @@ _SUBTYPE_TO_ENCODING = {
"VORBIS": "VORBIS",
}
def _get_encoding(format: str, subtype: str):
if format == "FLAC":
return "FLAC"
return _SUBTYPE_TO_ENCODING.get(subtype, "UNKNOWN")
def info(filepath: str, format: Optional[str] = None) -> AudioInfo:
def info(filepath: str, format: Optional[str]=None) -> AudioInfo:
"""Get signal information of an audio file.
Note:
@ -657,5 +674,4 @@ def info(filepath: str, format: Optional[str] = None) -> AudioInfo:
sinfo.frames,
sinfo.channels,
bits_per_sample=_get_bit_depth(sinfo.subtype),
encoding=_get_encoding(sinfo.format, sinfo.subtype),
)
encoding=_get_encoding(sinfo.format, sinfo.subtype), )

@ -1,17 +1,17 @@
from pathlib import Path
from typing import Callable
from typing import Optional, Tuple, Union
import os
from typing import Optional
from typing import Tuple
import paddle
import paddleaudio
from paddle import Tensor
from .common import AudioInfo
import os
from paddleaudio._internal import module_utils as _mod_utils
from .common import AudioInfo
#https://github.com/pytorch/audio/blob/main/torchaudio/backend/sox_io_backend.py
def _fail_info(filepath: str, format: Optional[str]) -> AudioInfo:
raise RuntimeError("Failed to fetch metadata from {}".format(filepath))
@ -23,72 +23,77 @@ def _fail_info_fileobj(fileobj, format: Optional[str]) -> AudioInfo:
# Note: need to comply TorchScript syntax -- need annotation and no f-string
def _fail_load(
filepath: str,
frame_offset: int = 0,
num_frames: int = -1,
normalize: bool = True,
channels_first: bool = True,
format: Optional[str] = None,
) -> Tuple[Tensor, int]:
frame_offset: int=0,
num_frames: int=-1,
normalize: bool=True,
channels_first: bool=True,
format: Optional[str]=None, ) -> Tuple[Tensor, int]:
raise RuntimeError("Failed to load audio from {}".format(filepath))
def _fail_load_fileobj(fileobj, *args, **kwargs):
raise RuntimeError(f"Failed to load audio from {fileobj}")
_fallback_info = _fail_info
_fallback_info_fileobj = _fail_info_fileobj
_fallback_load = _fail_load
_fallback_load_filebj = _fail_load_fileobj
@_mod_utils.requires_sox()
def load(
filepath: str,
frame_offset: int = 0,
frame_offset: int=0,
num_frames: int=-1,
normalize: bool = True,
channels_first: bool = True,
normalize: bool=True,
channels_first: bool=True,
format: Optional[str]=None, ) -> Tuple[Tensor, int]:
if hasattr(filepath, "read"):
ret = paddleaudio._paddleaudio.load_audio_fileobj(
filepath, frame_offset, num_frames, normalize, channels_first, format
)
filepath, frame_offset, num_frames, normalize, channels_first,
format)
if ret is not None:
audio_tensor = paddle.to_tensor(ret[0])
return (audio_tensor, ret[1])
return _fallback_load_fileobj(filepath, frame_offset, num_frames, normalize, channels_first, format)
return _fallback_load_fileobj(filepath, frame_offset, num_frames,
normalize, channels_first, format)
filepath = os.fspath(filepath)
ret = paddleaudio._paddleaudio.sox_io_load_audio_file(
filepath, frame_offset, num_frames, normalize, channels_first, format
)
filepath, frame_offset, num_frames, normalize, channels_first, format)
if ret is not None:
audio_tensor = paddle.to_tensor(ret[0])
return (audio_tensor, ret[1])
return _fallback_load(filepath, frame_offset, num_frames, normalize, channels_first, format)
return _fallback_load(filepath, frame_offset, num_frames, normalize,
channels_first, format)
@_mod_utils.requires_sox()
def save(filepath: str,
def save(
filepath: str,
src: Tensor,
sample_rate: int,
channels_first: bool = True,
compression: Optional[float] = None,
format: Optional[str] = None,
encoding: Optional[str] = None,
bits_per_sample: Optional[int] = None,
):
channels_first: bool=True,
compression: Optional[float]=None,
format: Optional[str]=None,
encoding: Optional[str]=None,
bits_per_sample: Optional[int]=None, ):
src_arr = src.numpy()
if hasattr(filepath, "write"):
paddleaudio._paddleaudio.save_audio_fileobj(
filepath, src_arr, sample_rate, channels_first, compression, format, encoding, bits_per_sample
)
filepath, src_arr, sample_rate, channels_first, compression, format,
encoding, bits_per_sample)
return
filepath = os.fspath(filepath)
paddleaudio._paddleaudio.sox_io_save_audio_file(
filepath, src_arr, sample_rate, channels_first, compression, format, encoding, bits_per_sample
)
filepath, src_arr, sample_rate, channels_first, compression, format,
encoding, bits_per_sample)
@_mod_utils.requires_sox()
def info(filepath: str, format: Optional[str] = None,) -> AudioInfo:
def info(
filepath: str,
format: Optional[str]=None, ) -> AudioInfo:
if hasattr(filepath, "read"):
sinfo = paddleaudio._paddleaudio.get_info_fileobj(filepath, format)
if sinfo is not None:

@ -1,6 +1,5 @@
"""Defines utilities for switching audio backends"""
#code is from: https://github.com/pytorch/audio/blob/main/torchaudio/backend/utils.py
import warnings
from typing import List
from typing import Optional
@ -8,7 +7,9 @@ from typing import Optional
import paddleaudio
from paddleaudio._internal import module_utils as _mod_utils
from . import no_backend, soundfile_backend, sox_io_backend
from . import no_backend
from . import soundfile_backend
from . import sox_io_backend
__all__ = [
"list_audio_backends",
@ -55,6 +56,7 @@ def set_audio_backend(backend: Optional[str]):
for func in ["save", "load", "info"]:
setattr(paddleaudio, func, getattr(module, func))
def _init_audio_backend():
backends = list_audio_backends()
if "soundfile" in backends:

@ -21,7 +21,7 @@ from .env import USER_HOME
from .error import ParameterError
from .log import Logger
from .log import logger
from .time import seconds_to_hms
from .time import Timer
from .numeric import depth_convert
from .numeric import pcm16to32
from .time import seconds_to_hms
from .time import Timer

@ -1,8 +1,8 @@
import itertools
from unittest import skipIf
from parameterized import parameterized
from paddleaudio._internal.module_utils import is_module_available
from parameterized import parameterized
def name_func(func, _, params):
@ -31,7 +31,8 @@ def skipIfFormatNotSupported(fmt):
def parameterize(*params):
return parameterized.expand(list(itertools.product(*params)), name_func=name_func)
return parameterized.expand(
list(itertools.product(*params)), name_func=name_func)
def fetch_wav_subtype(dtype, encoding, bits_per_sample):
@ -54,4 +55,3 @@ def fetch_wav_subtype(dtype, encoding, bits_per_sample):
if subtype:
return subtype
raise ValueError(f"wav does not support ({encoding}, {bits_per_sample}).")

@ -1,37 +1,37 @@
#this code is from: https://github.com/pytorch/audio/blob/main/test/torchaudio_unittest/backend/soundfile/info_test.py
import tarfile
import warnings
import unittest
import warnings
from unittest.mock import patch
import paddle
from paddleaudio._internal import module_utils as _mod_utils
import soundfile
from common import parameterize
from common import skipIfFormatNotSupported
from paddleaudio.backends import soundfile_backend
from tests.backends.common import get_bits_per_sample, get_encoding
from tests.common_utils import (
get_wav_data,
nested_params,
save_wav,
TempDirMixin,
)
from common import parameterize, skipIfFormatNotSupported
import soundfile
from tests.backends.common import get_bits_per_sample
from tests.backends.common import get_encoding
from tests.common_utils import get_wav_data
from tests.common_utils import nested_params
from tests.common_utils import save_wav
from tests.common_utils import TempDirMixin
class TestInfo(TempDirMixin, unittest.TestCase):
@parameterize(
["float32", "int32"],
[8000, 16000],
[1, 2],
)
[1, 2], )
def test_wav(self, dtype, sample_rate, num_channels):
"""`soundfile_backend.info` can check wav file correctly"""
duration = 1
path = self.get_temp_path("data.wav")
data = get_wav_data(dtype, num_channels, normalize=False, num_frames=duration * sample_rate)
data = get_wav_data(
dtype,
num_channels,
normalize=False,
num_frames=duration * sample_rate)
save_wav(path, data, sample_rate)
info = soundfile_backend.info(path)
assert info.sample_rate == sample_rate
@ -86,8 +86,7 @@ class TestInfo(TempDirMixin, unittest.TestCase):
@nested_params(
[8000, 16000],
[1, 2],
[("PCM_24", 24), ("PCM_32", 32)],
)
[("PCM_24", 24), ("PCM_32", 32)], )
@skipIfFormatNotSupported("NIST")
def test_sphere(self, sample_rate, num_channels, subtype_and_bit_depth):
"""`soundfile_backend.info` can check sph file correctly"""
@ -127,7 +126,8 @@ class TestInfo(TempDirMixin, unittest.TestCase):
with warnings.catch_warnings(record=True) as w:
info = soundfile_backend.info("foo")
assert len(w) == 1
assert "UNSEEN_SUBTYPE subtype is unknown to PaddleAudio" in str(w[-1].message)
assert "UNSEEN_SUBTYPE subtype is unknown to PaddleAudio" in str(
w[-1].message)
assert info.bits_per_sample == 0
@ -195,5 +195,6 @@ class TestFileObject(TempDirMixin, unittest.TestCase):
"""Query compressed audio via file-like object works"""
self._test_tarobj("flac", "PCM_16", 16)
if __name__ == '__main__':
unittest.main()

@ -1,28 +1,23 @@
#this code is from: https://github.com/pytorch/audio/blob/main/test/torchaudio_unittest/backend/soundfile/load_test.py
import os
import tarfile
import unittest
from unittest.mock import patch
import numpy as np
from parameterized import parameterized
import numpy as np
import paddle
from paddleaudio._internal import module_utils as _mod_utils
import soundfile
from common import dtype2subtype
from common import parameterize
from common import skipIfFormatNotSupported
from paddleaudio.backends import soundfile_backend
from tests.backends.common import get_bits_per_sample, get_encoding
from tests.common_utils import (
get_wav_data,
load_wav,
nested_params,
normalize_wav,
save_wav,
TempDirMixin,
)
from common import dtype2subtype, parameterize, skipIfFormatNotSupported
from parameterized import parameterized
import soundfile
from tests.common_utils import get_wav_data
from tests.common_utils import load_wav
from tests.common_utils import normalize_wav
from tests.common_utils import save_wav
from tests.common_utils import TempDirMixin
def _get_mock_path(
@ -30,8 +25,7 @@ def _get_mock_path(
dtype: str,
sample_rate: int,
num_channels: int,
num_frames: int,
):
num_frames: int, ):
return f"{dtype}_{sample_rate}_{num_channels}_{num_frames}.{ext}"
@ -87,9 +81,8 @@ class SoundFileMock:
self._params["num_channels"],
normalize=False,
num_frames=self._params["num_frames"],
channels_first=False,
).numpy()
return data[self._start : self._start + frames]
channels_first=False, ).numpy()
return data[self._start:self._start + frames]
def __enter__(self):
return self
@ -99,13 +92,17 @@ class SoundFileMock:
class MockedLoadTest(unittest.TestCase):
def assert_dtype(self, ext, dtype, sample_rate, num_channels, normalize, channels_first):
def assert_dtype(self, ext, dtype, sample_rate, num_channels, normalize,
channels_first):
"""When format is WAV or NIST, normalize=False will return the native dtype Tensor, otherwise float32"""
num_frames = 3 * sample_rate
path = _get_mock_path(ext, dtype, sample_rate, num_channels, num_frames)
expected_dtype = paddle.float32 if normalize or ext not in ["wav", "nist"] else getattr(paddle, dtype)
expected_dtype = paddle.float32 if normalize or ext not in [
"wav", "nist"
] else getattr(paddle, dtype)
with patch("soundfile.SoundFile", SoundFileMock):
found, sr = soundfile_backend.load(path, normalize=normalize, channels_first=channels_first)
found, sr = soundfile_backend.load(
path, normalize=normalize, channels_first=channels_first)
assert found.dtype == expected_dtype
assert sample_rate == sr
@ -114,32 +111,36 @@ class MockedLoadTest(unittest.TestCase):
[8000, 16000],
[1, 2],
[True, False],
[True, False],
)
def test_wav(self, dtype, sample_rate, num_channels, normalize, channels_first):
[True, False], )
def test_wav(self, dtype, sample_rate, num_channels, normalize,
channels_first):
"""Returns native dtype when normalize=False else float32"""
self.assert_dtype("wav", dtype, sample_rate, num_channels, normalize, channels_first)
self.assert_dtype("wav", dtype, sample_rate, num_channels, normalize,
channels_first)
@parameterize(
["int32"],
[8000, 16000],
[1, 2],
[True, False],
[True, False],
)
def test_sphere(self, dtype, sample_rate, num_channels, normalize, channels_first):
[True, False], )
def test_sphere(self, dtype, sample_rate, num_channels, normalize,
channels_first):
"""Returns float32 always"""
self.assert_dtype("sph", dtype, sample_rate, num_channels, normalize, channels_first)
self.assert_dtype("sph", dtype, sample_rate, num_channels, normalize,
channels_first)
@parameterize([8000, 16000], [1, 2], [True, False], [True, False])
def test_ogg(self, sample_rate, num_channels, normalize, channels_first):
"""Returns float32 always"""
self.assert_dtype("ogg", "int16", sample_rate, num_channels, normalize, channels_first)
self.assert_dtype("ogg", "int16", sample_rate, num_channels, normalize,
channels_first)
@parameterize([8000, 16000], [1, 2], [True, False], [True, False])
def test_flac(self, sample_rate, num_channels, normalize, channels_first):
"""`soundfile_backend.load` can load ogg format."""
self.assert_dtype("flac", "int16", sample_rate, num_channels, normalize, channels_first)
self.assert_dtype("flac", "int16", sample_rate, num_channels, normalize,
channels_first)
class LoadTestBase(TempDirMixin, unittest.TestCase):
@ -150,8 +151,7 @@ class LoadTestBase(TempDirMixin, unittest.TestCase):
num_channels,
normalize,
channels_first=True,
duration=1,
):
duration=1, ):
"""`soundfile_backend.load` can load wav format correctly.
Wav data loaded with soundfile backend should match those with scipy
@ -163,11 +163,12 @@ class LoadTestBase(TempDirMixin, unittest.TestCase):
num_channels,
normalize=normalize,
num_frames=num_frames,
channels_first=channels_first,
)
channels_first=channels_first, )
save_wav(path, data, sample_rate, channels_first=channels_first)
expected = load_wav(path, normalize=normalize, channels_first=channels_first)[0]
data, sr = soundfile_backend.load(path, normalize=normalize, channels_first=channels_first)
expected = load_wav(
path, normalize=normalize, channels_first=channels_first)[0]
data, sr = soundfile_backend.load(
path, normalize=normalize, channels_first=channels_first)
assert sr == sample_rate
np.testing.assert_array_almost_equal(data.numpy(), expected.numpy())
@ -177,8 +178,7 @@ class LoadTestBase(TempDirMixin, unittest.TestCase):
sample_rate,
num_channels,
channels_first=True,
duration=1,
):
duration=1, ):
"""`soundfile_backend.load` can load SPHERE format correctly."""
path = self.get_temp_path("reference.sph")
num_frames = duration * sample_rate
@ -187,9 +187,9 @@ class LoadTestBase(TempDirMixin, unittest.TestCase):
num_channels,
num_frames=num_frames,
normalize=False,
channels_first=False,
)
soundfile.write(path, raw, sample_rate, subtype=dtype2subtype(dtype), format="NIST")
channels_first=False, )
soundfile.write(
path, raw, sample_rate, subtype=dtype2subtype(dtype), format="NIST")
expected = normalize_wav(raw.t() if channels_first else raw)
data, sr = soundfile_backend.load(path, channels_first=channels_first)
assert sr == sample_rate
@ -202,8 +202,7 @@ class LoadTestBase(TempDirMixin, unittest.TestCase):
sample_rate,
num_channels,
channels_first=True,
duration=1,
):
duration=1, ):
"""`soundfile_backend.load` can load FLAC format correctly."""
path = self.get_temp_path("reference.flac")
num_frames = duration * sample_rate
@ -212,8 +211,7 @@ class LoadTestBase(TempDirMixin, unittest.TestCase):
num_channels,
num_frames=num_frames,
normalize=False,
channels_first=False,
)
channels_first=False, )
soundfile.write(path, raw, sample_rate)
expected = normalize_wav(raw.t() if channels_first else raw)
data, sr = soundfile_backend.load(path, channels_first=channels_first)
@ -222,7 +220,6 @@ class LoadTestBase(TempDirMixin, unittest.TestCase):
np.testing.assert_array_almost_equal(data.numpy(), expected.numpy())
class TestLoad(LoadTestBase):
"""Test the correctness of `soundfile_backend.load` for various formats"""
@ -231,29 +228,31 @@ class TestLoad(LoadTestBase):
[8000, 16000],
[1, 2],
[False, True],
[False, True],
)
def test_wav(self, dtype, sample_rate, num_channels, normalize, channels_first):
[False, True], )
def test_wav(self, dtype, sample_rate, num_channels, normalize,
channels_first):
"""`soundfile_backend.load` can load wav format correctly."""
self.assert_wav(dtype, sample_rate, num_channels, normalize, channels_first)
self.assert_wav(dtype, sample_rate, num_channels, normalize,
channels_first)
@parameterize(
["int32"],
[16000],
[2],
[False],
)
[False], )
def test_wav_large(self, dtype, sample_rate, num_channels, normalize):
"""`soundfile_backend.load` can load large wav file correctly."""
two_hours = 2 * 60 * 60
self.assert_wav(dtype, sample_rate, num_channels, normalize, duration=two_hours)
self.assert_wav(
dtype, sample_rate, num_channels, normalize, duration=two_hours)
@parameterize(["float32", "int32"], [4, 8, 16, 32], [False, True])
def test_multiple_channels(self, dtype, num_channels, channels_first):
"""`soundfile_backend.load` can load wav file with more than 2 channels."""
sample_rate = 8000
normalize = False
self.assert_wav(dtype, sample_rate, num_channels, normalize, channels_first)
self.assert_wav(dtype, sample_rate, num_channels, normalize,
channels_first)
#@parameterize(["int32"], [8000, 16000], [1, 2], [False, True])
#@skipIfFormatNotSupported("NIST")
@ -291,21 +290,17 @@ class TestLoadFormat(TempDirMixin, unittest.TestCase):
#self.assertEqual(found, expected)
np.testing.assert_array_almost_equal(found, expected)
@parameterized.expand(
[
("WAV",),
("wav",),
]
)
@parameterized.expand([
("WAV", ),
("wav", ),
])
def test_wav(self, format_):
self._test_format(format_)
@parameterized.expand(
[
("FLAC",),
("flac",),
]
)
@parameterized.expand([
("FLAC", ),
("flac", ),
])
@skipIfFormatNotSupported("FLAC")
def test_flac(self, format_):
self._test_format(format_)
@ -356,7 +351,6 @@ class TestFileObject(TempDirMixin, unittest.TestCase):
#self.assertEqual(expected, found)
np.testing.assert_array_almost_equal(found.numpy(), expected)
def test_tarfile_wav(self):
"""Loading audio via file-like object works"""
self._test_tarfile("wav")
@ -365,5 +359,6 @@ class TestFileObject(TempDirMixin, unittest.TestCase):
"""Loading audio via file-like object works"""
self._test_tarfile("flac")
if __name__ == '__main__':
unittest.main()

@ -2,23 +2,18 @@ import io
import unittest
from unittest.mock import patch
from paddleaudio._internal import module_utils as _mod_utils
from paddleaudio.backends import soundfile_backend
from tests.common_utils import (
get_wav_data,
load_wav,
nested_params,
normalize_wav,
save_wav,
TempDirMixin,
)
from common import fetch_wav_subtype, parameterize, skipIfFormatNotSupported
import paddle
import numpy as np
import paddle
import soundfile
from common import fetch_wav_subtype
from common import parameterize
from common import skipIfFormatNotSupported
from paddleaudio.backends import soundfile_backend
from tests.common_utils import get_wav_data
from tests.common_utils import load_wav
from tests.common_utils import nested_params
from tests.common_utils import TempDirMixin
class MockedSaveTest(unittest.TestCase):
@ -41,10 +36,10 @@ class MockedSaveTest(unittest.TestCase):
("ULAW", 8),
("ALAW", None),
("ALAW", 8),
],
)
], )
@patch("soundfile.write")
def test_wav(self, dtype, sample_rate, num_channels, channels_first, enc_params, mocked_write):
def test_wav(self, dtype, sample_rate, num_channels, channels_first,
enc_params, mocked_write):
"""soundfile_backend.save passes correct subtype to soundfile.write when WAV"""
filepath = "foo.wav"
input_tensor = get_wav_data(
@ -52,8 +47,7 @@ class MockedSaveTest(unittest.TestCase):
num_channels,
num_frames=3 * sample_rate,
normalize=dtype == "float32",
channels_first=channels_first,
)
channels_first=channels_first, )
input_tensor = paddle.transpose(input_tensor, [1, 0])
encoding, bits_per_sample = enc_params
@ -63,20 +57,20 @@ class MockedSaveTest(unittest.TestCase):
sample_rate,
channels_first=channels_first,
encoding=encoding,
bits_per_sample=bits_per_sample,
)
bits_per_sample=bits_per_sample, )
# on +Py3.8 call_args.kwargs is more descreptive
args = mocked_write.call_args[1]
assert args["file"] == filepath
assert args["samplerate"] == sample_rate
assert args["subtype"] == fetch_wav_subtype(dtype, encoding, bits_per_sample)
assert args["subtype"] == fetch_wav_subtype(dtype, encoding,
bits_per_sample)
assert args["format"] is None
tensor_result = paddle.transpose(input_tensor, [1, 0]) if channels_first else input_tensor
tensor_result = paddle.transpose(
input_tensor, [1, 0]) if channels_first else input_tensor
#self.assertEqual(args["data"], tensor_result.numpy())
np.testing.assert_array_almost_equal(args["data"].numpy(), tensor_result.numpy())
np.testing.assert_array_almost_equal(args["data"].numpy(),
tensor_result.numpy())
@patch("soundfile.write")
def assert_non_wav(
@ -88,8 +82,7 @@ class MockedSaveTest(unittest.TestCase):
channels_first,
mocked_write,
encoding=None,
bits_per_sample=None,
):
bits_per_sample=None, ):
"""soundfile_backend.save passes correct subtype and format to soundfile.write when SPHERE"""
filepath = f"foo.{fmt}"
input_tensor = get_wav_data(
@ -97,11 +90,11 @@ class MockedSaveTest(unittest.TestCase):
num_channels,
num_frames=3 * sample_rate,
normalize=False,
channels_first=channels_first,
)
channels_first=channels_first, )
input_tensor = paddle.transpose(input_tensor, [1, 0])
expected_data = paddle.transpose(input_tensor, [1, 0]) if channels_first else input_tensor
expected_data = paddle.transpose(
input_tensor, [1, 0]) if channels_first else input_tensor
soundfile_backend.save(
filepath,
@ -109,8 +102,7 @@ class MockedSaveTest(unittest.TestCase):
sample_rate,
channels_first,
encoding=encoding,
bits_per_sample=bits_per_sample,
)
bits_per_sample=bits_per_sample, )
# on +Py3.8 call_args.kwargs is more descreptive
args = mocked_write.call_args[1]
@ -120,7 +112,8 @@ class MockedSaveTest(unittest.TestCase):
assert args["format"] == "NIST"
else:
assert args["format"] is None
np.testing.assert_array_almost_equal(args["data"].numpy(), expected_data.numpy())
np.testing.assert_array_almost_equal(args["data"].numpy(),
expected_data.numpy())
#self.assertEqual(args["data"], expected_data)
@nested_params(
@ -139,45 +132,57 @@ class MockedSaveTest(unittest.TestCase):
("ALAW", 16),
("ALAW", 24),
("ALAW", 32),
],
)
def test_sph(self, fmt, dtype, sample_rate, num_channels, channels_first, enc_params):
], )
def test_sph(self, fmt, dtype, sample_rate, num_channels, channels_first,
enc_params):
"""soundfile_backend.save passes default format and subtype (None-s) to
soundfile.write when not WAV"""
encoding, bits_per_sample = enc_params
self.assert_non_wav(
fmt, dtype, sample_rate, num_channels, channels_first, encoding=encoding, bits_per_sample=bits_per_sample
)
fmt,
dtype,
sample_rate,
num_channels,
channels_first,
encoding=encoding,
bits_per_sample=bits_per_sample)
@parameterize(
["int32"],
[8000, 16000],
[1, 2],
[False, True],
[8, 16, 24],
)
def test_flac(self, dtype, sample_rate, num_channels, channels_first, bits_per_sample):
[8, 16, 24], )
def test_flac(self, dtype, sample_rate, num_channels, channels_first,
bits_per_sample):
"""soundfile_backend.save passes default format and subtype (None-s) to
soundfile.write when not WAV"""
self.assert_non_wav("flac", dtype, sample_rate, num_channels, channels_first, bits_per_sample=bits_per_sample)
self.assert_non_wav(
"flac",
dtype,
sample_rate,
num_channels,
channels_first,
bits_per_sample=bits_per_sample)
@parameterize(
["int32"],
[8000, 16000],
[1, 2],
[False, True],
)
[False, True], )
def test_ogg(self, dtype, sample_rate, num_channels, channels_first):
"""soundfile_backend.save passes default format and subtype (None-s) to
soundfile.write when not WAV"""
self.assert_non_wav("ogg", dtype, sample_rate, num_channels, channels_first)
self.assert_non_wav("ogg", dtype, sample_rate, num_channels,
channels_first)
class SaveTestBase(TempDirMixin, unittest.TestCase):
def assert_wav(self, dtype, sample_rate, num_channels, num_frames):
"""`soundfile_backend.save` can save wav format."""
path = self.get_temp_path("data.wav")
expected = get_wav_data(dtype, num_channels, num_frames=num_frames, normalize=False)
expected = get_wav_data(
dtype, num_channels, num_frames=num_frames, normalize=False)
soundfile_backend.save(path, expected, sample_rate)
found, sr = load_wav(path, normalize=False)
assert sample_rate == sr
@ -192,7 +197,8 @@ class SaveTestBase(TempDirMixin, unittest.TestCase):
"""
num_frames = sample_rate * 3
path = self.get_temp_path(f"data.{fmt}")
expected = get_wav_data(dtype, num_channels, num_frames=num_frames, normalize=False)
expected = get_wav_data(
dtype, num_channels, num_frames=num_frames, normalize=False)
soundfile_backend.save(path, expected, sample_rate)
sinfo = soundfile.info(path)
assert sinfo.format == fmt.upper()
@ -220,16 +226,14 @@ class TestSave(SaveTestBase):
@parameterize(
["float32", "int32"],
[8000, 16000],
[1, 2],
)
[1, 2], )
def test_wav(self, dtype, sample_rate, num_channels):
"""`soundfile_backend.save` can save wav format."""
self.assert_wav(dtype, sample_rate, num_channels, num_frames=None)
@parameterize(
["float32", "int32"],
[4, 8, 16, 32],
)
[4, 8, 16, 32], )
def test_multiple_channels(self, dtype, num_channels):
"""`soundfile_backend.save` can save wav with more than 2 channels."""
sample_rate = 8000
@ -238,8 +242,7 @@ class TestSave(SaveTestBase):
@parameterize(
["int32"],
[8000, 16000],
[1, 2],
)
[1, 2], )
@skipIfFormatNotSupported("NIST")
def test_sphere(self, dtype, sample_rate, num_channels):
"""`soundfile_backend.save` can save sph format."""
@ -247,8 +250,7 @@ class TestSave(SaveTestBase):
@parameterize(
[8000, 16000],
[1, 2],
)
[1, 2], )
@skipIfFormatNotSupported("FLAC")
def test_flac(self, sample_rate, num_channels):
"""`soundfile_backend.save` can save flac format."""
@ -256,8 +258,7 @@ class TestSave(SaveTestBase):
@parameterize(
[8000, 16000],
[1, 2],
)
[1, 2], )
@skipIfFormatNotSupported("OGG")
def test_ogg(self, sample_rate, num_channels):
"""`soundfile_backend.save` can save ogg/vorbis format."""
@ -318,5 +319,6 @@ class TestFileObject(TempDirMixin, unittest.TestCase):
"""Saving audio via file-like object works"""
self._test_fileobj("OGG")
if __name__ == '__main__':
unittest.main()

@ -1,17 +1,12 @@
from .wav_utils import get_wav_data, load_wav, save_wav, normalize_wav
from .case_utils import name_func
from .case_utils import TempDirMixin
from .parameterized_utils import nested_params
from .case_utils import (
TempDirMixin,
name_func
)
from .wav_utils import get_wav_data
from .wav_utils import load_wav
from .wav_utils import normalize_wav
from .wav_utils import save_wav
__all__ = [
"get_wav_data",
"load_wav",
"save_wav",
"normalize_wav",
"get_sinusoid",
"name_func",
"nested_params",
"TempDirMixin"
"get_wav_data", "load_wav", "save_wav", "normalize_wav", "get_sinusoid",
"name_func", "nested_params", "TempDirMixin"
]

@ -1,8 +1,8 @@
from typing import Optional
import scipy.io.wavfile
import paddle
import numpy as np
import scipy.io.wavfile
def normalize_wav(tensor: paddle.Tensor) -> paddle.Tensor:
if tensor.dtype == paddle.float32:
@ -26,10 +26,9 @@ def get_wav_data(
dtype: str,
num_channels: int,
*,
num_frames: Optional[int] = None,
normalize: bool = True,
channels_first: bool = True,
):
num_frames: Optional[int]=None,
normalize: bool=True,
channels_first: bool=True, ):
"""Generate linear signal of the given dtype and num_channels
Data range is
@ -66,7 +65,8 @@ def get_wav_data(
elif dtype == "float64":
base = paddle.linspace(-1.0, 1.0, num_frames, dtype=dtype_)
elif dtype == "int32":
base = paddle.linspace(-2147483648, 2147483647, num_frames, dtype=dtype_)
base = paddle.linspace(
-2147483648, 2147483647, num_frames, dtype=dtype_)
#elif dtype == "int16":
# base = paddle.linspace(-32768, 32767, num_frames, dtype=dtype_)
#dtype_np = getattr(np, dtype)

@ -14,9 +14,9 @@
import argparse
import paddle
from paddleaudio.datasets.voxceleb import VoxCeleb
from yacs.config import CfgNode
from paddleaudio.datasets.voxceleb import VoxCeleb
from paddlespeech.s2t.utils.log import Log
from paddlespeech.vector.io.augment import build_augment_pipeline
from paddlespeech.vector.training.seeding import seed_everything

@ -21,9 +21,9 @@ import os
from typing import List
import tqdm
from paddleaudio.backends import soundfile_load as load_audio
from yacs.config import CfgNode
from paddleaudio.backends import soundfile_load as load_audio
from paddlespeech.s2t.utils.log import Log
from paddlespeech.vector.utils.vector_utils import get_chunks

@ -22,9 +22,9 @@ import os
import random
import tqdm
from paddleaudio.backends import soundfile_load as load_audio
from yacs.config import CfgNode
from paddleaudio.backends import soundfile_load as load_audio
from paddlespeech.s2t.utils.log import Log
from paddlespeech.vector.utils.vector_utils import get_chunks

@ -11,17 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from . import _extension
from . import compliance
from . import datasets
from . import features
from . import functional
from . import io
from . import metric
from . import sox_effects
from . import streamdata
from . import text
from . import transform
from .backends import load
from .backends import save

@ -1,15 +1,12 @@
import os
import warnings
from pathlib import Path
from ._internal import module_utils as _mod_utils # noqa: F401
import contextlib
import ctypes
import os
import sys
import types
import warnings
from pathlib import Path
from ._internal import module_utils as _mod_utils # noqa: F401
# Query `hasattr` only once.
_SET_GLOBAL_FLAGS = hasattr(sys, 'getdlopenflags') and hasattr(sys,
@ -68,6 +65,7 @@ class _Ops(types.ModuleType):
_LIB_DIR = Path(__file__).parent / "lib"
def _get_lib_path(lib: str):
suffix = "pyd" if os.name == "nt" else "so"
path = _LIB_DIR / f"{lib}.{suffix}"

@ -1,17 +1,17 @@
from pathlib import Path
from typing import Callable
from typing import Optional, Tuple, Union
import os
from typing import Optional
from typing import Tuple
import paddle
from paddle import Tensor
from .common import AudioMetaData
import os
from paddlespeech.audio._internal import module_utils as _mod_utils
from .common import AudioMetaData
from paddlespeech.audio import _paddleaudio as paddleaudio
from paddlespeech.audio._internal import module_utils as _mod_utils
#https://github.com/pytorch/audio/blob/main/torchaudio/backend/sox_io_backend.py
def _fail_info(filepath: str, format: Optional[str]) -> AudioMetaData:
raise RuntimeError("Failed to fetch metadata from {}".format(filepath))
@ -23,72 +23,76 @@ def _fail_info_fileobj(fileobj, format: Optional[str]) -> AudioMetaData:
# Note: need to comply TorchScript syntax -- need annotation and no f-string
def _fail_load(
filepath: str,
frame_offset: int = 0,
num_frames: int = -1,
normalize: bool = True,
channels_first: bool = True,
format: Optional[str] = None,
) -> Tuple[Tensor, int]:
frame_offset: int=0,
num_frames: int=-1,
normalize: bool=True,
channels_first: bool=True,
format: Optional[str]=None, ) -> Tuple[Tensor, int]:
raise RuntimeError("Failed to load audio from {}".format(filepath))
def _fail_load_fileobj(fileobj, *args, **kwargs):
raise RuntimeError(f"Failed to load audio from {fileobj}")
_fallback_info = _fail_info
_fallback_info_fileobj = _fail_info_fileobj
_fallback_load = _fail_load
_fallback_load_filebj = _fail_load_fileobj
@_mod_utils.requires_sox()
def load(
filepath: str,
frame_offset: int = 0,
frame_offset: int=0,
num_frames: int=-1,
normalize: bool = True,
channels_first: bool = True,
normalize: bool=True,
channels_first: bool=True,
format: Optional[str]=None, ) -> Tuple[Tensor, int]:
if hasattr(filepath, "read"):
ret = paddleaudio.load_audio_fileobj(
filepath, frame_offset, num_frames, normalize, channels_first, format
)
ret = paddleaudio.load_audio_fileobj(filepath, frame_offset, num_frames,
normalize, channels_first, format)
if ret is not None:
audio_tensor = paddle.to_tensor(ret[0])
return (audio_tensor, ret[1])
return _fallback_load_fileobj(filepath, frame_offset, num_frames, normalize, channels_first, format)
return _fallback_load_fileobj(filepath, frame_offset, num_frames,
normalize, channels_first, format)
filepath = os.fspath(filepath)
ret = paddleaudio.sox_io_load_audio_file(
filepath, frame_offset, num_frames, normalize, channels_first, format
)
ret = paddleaudio.sox_io_load_audio_file(filepath, frame_offset, num_frames,
normalize, channels_first, format)
if ret is not None:
audio_tensor = paddle.to_tensor(ret[0])
return (audio_tensor, ret[1])
return _fallback_load(filepath, frame_offset, num_frames, normalize, channels_first, format)
return _fallback_load(filepath, frame_offset, num_frames, normalize,
channels_first, format)
@_mod_utils.requires_sox()
def save(filepath: str,
def save(
filepath: str,
src: Tensor,
sample_rate: int,
channels_first: bool = True,
compression: Optional[float] = None,
format: Optional[str] = None,
encoding: Optional[str] = None,
bits_per_sample: Optional[int] = None,
):
channels_first: bool=True,
compression: Optional[float]=None,
format: Optional[str]=None,
encoding: Optional[str]=None,
bits_per_sample: Optional[int]=None, ):
src_arr = src.numpy()
if hasattr(filepath, "write"):
paddleaudio.save_audio_fileobj(
filepath, src_arr, sample_rate, channels_first, compression, format, encoding, bits_per_sample
)
paddleaudio.save_audio_fileobj(filepath, src_arr, sample_rate,
channels_first, compression, format,
encoding, bits_per_sample)
return
filepath = os.fspath(filepath)
paddleaudio.sox_io_save_audio_file(
filepath, src_arr, sample_rate, channels_first, compression, format, encoding, bits_per_sample
)
paddleaudio.sox_io_save_audio_file(filepath, src_arr, sample_rate,
channels_first, compression, format,
encoding, bits_per_sample)
@_mod_utils.requires_sox()
def info(filepath: str, format: Optional[str] = None,) -> AudioMetaData:
def info(
filepath: str,
format: Optional[str]=None, ) -> AudioMetaData:
if hasattr(filepath, "read"):
sinfo = paddleaudio.get_info_fileobj(filepath, format)
if sinfo is not None:

@ -1,15 +1,15 @@
"""Defines utilities for switching audio backends"""
#code is from: https://github.com/pytorch/audio/blob/main/torchaudio/backend/utils.py
import warnings
from typing import List
from typing import Optional
import paddlespeech.audio
from . import no_backend
from . import soundfile_backend
from . import sox_io_backend
from paddlespeech.audio._internal import module_utils as _mod_utils
from . import no_backend, soundfile_backend, sox_io_backend
__all__ = [
"list_audio_backends",
"get_audio_backend",

@ -1,14 +1,10 @@
from .sox_effects import apply_effects_file
from .sox_effects import apply_effects_tensor
from .sox_effects import effect_names
from .sox_effects import init_sox_effects
from .sox_effects import shutdown_sox_effects
from paddlespeech.audio._internal import module_utils as _mod_utils
from .sox_effects import (
apply_effects_file,
apply_effects_tensor,
effect_names,
init_sox_effects,
shutdown_sox_effects,
)
if _mod_utils.is_sox_available():
import atexit
@ -22,4 +18,3 @@ __all__ = [
"apply_effects_tensor",
"apply_effects_file",
]

@ -1,14 +1,17 @@
import os
from typing import List, Optional, Tuple
from typing import List
from typing import Optional
from typing import Tuple
import paddle
import numpy
from paddlespeech.audio import _paddleaudio as paddleaudio
from paddlespeech.audio._internal import module_utils as _mod_utils
from paddlespeech.audio.utils.sox_utils import list_effects
from paddlespeech.audio import _paddleaudio as paddleaudio
#code is from: https://github.com/pytorch/audio/blob/main/torchaudio/sox_effects/sox_effects.py
@_mod_utils.requires_sox()
def init_sox_effects():
"""Initialize resources required to use sox effects.
@ -57,8 +60,7 @@ def apply_effects_tensor(
tensor: paddle.Tensor,
sample_rate: int,
effects: List[List[str]],
channels_first: bool = True,
) -> Tuple[paddle.Tensor, int]:
channels_first: bool=True, ) -> Tuple[paddle.Tensor, int]:
"""Apply sox effects to given Tensor
.. devices:: CPU
@ -120,7 +122,8 @@ def apply_effects_tensor(
"""
tensor_np = tensor.numpy()
ret = paddleaudio.sox_effects_apply_effects_tensor(tensor_np, sample_rate, effects, channels_first)
ret = paddleaudio.sox_effects_apply_effects_tensor(tensor_np, sample_rate,
effects, channels_first)
if ret is not None:
return (paddle.to_tensor(ret[0]), ret[1])
raise RuntimeError("Failed to apply sox effect")
@ -130,10 +133,9 @@ def apply_effects_tensor(
def apply_effects_file(
path: str,
effects: List[List[str]],
normalize: bool = True,
channels_first: bool = True,
format: Optional[str] = None,
) -> Tuple[paddle.Tensor, int]:
normalize: bool=True,
channels_first: bool=True,
format: Optional[str]=None, ) -> Tuple[paddle.Tensor, int]:
"""Apply sox effects to the audio file and load the resulting data as Tensor
Note:
@ -227,12 +229,14 @@ def apply_effects_file(
>>> pass
"""
if hasattr(path, "read"):
ret = paddleaudio.apply_effects_fileobj(path, effects, normalize, channels_first, format)
ret = paddleaudio.apply_effects_fileobj(path, effects, normalize,
channels_first, format)
if ret is None:
raise RuntimeError("Failed to load audio from {}".format(path))
return (paddle.to_tensor(ret[0]), ret[1])
path = os.fspath(path)
ret = paddleaudio.sox_effects_apply_effects_file(path, effects, normalize, channels_first, format)
ret = paddleaudio.sox_effects_apply_effects_file(path, effects, normalize,
channels_first, format)
if ret is not None:
return (paddle.to_tensor(ret[0]), ret[1])
raise RuntimeError("Failed to load audio from {}".format(path))

@ -1,7 +1,11 @@
from typing import Dict, List
from typing import Dict
from typing import List
from paddlespeech.audio._internal import module_utils as _mod_utils
from paddlespeech.audio import _paddleaudio
from paddlespeech.audio._internal import module_utils as _mod_utils
#Taken form https://github.com/pytorch/audio/blob/main/torchaudio/utils/sox_utils.py with modification.
@_mod_utils.requires_sox()
def set_seed(seed: int):

@ -20,12 +20,12 @@ from typing import Union
import paddle
import yaml
from paddleaudio.backends import soundfile_load as load_audio
from paddleaudio.compliance.kaldi import fbank as kaldi_fbank
from ..executor import BaseExecutor
from ..log import logger
from ..utils import stats_wrapper
from paddleaudio.backends import soundfile_load as load_audio
from paddleaudio.compliance.kaldi import fbank as kaldi_fbank
__all__ = ['KWSExecutor']
@ -139,7 +139,7 @@ class KWSExecutor(BaseExecutor):
Input content can be a text(tts), a file(asr, cls) or a streaming(not supported yet).
"""
assert os.path.isfile(audio_file)
waveform, _ = load(audio_file)
waveform, _ = load_audio(audio_file)
if isinstance(audio_file, (str, os.PathLike)):
logger.debug("Preprocessing audio_file:" + audio_file)

@ -22,13 +22,13 @@ from typing import Union
import paddle
import soundfile
from paddleaudio.backends import soundfile_load as load_audio
from paddleaudio.compliance.librosa import melspectrogram
from yacs.config import CfgNode
from ..executor import BaseExecutor
from ..log import logger
from ..utils import stats_wrapper
from paddleaudio.backends import soundfile_load as load_audio
from paddleaudio.compliance.librosa import melspectrogram
from paddlespeech.vector.io.batch import feature_normalize
from paddlespeech.vector.modules.sid_model import SpeakerIdetification

@ -16,11 +16,10 @@ import os
import numpy as np
from paddle import inference
from scipy.special import softmax
from paddleaudio.backends import soundfile_load as load_audio
from paddleaudio.datasets import ESC50
from paddleaudio.features import melspectrogram
from scipy.special import softmax
# yapf: disable
parser = argparse.ArgumentParser()

@ -15,8 +15,8 @@ import argparse
import os
import paddle
from paddleaudio.datasets import ESC50
from paddlespeech.cls.models import cnn14
from paddlespeech.cls.models import SoundClassifier

@ -17,10 +17,10 @@ import os
import paddle
import paddle.nn.functional as F
import yaml
from paddleaudio.backends import soundfile_load as load_audio
from paddleaudio.features import LogMelSpectrogram
from paddleaudio.utils import logger
from paddlespeech.cls.models import SoundClassifier
from paddlespeech.utils.dynamic_import import dynamic_import

@ -14,10 +14,10 @@
import os
import paddle
from yacs.config import CfgNode
from paddleaudio.utils import logger
from paddleaudio.utils import Timer
from yacs.config import CfgNode
from paddlespeech.kws.exps.mdtc.collate import collate_features
from paddlespeech.kws.models.loss import max_pooling_loss
from paddlespeech.kws.models.mdtc import KWSModel

@ -15,8 +15,8 @@ import os
import paddle.nn as nn
import paddle.nn.functional as F
from paddleaudio.utils.download import load_state_dict_from_url
from paddlespeech.utils.env import MODEL_HOME
__all__ = ['CNN14', 'CNN10', 'CNN6', 'cnn14', 'cnn10', 'cnn6']

@ -14,10 +14,10 @@
import os
import paddle
from yacs.config import CfgNode
from paddleaudio.utils import logger
from paddleaudio.utils import Timer
from yacs.config import CfgNode
from paddlespeech.kws.exps.mdtc.collate import collate_features
from paddlespeech.kws.models.loss import max_pooling_loss
from paddlespeech.kws.models.mdtc import KWSModel

@ -14,11 +14,10 @@
"""Contains the audio featurizer class."""
import numpy as np
import paddle
import paddleaudio.compliance.kaldi as kaldi
from python_speech_features import delta
from python_speech_features import mfcc
import paddleaudio.compliance.kaldi as kaldi
class AudioFeaturizer():
"""Audio featurizer, for extracting features from audio contents of

@ -28,10 +28,10 @@ from typing import Tuple
import paddle
from paddle import jit
from paddle import nn
from paddleaudio.utils.tensor_utils import add_sos_eos
from paddleaudio.utils.tensor_utils import pad_sequence
from paddleaudio.utils.tensor_utils import th_accuracy
from paddlespeech.s2t.decoders.scorers.ctc import CTCPrefixScorer
from paddlespeech.s2t.frontend.utility import IGNORE_ID
from paddlespeech.s2t.frontend.utility import load_cmvn

@ -24,9 +24,9 @@ from typing import Tuple
import paddle
from paddle import jit
from paddle import nn
from paddleaudio.utils.tensor_utils import add_sos_eos
from paddleaudio.utils.tensor_utils import th_accuracy
from paddlespeech.s2t.frontend.utility import IGNORE_ID
from paddlespeech.s2t.frontend.utility import load_cmvn
from paddlespeech.s2t.modules.cmvn import GlobalCMVN

@ -16,9 +16,9 @@ from collections import OrderedDict
import numpy as np
import paddle
from paddleaudio.backends import soundfile_load as load_audio
from paddleaudio.compliance.librosa import melspectrogram
from paddlespeech.cli.log import logger
from paddlespeech.cli.vector.infer import VectorExecutor
from paddlespeech.server.engine.base_engine import BaseEngine

@ -24,11 +24,11 @@ from typing import Any
from typing import Dict
import paddle
import paddleaudio
import requests
import yaml
from paddle.framework import load
import paddleaudio
from .entry import client_commands
from .entry import server_commands
from paddlespeech.cli import download

@ -16,10 +16,10 @@ import os
import time
import paddle
from yacs.config import CfgNode
from paddleaudio.backends import soundfile_load as load_audio
from paddleaudio.compliance.librosa import melspectrogram
from yacs.config import CfgNode
from paddlespeech.s2t.utils.log import Log
from paddlespeech.vector.io.batch import feature_normalize
from paddlespeech.vector.models.ecapa_tdnn import EcapaTdnn

@ -18,10 +18,10 @@ import numpy as np
import paddle
from paddle.io import BatchSampler
from paddle.io import DataLoader
from paddleaudio.metric import compute_eer
from tqdm import tqdm
from yacs.config import CfgNode
from paddleaudio.metric import compute_eer
from paddlespeech.s2t.utils.log import Log
from paddlespeech.vector.io.batch import batch_feature_normalize
from paddlespeech.vector.io.dataset import CSVDataset

@ -20,9 +20,9 @@ import paddle
from paddle.io import BatchSampler
from paddle.io import DataLoader
from paddle.io import DistributedBatchSampler
from paddleaudio.compliance.librosa import melspectrogram
from yacs.config import CfgNode
from paddleaudio.compliance.librosa import melspectrogram
from paddlespeech.s2t.utils.log import Log
from paddlespeech.vector.io.augment import build_augment_pipeline
from paddlespeech.vector.io.augment import waveform_augment

@ -15,10 +15,9 @@ from dataclasses import dataclass
from dataclasses import fields
from paddle.io import Dataset
from paddleaudio.backends import soundfile_load as load_audio
from paddleaudio.compliance.librosa import melspectrogram
from paddlespeech.s2t.utils.log import Log
logger = Log(__name__).getlog()

@ -16,7 +16,6 @@ from dataclasses import dataclass
from dataclasses import fields
from paddle.io import Dataset
from paddleaudio.backends import soundfile_load as load_audio
from paddleaudio.compliance.librosa import melspectrogram
from paddleaudio.compliance.librosa import mfcc

@ -1,28 +1,29 @@
import unittest
import itertools
import unittest
from parameterized import parameterized
import numpy as np
from paddlespeech.audio._internal import module_utils as _mod_utils
from paddlespeech.audio.backends import sox_io_backend
from parameterized import parameterized
from tests.unit.common_utils import (
get_wav_data,
load_wav,
save_wav,
)
from paddlespeech.audio.backends import sox_io_backend
from tests.unit.common_utils import get_wav_data
from tests.unit.common_utils import load_wav
from tests.unit.common_utils import save_wav
#code is from:https://github.com/pytorch/audio/blob/main/torchaudio/test/torchaudio_unittest/backend/sox_io/load_test.py
class TestLoad(unittest.TestCase):
class TestLoad(unittest.TestCase):
def assert_wav(self, dtype, sample_rate, num_channels, normalize, duration):
"""`sox_io_backend.load` can load wav format correctly.
Wav data loaded with sox_io backend should match those with scipy
"""
path = 'testdata/reference.wav'
data = get_wav_data(dtype, num_channels, normalize=normalize, num_frames=duration * sample_rate)
data = get_wav_data(
dtype,
num_channels,
normalize=normalize,
num_frames=duration * sample_rate)
save_wav(path, data, sample_rate)
expected = load_wav(path, normalize=normalize)[0]
data, sr = sox_io_backend.load(path, normalize=normalize)
@ -32,16 +33,18 @@ class TestLoad(unittest.TestCase):
@parameterized.expand(
list(
itertools.product(
["float64", "float32", "int32",],
[
"float64",
"float32",
"int32",
],
[8000, 16000],
[1, 2],
[False, True],
)
),
)
[False, True], )), )
def test_wav(self, dtype, sample_rate, num_channels, normalize):
"""`sox_io_backend.load` can load wav format correctly."""
self.assert_wav(dtype, sample_rate, num_channels, normalize, duration=1)
if __name__ == '__main__':
unittest.main()

@ -1,23 +1,19 @@
import io
import os
import unittest
import numpy as np
import paddle
from parameterized import parameterized
from paddlespeech.audio.backends import sox_io_backend
from tests.unit.common_utils import (
get_wav_data,
load_wav,
save_wav,
nested_params,
TempDirMixin,
sox_utils
)
from paddlespeech.audio.backends import sox_io_backend
from tests.unit.common_utils import get_wav_data
from tests.unit.common_utils import load_wav
from tests.unit.common_utils import nested_params
from tests.unit.common_utils import save_wav
from tests.unit.common_utils import sox_utils
from tests.unit.common_utils import TempDirMixin
#code is from:https://github.com/pytorch/audio/blob/main/torchaudio/test/torchaudio_unittest/backend/sox_io/save_test.py
def _get_sox_encoding(encoding):
encodings = {
"PCM_F": "floating-point",
@ -28,20 +24,20 @@ def _get_sox_encoding(encoding):
}
return encodings.get(encoding)
class TestSaveBase(TempDirMixin):
def assert_save_consistency(
self,
format: str,
*,
compression: float = None,
encoding: str = None,
bits_per_sample: int = None,
sample_rate: float = 8000,
num_channels: int = 2,
num_frames: float = 3 * 8000,
src_dtype: str = "int32",
test_mode: str = "path",
):
compression: float=None,
encoding: str=None,
bits_per_sample: int=None,
sample_rate: float=8000,
num_channels: int=2,
num_frames: float=3 * 8000,
src_dtype: str="int32",
test_mode: str="path", ):
"""`save` function produces file that is comparable with `sox` command
To compare that the file produced by `save` function agains the file produced by
@ -89,15 +85,20 @@ class TestSaveBase(TempDirMixin):
ref_path = self.get_temp_path("3.2.ref.wav")
# 1. Generate original wav
data = get_wav_data(src_dtype, num_channels, normalize=False, num_frames=num_frames)
data = get_wav_data(
src_dtype, num_channels, normalize=False, num_frames=num_frames)
save_wav(src_path, data, sample_rate)
# 2.1. Convert the original wav to target format with paddleaudio
data = load_wav(src_path, normalize=False)[0]
if test_mode == "path":
sox_io_backend.save(
tgt_path, data, sample_rate, compression=compression, encoding=encoding, bits_per_sample=bits_per_sample
)
tgt_path,
data,
sample_rate,
compression=compression,
encoding=encoding,
bits_per_sample=bits_per_sample)
elif test_mode == "fileobj":
with open(tgt_path, "bw") as file_:
sox_io_backend.save(
@ -107,8 +108,7 @@ class TestSaveBase(TempDirMixin):
format=format,
compression=compression,
encoding=encoding,
bits_per_sample=bits_per_sample,
)
bits_per_sample=bits_per_sample, )
elif test_mode == "bytesio":
file_ = io.BytesIO()
sox_io_backend.save(
@ -118,33 +118,40 @@ class TestSaveBase(TempDirMixin):
format=format,
compression=compression,
encoding=encoding,
bits_per_sample=bits_per_sample,
)
bits_per_sample=bits_per_sample, )
file_.seek(0)
with open(tgt_path, "bw") as f:
f.write(file_.read())
else:
raise ValueError(f"Unexpected test mode: {test_mode}")
# 2.2. Convert the target format to wav with sox
sox_utils.convert_audio_file(tgt_path, tst_path, encoding=cmp_encoding, bit_depth=cmp_bit_depth)
sox_utils.convert_audio_file(
tgt_path, tst_path, encoding=cmp_encoding, bit_depth=cmp_bit_depth)
# 2.3. Load with SciPy
found = load_wav(tst_path, normalize=False)[0]
# 3.1. Convert the original wav to target format with sox
sox_encoding = _get_sox_encoding(encoding)
sox_utils.convert_audio_file(
src_path, sox_path, compression=compression, encoding=sox_encoding, bit_depth=bits_per_sample
)
src_path,
sox_path,
compression=compression,
encoding=sox_encoding,
bit_depth=bits_per_sample)
# 3.2. Convert the target format to wav with sox
sox_utils.convert_audio_file(sox_path, ref_path, encoding=cmp_encoding, bit_depth=cmp_bit_depth)
sox_utils.convert_audio_file(
sox_path, ref_path, encoding=cmp_encoding, bit_depth=cmp_bit_depth)
# 3.3. Load with SciPy
expected = load_wav(ref_path, normalize=False)[0]
np.testing.assert_array_almost_equal(found, expected)
class TestSave(TestSaveBase, unittest.TestCase):
@nested_params(
["path",],
[
"path",
],
[
("PCM_U", 8),
("PCM_S", 16),
@ -153,22 +160,27 @@ class TestSave(TestSaveBase, unittest.TestCase):
("PCM_F", 64),
("ULAW", 8),
("ALAW", 8),
],
)
], )
def test_save_wav(self, test_mode, enc_params):
encoding, bits_per_sample = enc_params
self.assert_save_consistency("wav", encoding=encoding, bits_per_sample=bits_per_sample, test_mode=test_mode)
self.assert_save_consistency(
"wav",
encoding=encoding,
bits_per_sample=bits_per_sample,
test_mode=test_mode)
@nested_params(
["path", ],
[
("float32",),
("int32",),
"path",
],
)
[
("float32", ),
("int32", ),
], )
def test_save_wav_dtype(self, test_mode, params):
(dtype,) = params
self.assert_save_consistency("wav", src_dtype=dtype, test_mode=test_mode)
(dtype, ) = params
self.assert_save_consistency(
"wav", src_dtype=dtype, test_mode=test_mode)
if __name__ == '__main__':

@ -3,12 +3,12 @@ import itertools
import unittest
from parameterized import parameterized
from paddlespeech.audio.backends import sox_io_backend
from tests.unit.common_utils import (
get_wav_data,
TempDirMixin,
name_func
)
from tests.unit.common_utils import get_wav_data
from tests.unit.common_utils import name_func
from tests.unit.common_utils import TempDirMixin
class SmokeTest(TempDirMixin, unittest.TestCase):
"""Run smoke test on various audio format
@ -20,15 +20,23 @@ class SmokeTest(TempDirMixin, unittest.TestCase):
however without such tools, the correctness of each function cannot be verified.
"""
def run_smoke_test(self, ext, sample_rate, num_channels, *, compression=None, dtype="float32"):
def run_smoke_test(self,
ext,
sample_rate,
num_channels,
*,
compression=None,
dtype="float32"):
duration = 1
num_frames = sample_rate * duration
#path = self.get_temp_path(f"test.{ext}")
path = self.get_temp_path(f"test.{ext}")
original = get_wav_data(dtype, num_channels, normalize=False, num_frames=num_frames)
original = get_wav_data(
dtype, num_channels, normalize=False, num_frames=num_frames)
# 1. run save
sox_io_backend.save(path, original, sample_rate, compression=compression)
sox_io_backend.save(
path, original, sample_rate, compression=compression)
# 2. run info
info = sox_io_backend.info(path)
assert info.sample_rate == sample_rate
@ -41,14 +49,11 @@ class SmokeTest(TempDirMixin, unittest.TestCase):
@parameterized.expand(
list(
itertools.product(
["float32", "int32" ],
["float32", "int32"],
#["float32", "int32", "int16", "uint8"],
[8000, 16000],
[1, 2],
)
),
name_func=name_func,
)
[1, 2], )),
name_func=name_func, )
def test_wav(self, dtype, sample_rate, num_channels):
"""Run smoke test on wav format"""
self.run_smoke_test("wav", sample_rate, num_channels, dtype=dtype)
@ -80,18 +85,15 @@ class SmokeTest(TempDirMixin, unittest.TestCase):
#self.run_smoke_test("vorbis", sample_rate, num_channels, compression=quality_level)
@parameterized.expand(
list(
itertools.product(
list(itertools.product(
[8000, 16000],
[1, 2],
list(range(9)),
)
),
name_func=name_func,
)
list(range(9)), )),
name_func=name_func, )
def test_flac(self, sample_rate, num_channels, compression_level):
"""Run smoke test on flac format"""
self.run_smoke_test("flac", sample_rate, num_channels, compression=compression_level)
self.run_smoke_test(
"flac", sample_rate, num_channels, compression=compression_level)
class SmokeTestFileObj(unittest.TestCase):
@ -104,14 +106,22 @@ class SmokeTestFileObj(unittest.TestCase):
however without such tools, the correctness of each function cannot be verified.
"""
def run_smoke_test(self, ext, sample_rate, num_channels, *, compression=None, dtype="float32"):
def run_smoke_test(self,
ext,
sample_rate,
num_channels,
*,
compression=None,
dtype="float32"):
duration = 1
num_frames = sample_rate * duration
original = get_wav_data(dtype, num_channels, normalize=False, num_frames=num_frames)
original = get_wav_data(
dtype, num_channels, normalize=False, num_frames=num_frames)
fileobj = io.BytesIO()
# 1. run save
sox_io_backend.save(fileobj, original, sample_rate, compression=compression, format=ext)
sox_io_backend.save(
fileobj, original, sample_rate, compression=compression, format=ext)
# 2. run info
fileobj.seek(0)
info = sox_io_backend.info(fileobj, format=ext)
@ -124,15 +134,11 @@ class SmokeTestFileObj(unittest.TestCase):
assert loaded.shape[0] == num_channels
@parameterized.expand(
list(
itertools.product(
list(itertools.product(
["float32", "int32"],
[8000, 16000],
[1, 2],
)
),
name_func=name_func,
)
[1, 2], )),
name_func=name_func, )
def test_wav(self, dtype, sample_rate, num_channels):
"""Run smoke test on wav format"""
self.run_smoke_test("wav", sample_rate, num_channels, dtype=dtype)
@ -165,18 +171,16 @@ class SmokeTestFileObj(unittest.TestCase):
#self.run_smoke_test("vorbis", sample_rate, num_channels, compression=quality_level)
@parameterized.expand(
list(
itertools.product(
list(itertools.product(
[8000, 16000],
[1, 2],
list(range(9)),
)
),
name_func=name_func,
)
list(range(9)), )),
name_func=name_func, )
def test_flac(self, sample_rate, num_channels, compression_level):
#"""Run smoke test on flac format"""
self.run_smoke_test("flac", sample_rate, num_channels, compression=compression_level)
self.run_smoke_test(
"flac", sample_rate, num_channels, compression=compression_level)
if __name__ == '__main__':
#test_func()

@ -4,24 +4,18 @@ import itertools
import tarfile
import unittest
from pathlib import Path
import numpy as np
import numpy as np
from parameterized import parameterized
from paddlespeech.audio import sox_effects
from paddlespeech.audio._internal import module_utils as _mod_utils
from tests.unit.common_utils import (
get_sinusoid,
get_wav_data,
load_wav,
save_wav,
sox_utils,
TempDirMixin,
name_func,
load_effects_params
)
if _mod_utils.is_module_available("requests"):
import requests
from tests.unit.common_utils import get_sinusoid
from tests.unit.common_utils import get_wav_data
from tests.unit.common_utils import load_effects_params
from tests.unit.common_utils import load_wav
from tests.unit.common_utils import save_wav
from tests.unit.common_utils import sox_utils
from tests.unit.common_utils import TempDirMixin
class TestSoxEffects(unittest.TestCase):
@ -35,14 +29,18 @@ class TestSoxEffectsTensor(TempDirMixin, unittest.TestCase):
"""Test suite for `apply_effects_tensor` function"""
@parameterized.expand(
list(itertools.product(["float32", "int32"], [8000, 16000], [1, 2, 4, 8], [True, False])),
)
def test_apply_no_effect(self, dtype, sample_rate, num_channels, channels_first):
list(
itertools.product(["float32", "int32"], [8000, 16000], [1, 2, 4, 8],
[True, False])), )
def test_apply_no_effect(self, dtype, sample_rate, num_channels,
channels_first):
"""`apply_effects_tensor` without effects should return identical data as input"""
original = get_wav_data(dtype, num_channels, channels_first=channels_first)
original = get_wav_data(
dtype, num_channels, channels_first=channels_first)
expected = original.clone()
found, output_sample_rate = sox_effects.apply_effects_tensor(expected, sample_rate, [], channels_first)
found, output_sample_rate = sox_effects.apply_effects_tensor(
expected, sample_rate, [], channels_first)
assert (output_sample_rate == sample_rate)
# SoxEffect should not alter the input Tensor object
@ -69,12 +67,18 @@ class TestSoxEffectsTensor(TempDirMixin, unittest.TestCase):
input_path = self.get_temp_path("input.wav")
reference_path = self.get_temp_path("reference.wav")
original = get_sinusoid(frequency=800, sample_rate=input_sr, n_channels=num_channels, dtype="float32")
original = get_sinusoid(
frequency=800,
sample_rate=input_sr,
n_channels=num_channels,
dtype="float32")
save_wav(input_path, original, input_sr)
sox_utils.run_sox_effect(input_path, reference_path, effects, output_sample_rate=output_sr)
sox_utils.run_sox_effect(
input_path, reference_path, effects, output_sample_rate=output_sr)
expected, expected_sr = load_wav(reference_path)
found, sr = sox_effects.apply_effects_tensor(original, input_sr, effects)
found, sr = sox_effects.apply_effects_tensor(original, input_sr,
effects)
assert sr == expected_sr
#self.assertEqual(expected, found)
@ -90,20 +94,19 @@ class TestSoxEffectsFile(TempDirMixin, unittest.TestCase):
["float32", "int32"],
[8000, 16000],
[1, 2, 4, 8],
[False, True],
)
),
[False, True], )),
#name_func=name_func,
)
def test_apply_no_effect(self, dtype, sample_rate, num_channels, channels_first):
def test_apply_no_effect(self, dtype, sample_rate, num_channels,
channels_first):
"""`apply_effects_file` without effects should return identical data as input"""
path = self.get_temp_path("input.wav")
expected = get_wav_data(dtype, num_channels, channels_first=channels_first)
expected = get_wav_data(
dtype, num_channels, channels_first=channels_first)
save_wav(path, expected, sample_rate, channels_first=channels_first)
found, output_sample_rate = sox_effects.apply_effects_file(
path, [], normalize=False, channels_first=channels_first
)
path, [], normalize=False, channels_first=channels_first)
assert output_sample_rate == sample_rate
#self.assertEqual(expected, found)
@ -126,16 +129,17 @@ class TestSoxEffectsFile(TempDirMixin, unittest.TestCase):
reference_path = self.get_temp_path("reference.wav")
data = get_wav_data(dtype, num_channels, channels_first=channels_first)
save_wav(input_path, data, input_sr, channels_first=channels_first)
sox_utils.run_sox_effect(input_path, reference_path, effects, output_sample_rate=output_sr)
sox_utils.run_sox_effect(
input_path, reference_path, effects, output_sample_rate=output_sr)
expected, expected_sr = load_wav(reference_path)
found, sr = sox_effects.apply_effects_file(input_path, effects, normalize=False, channels_first=channels_first)
found, sr = sox_effects.apply_effects_file(
input_path, effects, normalize=False, channels_first=channels_first)
assert sr == expected_sr
#self.assertEqual(found, expected)
np.testing.assert_array_almost_equal(expected.numpy(), found.numpy())
def test_apply_effects_path(self):
"""`apply_effects_file` should return identical data as sox command when file path is given as a Path Object"""
dtype = "int32"
@ -149,12 +153,15 @@ class TestSoxEffectsFile(TempDirMixin, unittest.TestCase):
reference_path = self.get_temp_path("reference.wav")
data = get_wav_data(dtype, num_channels, channels_first=channels_first)
save_wav(input_path, data, input_sr, channels_first=channels_first)
sox_utils.run_sox_effect(input_path, reference_path, effects, output_sample_rate=output_sr)
sox_utils.run_sox_effect(
input_path, reference_path, effects, output_sample_rate=output_sr)
expected, expected_sr = load_wav(reference_path)
found, sr = sox_effects.apply_effects_file(
Path(input_path), effects, normalize=False, channels_first=channels_first
)
Path(input_path),
effects,
normalize=False,
channels_first=channels_first)
assert sr == expected_sr
#self.assertEqual(found, expected)
@ -165,13 +172,10 @@ class TestFileFormats(TempDirMixin, unittest.TestCase):
"""`apply_effects_file` gives the same result as sox on various file formats"""
@parameterized.expand(
list(
itertools.product(
list(itertools.product(
["float32", "int32"],
[8000, 16000],
[1, 2],
)
),
[1, 2], )),
#name_func=lambda f, _, p: f'{f.__name__}_{"_".join(str(arg) for arg in p.args)}',
)
def test_wav(self, dtype, sample_rate, num_channels):
@ -186,7 +190,8 @@ class TestFileFormats(TempDirMixin, unittest.TestCase):
sox_utils.run_sox_effect(input_path, reference_path, effects)
expected, expected_sr = load_wav(reference_path)
found, sr = sox_effects.apply_effects_file(input_path, effects, normalize=False, channels_first=channels_first)
found, sr = sox_effects.apply_effects_file(
input_path, effects, normalize=False, channels_first=channels_first)
assert sr == expected_sr
#self.assertEqual(found, expected)
@ -248,14 +253,12 @@ class TestFileFormats(TempDirMixin, unittest.TestCase):
#np.testing.assert_array_almost_equal(found.numpy(), expected.numpy())
#@skipIfNoExec("sox")
#@skipIfNoSox
#@skipIfNoExec("sox")
#@skipIfNoSox
class TestFileObject(TempDirMixin, unittest.TestCase):
@parameterized.expand(
[
@parameterized.expand([
("wav", None),
]
)
])
def test_fileobj(self, ext, compression):
"""Applying effects via file object works"""
sample_rate = 16000
@ -268,21 +271,25 @@ class TestFileObject(TempDirMixin, unittest.TestCase):
data = get_wav_data("int32", 2, channels_first=channels_first)
save_wav(input_path, data, sample_rate, channels_first=channels_first)
sox_utils.run_sox_effect(input_path, reference_path, effects, output_bitdepth=32)
sox_utils.run_sox_effect(
input_path, reference_path, effects, output_bitdepth=32)
expected, expected_sr = load_wav(reference_path)
with open(input_path, "rb") as fileobj:
found, sr = sox_effects.apply_effects_file(fileobj, effects, channels_first=channels_first)
save_wav(self.get_temp_path("result.wav"), found, sr, channels_first=channels_first)
found, sr = sox_effects.apply_effects_file(
fileobj, effects, channels_first=channels_first)
save_wav(
self.get_temp_path("result.wav"),
found,
sr,
channels_first=channels_first)
assert sr == expected_sr
#self.assertEqual(found, expected)
np.testing.assert_array_almost_equal(found.numpy(), expected.numpy())
@parameterized.expand(
[
@parameterized.expand([
("wav", None),
]
)
])
def test_bytesio(self, ext, compression):
"""Applying effects via BytesIO object works"""
sample_rate = 16000
@ -294,13 +301,19 @@ class TestFileObject(TempDirMixin, unittest.TestCase):
#sox_utils.gen_audio_file(input_path, sample_rate, num_channels=2, compression=compression)
data = get_wav_data("int32", 2, channels_first=channels_first)
save_wav(input_path, data, sample_rate, channels_first=channels_first)
sox_utils.run_sox_effect(input_path, reference_path, effects, output_bitdepth=32)
sox_utils.run_sox_effect(
input_path, reference_path, effects, output_bitdepth=32)
expected, expected_sr = load_wav(reference_path)
with open(input_path, "rb") as file_:
fileobj = io.BytesIO(file_.read())
found, sr = sox_effects.apply_effects_file(fileobj, effects, channels_first=channels_first)
save_wav(self.get_temp_path("result.wav"), found, sr, channels_first=channels_first)
found, sr = sox_effects.apply_effects_file(
fileobj, effects, channels_first=channels_first)
save_wav(
self.get_temp_path("result.wav"),
found,
sr,
channels_first=channels_first)
assert sr == expected_sr
#self.assertEqual(found, expected)
print("found")
@ -309,11 +322,9 @@ class TestFileObject(TempDirMixin, unittest.TestCase):
print(expected)
np.testing.assert_array_almost_equal(found.numpy(), expected.numpy())
@parameterized.expand(
[
@parameterized.expand([
("wav", None),
]
)
])
def test_tarfile(self, ext, compression):
"""Applying effects to compressed audio via file-like file works"""
sample_rate = 16000
@ -328,7 +339,8 @@ class TestFileObject(TempDirMixin, unittest.TestCase):
save_wav(input_path, data, sample_rate, channels_first=channels_first)
# sox_utils.gen_audio_file(input_path, sample_rate, num_channels=2, compression=compression)
sox_utils.run_sox_effect(input_path, reference_path, effects, output_bitdepth=32)
sox_utils.run_sox_effect(
input_path, reference_path, effects, output_bitdepth=32)
expected, expected_sr = load_wav(reference_path)
@ -336,8 +348,13 @@ class TestFileObject(TempDirMixin, unittest.TestCase):
tarobj.add(input_path, arcname=audio_file)
with tarfile.TarFile(archive_path, "r") as tarobj:
fileobj = tarobj.extractfile(audio_file)
found, sr = sox_effects.apply_effects_file(fileobj, effects, channels_first=channels_first)
save_wav(self.get_temp_path("result.wav"), found, sr, channels_first=channels_first)
found, sr = sox_effects.apply_effects_file(
fileobj, effects, channels_first=channels_first)
save_wav(
self.get_temp_path("result.wav"),
found,
sr,
channels_first=channels_first)
assert sr == expected_sr
#self.assertEqual(found, expected)
np.testing.assert_array_almost_equal(found.numpy(), expected.numpy())

@ -17,7 +17,6 @@ import urllib.request
import numpy as np
import paddle
from paddleaudio.backends import soundfile_load as load
wav_url = 'https://paddlespeech.bj.bcebos.com/PaddleAudio/zh.wav'

@ -15,9 +15,9 @@ import unittest
import numpy as np
import paddle
from paddleaudio.functional.window import get_window
from .base import FeatTest
from paddleaudio.functional.window import get_window
from paddlespeech.s2t.transform.spectrogram import IStft
from paddlespeech.s2t.transform.spectrogram import Stft

@ -14,18 +14,17 @@
import unittest
import numpy as np
import paddle
from kaldiio import ReadHelper
from paddlespeech.audio.kaldi import fbank as fbank
from paddlespeech.audio.kaldi import pitch as pitch
from kaldiio import ReadHelper
# the groundtruth feats computed in kaldi command below.
#compute-fbank-feats --dither=0 scp:$wav_scp ark,t:fbank_feat.ark
#compute-kaldi-pitch-feats --sample-frequency=16000 scp:$wav_scp ark,t:pitch_feat.ark
class TestKaldiFbank(unittest.TestCase):
class TestKaldiFbank(unittest.TestCase):
def test_fbank(self):
fbank_groundtruth = {}
with ReadHelper('ark:testdata/fbank_feat.ark') as reader:
@ -53,6 +52,5 @@ class TestKaldiFbank(unittest.TestCase):
pitch_feat, pitch_check, decimal=4)
if __name__ == '__main__':
unittest.main()

@ -15,8 +15,8 @@ import unittest
import numpy as np
import paddle
import paddleaudio
from .base import FeatTest
from paddlespeech.s2t.transform.spectrogram import LogMelSpectrogram

@ -15,8 +15,8 @@ import unittest
import numpy as np
import paddle
import paddleaudio
from .base import FeatTest
from paddlespeech.s2t.transform.spectrogram import Spectrogram

@ -15,9 +15,9 @@ import unittest
import numpy as np
import paddle
from paddleaudio.functional.window import get_window
from .base import FeatTest
from paddleaudio.functional.window import get_window
from paddlespeech.s2t.transform.spectrogram import Stft

@ -1,19 +1,15 @@
from .wav_utils import get_wav_data, load_wav, save_wav, normalize_wav
from .case_utils import name_func
from .case_utils import TempDirMixin
from .data_utils import get_sinusoid
from .data_utils import load_effects_params
from .data_utils import load_params
from .parameterized_utils import nested_params
from .data_utils import get_sinusoid, load_params, load_effects_params
from .case_utils import (
TempDirMixin,
name_func
)
from .wav_utils import get_wav_data
from .wav_utils import load_wav
from .wav_utils import normalize_wav
from .wav_utils import save_wav
__all__ = [
"get_wav_data",
"load_wav",
"save_wav",
"normalize_wav",
"load_params",
"nested_params",
"get_sinusoid",
"name_func",
"load_effects_params"
"get_wav_data", "load_wav", "save_wav", "normalize_wav", "load_params",
"nested_params", "get_sinusoid", "name_func", "load_effects_params"
]

@ -1,24 +1,13 @@
import functools
import os.path
import shutil
import subprocess
import sys
import tempfile
import time
import unittest
#code is from:https://github.com/pytorch/audio/blob/main/test/torchaudio_unittest/common_utils/case_utils.py
import paddle
from paddlespeech.audio._internal.module_utils import (
is_kaldi_available,
is_module_available,
is_sox_available,
)
def name_func(func, _, params):
return f'{func.__name__}_{"_".join(str(arg) for arg in params.args)}'
class TempDirMixin:
"""Mixin to provide easy access to temp dir"""

@ -1,8 +1,8 @@
from typing import Optional
import scipy.io.wavfile
import paddle
import numpy as np
import scipy.io.wavfile
def normalize_wav(tensor: paddle.Tensor) -> paddle.Tensor:
if tensor.dtype == paddle.float32:
@ -26,10 +26,9 @@ def get_wav_data(
dtype: str,
num_channels: int,
*,
num_frames: Optional[int] = None,
normalize: bool = True,
channels_first: bool = True,
):
num_frames: Optional[int]=None,
normalize: bool=True,
channels_first: bool=True, ):
"""Generate linear signal of the given dtype and num_channels
Data range is
@ -66,7 +65,8 @@ def get_wav_data(
elif dtype == "float64":
base = paddle.linspace(-1.0, 1.0, num_frames, dtype=dtype_)
elif dtype == "int32":
base = paddle.linspace(-2147483648, 2147483647, num_frames, dtype=dtype_)
base = paddle.linspace(
-2147483648, 2147483647, num_frames, dtype=dtype_)
#elif dtype == "int16":
# base = paddle.linspace(-32768, 32767, num_frames, dtype=dtype_)
#dtype_np = getattr(np, dtype)

Loading…
Cancel
Save