Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make buffer queues be objects of class AsyncReader. #776

Merged
merged 1 commit into from
Mar 27, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 12 additions & 15 deletions fluid/DeepASR/data_utils/async_data_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

class SampleInfo(object):
"""SampleInfo holds the necessary information to load a sample from disk.

Args:
feature_bin_path (str): File containing the feature data.
feature_start (int): Start position of the sample's feature data.
Expand Down Expand Up @@ -55,7 +54,6 @@ class SampleInfoBucket(object):
data, sample start position, sample byte number etc.) to access samples'
feature data and the same with the label description file. SampleInfoBucket
is the minimum unit to do shuffle.

Args:
feature_bin_paths (list|tuple): Files containing the binary feature
data.
Expand Down Expand Up @@ -165,7 +163,6 @@ def generate_sample_info_list(self):
class AsyncDataReader(object):
"""DataReader provides basic audio sample preprocessing pipeline including
data loading and data augmentation.

Args:
feature_file_list (str): File containing paths of feature data file and
corresponding description file.
Expand Down Expand Up @@ -209,15 +206,17 @@ def __init__(self,
self.generate_bucket_list(True)
self._order_id = 0
self._manager = Manager()
self._sample_buffer_size = sample_buffer_size
self._sample_info_buffer_size = sample_info_buffer_size
self._batch_buffer_size = batch_buffer_size
self._proc_num = proc_num
if self._proc_num <= 2:
raise ValueError("Value of `proc_num` should be greater than 2.")
self._sample_proc_num = self._proc_num - 2
self._verbose = verbose
self._force_exit = ForceExitWrapper(self._manager.Value('b', False))
# buffer queue
self._sample_info_queue = self._manager.Queue(sample_info_buffer_size)
self._sample_queue = self._manager.Queue(sample_buffer_size)
self._batch_queue = self._manager.Queue(batch_buffer_size)

def generate_bucket_list(self, is_shuffle):
if self._block_info_list is None:
Expand Down Expand Up @@ -258,8 +257,6 @@ def recycle(self, *args):
shared_ndarray.recycle(self._pool_manager.pool)

def _start_async_processing(self):
sample_info_queue = self._manager.Queue(self._sample_info_buffer_size)
sample_queue = self._manager.Queue(self._sample_buffer_size)
self._order_id = 0

@suppress_complaints(verbose=self._verbose, notify=self._force_exit)
Expand All @@ -284,7 +281,9 @@ def ordered_feeding_task(sample_info_queue):
sample_info_queue.put(EpochEndSignal())

feeding_proc = DaemonProcessGroup(
proc_num=1, target=ordered_feeding_task, args=(sample_info_queue, ))
proc_num=1,
target=ordered_feeding_task,
args=(self._sample_info_queue, ))
feeding_proc.start_all()

@suppress_complaints(verbose=self._verbose, notify=self._force_exit)
Expand Down Expand Up @@ -361,15 +360,13 @@ def read_bytes(fpath, start, size):
sample_queue.put(EpochEndSignal())

out_order = self._manager.list([0])
args = (sample_info_queue, sample_queue, out_order)
args = (self._sample_info_queue, self._sample_queue, out_order)
sample_proc = DaemonProcessGroup(
proc_num=self._sample_proc_num,
target=ordered_processing_task,
args=args)
sample_proc.start_all()

return sample_queue

def batch_iterator(self, batch_size, minimum_batch_size):
@suppress_complaints(verbose=self._verbose, notify=self._force_exit)
def batch_assembling_task(sample_queue, batch_queue, pool):
Expand Down Expand Up @@ -419,21 +416,21 @@ def conv_to_shared(ndarray):

batch_queue.put(EpochEndSignal())

sample_queue = self._start_async_processing()
batch_queue = self._manager.Queue(self._batch_buffer_size)
self._start_async_processing()

self._pool_manager = SharedMemoryPoolManager(self._batch_buffer_size *
3, self._manager)

assembling_proc = DaemonProcessGroup(
proc_num=1,
target=batch_assembling_task,
args=(sample_queue, batch_queue, self._pool_manager.pool))
args=(self._sample_queue, self._batch_queue,
self._pool_manager.pool))
assembling_proc.start_all()

while self._force_exit == False:
try:
batch_data = batch_queue.get_nowait()
batch_data = self._batch_queue.get_nowait()
except Queue.Empty:
time.sleep(0.001)
else:
Expand Down