Skip to content

Commit

Permalink
Merge pull request #5776 from typhoonzero/update_refactor_dist_train_doc
Browse files Browse the repository at this point in the history
Update design of dist train refactor
  • Loading branch information
typhoonzero authored Jan 9, 2018
2 parents 6ece41e + e1cea8c commit 32b09b5
Show file tree
Hide file tree
Showing 25 changed files with 117 additions and 254 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,77 +52,121 @@ The IR for PaddlePaddle after refactoring is called a `Block`, it specifies the

The user can not directly specify the parameter update rule for the parameter server in the Python module, since the parameter server does not use the same computation definition as the trainer. Instead, the update rule is baked inside the parameter server. The user can not specify the update rule explicitly.

This could be fixed by making the parameter server run the same computation definition as the trainer (the user's Python module). For a detailed explanation, refer to this document -
[Design Doc: Operation Graph Based Parameter Server](./parameter_server.md)
This could be fixed by making the parameter server also run an IR, which can be different to the trainer side
For a detailed explanation, refer to this document -
[Design Doc: Parameter Server](./parameter_server.md)

## Distributed Training Architecture

The revamped distributed training architecture can address the above discussed limitations. Below is the illustration of how it does so:

<img src="src/distributed_architecture.png"/>

The major components in the architecture are: *PaddlePaddle Python*, *PaddlePaddle converter* and *PaddlePaddle runtime*.
The major components are: *Python API*, *Distribute Transpiler* and *Remote Executor*.

### PaddlePaddle Python
### Python API

PaddlePaddle Python is the Python library that user's Python code invokes, to read the data. build the neural network topology, start training, etc.
Python API is the Python library that user's Python code invokes, to read the data, build the neural network topology, and start training, etc.

```Python
paddle.init()
input = paddle.op.recordIO("/home/data/mnist.recordio") # file stored on the cluster
img, label = input[0], input[1]
hidden = paddle.layer.fc(input=img, size=200, act=paddle.activation.Tanh())
prediction = paddle.layer.fc(input=img, size=10, act=paddle.activation.Softmax())
cost = paddle.layer.classification_cost(input=prediction, label=label)
optimizer = paddle.optimizer.SGD(cost, learning_rate=0.01)
session = paddle.session.NewRemote(num_trainer=3, num_ps=2, GPU_per_trainer=1)
for i in range(1000):
_, cost_val = session.eval(targets=[cost, optimizer])
print cost_val
images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
...
predict = fluid.layers.fc(input=conv_pool_2, size=10, act="softmax")
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
optimizer = fluid.optimizer.Adam(learning_rate=0.01)
optimizer.minimize(avg_cost)

train_reader = paddle.batch(
paddle.reader.shuffle(
paddle.dataset.mnist.train(), buf_size=500),
batch_size=BATCH_SIZE)

place = fluid.CPUPlace()
exe = fluid.Executor(place)

for pass_id in range(10):
for data in train_reader():
loss, acc = exe.run(trainer_prog,
feed=feeder.feed(data),
fetch_list=[avg_cost])
```

The above code is what a typical Python trainer code is, the neural network topology is built using the helper functions such as `paddle.layer.fc`. Training is done by calling `session.eval` iteratively.

#### session.eval

As shown in the graph, `session.eval` sends the IR and the evaluation inputs or targets to the PaddlePaddle cluster for evaluation.
The targets can be any variable in the computation graph. When the target is say, the `optimizer` variable, the neural network will be optimized once. When the target is the `cost` variable, `session.eval` returns the cost value. Based on what the target is, an appropriate action is taken.

The Python `session` is a wrapper of the C++ `Session` class. For more information about `Session`, refer to this document - [Design Doc: Session](./session.md).

### PaddlePaddle Converter

The PaddlePaddle converter automatically converts the IR in the request (IR and evaluation inputs/targets) from PaddlePaddle Python to partitioned IRs and dispatches the new IRs and evaluation inputs/targets to different PaddlePaddle runtimes. Below are the steps that are followed :

1. Add a `feed` OP that feeds the eval inputs, and a `fetch` OP that fetches the eval targets to the IR.

2. Extract a new computation (sub)graph with the `feed` and `fetch` OPs as the boundary. The runtime does not need to run the OP that is not dependent on the `fetch` OP.

3. Optimize the computation graph.

4. Place the OPs in the graph onto different devices on different PaddlePaddle runtime according to a placement algorithm and the device constraints specified by the user.

5. Partition the graph according to runtime boundaries and add `send` / `recv` OP pair on the runtime boundaries.
The code above is a typical local training program, the "Training Program" is built using helper functions such as
`fluid.layer.fc`. The training is done by calling `Executor.run`
iteratively.

For more details, the implementation of IR is [Program](../program.md), and `ProgramDesc` is the protobuf type.

[Executor](../executor.md) simply runs the `ProgramDesc`. For local training you generally use
`Executor` to run the program locally. For any kind of distributed training, you can use
`RemoteExecutor` to specify desired distributed training method with some optional arguments.

### Distributed Transpiler

The Distributed Transpiler automatically converts the IR (in protobuf format) to partitioned IRs. Then
the Remote Executor dispatches the new IRs to Remote Executors across the cluster.
Below are the steps that are followed :

1. User only need to change `Executor` to `RemoteExecutor` to change local program to distributed program.
1. `RemoteExecutor` calls `Distributed Transpiler` to "transpile" user's program to several IRs representing a
distributed training program:
1. Parse configurations from `RemoteExecutor`.
1. Determine the type of distributed program, can be DataParallelism, ModelParallelism or Streaming.
1. Partition the `ProgramDesc` according to type and add `send` / `recv` OP pair on the boundaries. Take
DataParallelism type for example, it removes the optimization operators and add a `send` OP to the
"trainer" role, then add the optimization operators to the parameter server role within the `recv` OP.
1. Dispatch the partitioned graph to different `RemoteExecutor` in the cluster.
1. `RemoteExecutor` on each node run the received `ProgramDesc` utill the end.


### RemoteExecutor

As shown in the graph, `RemoteExecutor.run` sends the IR to the cluster for Execution.
You can also use parameter `fetch_list` to interactively fetch variable back to local for
log printing.

The Python `RemoteExecutor` is derived from `Executor` class.

```python
exe = RemoteExecutor(
feed=feeder.feed(data),
fetch_list=[avg_cost],
job_desc=JobDesc(
jobname,
num_trainer,
num_pserver,
cpu_per_trainer,
gpu_per_trainer,
mem_per_trainer,
cpu_per_pserver,
mem_per_pserver
))
for data in train_reader():
loss, acc = exe.run(trainer_prog,
feed=feeder.feed(data),
fetch_list=[avg_cost])
```

6. Dispatch the partitioned graph to different PaddlePaddle runtimes.
`JobDesc` object describe the distributed job resource specification to run on
Cluster environment.

7. PaddlePaddle runtimes with the `fetch` OP reports evaluation results back to the converter, the converter reports the evaluation results back to the PaddlePaddle Python.
<img src="src/remote_executor.png"/>

The output IRs will be cached to optimize the conversion latency.
`RemoteExecutor.run` sends the `ProgramDesc` and
[TrainingJob](https://github.com/PaddlePaddle/cloud/blob/develop/doc/autoscale/README.md#training-job-resource)
to a server in the cluster which executes `RemoteExecutor.listen`. This server is responsible
to start the final Kubernetes Jobs to run the different role of `ProgramDesc`.


#### Placement Algorithm
### Placement Algorithm

Our first implementation will only support "trainer-parameter server" placement: the parameters, initializers, and optimizers are all placed on the PaddlePaddle runtimes with the parameter server role. Everything else will be placed on the PaddlePaddle runtimes with the trainer role. This has the same functionality as the "trainer-parameter server" architecture of PaddlePaddle v0.10.0, but is more generic and flexible.

In the future, a more general placement algorithm should be implemented, which makes placements according to the input IR, and a model of device computation time and device communication time. Model parallelism requires the generic placement algorithm.


### PaddlePaddle Runtime

The PaddlePaddle runtime owns multiple devices (e.g., CPUs, GPUs) and runs the IR. The runtime does not need to do OP placement since it is already done by the converter.


### Local Training Architecture

The local training architecture will be the same as the distributed training architecture, the difference is that everything runs locally, and there is just one PaddlePaddle runtime:
Expand All @@ -132,9 +176,18 @@ The local training architecture will be the same as the distributed training arc

### Training Data

In PaddlePaddle v0.10.0, training data is typically read with a [data reader](../reader/README.md) from Python. This approach is no longer efficient when training in a distributed fashion since the Python process no longer runs on the same node with the trainer processes. The Python reader will need to read from the distributed filesystem (assuming it has the required access) and send to the trainers, doubling the network traffic.

When doing distributed training, the user can still use Python data reader: the training data are sent with `session.eval`. However this should be used for debugging purpose only. The users are encouraged to use the read data OPs.
In PaddlePaddle v0.10.0, training data is typically read
with [data reader](../reader/README.md) from Python. This approach is
no longer efficient when training distributedly since the Python
process no longer runs on the same node with the trainer processes,
the Python reader will need to read from the distributed filesystem
(assuming it has the access) and send to the trainers, doubling the
network traffic.

When doing distributed training, the user can still use Python data
reader: the training data are sent with `Executor.run`. However, should
be used for debugging purpose only. The users are encouraged to use
the read data OPs.


## References:
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Design Doc: Operation Graph Based Parameter Server
# Design Doc: Parameter Server

## Abstract

Expand All @@ -10,7 +10,7 @@ different purposes.
## Background

The previous implementations of the parameter server does not run a
subgraph. parameter initialization, optimizer computation, network
fluid sub-program. Parameter initialization, optimizer computation, network
communication and checkpointing are implemented twice on both the
trainer and the parameter server.

Expand All @@ -23,18 +23,17 @@ server becomes a natural extension.

## Design

### Graph Converter
### Distributed Transpiler

The *graph converter* converts the user-defined operation (OP) graph
into subgraphs to be scheduled on different nodes with the following
The *Distributed Transpiler* converts the user-defined fluid program
into sub-programs to be scheduled on different nodes with the following
steps:

1. OP placement: the OPs will be placed on different nodes according
to heuristic that minimizes estimated total computation
time. Currently we will use a simple heuristic that puts parameter
varable on parameter server workers and everything else on trainer
workers.

1. Add communication OPs to enable the communication between nodes.

We will need these OPs: *Send*, *Recv*, *Enqueue*, *Dequeue*.
Expand All @@ -48,8 +47,8 @@ After converting:

<img src="src/dist-graph.png" width="700"/>

1. The parameter variable W and it's optimizer subgraph are placed on the parameter server.
1. Operators are added to the subgraphs.
1. The parameter variable W and it's optimizer program are placed on the parameter server.
1. Operators are added to the program.
- *Send* sends data to the connected *Recv* operator. The
scheduler on the receive node will only schedule *Recv* operator
to run when the *Send* operator has ran (the *Send* OP will mark
Expand All @@ -64,39 +63,30 @@ After converting:
### Benefits

- Model parallelism become easier to implement: it's an extension to
the trainer - parameter server approach. we already have the
communication OPs, but need to extend the graph converter's
placement functionality.

the trainer - parameter server approach. We can have several "Transpilers"
to achieve different goals.
- User-defined optimizer is easier to add - user can now express it as
a subgraph.

a sub-program.
- No more duplication logic inside the trainer and the parameter
server mentioned in the background section.

### Challenges

- It might be hard for the graph converter to cut a general graph
(without any hint for which subgraph is the optimizer). We may need
to label which subgraph inside the OP graph is the optimizer.

- It's important to balance the parameter shards of on multiple
parameter server. If a single parameter is very big (some
word-embedding, fully connected, softmax layer), we need to
automatically partition the single parameter onto different
parameter servers when possible (only element-wise optimizer depends
on the parameter variable).
- In the "Aync SGD" figure, the "W" variable on the parameter server
could be read and wrote concurrently. See
[here](https://github.com/PaddlePaddle/Paddle/pull/6394) for more
details about concurrent program in fluid.

### Discussion

- In the "Aync SGD" figure, the "W" variable on the parameter server
could be read and wrote concurrently, what is our locking strategy?
E.g., each variable have a lock cpp method to be invoked by every
OP, or, have a lock OP.

- Can the Enqueue OP be implemented under our current tensor design
(puts the input tensor into the queue tensor)?

- *Dequeue* OP will have variable numbers of output (depends on the
`min_count` attribute), does our current design support it? (similar
question for the *Add* OP)
Expand Down
File renamed without changes.
File renamed without changes
File renamed without changes.
File renamed without changes
Binary file not shown.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
File renamed without changes.
File renamed without changes
Binary file not shown.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
File renamed without changes
File renamed without changes
File renamed without changes
Binary file not shown.
Binary file added doc/design/dist_refactor/src/remote_executor.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit 32b09b5

Please sign in to comment.