Skip to content

Commit

Permalink
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
Browse files Browse the repository at this point in the history
… fix_test_sendrecv_portbind
  • Loading branch information
typhoonzero committed Apr 4, 2018
2 parents b853ac8 + fb8c1cf commit b03fa88
Show file tree
Hide file tree
Showing 19 changed files with 304 additions and 66 deletions.
95 changes: 94 additions & 1 deletion doc/v2/howto/rnn/recurrent_group_en.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,96 @@
# Recurrent Group Tutorial

TBD
## Overview

Sequential data is common in natural language processing.

A sentence is a sequence of words and many sentences form a paragraph further. Therefore, a paragraph can be viewed as a nested sequence with two level, where each element of the sequence is another sequence. That is to say, sequential data could be recursive. An example of two-level recursive sequential data is that an article is composed of a sequence of sentences, and each sentence a sequence of words.

PaddlePaddle and PaddlePaddle v2 support two-level recursive sequential data. The two-level sequence is a very flexible data, which helps us to better describe more complex language data such as discribing paragraphs and several rounds of dialogues. Based on two-level sequence input, we can design and build a flexible, hierarchical RNN model that encodes input data from the word and sentence level. For the support of arbitrary levels, please refer to PaddlePaddle Fluid.

In PaddlePaddle, `recurrent_group` is an arbitrarily complex RNN unit. The user only needs to define the calculation that the RNN will complete in one time step. PaddlePaddle is responsible for the propagation of information and error in time series.

Furthermore, `recurrent_group` can also be extended to handle two-level sequence. By defining two nested `recurrent_group` operations at the clause level and the word level respectively, a hierarchical and complex RNN is finally achieved.

Currently, in the PaddlePaddle, there are `recurrent_group` and some Layers that can process bidirectional sequences. For details, refer to the document: <a href = "hierarchical_layer_en.html">Layers for supporting double-layer sequences as input.</a>

## Related Concepts

### Basic Principle
`recurrent_group` is an arbitrarily complex RNN unit supported by PaddlePaddle. The user only needs to focus on the calculations that the RNN is designed to complete within a single time step. The PaddlePaddle is responsible for completing the propagation of information and gradients over time.

In PaddlePaddle, a simple call to `recurrent_group` is as follows:

``` python
recurrent_group(step, input, reverse)
```
- step: A callable function that defines the calculations completed by the RNN unit within a time step
- input: The input must be a single-layer sequence or a double-layer sequence
- reverse: Whether to process the input sequence in reverse order

The core of using `recurrent_group` is to design the logic of the step function. The step function can be freely combined with various layers supported by PaddlePaddle to complete arbitrary arithmetic logic. The input of `recurrent_group` (input) becomes the input of the step function. Since the step function only focuses on the calculation within one time step of RNN, here `recurrent_group` completes the splitting of the original input data for us.

### Input
The input sequence processed by `recurrent_group` is mainly divided into the following three types:

- **Input Data**: When putting a two-level sequence into `recurrent_group`, it will be disassembled into a single-level sequence. When putting a single-level sequence into `recurrent_group`, it will be disassembled into a non-sequence and then passed to the step function. This process is completely transparent to the user. There are two possible types: 1) User input via data_layer; 2) Output from other layers.

- **Read-only Memory Input**: `StaticInput` defines a read-only Memory. The input specified by `StaticInput` will not be disassembled by `recurrent_group`, and each time step of the `recurrent_group` loop will always be able to reference all inputs. It may be a non-sequence or a single-layer sequence.

- **Input of Sequence Generation Task**: `GeneratedInput` is only used to specify input data in a sequence generation task.

### Input Example

Sequence generation tasks mostly follow the encoder-decoer architecture. The encoder and decoder can be arbitrary neural network units capable of processing sequences and RNN is the most popular choice.

Given the encoder output and the current word, the decoder predicts the next most likely word each time. In this structure, the decoder accepts two inputs:

- Target sequence to be generated: a input of the decoder and the basis of the decoder loop. `recurrent_group` will disassemble this input type.

