Make ds2 run on paddle cloud

1. Refine data_utils/data.py to read bytes from tar file
2. Add scripts to submit paddle cloud job for ds2 trainning
pull/2/head
wanghaoshuang 8 years ago
parent db37c34919
commit 3c77d369ca

@ -8,13 +8,20 @@ from __future__ import print_function
import random import random
import numpy as np import numpy as np
import multiprocessing import multiprocessing
from threading import local
import paddle.v2 as paddle import paddle.v2 as paddle
import tarfile
from data_utils import utils from data_utils import utils
from data_utils.augmentor.augmentation import AugmentationPipeline from data_utils.augmentor.augmentation import AugmentationPipeline
from data_utils.featurizer.speech_featurizer import SpeechFeaturizer from data_utils.featurizer.speech_featurizer import SpeechFeaturizer
from data_utils.speech import SpeechSegment from data_utils.speech import SpeechSegment
from data_utils.normalizer import FeatureNormalizer from data_utils.normalizer import FeatureNormalizer
# for caching tar files info
local_data = local()
local_data.tar2info = {}
local_data.tar2object = {}
class DataGenerator(object): class DataGenerator(object):
""" """
@ -45,9 +52,6 @@ class DataGenerator(object):
:types max_freq: None|float :types max_freq: None|float
:param specgram_type: Specgram feature type. Options: 'linear'. :param specgram_type: Specgram feature type. Options: 'linear'.
:type specgram_type: str :type specgram_type: str
:param use_dB_normalization: Whether to normalize the audio to -20 dB
before extracting the features.
:type use_dB_normalization: bool
:param num_threads: Number of CPU threads for processing data. :param num_threads: Number of CPU threads for processing data.
:type num_threads: int :type num_threads: int
:param random_seed: Random seed. :param random_seed: Random seed.
@ -64,7 +68,6 @@ class DataGenerator(object):
window_ms=20.0, window_ms=20.0,
max_freq=None, max_freq=None,
specgram_type='linear', specgram_type='linear',
use_dB_normalization=True,
num_threads=multiprocessing.cpu_count(), num_threads=multiprocessing.cpu_count(),
random_seed=0): random_seed=0):
self._max_duration = max_duration self._max_duration = max_duration
@ -77,12 +80,15 @@ class DataGenerator(object):
specgram_type=specgram_type, specgram_type=specgram_type,
stride_ms=stride_ms, stride_ms=stride_ms,
window_ms=window_ms, window_ms=window_ms,
max_freq=max_freq, max_freq=max_freq)
use_dB_normalization=use_dB_normalization)
self._num_threads = num_threads self._num_threads = num_threads
self._rng = random.Random(random_seed) self._rng = random.Random(random_seed)
self._epoch = 0 self._epoch = 0
# for caching tar files info
self.tar2info = {}
self.tar2object = {}
def batch_reader_creator(self, def batch_reader_creator(self,
manifest_path, manifest_path,
batch_size, batch_size,
@ -198,9 +204,41 @@ class DataGenerator(object):
""" """
return self._speech_featurizer.vocab_list return self._speech_featurizer.vocab_list
def _parse_tar(self, file):
"""
Parse a tar file to get a tarfile object and a map containing tarinfoes
"""
result = {}
f = tarfile.open(file)
for tarinfo in f.getmembers():
result[tarinfo.name] = tarinfo
return f, result
def _read_soundbytes(self, filepath):
"""
Read bytes from file.
If filepath startwith tar, we will read bytes from tar file
and cached tar file info for next reading request.
"""
if filepath.startswith('tar:'):
tarpath, filename = filepath.split(':', 1)[1].split('#', 1)
if 'tar2info' not in local_data.__dict__:
local_data.tar2info = {}
if 'tar2object' not in local_data.__dict__:
local_data.tar2object = {}
if tarpath not in local_data.tar2info:
object, infoes = self._parse_tar(tarpath)
local_data.tar2info[tarpath] = infoes
local_data.tar2object[tarpath] = object
return local_data.tar2object[tarpath].extractfile(
local_data.tar2info[tarpath][filename]).read()
else:
return open(filepath).read()
def _process_utterance(self, filename, transcript): def _process_utterance(self, filename, transcript):
"""Load, augment, featurize and normalize for speech data.""" """Load, augment, featurize and normalize for speech data."""
speech_segment = SpeechSegment.from_file(filename, transcript) speech_segment = SpeechSegment.from_bytes(
self._read_soundbytes(filename), transcript)
self._augmentation_pipeline.transform_audio(speech_segment) self._augmentation_pipeline.transform_audio(speech_segment)
specgram, text_ids = self._speech_featurizer.featurize(speech_segment) specgram, text_ids = self._speech_featurizer.featurize(speech_segment)
specgram = self._normalizer.apply(specgram) specgram = self._normalizer.apply(specgram)

@ -0,0 +1,51 @@
import json
import os
import tarfile
import sys
import argparse
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--manifest_path",
default="/manifest.train",
type=str,
help="Manifest of target data. (default: %(default)s)")
parser.add_argument(
"--out_tar_path",
default="/dev.tar",
type=str,
help="Output tar file path. (default: %(default)s)")
parser.add_argument(
"--out_manifest_path",
default="/dev.mani",
type=str,
help="Manifest of output data. (default: %(default)s)")
args = parser.parse_args()
def gen_pcloud_data(manifest_path, out_tar_path, out_manifest_path):
'''
1. According manifest, tar sound files into out_tar_path
2. Generate a new manifest for output tar file
'''
out_tar = tarfile.open(out_tar_path, 'w')
manifest = []
for json_line in open(manifest_path):
try:
json_data = json.loads(json_line)
except Exception as e:
raise IOError("Error reading manifest: %s" % str(e))
sound_file = json_data['audio_filepath']
filename = os.path.basename(sound_file)
out_tar.add(sound_file, arcname=filename)
json_data['audio_filepath'] = filename
manifest.append("%s\n" % json.dumps(json_data))
with open(out_manifest_path, 'w') as out_manifest:
out_manifest.writelines(manifest)
out_manifest.close()
out_tar.close()
if __name__ == '__main__':
gen_pcloud_data(args.manifest_path, args.out_tar_path,
args.out_manifest_path)

@ -0,0 +1,47 @@
import os
import json
import argparse
def split_data(inManifest, tar_path, outManifest):
trainer_id = 1
trainer_count = 2
#with open("/trainer_id", "r") as f:
# trainer_id = int(f.readline()[:-1])
#with open("/trainer_count", "r") as f:
# trainer_count = int(f.readline()[:-1])
tarPath = os.path.abspath(tar_path)
result = []
for index, json_line in enumerate(open(inManifest)):
if (index % trainer_count) == trainer_id:
json_data = json.loads(json_line)
json_data['audio_filepath'] = "tar:%s#%s" % (
tarPath, json_data['audio_filepath'])
result.append("%s\n" % json.dumps(json_data))
with open(outManifest, 'w') as manifest:
manifest.writelines(result)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--in_manifest_path",
default='datasets/dev.mani',
type=str,
help="Input manifest path. (default: %(default)s)")
parser.add_argument(
"--data_tar_path",
default='datasets/dev.tar',
type=str,
help="Data tar file path. (default: %(default)s)")
parser.add_argument(
"--out_manifest_path",
default='datasets/dev.mani.split',
type=str,
help="Out manifest file path. (default: %(default)s)")
args = parser.parse_args()
split_data(args.in_manifest_path, args.data_tar_path,
args.out_manifest_path)

