Seperate data uploading from job summission for DS2 cloud training and add support for multiple shards uploading.

pull/2/head
Xinghai Sun 7 years ago
parent af71fccee5
commit f75746cd31

@ -1,32 +1,11 @@
# Configure input data set in local filesystem
TRAIN_MANIFEST="../datasets/manifest.train"
DEV_MANIFEST="../datasets/manifest.dev"
VOCAB_FILE="../datasets/vocab/eng_vocab.txt"
MEAN_STD_FILE="../mean_std.npz"
# Configure output path in PaddleCloud filesystem
CLOUD_DATA_DIR="/pfs/dlnel/home/sunxinghai@baidu.com/deepspeech2/data"
TRAIN_MANIFEST="cloud/cloud.manifest.test"
DEV_MANIFEST="cloud/cloud.manifest.dev"
CLOUD_MODEL_DIR="/pfs/dlnel/home/sunxinghai@baidu.com/deepspeech2/model"
# Configure cloud resources
NUM_CPU=8
BATCH_SIZE=256
NUM_GPU=8
NUM_NODE=1
MEMORY="10Gi"
IS_LOCAL="True"
# Pack and upload local data to PaddleCloud filesystem
python upload_data.py \
--train_manifest_path=${TRAIN_MANIFEST} \
--dev_manifest_path=${DEV_MANIFEST} \
--vocab_file=${VOCAB_FILE} \
--mean_std_file=${MEAN_STD_FILE} \
--cloud_data_path=${CLOUD_DATA_DIR}
if [ $? -ne 0 ]
then
echo "upload data failed!"
exit 1
fi
# Submit job to PaddleCloud
JOB_NAME=deepspeech-`date +%Y%m%d%H%M%S`
DS2_PATH=${PWD%/*}
cp -f pcloud_train.sh ${DS2_PATH}
@ -34,15 +13,15 @@ cp -f pcloud_train.sh ${DS2_PATH}
paddlecloud submit \
-image bootstrapper:5000/wanghaoshuang/pcloud_ds2:latest \
-jobname ${JOB_NAME} \
-cpu ${NUM_CPU} \
-cpu ${NUM_GPU} \
-gpu ${NUM_GPU} \
-memory ${MEMORY} \
-memory 10Gi \
-parallelism ${NUM_NODE} \
-pscpu 1 \
-pservers 1 \
-psmemory ${MEMORY} \
-psmemory 10Gi \
-passes 1 \
-entry "sh pcloud_train.sh ${CLOUD_DATA_DIR} ${CLOUD_MODEL_DIR} ${NUM_CPU} ${NUM_GPU} ${IS_LOCAL}" \
-entry "sh pcloud_train.sh ${TRAIN_MANIFEST} ${DEV_MANIFEST} ${CLOUD_MODEL_DIR} ${NUM_GPU} ${BATCH_SIZE} ${IS_LOCAL}" \
${DS2_PATH}
rm ${DS2_PATH}/pcloud_train.sh

@ -1,36 +1,24 @@
DATA_PATH=$1
MODEL_PATH=$2
NUM_CPU=$3
TRAIN_MANIFEST=$1
DEV_MANIFEST=$2
MODEL_PATH=$3
NUM_GPU=$4
IS_LOCAL=$5
BATCH_SIZE=$5
IS_LOCAL=$6
TRAIN_MANI=${DATA_PATH}/cloud.train.manifest
DEV_MANI=${DATA_PATH}/cloud.dev.manifest
TRAIN_TAR=${DATA_PATH}/cloud.train.tar
DEV_TAR=${DATA_PATH}/cloud.dev.tar
VOCAB_PATH=${DATA_PATH}/vocab.txt
MEAN_STD_FILE=${DATA_PATH}/mean_std.npz
# split train data for each pcloud node
python ./cloud/split_data.py \
--in_manifest_path=${TRAIN_MANI} \
--data_tar_path=${TRAIN_TAR} \
--out_manifest_path='/local.train.manifest'
--in_manifest_path=${TRAIN_MANIFEST} \
--out_manifest_path='/local.manifest.train'
# split dev data for each pcloud node
python ./cloud/split_data.py \
--in_manifest_path=${DEV_MANI} \
--data_tar_path=${DEV_TAR} \
--out_manifest_path='/local.dev.manifest'
--in_manifest_path=${DEV_MANIFEST} \
--out_manifest_path='/local.manifest.dev'
# run train
python train.py \
--batch_size=$BATCH_SIZE \
--use_gpu=1 \
--trainer_count=${NUM_GPU} \
--num_threads_data=${NUM_CPU} \
--num_threads_data=${NUM_GPU} \
--is_local=${IS_LOCAL} \
--mean_std_filepath=${MEAN_STD_FILE} \
--train_manifest_path='/local.train.manifest' \
--dev_manifest_path='/local.dev.manifest' \
--vocab_filepath=${VOCAB_PATH} \
--output_model_dir=${MODEL_PATH}
--train_manifest_path='/local.manifest.train' \
--dev_manifest_path='/local.manifest.dev' \
--output_model_dir=${MODEL_PATH} \

@ -0,0 +1,17 @@
IN_MANIFESTS="../datasets/manifest.tmp ../datasets/manifest.dev ../datasets/manifest.test"
OUT_MANIFESTS="./cloud.manifest.tmp ./cloud.manifest.dev ./cloud.manifest.test"
CLOUD_DATA_DIR="/pfs/dlnel/home/sunxinghai@baidu.com/deepspeech2/data"
NUM_SHARDS=10
python upload_data.py \
--in_manifest_paths ${IN_MANIFESTS} \
--out_manifest_paths ${OUT_MANIFESTS} \
--cloud_data_dir ${CLOUD_DATA_DIR} \
--num_shards ${NUM_SHARDS}
if [ $? -ne 0 ]
then
echo "Upload Data Failed!"
exit 1
fi
echo "All Done."

@ -1,7 +1,5 @@
"""This tool is used for splitting data into each node of
paddle cloud by total trainer count and current trainer id.
The meaning of trainer is a instance of k8s cluster.
This script should be called in paddle cloud.
paddlecloud. This script should be called in paddlecloud.
"""
from __future__ import absolute_import
from __future__ import division
@ -14,40 +12,30 @@ import argparse
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--in_manifest_path",
default='./cloud.train.manifest',
type=str,
help="Input manifest path. (default: %(default)s)")
parser.add_argument(
"--data_tar_path",
default='./cloud.train.tar',
type=str,
help="Data tar file path. (default: %(default)s)")
required=True,
help="Input manifest path for all nodes.")
parser.add_argument(
"--out_manifest_path",
default='./local.train.manifest',
type=str,
help="Out manifest file path. (default: %(default)s)")
required=True,
help="Output manifest file path for current node.")
args = parser.parse_args()
def split_data(in_manifest, tar_path, out_manifest):
def split_data(in_manifest_path, out_manifest_path):
with open("/trainer_id", "r") as f:
trainer_id = int(f.readline()[:-1])
with open("/trainer_count", "r") as f:
trainer_count = int(f.readline()[:-1])
tar_path = os.path.abspath(tar_path)
result = []
for index, json_line in enumerate(open(in_manifest)):
out_manifest = []
for index, json_line in enumerate(open(in_manifest_path, 'r')):
if (index % trainer_count) == trainer_id:
json_data = json.loads(json_line)
json_data['audio_filepath'] = "tar:%s#%s" % (
tar_path, json_data['audio_filepath'])
result.append("%s\n" % json.dumps(json_data))
with open(out_manifest, 'w') as manifest:
manifest.writelines(result)
out_manifest.append("%s\n" % json_line.strip())
with open(out_manifest_path, 'w') as f:
f.writelines(out_manifest)
if __name__ == '__main__':
split_data(args.in_manifest_path, args.data_tar_path,
args.out_manifest_path)
split_data(args.in_manifest_path, args.out_manifest_path)

@ -1,12 +1,9 @@
"""This script is used for preparing data for DeepSpeech2 trainning on paddle
cloud.
"""This script is for uploading data for DeepSpeech2 training on paddlecloud.
Steps:
1. Read original manifest and get the local path of sound files.
2. Tar all local sound files into one tar file.
3. Modify original manifest to remove the local path information.
Finally, we will get a tar file and a new manifest.
1. Read original manifests and extract local sound files.
2. Tar all local sound files into multiple tar files and upload them.
3. Modify original manifests with updated paths in cloud filesystem.
"""
from __future__ import absolute_import
from __future__ import division
@ -22,66 +19,81 @@ from subprocess import call
import _init_paths
from data_utils.utils import read_manifest
TRAIN_TAR = "cloud.train.tar"
TRAIN_MANIFEST = "cloud.train.manifest"
DEV_TAR = "cloud.dev.tar"
DEV_MANIFEST = "cloud.dev.manifest"
VOCAB_FILE = "vocab.txt"
MEAN_STD_FILE = "mean_std.npz"
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--train_manifest_path",
default="../datasets/manifest.train",
type=str,
help="Manifest file path for train data. (default: %(default)s)")
parser.add_argument(
"--dev_manifest_path",
default="../datasets/manifest.dev",
"--in_manifest_paths",
default=["../datasets/manifest.test", "../datasets/manifest.dev"],
type=str,
help="Manifest file path for validation data. (default: %(default)s)")
parser.add_argument(
"--vocab_file",
default="../datasets/vocab/eng_vocab.txt",
type=str,
help="Vocabulary file to be uploaded to paddlecloud. "
nargs='+',
help="Local filepaths of input manifests to load, pack and upload."
"(default: %(default)s)")
parser.add_argument(
"--mean_std_file",
default="../mean_std.npz",
"--out_manifest_paths",
default=["./cloud.manifest.test", "./cloud.manifest.dev"],
type=str,
help="Normalizer's statistics (mean and stddev) file to be uploaded to "
"paddlecloud. (default: %(default)s)")
nargs='+',
help="Local filepaths of modified manifests to write to. "
"(default: %(default)s)")
parser.add_argument(
"--cloud_data_path",
"--cloud_data_dir",
required=True,
type=str,
help="Destination path on paddlecloud. (default: %(default)s)")
help="Destination directory on paddlecloud to upload data to.")
parser.add_argument(
"--num_shards",
default=10,
type=int,
help="Number of parts to split data to. (default: %(default)s)")
parser.add_argument(
"--local_tmp_path",
"--local_tmp_dir",
default="./tmp/",
type=str,
help="Local directory for storing temporary data. (default: %(default)s)")
args = parser.parse_args()
def pack_data(manifest_path, out_tar_path, out_manifest_path):
"""1. According to the manifest, tar sound files into out_tar_path.
2. Generate a new manifest for output tar file.
def upload_data(in_manifest_path_list, out_manifest_path_list, local_tmp_dir,
upload_tar_dir, num_shards):
"""Extract and pack sound files listed in the manifest files into multple
tar files and upload them to padldecloud. Besides, generate new manifest
files with updated paths in paddlecloud.
"""
out_tar = tarfile.open(out_tar_path, 'w')
manifest = read_manifest(manifest_path)
results = []
for json_data in manifest:
sound_file = json_data['audio_filepath']
filename = os.path.basename(sound_file)
out_tar.add(sound_file, arcname=filename)
json_data['audio_filepath'] = filename
results.append("%s\n" % json.dumps(json_data))
with open(out_manifest_path, 'w') as out_manifest:
out_manifest.writelines(results)
out_manifest.close()
out_tar.close()
# compute total audio number
total_line = 0
for manifest_path in in_manifest_path_list:
with open(manifest_path, 'r') as f:
total_line += len(f.readlines())
line_per_tar = (total_line // num_shards) + 1
# pack and upload shard by shard
line_count, tar_file = 0, None
for manifest_path, out_manifest_path in zip(in_manifest_path_list,
out_manifest_path_list):
manifest = read_manifest(manifest_path)
out_manifest = []
for json_data in manifest:
sound_filepath = json_data['audio_filepath']
sound_filename = os.path.basename(sound_filepath)
if line_count % line_per_tar == 0:
if tar_file != None:
tar_file.close()
pcloud_cp(tar_path, upload_tar_dir)
os.remove(tar_path)
tar_name = 'part-%s-of-%s.tar' % (
str(line_count // line_per_tar).zfill(5),
str(num_shards).zfill(5))
tar_path = os.path.join(local_tmp_dir, tar_name)
tar_file = tarfile.open(tar_path, 'w')
tar_file.add(sound_filepath, arcname=sound_filename)
line_count += 1
json_data['audio_filepath'] = "tar:%s#%s" % (
os.path.join(upload_tar_dir, tar_name), sound_filename)
out_manifest.append("%s\n" % json.dumps(json_data))
with open(out_manifest_path, 'w') as f:
f.writelines(out_manifest)
tar_file.close()
pcloud_cp(tar_path, upload_tar_dir)
os.remove(tar_path)
def pcloud_mkdir(dir):
@ -99,44 +111,12 @@ def pcloud_cp(src, dst):
raise IOError("PaddleCloud cp failed: from [%s] to [%s]." % (src, dst))
def pcloud_exist(path):
"""Check if file or directory exists in PaddleCloud filesystem.
"""
ret = call(['paddlecloud', 'ls', path])
return ret
if __name__ == '__main__':
cloud_train_manifest = os.path.join(args.cloud_data_path, TRAIN_MANIFEST)
cloud_train_tar = os.path.join(args.cloud_data_path, TRAIN_TAR)
cloud_dev_manifest = os.path.join(args.cloud_data_path, DEV_MANIFEST)
cloud_dev_tar = os.path.join(args.cloud_data_path, DEV_TAR)
cloud_vocab_file = os.path.join(args.cloud_data_path, VOCAB_FILE)
cloud_mean_file = os.path.join(args.cloud_data_path, MEAN_STD_FILE)
local_train_manifest = os.path.join(args.local_tmp_path, TRAIN_MANIFEST)
local_train_tar = os.path.join(args.local_tmp_path, TRAIN_TAR)
local_dev_manifest = os.path.join(args.local_tmp_path, DEV_MANIFEST)
local_dev_tar = os.path.join(args.local_tmp_path, DEV_TAR)
# prepare local and cloud dir
if os.path.exists(args.local_tmp_path):
shutil.rmtree(args.local_tmp_path)
os.makedirs(args.local_tmp_path)
pcloud_mkdir(args.cloud_data_path)
# pack and upload train data
pack_data(args.train_manifest_path, local_train_tar, local_train_manifest)
pcloud_cp(local_train_manifest, cloud_train_manifest)
pcloud_cp(local_train_tar, cloud_train_tar)
# pack and upload validation data
pack_data(args.dev_manifest_path, local_dev_tar, local_dev_manifest)
pcloud_cp(local_dev_manifest, cloud_dev_manifest)
pcloud_cp(local_dev_tar, cloud_dev_tar)
if not os.path.exists(args.local_tmp_dir):
os.makedirs(args.local_tmp_dir)
pcloud_mkdir(args.cloud_data_dir)
# upload vocab file and mean_std file
pcloud_cp(args.vocab_file, cloud_vocab_file)
pcloud_cp(args.mean_std_file, cloud_mean_file)
upload_data(args.in_manifest_paths, args.out_manifest_paths,
args.local_tmp_dir, args.cloud_data_dir, 10)
shutil.rmtree(args.local_tmp_path)
shutil.rmtree(args.local_tmp_dir)

Loading…
Cancel
Save