- Encoder output, an non-sequencce or single-sequence: a unbounded memory. Each time step in the decoder loop will reference the entire result and should not be disassembled. This type of input must be specified via `StaticInput`. For more discussion on Unbounded Memory, please refer to the paper [Neural Turning Machine](https://arxiv.org/abs/1410.5401).

In a sequence generation task, the decoder RNN always refers to the word vector of the word predicted at the previous moment as the current time input. `GeneratedInput` will automate this process.

### Output
The `step` function must return the output of one or more Layers. The output of this Layer will be the final output of the entire `recurrent_group`. In the output process, `recurrent_group` will concatenate the output of each time step, which is also transparent to the user.

### Memory
Memory can only be defined and used in `recurrent_group`. Memory cannot exist independently and must point to a layer defined by PaddlePaddle. Memory is referenced to get a momentary output from this layer, so memory can be interpreted as a delay operation.

The user can explicitly specify the output of a layer to initialize the memory. When not specified, memory is initialized to 0 by default.

## Sequence-level RNN Introduction

`recurrent_group` helps us to split the input sequence, merge the output, and loop through the sequence of computational logic.

Using this feature, the two nested `recurrent_group` can handle the nested two-level sequences, implementing sequence-level RNN structures at both the word and sentence levels.

- Word-level RNN: each state corresponds to a word.
- Sequence-level RNN: a sequence-layer RNN consists of multiple word-layer RNNs. Each word-layer RNN (ie, each state of a sequence-layer RNN) has a subsequence.

For convenience of description, the following takes the NLP task as an example. A paragraph containing a subsequence is defined as a two-level sequence, and a sentence containing a word is defined as a single-layer sequence. Then, the zero-level sequence is a word.

## Usage of Sequence-level RNN

### Usage of Training Process
Using `recurrent_group` requires the following conventions:

- **Single-input Single-output**: Both input and output are single layer sequences.
- If there are multiple inputs, the number of words in different input sequences must be exactly equal.
- A single-layer sequence is output, and the number of words in the output sequence is the same as the input sequence.
- memory: define memory to point to a layer in the step function, get a moment output from this layer by referencing memory to form a recurrent connection. The is_seq parameter of memory must be false. If memory is not defined, the operations within each time step are independent.
- boot_layer: the initial state of memory, set 0 by default. is_seq in memory must be false.

- **Double-input Double-output**: Both input and output are two-level sequence.
- If there are multiple input sequences, the number of subsequence contained in different inputs must be strictly equal, but the number of words in the subsequence may not be equal.
- output a two-level sequence. The number of subsequence and the number of words are the same as the specified input sequence and the first input is default.
- memory: defining memory in the step function, pointing to a layer, by referring to the memory to get the output of this layer at a time, forming a recurrent connection. The memory defined in the outer `recurrent_group` step function can record the state of the previous subsequence, either as a single-level sequence (only as read-only memory) or as a word. If memory is not defined, the operations between subsequence are independent.
- boot_layer: the initial state of memory. It is either a single-level sequence (only as read-only memory) or a vector. The default is not set, that is, the initial state is 0.

- **Double-input Single-output**: not support for now, and output the error with "In hierachical RNN, all out links should be from sequences now".

### Usage of Generation Process
Using `beam_search` need follow those conventions:

- Word-level RNN: generate the next word from a word.
- Sequence-level RNN: the single-layer RNN generated subsequence is concatenated into a new double-layer sequence. Semantically, there is no case where a subsequence generates the next subseq directly.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void NCCLAllReduceOpHandle::RunImpl() {
}
}

std::string NCCLAllReduceOpHandle::Name() const { return "NCCL AllReduce"; }
std::string NCCLAllReduceOpHandle::Name() const { return "nccl_all_reduce"; }
} // namespace details
} // namespace framework
} // namespace paddle
7 changes: 7 additions & 0 deletions paddle/fluid/framework/details/nccl_all_reduce_op_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

#pragma once

#include <string>
#include <vector>

#include "paddle/fluid/framework/details/op_handle_base.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/scope.h"
Expand All @@ -34,6 +37,10 @@ struct NCCLAllReduceOpHandle : public OpHandleBase {

std::string Name() const override;

// Delay and buffer nccl_all_reduce together can significantly increase
// performance. Disable this feature by returning false.
bool IsMultiDeviceTransfer() override { return true; };

protected:
void RunImpl() override;
};
Expand Down
6 changes: 6 additions & 0 deletions paddle/fluid/framework/details/op_handle_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.

#pragma once
#include <string>
#include <vector>

