# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import tarfile import zipfile from typing import Any from typing import Dict from paddle.framework import load from . import download from .entry import commands __all__ = [ 'cli_register', 'get_command', 'download_and_decompress', 'load_state_dict_from_url', ] def cli_register(name: str, description: str='') -> Any: def _warpper(command): items = name.split('.') com = commands for item in items: com = com[item] com['_entry'] = command if description: com['_description'] = description return command return _warpper def get_command(name: str) -> Any: items = name.split('.') com = commands for item in items: com = com[item] return com['_entry'] def _get_uncompress_path(filepath: os.PathLike) -> os.PathLike: file_dir = os.path.dirname(filepath) is_zip_file = False if tarfile.is_tarfile(filepath): files = tarfile.open(filepath, "r:*") file_list = files.getnames() elif zipfile.is_zipfile(filepath): files = zipfile.ZipFile(filepath, 'r') file_list = files.namelist() is_zip_file = True else: return file_dir if download._is_a_single_file(file_list): rootpath = file_list[0] uncompressed_path = os.path.join(file_dir, rootpath) elif download._is_a_single_dir(file_list): if is_zip_file: rootpath = os.path.splitext(file_list[0])[0].split(os.sep)[0] else: rootpath = os.path.splitext(file_list[0])[0].split(os.sep)[-1] uncompressed_path = os.path.join(file_dir, rootpath) else: rootpath = os.path.splitext(filepath)[0].split(os.sep)[-1] uncompressed_path = os.path.join(file_dir, rootpath) files.close() return uncompressed_path def download_and_decompress(archive: Dict[str, str], path: str) -> os.PathLike: """ Download archieves and decompress to specific path. """ if not os.path.isdir(path): os.makedirs(path) assert 'url' in archive and 'md5' in archive, \ 'Dictionary keys of "url" and "md5" are required in the archive, but got: {}'.format(list(archive.keys())) filepath = os.path.join(path, os.path.basename(archive['url'])) if os.path.isfile(filepath) and download._md5check(filepath, archive['md5']): uncompress_path = _get_uncompress_path(filepath) if not os.path.isdir(uncompress_path): download._decompress(filepath) else: uncompress_path = download.get_path_from_url(archive['url'], path, archive['md5']) return uncompress_path def load_state_dict_from_url(url: str, path: str, md5: str=None) -> os.PathLike: """ Download and load a state dict from url """ if not os.path.isdir(path): os.makedirs(path) download.get_path_from_url(url, path, md5) return load(os.path.join(path, os.path.basename(url))) def _get_user_home(): return os.path.expanduser('~') def _get_paddlespcceh_home(): if 'PPSPEECH_HOME' in os.environ: home_path = os.environ['PPSPEECH_HOME'] if os.path.exists(home_path): if os.path.isdir(home_path): return home_path else: raise RuntimeError( 'The environment variable PPSPEECH_HOME {} is not a directory.'. format(home_path)) else: return home_path return os.path.join(_get_user_home(), '.paddlespeech') def _get_sub_home(directory): home = os.path.join(_get_paddlespcceh_home(), directory) if not os.path.exists(home): os.makedirs(home) return home PPSPEECH_HOME = _get_paddlespcceh_home() MODEL_HOME = _get_sub_home('models')