@ -0,0 +1,13 @@
paddlecloud submit \
-image wanghaoshuang/pcloud_ds2 \
-jobname ds23 \
-cpu 1 \
-gpu 0 \
-memory 10Gi \
-parallelism 1 \
-pscpu 1 \
-pservers 1 \
-psmemory 10Gi \
-passes 1 \
-entry "sh pcloud_train.sh" \
./deep_speech_2

@ -0,0 +1,32 @@
#setted by user
TRAIN_MANI='/pfs/dlnel/home/yanxu05@baidu.com/wanghaoshuang/data/ds2_data/demo.mani'
#setted by user
DEV_MANI='/pfs/dlnel/home/yanxu05@baidu.com/wanghaoshuang/data/ds2_data/demo.mani'
#setted by user
TRAIN_TAR='/pfs/dlnel/home/yanxu05@baidu.com/wanghaoshuang/data/ds2_data/demo.tar'
#setted by user
DEV_TAR='/pfs/dlnel/home/yanxu05@baidu.com/wanghaoshuang/data/ds2_data/demo.tar'
#setted by user
VOCAB_PATH='/pfs/dlnel/home/yanxu05@baidu.com/wanghaoshuang/data/ds2_data/eng_vocab.txt'
#setted by user
MEAN_STD_FILE='/pfs/dlnel/home/yanxu05@baidu.com/wanghaoshuang/data/ds2_data/mean_std.npz'
# split train data for each pcloud node
python pcloud_split_data.py \
--in_manifest_path=$TRAIN_MANI \
--data_tar_path=$TRAIN_TAR \
--out_manifest_path='./train.mani'
# split dev data for each pcloud node
python pcloud_split_data.py \
--in_manifest_path=$DEV_MANI \
--data_tar_path=$DEV_TAR \
--out_manifest_path='./dev.mani'
python train.py \
--use_gpu=0 \
--trainer_count=4 \
--batch_size=2 \
--mean_std_filepath=$MEAN_STD_FILE \
--train_manifest_path='./train.mani' \
--dev_manifest_path='./dev.mani' \
--vocab_filepath=$VOCAB_PATH \
Loading…
Cancel
Save