#include "paddle/fluid/framework/details/var_handle.h"
#include "paddle/fluid/platform/device_context.h"
Expand Down Expand Up @@ -53,6 +55,10 @@ class OpHandleBase {

void AddOutput(VarHandleBase *out);

// If the Op involves data transfer of multiple devices that
// will likely block other computations.
virtual bool IsMultiDeviceTransfer() { return false; }

protected:
virtual void RunImpl() = 0;
};
Expand Down
62 changes: 50 additions & 12 deletions paddle/fluid/framework/details/threaded_ssa_graph_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,36 @@ ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor(
size_t num_threads, bool use_event,
const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
std::unique_ptr<SSAGraph> &&graph)
std::unique_ptr<SSAGraph> &&graph, bool allow_op_delay)
: SSAGraphExecutor(std::move(graph)),
pool_(num_threads >= 2 ? new ::ThreadPool(num_threads) : nullptr),
local_scopes_(local_scopes),
places_(places),
fetch_ctxs_(places),
use_event_(use_event) {}
use_event_(use_event),
running_ops_(0),
allow_op_delay_(allow_op_delay) {}

void ThreadedSSAGraphExecutor::RunDelayedOps(
const std::unordered_set<OpHandleBase *> &delayed_ops) {
for (auto op : delayed_ops) {
op->Run(use_event_);
}
}

