You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
PaddleSpeech/paddlespeech/cli/utils.py

227 lines
6.7 KiB

# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import hashlib
import logging
import os
import tarfile
import zipfile
from typing import Any
from typing import Dict
from typing import List
from paddle.framework import load
from . import download
from .entry import commands
__all__ = [
'cli_register',
'get_command',
'download_and_decompress',
'load_state_dict_from_url',
'logger',
]
def cli_register(name: str, description: str='') -> Any:
def _warpper(command):
items = name.split('.')
com = commands
for item in items:
com = com[item]
com['_entry'] = command
if description:
com['_description'] = description
return command
return _warpper
def get_command(name: str) -> Any:
items = name.split('.')
com = commands
for item in items:
com = com[item]
return com['_entry']
def _md5check(filepath: os.PathLike, md5sum: str) -> bool:
logger.info("File {} md5 checking...".format(filepath))
md5 = hashlib.md5()
with open(filepath, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
md5.update(chunk)
calc_md5sum = md5.hexdigest()
if calc_md5sum != md5sum:
logger.info("File {} md5 check failed, {}(calc) != "
"{}(base)".format(filepath, calc_md5sum, md5sum))
return False
else:
logger.info("File {} md5 check passed.".format(filepath))
return True
def _get_uncompress_path(filepath: os.PathLike) -> os.PathLike:
file_dir = os.path.dirname(filepath)
if tarfile.is_tarfile(filepath):
files = tarfile.open(filepath, "r:*")
file_list = files.getnames()
elif zipfile.is_zipfile(filepath):
files = zipfile.ZipFile(filepath, 'r')
file_list = files.namelist()
else:
return file_dir
if _is_a_single_file(file_list):
rootpath = file_list[0]
uncompressed_path = os.path.join(file_dir, rootpath)
elif _is_a_single_dir(file_list):
rootpath = os.path.splitext(file_list[0])[0].split(os.sep)[0]
uncompressed_path = os.path.join(file_dir, rootpath)
else:
rootpath = os.path.splitext(filepath)[0].split(os.sep)[-1]
uncompressed_path = os.path.join(file_dir, rootpath)
files.close()
return uncompressed_path
def _is_a_single_file(file_list: List[os.PathLike]) -> bool:
if len(file_list) == 1 and file_list[0].find(os.sep) < -1:
return True
return False
def _is_a_single_dir(file_list: List[os.PathLike]) -> bool:
new_file_list = []
for file_path in file_list:
if '/' in file_path:
file_path = file_path.replace('/', os.sep)
elif '\\' in file_path:
file_path = file_path.replace('\\', os.sep)
new_file_list.append(file_path)
file_name = new_file_list[0].split(os.sep)[0]
for i in range(1, len(new_file_list)):
if file_name != new_file_list[i].split(os.sep)[0]:
return False
return True
def download_and_decompress(archive: Dict[str, str], path: str) -> os.PathLike:
"""
Download archieves and decompress to specific path.
"""
if not os.path.isdir(path):
os.makedirs(path)
assert 'url' in archive and 'md5' in archive, \
'Dictionary keys of "url" and "md5" are required in the archive, but got: {}'.format(list(archive.keys()))
filepath = os.path.join(path, os.path.basename(archive['url']))
if os.path.isfile(filepath) and _md5check(filepath, archive['md5']):
uncompress_path = _get_uncompress_path(filepath)
if not os.path.isdir(uncompress_path):
download._decompress(filepath)
else:
uncompress_path = download.get_path_from_url(archive['url'], path,
archive['md5'])
return uncompress_path
def load_state_dict_from_url(url: str, path: str, md5: str=None) -> os.PathLike:
"""
Download and load a state dict from url
"""
if not os.path.isdir(path):
os.makedirs(path)
download.get_path_from_url(url, path, md5)
return load(os.path.join(path, os.path.basename(url)))
def _get_user_home():
return os.path.expanduser('~')
def _get_paddlespcceh_home():
if 'PPSPEECH_HOME' in os.environ:
home_path = os.environ['PPSPEECH_HOME']
if os.path.exists(home_path):
if os.path.isdir(home_path):
return home_path
else:
raise RuntimeError(
'The environment variable PPSPEECH_HOME {} is not a directory.'.
format(home_path))
else:
return home_path
return os.path.join(_get_user_home(), '.paddlespeech')
def _get_sub_home(directory):
home = os.path.join(_get_paddlespcceh_home(), directory)
if not os.path.exists(home):
os.makedirs(home)
return home
PPSPEECH_HOME = _get_paddlespcceh_home()
MODEL_HOME = _get_sub_home('models')
class Logger(object):
def __init__(self, name: str=None):
name = 'PaddleSpeech' if not name else name
self.logger = logging.getLogger(name)
log_config = {
'DEBUG': 10,
'INFO': 20,
'TRAIN': 21,
'EVAL': 22,
'WARNING': 30,
'ERROR': 40,
'CRITICAL': 50,
'EXCEPTION': 100,
}
for key, level in log_config.items():
logging.addLevelName(level, key)
if key == 'EXCEPTION':
self.__dict__[key.lower()] = self.logger.exception
else:
self.__dict__[key.lower()] = functools.partial(self.__call__,
level)
self.format = logging.Formatter(
fmt='[%(asctime)-15s] [%(levelname)8s] [%(filename)s] [L%(lineno)d] - %(message)s'
)
self.handler = logging.StreamHandler()
self.handler.setFormatter(self.format)
self.logger.addHandler(self.handler)
self.logger.setLevel(logging.DEBUG)
self.logger.propagate = False
def __call__(self, log_level: str, msg: str):
self.logger.log(log_level, msg)
logger = Logger()