From 46ae4075eec45241ddf69b830b7f724f30e63fc7 Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Mon, 12 Mar 2018 14:55:31 +0800 Subject: [PATCH 1/5] Polish ShuffleReader and test --- .../reader/create_shuffle_reader_op.cc | 75 +++++++++++-------- python/paddle/fluid/layers/io.py | 23 +++++- python/paddle/fluid/recordio_writer.py | 3 + .../tests/unittests/test_recordio_reader.py | 13 +++- 4 files changed, 79 insertions(+), 35 deletions(-) diff --git a/paddle/fluid/operators/reader/create_shuffle_reader_op.cc b/paddle/fluid/operators/reader/create_shuffle_reader_op.cc index 4dac3831109be..70e2f587dc414 100644 --- a/paddle/fluid/operators/reader/create_shuffle_reader_op.cc +++ b/paddle/fluid/operators/reader/create_shuffle_reader_op.cc @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include "glog/logging.h" +#include "paddle/fluid/operators/detail/safe_ref.h" #include "paddle/fluid/operators/reader/reader_op_registry.h" namespace paddle { @@ -20,43 +23,53 @@ namespace reader { class ShuffleReader : public framework::DecoratedReader { public: - ShuffleReader(ReaderBase* reader, int buffer_size) - : DecoratedReader(reader), buffer_size_(buffer_size), iteration_pos_(0) { - buffer_.reserve(buffer_size); + ShuffleReader(ReaderBase* reader, size_t buffer_size, size_t seed = 0) + : DecoratedReader(reader), buffer_size_(buffer_size), seed_(seed) { + VLOG(10) << "Create shuffle reader of " << reader_; + if (seed_ == 0) { + std::random_device device; + seed_ = device(); + } + ReadIntoBuffers(); } - void ReadNext(std::vector* out) override; + void ReadNext(std::vector* out) override { + if (iteration_pos_ >= buffer_.size()) { + VLOG(10) << "Resetting shuffle buffer"; + ReadIntoBuffers(); + } + *out = buffer_[iteration_pos_++]; + } - private: - int buffer_size_; - std::vector> buffer_; - size_t iteration_pos_; -}; + bool HasNext() const override { + return iteration_pos_ < buffer_.size() || reader_->HasNext(); + } -void ShuffleReader::ReadNext(std::vector* out) { - if (iteration_pos_ >= buffer_.size()) { - // Reload buffer with new data + private: + void ReadIntoBuffers() { buffer_.clear(); buffer_.reserve(buffer_size_); - for (int i = 0; i < buffer_size_; ++i) { - buffer_.push_back(std::vector()); - reader_->ReadNext(&buffer_.back()); - if (buffer_.back().empty()) { - buffer_.pop_back(); + iteration_pos_ = 0; + PADDLE_ENFORCE(reader_->HasNext()); + for (size_t i = 0; i < buffer_size_; ++i) { + if (!reader_->HasNext()) { break; } + buffer_.emplace_back(); + reader_->ReadNext(&buffer_.back()); } - // TODO(fengjiayi): 'std::random_shuffle' can be very slow. It needs to be - // optimize. - std::random_shuffle(buffer_.begin(), buffer_.end()); - iteration_pos_ = 0; + std::mt19937 g(seed_); + std::shuffle(buffer_.begin(), buffer_.end(), g); + seed_ = g(); // update seed_; + VLOG(10) << "random buffer size = " << buffer_.size(); } - out->clear(); - if (!buffer_.empty()) { - std::swap(*out, buffer_[iteration_pos_++]); - } - // if buffer_ is empty, the 'out' will return as an empty vector. -} + + size_t buffer_size_; + std::vector> buffer_; + + size_t iteration_pos_; + size_t seed_; +}; class CreateShuffleReaderOp : public framework::OperatorBase { public: @@ -67,10 +80,10 @@ class CreateShuffleReaderOp : public framework::OperatorBase { const platform::Place& dev_place) const override { const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader")) ->Get(); - auto* out = scope.FindVar(Output("Out")) - ->template GetMutable(); - out->Reset( - new ShuffleReader(underlying_reader.Get(), Attr("buffer_size"))); + auto& var = detail::Ref(scope.FindVar(Output("Out"))); + var.GetMutable()->Reset( + new ShuffleReader(underlying_reader.Get(), + static_cast(Attr("buffer_size")))); } }; diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index f1b2af70205ab..81dd9789495a6 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -21,7 +21,7 @@ __all__ = [ 'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'open_recordio_file', - 'read_file' + 'read_file', 'create_shuffle_reader' ] @@ -245,6 +245,8 @@ def reset(): reader.eof = eof reader.reset = reset + reader.stop_gradient = True + reader.persistable = True return reader @@ -285,6 +287,25 @@ def open_recordio_file(filename, shapes, lod_levels, dtypes): startup_var) +def __create_decorated_reader__(op_type, reader, attrs): + var_name = unique_name(op_type) + startup_blk = default_startup_program().current_block() + startup_var = startup_blk.create_var(name=var_name) + startup_blk.append_op( + type=op_type, + inputs={'UnderlyingReader': reader}, + outputs={'Out': [startup_var]}, + attrs=attrs) + startup_var.persistable = True + return _copy_reader_var_(default_main_program().current_block(), + startup_var) + + +def create_shuffle_reader(reader, buffer_size): + return __create_decorated_reader__('create_shuffle_reader', reader, + {'buffer_size': int(buffer_size)}) + + def read_file(file_obj): helper = LayerHelper('read_file') out = [ diff --git a/python/paddle/fluid/recordio_writer.py b/python/paddle/fluid/recordio_writer.py index 9735df8c06113..5accaacd53611 100644 --- a/python/paddle/fluid/recordio_writer.py +++ b/python/paddle/fluid/recordio_writer.py @@ -36,6 +36,7 @@ def convert_reader_to_recordio_file( feed_order=None): if feed_order is None: feed_order = feeder.feed_names + counter = 0 with create_recordio_writer(filename, compressor, max_num_records) as writer: for batch in reader_creator(): @@ -43,3 +44,5 @@ def convert_reader_to_recordio_file( for each in feed_order: writer.append_tensor(res[each]) writer.complete_append_tensor() + counter += 1 + return counter diff --git a/python/paddle/fluid/tests/unittests/test_recordio_reader.py b/python/paddle/fluid/tests/unittests/test_recordio_reader.py index d249742bd30ec..cdebda5b7df1c 100644 --- a/python/paddle/fluid/tests/unittests/test_recordio_reader.py +++ b/python/paddle/fluid/tests/unittests/test_recordio_reader.py @@ -31,10 +31,10 @@ def setUp(self): name='label', shape=[1], dtype='int64'), ], place=fluid.CPUPlace()) - fluid.recordio_writer.convert_reader_to_recordio_file( + self.num_batches = fluid.recordio_writer.convert_reader_to_recordio_file( './mnist.recordio', reader, feeder) - def test_main(self): + def test_main(self, decorator_callback=None): # use new program with fluid.program_guard(fluid.Program(), fluid.Program()): data_file = fluid.layers.open_recordio_file( @@ -42,6 +42,8 @@ def test_main(self): shapes=[[-1, 784], [-1, 1]], lod_levels=[0, 0], dtypes=['float32', 'int64']) + if decorator_callback is not None: + data_file = decorator_callback(data_file) img, label = fluid.layers.read_file(data_file) hidden = fluid.layers.fc(input=img, size=100, act='tanh') @@ -56,9 +58,14 @@ def test_main(self): avg_loss_np = [] # train a pass + batch_id = 0 while not data_file.eof(): tmp, = exe.run(fetch_list=[avg_loss]) avg_loss_np.append(tmp) + batch_id += 1 data_file.reset() - + self.assertEqual(batch_id, self.num_batches) self.assertLess(avg_loss_np[-1], avg_loss_np[0]) + + def test_shuffle_reader(self): + self.test_main(decorator_callback=lambda reader: fluid.layers.create_shuffle_reader(reader, buffer_size=200)) From 2ea4a5d96c0d134c84651e691510f90c8b19f0fa Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Mon, 12 Mar 2018 15:39:31 +0800 Subject: [PATCH 2/5] Polish double buffer reader --- .../reader/create_double_buffer_reader_op.cc | 79 ++++++++++++++----- python/paddle/fluid/layers/io.py | 10 ++- .../tests/unittests/test_recordio_reader.py | 14 +++- 3 files changed, 81 insertions(+), 22 deletions(-) diff --git a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc index ba08ea12e2486..ca947fff4358d 100644 --- a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc +++ b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc @@ -24,11 +24,16 @@ static constexpr size_t kDoubleBufferSize = 2; class DoubleBufferReader : public framework::DecoratedReader { public: - explicit DoubleBufferReader(ReaderBase* reader) - : DecoratedReader(reader), - buffer_(framework::MakeChannel>( - kDoubleBufferSize)) { - std::thread prefetch(&DoubleBufferReader::PrefetchThreadFunc, this); + explicit DoubleBufferReader( + ReaderBase* reader, platform::Place target_place = platform::CPUPlace()) + : DecoratedReader(reader), place_(target_place) { + start_thread(); + } + + void start_thread() { + buffer_ = framework::MakeChannel>( + kDoubleBufferSize); + std::thread prefetch([this] { PrefetchThreadFunc(); }); prefetch.detach(); } @@ -43,6 +48,8 @@ class DoubleBufferReader : public framework::DecoratedReader { void PrefetchThreadFunc(); framework::Channel>* buffer_; + platform::Place place_; + mutable std::vector local_buffer_; }; class CreateDoubleBufferReaderOp : public framework::OperatorBase { @@ -56,7 +63,20 @@ class CreateDoubleBufferReaderOp : public framework::OperatorBase { ->Get(); auto* out = scope.FindVar(Output("Out")) ->template GetMutable(); - out->Reset(new DoubleBufferReader(underlying_reader.Get())); + + auto place_str = Attr("place"); + platform::Place place; + if (place_str == "CPU") { + place = platform::CPUPlace(); + } else { + std::istringstream sin(place_str); + sin.seekg(std::string("CUDA:").size(), std::ios::beg); + size_t num; + sin >> num; + place = platform::CUDAPlace(static_cast(num)); + } + + out->Reset(new DoubleBufferReader(underlying_reader.Get(), place)); } }; @@ -71,44 +91,65 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase { It launches another thread to execute the 'underlying reader' asynchronously, which prevents reading process from blocking subsequent training. )DOC"); + std::unordered_set enum_range; + constexpr size_t kMaxCUDADevs = 128; + for (size_t i = 0; i < kMaxCUDADevs; ++i) { + enum_range.insert(string::Sprintf("CUDA:%d", i)); + } + enum_range.insert("CPU"); + AddAttr("place", "The double buffer place, default is CPU") + .SetDefault("CPU") + .InEnum({enum_range}); } }; void DoubleBufferReader::ReadNext(std::vector* out) { out->clear(); - buffer_->Receive(out); + if (local_buffer_.empty()) { + buffer_->Receive(out); + } else { + *out = local_buffer_; + local_buffer_.clear(); + } } void DoubleBufferReader::ReInit() { reader_->ReInit(); buffer_->Close(); - // The existing prefetch thread will terminate for the buffer_ is closed. - buffer_ = framework::MakeChannel>( - kDoubleBufferSize); - std::thread prefetch(&DoubleBufferReader::PrefetchThreadFunc, this); - prefetch.detach(); + start_thread(); } void DoubleBufferReader::PrefetchThreadFunc() { VLOG(5) << "A new prefetch thread starts."; - while (true) { + while (reader_->HasNext()) { std::vector batch; reader_->ReadNext(&batch); - if (batch.empty()) { - // EOF - buffer_->Close(); - VLOG(5) << "Reached the end of the file. The prefetch thread terminates."; - break; + if (platform::is_gpu_place(place_)) { + std::vector gpu_batch; + gpu_batch.resize(batch.size()); + for (size_t i = 0; i < batch.size(); ++i) { + framework::TensorCopy(batch[i], place_, &gpu_batch[i]); + gpu_batch[i].set_lod(batch[i].lod()); + } } + if (!buffer_->Send(&batch)) { VLOG(5) << "WARNING: The double buffer channel has been closed. The " "prefetch thread terminates."; break; } } + buffer_->Close(); } -bool DoubleBufferReader::HasNext() const { PADDLE_THROW("Not Implemented"); } +bool DoubleBufferReader::HasNext() const { + if (local_buffer_.empty()) { + bool ok = buffer_->Receive(&local_buffer_); + return ok; + } else { + return true; + } +} } // namespace reader } // namespace operators diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 81dd9789495a6..9c91f395e7c9d 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -21,7 +21,7 @@ __all__ = [ 'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'open_recordio_file', - 'read_file', 'create_shuffle_reader' + 'read_file', 'create_shuffle_reader', 'create_double_buffer_reader' ] @@ -306,6 +306,14 @@ def create_shuffle_reader(reader, buffer_size): {'buffer_size': int(buffer_size)}) +def create_double_buffer_reader(reader, place=None): + attrs = dict() + if place is not None: + attrs['place'] = str(place).upper() + return __create_decorated_reader__('create_double_buffer_reader', reader, + attrs) + + def read_file(file_obj): helper = LayerHelper('read_file') out = [ diff --git a/python/paddle/fluid/tests/unittests/test_recordio_reader.py b/python/paddle/fluid/tests/unittests/test_recordio_reader.py index cdebda5b7df1c..24a0074d9b962 100644 --- a/python/paddle/fluid/tests/unittests/test_recordio_reader.py +++ b/python/paddle/fluid/tests/unittests/test_recordio_reader.py @@ -13,9 +13,10 @@ # limitations under the License. import unittest + import paddle.fluid as fluid -import paddle.v2.dataset.mnist as mnist import paddle.v2 as paddle +import paddle.v2.dataset.mnist as mnist class TestRecordIO(unittest.TestCase): @@ -53,7 +54,12 @@ def test_main(self, decorator_callback=None): fluid.optimizer.Adam(learning_rate=1e-3).minimize(avg_loss) - exe = fluid.Executor(fluid.CPUPlace()) + if fluid.core.is_compiled_with_cuda(): + place = fluid.CUDAPlace(0) + else: + place = fluid.CPUPlace() + + exe = fluid.Executor(place) exe.run(fluid.default_startup_program()) avg_loss_np = [] @@ -69,3 +75,7 @@ def test_main(self, decorator_callback=None): def test_shuffle_reader(self): self.test_main(decorator_callback=lambda reader: fluid.layers.create_shuffle_reader(reader, buffer_size=200)) + + def test_double_buffer_reader(self): + self.test_main(decorator_callback=lambda reader: fluid.layers.create_double_buffer_reader(reader, + place='cuda:0' if fluid.core.is_compiled_with_cuda() else 'cpu')) From 225efa671fd1b234e67752ad9a1cd4aecdffe58b Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Mon, 12 Mar 2018 16:10:19 +0800 Subject: [PATCH 3/5] Remove dims in base class --- paddle/fluid/framework/operator.cc | 20 ++------------ paddle/fluid/framework/reader.cc | 10 +------ paddle/fluid/framework/reader.h | 26 ++----------------- .../reader/create_random_data_generator_op.cc | 5 ++-- .../reader/create_recordio_file_reader_op.cc | 10 +++---- 5 files changed, 12 insertions(+), 59 deletions(-) diff --git a/paddle/fluid/framework/operator.cc b/paddle/fluid/framework/operator.cc index ac6289c5abe8f..49f8cd5f90a01 100644 --- a/paddle/fluid/framework/operator.cc +++ b/paddle/fluid/framework/operator.cc @@ -442,15 +442,7 @@ class RuntimeInferShapeContext : public InferShapeContext { } std::vector GetRepeatedDims(const std::string& name) const override { - Variable* var = scope_.FindVar(name); - if (var->IsType()) { - return var->Get().shapes(); - } else { - PADDLE_THROW( - "Only ReaderHolder support 'GetRepeatedDims', but Variable %s's " - "type_id is %s.", - name, var->Type().name()); - } + PADDLE_THROW("Only compile time support this method"); } void SetDim(const std::string& name, const DDim& dim) override { @@ -467,15 +459,7 @@ class RuntimeInferShapeContext : public InferShapeContext { void SetRepeatedDims(const std::string& name, const std::vector& dims) override { - Variable* var = scope_.FindVar(name); - if (var->IsType()) { - var->GetMutable()->set_shapes(dims); - } else { - PADDLE_THROW( - "Only ReaderHolder support 'SetRepeatedDims', but Variable %s's " - "type_id is %s.", - name, var->Type().name()); - } + PADDLE_THROW("Only compile time support this method"); } proto::VarType::Type GetVarType(const std::string& name) const override { diff --git a/paddle/fluid/framework/reader.cc b/paddle/fluid/framework/reader.cc index 91879d6d45868..31f686151e363 100644 --- a/paddle/fluid/framework/reader.cc +++ b/paddle/fluid/framework/reader.cc @@ -16,14 +16,6 @@ namespace paddle { namespace framework { - -DDim ReaderBase::shape(size_t idx) const { - PADDLE_ENFORCE_LT( - idx, shapes_.size(), - "Cannot get the %d'th shape, 'shapes_' only has %d elements.", idx, - shapes_.size()); - return shapes_[idx]; -} - +ReaderBase::~ReaderBase() {} } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/reader.h b/paddle/fluid/framework/reader.h index e281c9b13fb50..2d8d30fc66274 100644 --- a/paddle/fluid/framework/reader.h +++ b/paddle/fluid/framework/reader.h @@ -22,34 +22,18 @@ namespace framework { class ReaderBase { public: - explicit ReaderBase(const std::vector& shapes) : shapes_(shapes) { - PADDLE_ENFORCE(!shapes_.empty()); - } virtual void ReadNext(std::vector* out) = 0; virtual void ReInit() = 0; - DDim shape(size_t idx) const; - std::vector shapes() const { return shapes_; } - void set_shapes(const std::vector& shapes) { shapes_ = shapes; } - virtual bool HasNext() const = 0; - virtual ~ReaderBase() {} - - protected: - std::vector shapes_; -}; - -class FileReader : public ReaderBase { - public: - explicit FileReader(const std::vector& shapes) : ReaderBase(shapes) {} + virtual ~ReaderBase(); }; class DecoratedReader : public ReaderBase { public: - explicit DecoratedReader(ReaderBase* reader) - : ReaderBase(reader->shapes()), reader_(reader) { + explicit DecoratedReader(ReaderBase* reader) : ReaderBase(), reader_(reader) { PADDLE_ENFORCE_NOT_NULL(reader_); } @@ -72,12 +56,6 @@ class ReaderHolder { void ReadNext(std::vector* out) { reader_->ReadNext(out); } void ReInit() { reader_->ReInit(); } - DDim shape(size_t idx) const { return reader_->shape(idx); } - std::vector shapes() const { return reader_->shapes(); } - void set_shapes(const std::vector& shapes) { - reader_->set_shapes(shapes); - } - bool HasNext() const { return reader_->HasNext(); } private: diff --git a/paddle/fluid/operators/reader/create_random_data_generator_op.cc b/paddle/fluid/operators/reader/create_random_data_generator_op.cc index e62f952d0e895..95d8674c08b63 100644 --- a/paddle/fluid/operators/reader/create_random_data_generator_op.cc +++ b/paddle/fluid/operators/reader/create_random_data_generator_op.cc @@ -19,11 +19,11 @@ namespace operators { namespace reader { template -class RandomDataGenerator : public framework::FileReader { +class RandomDataGenerator : public framework::ReaderBase { public: RandomDataGenerator(const std::vector& shapes, float min, float max) - : FileReader(shapes), min_(min), max_(max) { + : framework::ReaderBase(), min_(min), max_(max), shapes_(shapes) { PADDLE_ENFORCE_LE( min, max, "'min' shouldn't be greater than 'max'.(%f vs %f)", min, max); unsigned int seed = std::random_device()(); @@ -59,6 +59,7 @@ class RandomDataGenerator : public framework::FileReader { float max_; std::minstd_rand engine_; std::uniform_real_distribution dist_; + std::vector shapes_; }; template diff --git a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc index c3eb247bbe204..4992eb8617216 100644 --- a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc +++ b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc @@ -18,11 +18,10 @@ namespace paddle { namespace operators { namespace reader { -class RecordIOFileReader : public framework::FileReader { +class RecordIOFileReader : public framework::ReaderBase { public: - RecordIOFileReader(const std::string& filename, - const std::vector& shapes) - : FileReader(shapes), + explicit RecordIOFileReader(const std::string& filename) + : ReaderBase(), scanner_(filename), dev_ctx_(*platform::DeviceContextPool::Instance().Get( platform::CPUPlace())) {} @@ -54,12 +53,11 @@ class CreateRecordIOReaderOp : public framework::OperatorBase { int(shape_concat.size()), "The accumulate of all ranks should be equal to the " "shape concat's length."); - std::vector shapes = RestoreShapes(shape_concat, ranks); std::string filename = Attr("filename"); auto* out = scope.FindVar(Output("Out")) ->template GetMutable(); - out->Reset(new RecordIOFileReader(filename, shapes)); + out->Reset(new RecordIOFileReader(filename)); } }; From f9974a4a12de337559cb1d6494c4d1f7656d52e9 Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Tue, 13 Mar 2018 14:44:19 +0800 Subject: [PATCH 4/5] Make double_buffer reader async --- .../reader/create_double_buffer_reader_op.cc | 59 +++++++++++++------ 1 file changed, 42 insertions(+), 17 deletions(-) diff --git a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc index ca947fff4358d..706f6fd592f88 100644 --- a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc +++ b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc @@ -24,15 +24,31 @@ static constexpr size_t kDoubleBufferSize = 2; class DoubleBufferReader : public framework::DecoratedReader { public: + struct Item { + Item() : ctx_(nullptr) {} + + std::vector payloads_; + platform::DeviceContext* ctx_; + }; + explicit DoubleBufferReader( ReaderBase* reader, platform::Place target_place = platform::CPUPlace()) : DecoratedReader(reader), place_(target_place) { + for (size_t i = 0; i < kDoubleBufferSize; ++i) { + if (platform::is_gpu_place(place_)) { +#ifdef PADDLE_WITH_CUDA + ctxs_.emplace_back(new platform::CUDADeviceContext( + boost::get(place_))); +#else +#endif + } + } + start_thread(); } void start_thread() { - buffer_ = framework::MakeChannel>( - kDoubleBufferSize); + buffer_ = framework::MakeChannel(kDoubleBufferSize); std::thread prefetch([this] { PrefetchThreadFunc(); }); prefetch.detach(); } @@ -47,9 +63,10 @@ class DoubleBufferReader : public framework::DecoratedReader { private: void PrefetchThreadFunc(); - framework::Channel>* buffer_; + framework::Channel* buffer_; platform::Place place_; - mutable std::vector local_buffer_; + std::vector> ctxs_; + mutable Item local_buffer_; }; class CreateDoubleBufferReaderOp : public framework::OperatorBase { @@ -104,12 +121,14 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase { }; void DoubleBufferReader::ReadNext(std::vector* out) { - out->clear(); - if (local_buffer_.empty()) { - buffer_->Receive(out); - } else { - *out = local_buffer_; - local_buffer_.clear(); + if (local_buffer_.payloads_.empty()) { + buffer_->Receive(&local_buffer_); + } + + *out = local_buffer_.payloads_; + local_buffer_.payloads_.clear(); + if (local_buffer_.ctx_) { + local_buffer_.ctx_->Wait(); } } @@ -121,16 +140,22 @@ void DoubleBufferReader::ReInit() { void DoubleBufferReader::PrefetchThreadFunc() { VLOG(5) << "A new prefetch thread starts."; + size_t gpu_ctx_offset = 0; while (reader_->HasNext()) { - std::vector batch; - reader_->ReadNext(&batch); + Item batch; + reader_->ReadNext(&batch.payloads_); if (platform::is_gpu_place(place_)) { std::vector gpu_batch; - gpu_batch.resize(batch.size()); - for (size_t i = 0; i < batch.size(); ++i) { - framework::TensorCopy(batch[i], place_, &gpu_batch[i]); - gpu_batch[i].set_lod(batch[i].lod()); + auto& gpu_ctx = this->ctxs_[gpu_ctx_offset++]; + gpu_ctx_offset %= this->ctxs_.size(); + gpu_batch.resize(batch.payloads_.size()); + for (size_t i = 0; i < batch.payloads_.size(); ++i) { + framework::TensorCopy(batch.payloads_[i], place_, *gpu_ctx, + &gpu_batch[i]); + gpu_batch[i].set_lod(batch.payloads_[i].lod()); } + batch.ctx_ = gpu_ctx.get(); + std::swap(gpu_batch, batch.payloads_); } if (!buffer_->Send(&batch)) { @@ -143,7 +168,7 @@ void DoubleBufferReader::PrefetchThreadFunc() { } bool DoubleBufferReader::HasNext() const { - if (local_buffer_.empty()) { + if (local_buffer_.payloads_.empty()) { bool ok = buffer_->Receive(&local_buffer_); return ok; } else { From 164f2382afe6ded95c95f4fb731a1d932d578026 Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Tue, 13 Mar 2018 17:56:53 +0800 Subject: [PATCH 5/5] Polish code --- paddle/fluid/framework/reader.cc | 40 +------------------ paddle/fluid/framework/reader.h | 25 +----------- .../reader/create_double_buffer_reader_op.cc | 1 - .../reader/create_recordio_file_reader_op.cc | 4 +- 4 files changed, 6 insertions(+), 64 deletions(-) diff --git a/paddle/fluid/framework/reader.cc b/paddle/fluid/framework/reader.cc index c3fb657a3a2cb..fa00c08e0d579 100644 --- a/paddle/fluid/framework/reader.cc +++ b/paddle/fluid/framework/reader.cc @@ -18,45 +18,9 @@ namespace paddle { namespace framework { ReaderBase::~ReaderBase() {} -std::vector> ReaderBase::SplitReader( - const platform::PlaceList &places) { - std::vector> readers; +FileReader::FileReader(const std::vector &dims) : dims_(dims) {} - auto mutex = std::make_shared(); - for (size_t i = 0; i < places.size(); ++i) { - readers.emplace_back(new ThreadSafeReader(this, mutex)); - } - - return readers; -} - -void ThreadSafeReader::ReadNext(std::vector *out) { - std::lock_guard guard(*mutex_); - reader_->ReadNext(out); -} - -void ThreadSafeReader::ReInit() { - std::lock_guard guard(*mutex_); - reader_->ReInit(); -} - -bool ThreadSafeReader::HasNext() const { - std::lock_guard guard(*mutex_); - return reader_->HasNext(); -} - -std::vector> ThreadSafeReader::SplitReader( - const platform::PlaceList &places) { - std::vector> readers; - for (size_t i = 0; i < places.size(); ++i) { - readers.emplace_back(new ThreadSafeReader(reader_, mutex_)); - } - return readers; -} - -FileReaderBase::FileReaderBase(const std::vector &dims) : dims_(dims) {} - -void FileReaderBase::ReadNext(std::vector *out) { +void FileReader::ReadNext(std::vector *out) { ReadNextImpl(out); PADDLE_ENFORCE_EQ(out->size(), dims_.size()); for (size_t i = 0; i < dims_.size(); ++i) { diff --git a/paddle/fluid/framework/reader.h b/paddle/fluid/framework/reader.h index 8989bddd10d9c..3573b99becf6d 100644 --- a/paddle/fluid/framework/reader.h +++ b/paddle/fluid/framework/reader.h @@ -33,9 +33,6 @@ class ReaderBase { virtual bool HasNext() const = 0; - virtual std::vector> SplitReader( - const platform::PlaceList& places); - virtual ~ReaderBase(); }; @@ -53,27 +50,9 @@ class DecoratedReader : public ReaderBase { ReaderBase* reader_; }; -class ThreadSafeReader : public DecoratedReader { - public: - ThreadSafeReader(ReaderBase* reader, const std::shared_ptr& mutex) - : DecoratedReader(reader), mutex_(mutex) {} - - void ReadNext(std::vector* out) override; - - void ReInit() override; - - bool HasNext() const override; - - std::vector> SplitReader( - const platform::PlaceList& places) override; - - private: - std::shared_ptr mutex_; -}; - -class FileReaderBase : public ReaderBase { +class FileReader : public ReaderBase { public: - explicit FileReaderBase(const std::vector& dims); + explicit FileReader(const std::vector& dims); void ReadNext(std::vector* out) override; diff --git a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc index 706f6fd592f88..d0de092947eb0 100644 --- a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc +++ b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc @@ -39,7 +39,6 @@ class DoubleBufferReader : public framework::DecoratedReader { #ifdef PADDLE_WITH_CUDA ctxs_.emplace_back(new platform::CUDADeviceContext( boost::get(place_))); -#else #endif } } diff --git a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc index 819e09a36966a..c4aa29c7206db 100644 --- a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc +++ b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc @@ -18,11 +18,11 @@ namespace paddle { namespace operators { namespace reader { -class RecordIOFileReader : public framework::FileReaderBase { +class RecordIOFileReader : public framework::FileReader { public: explicit RecordIOFileReader(const std::string& filename, const std::vector& dims) - : FileReaderBase(dims), + : FileReader(dims), scanner_(filename), dev_ctx_(*platform::DeviceContextPool::Instance().Get( platform::CPUPlace())) {}