FeedFetchList ThreadedSSAGraphExecutor::Run(
const std::vector<std::string> &fetch_tensors) {
std::unordered_map<OpHandleBase *, size_t> pending_ops;
std::unordered_set<VarHandleBase *> pending_vars;

BlockingQueue<VarHandleBase *> ready_vars;

std::unordered_set<OpHandleBase *> ready_ops;
// For ops (e.g. nccl_all_reduce) that need to coordinate multiple
// streams from multiple GPUs, it's faster to buffer them and schedule
// together since we currently cannot overlap computation and memcpy streams.
// Should revisit it if overlapping is available.
std::unordered_set<OpHandleBase *> delayed_ops;
std::unordered_set<OpHandleBase *> blocked_by_delayed_ops;
std::unordered_set<VarHandleBase *> delayed_vars;

auto InsertPendingVar = [&pending_vars, &ready_vars](VarHandleBase &var) {
pending_vars.insert(&var);
Expand Down Expand Up @@ -106,7 +120,14 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(

auto run_all_ready_ops = [&] {
for (auto *op : ready_ops) {
RunOp(ready_vars, op);
if (op->IsMultiDeviceTransfer() && allow_op_delay_) {
delayed_ops.insert(op);
delayed_vars.insert(op->outputs_.begin(), op->outputs_.end());
ready_vars.Extend(op->outputs_);
continue;
}
running_ops_++;
RunOp(&ready_vars, op);
}
ready_ops.clear();
};
Expand All @@ -118,13 +139,13 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
}

// Step 3. Execution
while (!pending_vars.empty()) {
while (!pending_vars.empty() || !ready_ops.empty() || !delayed_ops.empty()) {
// 1. Run All Ready ops
run_all_ready_ops();

// 2. Find ready variable
bool timeout;
auto cur_ready_vars = ready_vars.PopAll(1000, &timeout);
auto cur_ready_vars = ready_vars.PopAll(1, &timeout);

if (timeout) {
if (exception_) {
Expand All @@ -141,13 +162,29 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
auto &deps = pending_ops[op];
--deps;
if (deps == 0) {
ready_ops.insert(op);
if (delayed_vars.find(ready_var) != delayed_vars.end()) {
blocked_by_delayed_ops.insert(op);
} else {
ready_ops.insert(op);
}
}
}
}
// When there are no other ops to schedule, schedule buffered delayed
// ops and unblock other ops.
if (ready_ops.empty() && !delayed_ops.empty() && running_ops_ == 0) {
RunDelayedOps(delayed_ops);
delayed_ops.clear();
for (auto *op : blocked_by_delayed_ops) {
ready_ops.insert(op);
}
blocked_by_delayed_ops.clear();
}
// Keep loop until all vars are ready.
}

PADDLE_ENFORCE(ready_ops.empty());
PADDLE_ENFORCE(delayed_ops.empty());
PADDLE_ENFORCE(blocked_by_delayed_ops.empty());
++computation_count_;

auto sync_computation = [&] {
Expand Down Expand Up @@ -182,12 +219,13 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
}

void ThreadedSSAGraphExecutor::RunOp(
BlockingQueue<VarHandleBase *> &ready_var_q, details::OpHandleBase *op) {
auto op_run = [&ready_var_q, op, this] {
BlockingQueue<VarHandleBase *> *ready_var_q, details::OpHandleBase *op) {
auto op_run = [ready_var_q, op, this] {
try {
VLOG(10) << op->Name() << " : " << op->DebugString();
op->Run(use_event_);
ready_var_q.Extend(op->outputs_);
running_ops_--;
ready_var_q->Extend(op->outputs_);
} catch (platform::EnforceNotMet ex) {
exception_.reset(new platform::EnforceNotMet(ex));
} catch (...) {
Expand Down
16 changes: 13 additions & 3 deletions paddle/fluid/framework/details/threaded_ssa_graph_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@

#pragma once

#include <chrono>
#include <deque>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>

#include <functional>
#include "ThreadPool.h" // ThreadPool in thrird party
#include "paddle/fluid/framework/details/ssa_graph_executor.h"
Expand Down Expand Up @@ -70,7 +75,8 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
ThreadedSSAGraphExecutor(size_t num_threads, bool use_event,
const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
std::unique_ptr<SSAGraph> &&graph);
std::unique_ptr<SSAGraph> &&graph,
bool allow_op_delay);

// Run a SSAGraph by a thread pool
// Use topological sort algorithm
Expand All @@ -79,16 +85,20 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
~ThreadedSSAGraphExecutor() {}

private:
void RunOp(BlockingQueue<VarHandleBase *> &ready_var_q,
void RunOp(BlockingQueue<VarHandleBase *> *ready_var_q,
details::OpHandleBase *op);

void RunDelayedOps(const std::unordered_set<OpHandleBase *> &delayed_ops);

private:
std::unique_ptr<::ThreadPool> pool_;
std::vector<Scope *> local_scopes_;
std::vector<platform::Place> places_;
platform::DeviceContextPool fetch_ctxs_;
const bool use_event_;
std::unique_ptr<platform::EnforceNotMet> exception_;
std::atomic<int> running_ops_;
bool allow_op_delay_;

size_t computation_count_{0};
size_t max_async_computation{100};
Expand Down
15 changes: 15 additions & 0 deletions paddle/fluid/framework/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,21 @@ std::unique_ptr<ExecutorPrepareContext> Executor::Prepare(
return std::unique_ptr<ExecutorPrepareContext>(ctx);
}

std::vector<std::shared_ptr<ExecutorPrepareContext>> Executor::Prepare(
const ProgramDesc& program, const std::vector<int>& block_ids) {
std::vector<std::shared_ptr<ExecutorPrepareContext>> result;
for (auto& bid : block_ids) {
auto* ctx = new ExecutorPrepareContext(program, bid);
PADDLE_ENFORCE_LT(static_cast<size_t>(bid), program.Size());
auto& block = program.Block(bid);
for (auto& op_desc : block.AllOps()) {
ctx->ops_.push_back(OpRegistry::CreateOp(*op_desc));
}
result.push_back(std::shared_ptr<ExecutorPrepareContext>(ctx));
}
return result;
}

void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
bool create_local_scope, bool create_vars) {
auto& block = ctx->prog_.Block(ctx->block_id_);
Expand Down
3 changes: 3 additions & 0 deletions paddle/fluid/framework/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ class Executor {
static std::unique_ptr<ExecutorPrepareContext> Prepare(
const ProgramDesc& program, int block_id);

static std::vector<std::shared_ptr<ExecutorPrepareContext>> Prepare(
const ProgramDesc& program, const std::vector<int>& block_ids);

void RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
bool create_local_scope = true,
bool create_vars = true);
Expand Down
8 changes: 5 additions & 3 deletions paddle/fluid/framework/parallel_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/framework/parallel_executor.h"
#include "paddle/fluid/platform/profiler.h"

#include <string>
#include <vector>
Expand Down Expand Up @@ -47,7 +48,7 @@ ParallelExecutor::ParallelExecutor(
const std::vector<platform::Place> &places,
const std::unordered_set<std::string> &params,
const ProgramDesc &startup_program, const ProgramDesc &main_program,
const std::string &loss_var_name, Scope *scope)
const std::string &loss_var_name, Scope *scope, bool allow_op_delay)
: member_(new ParallelExecutorPrivate(places)) {
member_->global_scope_ = scope;

Expand Down Expand Up @@ -82,8 +83,8 @@ ParallelExecutor::ParallelExecutor(
auto graph = builder.Build(main_program);

member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
num_threads, use_event, member_->local_scopes_, places,
std::move(graph)));
num_threads, use_event, member_->local_scopes_, places, std::move(graph),
allow_op_delay));

// Step 3. Create vars in each scope;
for (auto *scope : member_->local_scopes_) {
Expand Down Expand Up @@ -151,6 +152,7 @@ void ParallelExecutor::BCastParamsToGPUs(

void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
const std::string &fetched_var_name) {
platform::RecordBlock b(0);
auto fetch_data = member_->executor_->Run(fetch_tensors);
*member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() =
fetch_data;
Expand Down
Loading

0 comments on commit b03fa88

Please sign in to comment.