|
|
|
"""This script is for uploading data for DeepSpeech2 training on paddlecloud.
|
|
|
|
|
|
|
|
Steps:
|
|
|
|
1. Read original manifests and extract local sound files.
|
|
|
|
2. Tar all local sound files into multiple tar files and upload them.
|
|
|
|
3. Modify original manifests with updated paths in cloud filesystem.
|
|
|
|
"""
|
|
|
|
from __future__ import absolute_import
|
|
|
|
from __future__ import division
|
|
|
|
from __future__ import print_function
|
|
|
|
|
|
|
|
import json
|
|
|
|
import os
|
|
|
|
import tarfile
|
|
|
|
import sys
|
|
|
|
import argparse
|
|
|
|
import shutil
|
|
|
|
from subprocess import call
|
|
|
|
import _init_paths
|
|
|
|
from data_utils.utils import read_manifest
|
|
|
|
|
|
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
|
|
parser.add_argument(
|
|
|
|
"--in_manifest_paths",
|
|
|
|
default=[
|
|
|
|
"../datasets/manifest.train", "../datasets/manifest.dev",
|
|
|
|
"../datasets/manifest.test"
|
|
|
|
],
|
|
|
|
type=str,
|
|
|
|
nargs='+',
|
|
|
|
help="Local filepaths of input manifests to load, pack and upload."
|
|
|
|
"(default: %(default)s)")
|
|
|
|
parser.add_argument(
|
|
|
|
"--out_manifest_paths",
|
|
|
|
default=[
|
|
|
|
"./cloud.manifest.train", "./cloud.manifest.dev",
|
|
|
|
"./cloud.manifest.test"
|
|
|
|
],
|
|
|
|
type=str,
|
|
|
|
nargs='+',
|
|
|
|
help="Local filepaths of modified manifests to write to. "
|
|
|
|
"(default: %(default)s)")
|
|
|
|
parser.add_argument(
|
|
|
|
"--cloud_data_dir",
|
|
|
|
required=True,
|
|
|
|
type=str,
|
|
|
|
help="Destination directory on paddlecloud to upload data to.")
|
|
|
|
parser.add_argument(
|
|
|
|
"--num_shards",
|
|
|
|
default=10,
|
|
|
|
type=int,
|
|
|
|
help="Number of parts to split data to. (default: %(default)s)")
|
|
|
|
parser.add_argument(
|
|
|
|
"--local_tmp_dir",
|
|
|
|
default="./tmp/",
|
|
|
|
type=str,
|
|
|
|
help="Local directory for storing temporary data. (default: %(default)s)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
|
|
|
|
|
|
def upload_data(in_manifest_path_list, out_manifest_path_list, local_tmp_dir,
|
|
|
|
upload_tar_dir, num_shards):
|
|
|
|
"""Extract and pack sound files listed in the manifest files into multple
|
|
|
|
tar files and upload them to padldecloud. Besides, generate new manifest
|
|
|
|
files with updated paths in paddlecloud.
|
|
|
|
"""
|
|
|
|
# compute total audio number
|
|
|
|
total_line = 0
|
|
|
|
for manifest_path in in_manifest_path_list:
|
|
|
|
with open(manifest_path, 'r') as f:
|
|
|
|
total_line += len(f.readlines())
|
|
|
|
line_per_tar = (total_line // num_shards) + 1
|
|
|
|
|
|
|
|
# pack and upload shard by shard
|
|
|
|
line_count, tar_file = 0, None
|
|
|
|
for manifest_path, out_manifest_path in zip(in_manifest_path_list,
|
|
|
|
out_manifest_path_list):
|
|
|
|
manifest = read_manifest(manifest_path)
|
|
|
|
out_manifest = []
|
|
|
|
for json_data in manifest:
|
|
|
|
sound_filepath = json_data['audio_filepath']
|
|
|
|
sound_filename = os.path.basename(sound_filepath)
|
|
|
|
if line_count % line_per_tar == 0:
|
|
|
|
if tar_file != None:
|
|
|
|
tar_file.close()
|
|
|
|
pcloud_cp(tar_path, upload_tar_dir)
|
|
|
|
os.remove(tar_path)
|
|
|
|
tar_name = 'part-%s-of-%s.tar' % (
|
|
|
|
str(line_count // line_per_tar).zfill(5),
|
|
|
|
str(num_shards).zfill(5))
|
|
|
|
tar_path = os.path.join(local_tmp_dir, tar_name)
|
|
|
|
tar_file = tarfile.open(tar_path, 'w')
|
|
|
|
tar_file.add(sound_filepath, arcname=sound_filename)
|
|
|
|
line_count += 1
|
|
|
|
json_data['audio_filepath'] = "tar:%s#%s" % (
|
|
|
|
os.path.join(upload_tar_dir, tar_name), sound_filename)
|
|
|
|
out_manifest.append("%s\n" % json.dumps(json_data))
|
|
|
|
with open(out_manifest_path, 'w') as f:
|
|
|
|
f.writelines(out_manifest)
|
|
|
|
pcloud_cp(out_manifest_path, upload_tar_dir)
|
|
|
|
tar_file.close()
|
|
|
|
pcloud_cp(tar_path, upload_tar_dir)
|
|
|
|
os.remove(tar_path)
|
|
|
|
|
|
|
|
|
|
|
|
def pcloud_mkdir(dir):
|
|
|
|
"""Make directory in PaddleCloud filesystem.
|
|
|
|
"""
|
|
|
|
if call(['paddlecloud', 'mkdir', dir]) != 0:
|
|
|
|
raise IOError("PaddleCloud mkdir failed: %s." % dir)
|
|
|
|
|
|
|
|
|
|
|
|
def pcloud_cp(src, dst):
|
|
|
|
"""Copy src from local filesytem to dst in PaddleCloud filesystem,
|
|
|
|
or downlowd src from PaddleCloud filesystem to dst in local filesystem.
|
|
|
|
"""
|
|
|
|
if call(['paddlecloud', 'cp', src, dst]) != 0:
|
|
|
|
raise IOError("PaddleCloud cp failed: from [%s] to [%s]." % (src, dst))
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
if not os.path.exists(args.local_tmp_dir):
|
|
|
|
os.makedirs(args.local_tmp_dir)
|
|
|
|
pcloud_mkdir(args.cloud_data_dir)
|
|
|
|
|
|
|
|
upload_data(args.in_manifest_paths, args.out_manifest_paths,
|
|
|
|
args.local_tmp_dir, args.cloud_data_dir, args.num_shards)
|
|
|
|
|
|
|
|
shutil.rmtree(args.local_tmp_dir)
|