diff --git a/data_utils/data.py b/data_utils/data.py index 70ee6fbad..1c35f654c 100644 --- a/data_utils/data.py +++ b/data_utils/data.py @@ -11,6 +11,7 @@ import multiprocessing import numpy as np import paddle.v2 as paddle from threading import local +import atexit from data_utils.utility import read_manifest from data_utils.utility import xmap_readers_mp from data_utils.augmentor.augmentation import AugmentationPipeline @@ -274,13 +275,18 @@ class DataGenerator(object): for instance in manifest: yield instance - return xmap_readers_mp( + reader, cleanup_callback = xmap_readers_mp( lambda instance: self.process_utterance(instance["audio_filepath"], instance["text"]), reader, self._num_threads, 4096, order=True) + # register callback to main process + atexit.register(cleanup_callback) + + return reader + def _padding_batch(self, batch, padding_to=-1, flatten=False): """ Padding audio features with zeros to make them have the same shape (or diff --git a/data_utils/utility.py b/data_utils/utility.py index 49eed6d8d..bb5cad45b 100644 --- a/data_utils/utility.py +++ b/data_utils/utility.py @@ -138,6 +138,10 @@ def xmap_readers_mp(mapper, reader, process_num, buffer_size, order=False): out_queue.put(sample) out_queue.put(end_flag) + def cleanup(): + # kill all sub process and threads + os._exit(0) + def xreader(): # prepare shared memory manager = Manager() @@ -174,4 +178,4 @@ def xmap_readers_mp(mapper, reader, process_num, buffer_size, order=False): yield sample sample = flush_queue.get() - return xreader + return xreader, cleanup