From f75746cd31d69aa14ec57327b5bff73c69a8a9dc Mon Sep 17 00:00:00 2001 From: Xinghai Sun Date: Tue, 15 Aug 2017 16:53:26 +0800 Subject: [PATCH] Seperate data uploading from job summission for DS2 cloud training and add support for multiple shards uploading. --- cloud/pcloud_submit.sh | 35 ++------ cloud/pcloud_train.sh | 40 ++++----- cloud/pcloud_upload_data.sh | 17 ++++ cloud/split_data.py | 36 +++------ cloud/upload_data.py | 156 ++++++++++++++++-------------------- 5 files changed, 118 insertions(+), 166 deletions(-) create mode 100644 cloud/pcloud_upload_data.sh diff --git a/cloud/pcloud_submit.sh b/cloud/pcloud_submit.sh index 3a64f32e..35fe54f2 100644 --- a/cloud/pcloud_submit.sh +++ b/cloud/pcloud_submit.sh @@ -1,32 +1,11 @@ -# Configure input data set in local filesystem -TRAIN_MANIFEST="../datasets/manifest.train" -DEV_MANIFEST="../datasets/manifest.dev" -VOCAB_FILE="../datasets/vocab/eng_vocab.txt" -MEAN_STD_FILE="../mean_std.npz" -# Configure output path in PaddleCloud filesystem -CLOUD_DATA_DIR="/pfs/dlnel/home/sunxinghai@baidu.com/deepspeech2/data" +TRAIN_MANIFEST="cloud/cloud.manifest.test" +DEV_MANIFEST="cloud/cloud.manifest.dev" CLOUD_MODEL_DIR="/pfs/dlnel/home/sunxinghai@baidu.com/deepspeech2/model" -# Configure cloud resources -NUM_CPU=8 +BATCH_SIZE=256 NUM_GPU=8 NUM_NODE=1 -MEMORY="10Gi" IS_LOCAL="True" -# Pack and upload local data to PaddleCloud filesystem -python upload_data.py \ ---train_manifest_path=${TRAIN_MANIFEST} \ ---dev_manifest_path=${DEV_MANIFEST} \ ---vocab_file=${VOCAB_FILE} \ ---mean_std_file=${MEAN_STD_FILE} \ ---cloud_data_path=${CLOUD_DATA_DIR} -if [ $? -ne 0 ] -then - echo "upload data failed!" - exit 1 -fi - -# Submit job to PaddleCloud JOB_NAME=deepspeech-`date +%Y%m%d%H%M%S` DS2_PATH=${PWD%/*} cp -f pcloud_train.sh ${DS2_PATH} @@ -34,15 +13,15 @@ cp -f pcloud_train.sh ${DS2_PATH} paddlecloud submit \ -image bootstrapper:5000/wanghaoshuang/pcloud_ds2:latest \ -jobname ${JOB_NAME} \ --cpu ${NUM_CPU} \ +-cpu ${NUM_GPU} \ -gpu ${NUM_GPU} \ --memory ${MEMORY} \ +-memory 10Gi \ -parallelism ${NUM_NODE} \ -pscpu 1 \ -pservers 1 \ --psmemory ${MEMORY} \ +-psmemory 10Gi \ -passes 1 \ --entry "sh pcloud_train.sh ${CLOUD_DATA_DIR} ${CLOUD_MODEL_DIR} ${NUM_CPU} ${NUM_GPU} ${IS_LOCAL}" \ +-entry "sh pcloud_train.sh ${TRAIN_MANIFEST} ${DEV_MANIFEST} ${CLOUD_MODEL_DIR} ${NUM_GPU} ${BATCH_SIZE} ${IS_LOCAL}" \ ${DS2_PATH} rm ${DS2_PATH}/pcloud_train.sh diff --git a/cloud/pcloud_train.sh b/cloud/pcloud_train.sh index 21bd43f9..e42da1d6 100644 --- a/cloud/pcloud_train.sh +++ b/cloud/pcloud_train.sh @@ -1,36 +1,24 @@ -DATA_PATH=$1 -MODEL_PATH=$2 -NUM_CPU=$3 +TRAIN_MANIFEST=$1 +DEV_MANIFEST=$2 +MODEL_PATH=$3 NUM_GPU=$4 -IS_LOCAL=$5 +BATCH_SIZE=$5 +IS_LOCAL=$6 -TRAIN_MANI=${DATA_PATH}/cloud.train.manifest -DEV_MANI=${DATA_PATH}/cloud.dev.manifest -TRAIN_TAR=${DATA_PATH}/cloud.train.tar -DEV_TAR=${DATA_PATH}/cloud.dev.tar -VOCAB_PATH=${DATA_PATH}/vocab.txt -MEAN_STD_FILE=${DATA_PATH}/mean_std.npz - -# split train data for each pcloud node python ./cloud/split_data.py \ ---in_manifest_path=${TRAIN_MANI} \ ---data_tar_path=${TRAIN_TAR} \ ---out_manifest_path='/local.train.manifest' +--in_manifest_path=${TRAIN_MANIFEST} \ +--out_manifest_path='/local.manifest.train' -# split dev data for each pcloud node python ./cloud/split_data.py \ ---in_manifest_path=${DEV_MANI} \ ---data_tar_path=${DEV_TAR} \ ---out_manifest_path='/local.dev.manifest' +--in_manifest_path=${DEV_MANIFEST} \ +--out_manifest_path='/local.manifest.dev' -# run train python train.py \ +--batch_size=$BATCH_SIZE \ --use_gpu=1 \ --trainer_count=${NUM_GPU} \ ---num_threads_data=${NUM_CPU} \ +--num_threads_data=${NUM_GPU} \ --is_local=${IS_LOCAL} \ ---mean_std_filepath=${MEAN_STD_FILE} \ ---train_manifest_path='/local.train.manifest' \ ---dev_manifest_path='/local.dev.manifest' \ ---vocab_filepath=${VOCAB_PATH} \ ---output_model_dir=${MODEL_PATH} +--train_manifest_path='/local.manifest.train' \ +--dev_manifest_path='/local.manifest.dev' \ +--output_model_dir=${MODEL_PATH} \ diff --git a/cloud/pcloud_upload_data.sh b/cloud/pcloud_upload_data.sh new file mode 100644 index 00000000..1422b8a1 --- /dev/null +++ b/cloud/pcloud_upload_data.sh @@ -0,0 +1,17 @@ +IN_MANIFESTS="../datasets/manifest.tmp ../datasets/manifest.dev ../datasets/manifest.test" +OUT_MANIFESTS="./cloud.manifest.tmp ./cloud.manifest.dev ./cloud.manifest.test" +CLOUD_DATA_DIR="/pfs/dlnel/home/sunxinghai@baidu.com/deepspeech2/data" +NUM_SHARDS=10 + +python upload_data.py \ +--in_manifest_paths ${IN_MANIFESTS} \ +--out_manifest_paths ${OUT_MANIFESTS} \ +--cloud_data_dir ${CLOUD_DATA_DIR} \ +--num_shards ${NUM_SHARDS} + +if [ $? -ne 0 ] +then + echo "Upload Data Failed!" + exit 1 +fi +echo "All Done." diff --git a/cloud/split_data.py b/cloud/split_data.py index 8df194a6..3496d52b 100644 --- a/cloud/split_data.py +++ b/cloud/split_data.py @@ -1,7 +1,5 @@ """This tool is used for splitting data into each node of -paddle cloud by total trainer count and current trainer id. -The meaning of trainer is a instance of k8s cluster. -This script should be called in paddle cloud. +paddlecloud. This script should be called in paddlecloud. """ from __future__ import absolute_import from __future__ import division @@ -14,40 +12,30 @@ import argparse parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--in_manifest_path", - default='./cloud.train.manifest', type=str, - help="Input manifest path. (default: %(default)s)") -parser.add_argument( - "--data_tar_path", - default='./cloud.train.tar', - type=str, - help="Data tar file path. (default: %(default)s)") + required=True, + help="Input manifest path for all nodes.") parser.add_argument( "--out_manifest_path", - default='./local.train.manifest', type=str, - help="Out manifest file path. (default: %(default)s)") + required=True, + help="Output manifest file path for current node.") args = parser.parse_args() -def split_data(in_manifest, tar_path, out_manifest): +def split_data(in_manifest_path, out_manifest_path): with open("/trainer_id", "r") as f: trainer_id = int(f.readline()[:-1]) with open("/trainer_count", "r") as f: trainer_count = int(f.readline()[:-1]) - tar_path = os.path.abspath(tar_path) - result = [] - for index, json_line in enumerate(open(in_manifest)): + out_manifest = [] + for index, json_line in enumerate(open(in_manifest_path, 'r')): if (index % trainer_count) == trainer_id: - json_data = json.loads(json_line) - json_data['audio_filepath'] = "tar:%s#%s" % ( - tar_path, json_data['audio_filepath']) - result.append("%s\n" % json.dumps(json_data)) - with open(out_manifest, 'w') as manifest: - manifest.writelines(result) + out_manifest.append("%s\n" % json_line.strip()) + with open(out_manifest_path, 'w') as f: + f.writelines(out_manifest) if __name__ == '__main__': - split_data(args.in_manifest_path, args.data_tar_path, - args.out_manifest_path) + split_data(args.in_manifest_path, args.out_manifest_path) diff --git a/cloud/upload_data.py b/cloud/upload_data.py index efa9e77c..66857574 100644 --- a/cloud/upload_data.py +++ b/cloud/upload_data.py @@ -1,12 +1,9 @@ -"""This script is used for preparing data for DeepSpeech2 trainning on paddle -cloud. +"""This script is for uploading data for DeepSpeech2 training on paddlecloud. Steps: -1. Read original manifest and get the local path of sound files. -2. Tar all local sound files into one tar file. -3. Modify original manifest to remove the local path information. - -Finally, we will get a tar file and a new manifest. +1. Read original manifests and extract local sound files. +2. Tar all local sound files into multiple tar files and upload them. +3. Modify original manifests with updated paths in cloud filesystem. """ from __future__ import absolute_import from __future__ import division @@ -22,66 +19,81 @@ from subprocess import call import _init_paths from data_utils.utils import read_manifest -TRAIN_TAR = "cloud.train.tar" -TRAIN_MANIFEST = "cloud.train.manifest" -DEV_TAR = "cloud.dev.tar" -DEV_MANIFEST = "cloud.dev.manifest" -VOCAB_FILE = "vocab.txt" -MEAN_STD_FILE = "mean_std.npz" - parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( - "--train_manifest_path", - default="../datasets/manifest.train", - type=str, - help="Manifest file path for train data. (default: %(default)s)") -parser.add_argument( - "--dev_manifest_path", - default="../datasets/manifest.dev", + "--in_manifest_paths", + default=["../datasets/manifest.test", "../datasets/manifest.dev"], type=str, - help="Manifest file path for validation data. (default: %(default)s)") -parser.add_argument( - "--vocab_file", - default="../datasets/vocab/eng_vocab.txt", - type=str, - help="Vocabulary file to be uploaded to paddlecloud. " + nargs='+', + help="Local filepaths of input manifests to load, pack and upload." "(default: %(default)s)") parser.add_argument( - "--mean_std_file", - default="../mean_std.npz", + "--out_manifest_paths", + default=["./cloud.manifest.test", "./cloud.manifest.dev"], type=str, - help="Normalizer's statistics (mean and stddev) file to be uploaded to " - "paddlecloud. (default: %(default)s)") + nargs='+', + help="Local filepaths of modified manifests to write to. " + "(default: %(default)s)") parser.add_argument( - "--cloud_data_path", + "--cloud_data_dir", required=True, type=str, - help="Destination path on paddlecloud. (default: %(default)s)") + help="Destination directory on paddlecloud to upload data to.") +parser.add_argument( + "--num_shards", + default=10, + type=int, + help="Number of parts to split data to. (default: %(default)s)") parser.add_argument( - "--local_tmp_path", + "--local_tmp_dir", default="./tmp/", type=str, help="Local directory for storing temporary data. (default: %(default)s)") args = parser.parse_args() -def pack_data(manifest_path, out_tar_path, out_manifest_path): - """1. According to the manifest, tar sound files into out_tar_path. - 2. Generate a new manifest for output tar file. +def upload_data(in_manifest_path_list, out_manifest_path_list, local_tmp_dir, + upload_tar_dir, num_shards): + """Extract and pack sound files listed in the manifest files into multple + tar files and upload them to padldecloud. Besides, generate new manifest + files with updated paths in paddlecloud. """ - out_tar = tarfile.open(out_tar_path, 'w') - manifest = read_manifest(manifest_path) - results = [] - for json_data in manifest: - sound_file = json_data['audio_filepath'] - filename = os.path.basename(sound_file) - out_tar.add(sound_file, arcname=filename) - json_data['audio_filepath'] = filename - results.append("%s\n" % json.dumps(json_data)) - with open(out_manifest_path, 'w') as out_manifest: - out_manifest.writelines(results) - out_manifest.close() - out_tar.close() + # compute total audio number + total_line = 0 + for manifest_path in in_manifest_path_list: + with open(manifest_path, 'r') as f: + total_line += len(f.readlines()) + line_per_tar = (total_line // num_shards) + 1 + + # pack and upload shard by shard + line_count, tar_file = 0, None + for manifest_path, out_manifest_path in zip(in_manifest_path_list, + out_manifest_path_list): + manifest = read_manifest(manifest_path) + out_manifest = [] + for json_data in manifest: + sound_filepath = json_data['audio_filepath'] + sound_filename = os.path.basename(sound_filepath) + if line_count % line_per_tar == 0: + if tar_file != None: + tar_file.close() + pcloud_cp(tar_path, upload_tar_dir) + os.remove(tar_path) + tar_name = 'part-%s-of-%s.tar' % ( + str(line_count // line_per_tar).zfill(5), + str(num_shards).zfill(5)) + tar_path = os.path.join(local_tmp_dir, tar_name) + tar_file = tarfile.open(tar_path, 'w') + tar_file.add(sound_filepath, arcname=sound_filename) + line_count += 1 + json_data['audio_filepath'] = "tar:%s#%s" % ( + os.path.join(upload_tar_dir, tar_name), sound_filename) + out_manifest.append("%s\n" % json.dumps(json_data)) + with open(out_manifest_path, 'w') as f: + f.writelines(out_manifest) + tar_file.close() + pcloud_cp(tar_path, upload_tar_dir) + os.remove(tar_path) def pcloud_mkdir(dir): @@ -99,44 +111,12 @@ def pcloud_cp(src, dst): raise IOError("PaddleCloud cp failed: from [%s] to [%s]." % (src, dst)) -def pcloud_exist(path): - """Check if file or directory exists in PaddleCloud filesystem. - """ - ret = call(['paddlecloud', 'ls', path]) - return ret - - if __name__ == '__main__': - cloud_train_manifest = os.path.join(args.cloud_data_path, TRAIN_MANIFEST) - cloud_train_tar = os.path.join(args.cloud_data_path, TRAIN_TAR) - cloud_dev_manifest = os.path.join(args.cloud_data_path, DEV_MANIFEST) - cloud_dev_tar = os.path.join(args.cloud_data_path, DEV_TAR) - cloud_vocab_file = os.path.join(args.cloud_data_path, VOCAB_FILE) - cloud_mean_file = os.path.join(args.cloud_data_path, MEAN_STD_FILE) - - local_train_manifest = os.path.join(args.local_tmp_path, TRAIN_MANIFEST) - local_train_tar = os.path.join(args.local_tmp_path, TRAIN_TAR) - local_dev_manifest = os.path.join(args.local_tmp_path, DEV_MANIFEST) - local_dev_tar = os.path.join(args.local_tmp_path, DEV_TAR) - - # prepare local and cloud dir - if os.path.exists(args.local_tmp_path): - shutil.rmtree(args.local_tmp_path) - os.makedirs(args.local_tmp_path) - pcloud_mkdir(args.cloud_data_path) - - # pack and upload train data - pack_data(args.train_manifest_path, local_train_tar, local_train_manifest) - pcloud_cp(local_train_manifest, cloud_train_manifest) - pcloud_cp(local_train_tar, cloud_train_tar) - - # pack and upload validation data - pack_data(args.dev_manifest_path, local_dev_tar, local_dev_manifest) - pcloud_cp(local_dev_manifest, cloud_dev_manifest) - pcloud_cp(local_dev_tar, cloud_dev_tar) + if not os.path.exists(args.local_tmp_dir): + os.makedirs(args.local_tmp_dir) + pcloud_mkdir(args.cloud_data_dir) - # upload vocab file and mean_std file - pcloud_cp(args.vocab_file, cloud_vocab_file) - pcloud_cp(args.mean_std_file, cloud_mean_file) + upload_data(args.in_manifest_paths, args.out_manifest_paths, + args.local_tmp_dir, args.cloud_data_dir, 10) - shutil.rmtree(args.local_tmp_path) + shutil.rmtree(args.local_tmp_dir)