-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update design of dist train refactor #5776
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,11 +8,7 @@ PaddlePaddle version 0.10.0 uses the "trainer-parameter server" architecture. We | |
|
||
2. Model parallelism is hard: It would need all the if-else branches conditioned on the trainer ID to partition the model onto the trainers, and eventually manually writing out the inter-model-shard communication code to communicate between different trainers. | ||
|
||
3. The user can not directly specify the parameter update rule: need | ||
to modify the parameter server C++ code and compile a new binary. | ||
This adds complication for researchers: A lot of extra effort is | ||
required. Besides, the training job submission program | ||
may not allow running arbitrary binaries. | ||
3. The user can not directly specify the parameter update rule: This would need to modify the parameter server code and compile a new binary. This makes things more complicated for researchers: A lot of extra effort is required to make this work. Besides, the training job submission program may not allow running arbitrary binaries. | ||
|
||
This design doc discusses PaddlePaddle's new distributed training architecture that addresses the above mentioned limitations. | ||
|
||
|
@@ -31,103 +27,106 @@ There are two basic functionalities in the trainer program: | |
When we train using PaddlePaddle v0.10.0 in a distributed fashion, multiple instances of the same Python code are run on different nodes, hence both: the | ||
training logic as well as the neural network computation logic, is replicated. | ||
|
||
The tasks that should only run once all belong to the training logic, | ||
if we only replicate the neural network computation but do **not** | ||
replicate the training logic, the limitation could be solved. | ||
The tasks that only need to be run once belong to the training logic. Hence if we only replicate the neural network computation part, and do **not** | ||
replicate the training logic, the limitation mentioned above can be avoided. | ||
|
||
### Limitation 2 | ||
|
||
Model parallelism means that a single model is partitioned into different components and each node runs one of the component separately. This comes at the extra cost of managing the | ||
inter-model-shard communication between nodes. | ||
|
||
PaddlePaddle should be able to modify the neural network computation | ||
definition to support model parallelism automatically. However, the | ||
computation is only specified in Python code, and PaddlePaddle cannot | ||
modify Python code. | ||
PaddlePaddle should ideally be able to modify the neural network computation and figure out the support for model parallelism automatically. However, the | ||
computation is only specified in Python code which sits outside of PaddlePaddle, hence PaddlePaddle can not support the feature in this setup. | ||
|
||
Just like compiler uses an intermediate representation (IR) so that | ||
the programmer does not need to manually optimize their code in most of | ||
the cases - the compiler will optimize the IR: | ||
Similar to how a compiler uses an intermediate representation (IR) so that the programmer does not need to manually optimize their code for most of the cases, we can have an intermediate representation in PaddlePaddle as well. The compiler optimizes the IR as follows: | ||
|
||
<img src="src/compiler.png"/> | ||
|
||
We can have our own IR which is called [Program](../program.md). | ||
PaddlePaddle can support model parallel by | ||
converting the IR so the user no longer need to manually do it in | ||
Python: | ||
PaddlePaddle can support model parallelism by converting the IR so that the user no longer needs to manually perform the computation and operations in the Python component: | ||
|
||
<img src="src/paddle-compile.png"/> | ||
|
||
The IR for PaddlePaddle after refactoring is called a `Block`, it specifies the computation dependency graph and the variables used in the computation. | ||
|
||
### Limitation 3 | ||
|
||
The user can not directly specify the parameter update rule for the | ||
parameter server because the previous implementation hard coded that | ||
parameter server only do vector's optimization algorithm by | ||
configuration. The user can not specify the parameter server's | ||
computation layer by layer. | ||
The user can not directly specify the parameter update rule for the parameter server in the Python module, since the parameter server does not use the same computation definition as the trainer. Instead, the update rule is baked inside the parameter server. The user can not specify the update rule explicitly. | ||
|
||
This could be fixed by making the parameter server run a separated | ||
IR according to the trainer's variable (tensors, selectedrows) | ||
definition. | ||
|
||
the same | ||
computation definition of the trainer. For a detailed explanation, | ||
please | ||
see | ||
[Design Doc: Operation Graph-Based Parameter Server](./parameter_server.md) | ||
This could be fixed by making the parameter server run the same computation definition as the trainer (the user's Python module). For a detailed explanation, refer to this document - | ||
[Design Doc: Operation Graph Based Parameter Server](./parameter_server.md) | ||
|
||
## Distributed Training Architecture | ||
|
||
The revamped distributed training architecture can address the above discussed limitations. Below is the illustration of how it does so: | ||
|
||
<img src="src/distributed_architecture.png"/> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we explicitly mark the "Distribute Transpiler" in the graph? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. my understanding is: the transpiler works in the cluster instead of in local machine. did we discuss this issue? can you remind me of the reason why we put it in the local machine if so? |
||
|
||
The major components in the architecture are: *PaddlePaddle Python*, *PaddlePaddle converter* and *PaddlePaddle runtime*. | ||
The major components are: *Python API*, *Distribute Transpiler* and *Remote Executor*. | ||
|
||
### PaddlePaddle Python | ||
### Python API | ||
|
||
PaddlePaddle Python is the Python library that user's Python code invokes, to read the data. build the neural network topology, start training, etc. | ||
Python API is the Python library that user's Python code invokes, to read the data, build the neural network topology, and start training, etc. | ||
|
||
```Python | ||
paddle.init() | ||
input = paddle.op.recordIO("/home/data/mnist.recordio") # file stored on the cluster | ||
img, label = input[0], input[1] | ||
hidden = paddle.layer.fc(input=img, size=200, act=paddle.activation.Tanh()) | ||
prediction = paddle.layer.fc(input=img, size=10, act=paddle.activation.Softmax()) | ||
cost = paddle.layer.classification_cost(input=prediction, label=label) | ||
optimizer = paddle.optimizer.SGD(learning_rate=0.01) | ||
opts = optimizer.minimize(cost) | ||
exe = RemoteExecutor(num_trainer=3, num_ps=2, GPU_per_trainer=2, sync_batches=1) | ||
# this will init variable data on both server and trainer | ||
exe.run(framework.default_startup_program()) | ||
exe.sync() | ||
|
||
for i in range(1000): | ||
# feed data | ||
... | ||
cost, acc = exe.run(framework.default_main_program(), | ||
fetch_list=[avg_cost, acc_out]) | ||
print cost, acc | ||
images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype='float32') | ||
label = fluid.layers.data(name='label', shape=[1], dtype='int64') | ||
... | ||
predict = fluid.layers.fc(input=conv_pool_2, size=10, act="softmax") | ||
cost = fluid.layers.cross_entropy(input=predict, label=label) | ||
avg_cost = fluid.layers.mean(x=cost) | ||
optimizer = fluid.optimizer.Adam(learning_rate=0.01) | ||
optimizer.minimize(avg_cost) | ||
|
||
train_reader = paddle.batch( | ||
paddle.reader.shuffle( | ||
paddle.dataset.mnist.train(), buf_size=500), | ||
batch_size=BATCH_SIZE) | ||
|
||
place = fluid.CPUPlace() | ||
exe = fluid.Executor(place) | ||
|
||
for pass_id in range(10): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see we are using "python for" as loop here, not sure how's the progress on WhileOp, and are we trying to encourage user with WhileOp. if so, do we want to use WhileOp instead in this example? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a key point. For now, both recv side and send side is not using while op for iteration loop, which causes we have to run the python code instead of a I think this should be left to later discussions, we intend to make everything into the IR. |
||
for data in train_reader(): | ||
loss, acc = exe.run(trainer_prog, | ||
feed=feeder.feed(data), | ||
fetch_list=[avg_cost]) | ||
``` | ||
|
||
The code above is a typical Python trainer code, the neural network | ||
topology is built using helper functions such as | ||
`paddle.layer.fc`. The training is done by calling `Executor.run` | ||
The code above is a typical local training program, the "Training Program" is built using helper functions such as | ||
`fluid.layer.fc`. The training is done by calling `Executor.run` | ||
iteratively. | ||
|
||
#### RemoteExecutor | ||
For more details, the implementation of IR is [Program](../program.md), and `ProgramDesc` is the protobuf type. | ||
|
||
[Executor](../executor.md) simply runs the `ProgramDesc`. For local training you generally use | ||
`Executor` to run the program locally. For any kind of distributed training, you can use | ||
`RemoteExecutor` to specify desired distributed training method with some optional arguments. | ||
|
||
### Distributed Transpiler | ||
|
||
The Distributed Transpiler automatically converts the IR (in protobuf format) to partitioned IRs. Then | ||
the Remote Executor dispatches the new IRs to Remote Executors across the cluster. | ||
Below are the steps that are followed : | ||
|
||
1. User only need to change `Executor` to `RemoteExecutor` to change local program to distributed program. | ||
1. `RemoteExecutor` calls `Distributed Transpiler` to "transpile" user's program to several IRs representing a | ||
distributed training program: | ||
1. Parse configurations from `RemoteExecutor`. | ||
1. Determine the type of distributed program, can be DataParallelism, ModelParallelism or Streaming. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How is the of distributed program determined, is it set by the user? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From configurations of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, thanks! |
||
1. Partition the `ProgramDesc` according to type and add `send` / `recv` OP pair on the boundaries. For | ||
DataParallelism type for example, it removes the optimization operators and add a `send` OP to the | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. " For DataParallelism type for example" => "Take DataParallelism type as an example" |
||
"trainer" role, then add the optimization operators to the parameter server role within the `recv` OP. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do we know if an operator is an optimization operators? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, thanks! It currently works, but I think we need to aim for ProgramDesc for input (no Python) and ProgramDesc for output (open for discussion). |
||
1. Dispatch the partitioned graph to different `RemoteExecutor` in the cluster. | ||
1. `RemoteExecutor` on each node run the received `ProgramDesc` utill the end. | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we also give an example ProgramDesc generated by transpiler from above sample code? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please see the dist-graph.png in |
||
|
||
As shown in the graph, `RemoteExecutor.run` sends the IR to the | ||
PaddlePaddle cluster for Execution. You can also use parameter | ||
`fetch_list` to interactively fetch variable back to local for | ||
### RemoteExecutor | ||
|
||
As shown in the graph, `RemoteExecutor.run` sends the IR to the cluster for Execution. | ||
You can also use parameter `fetch_list` to interactively fetch variable back to local for | ||
log printing. | ||
|
||
The Python `RemoteExecutor` is derived from `Executor` class. | ||
For more information about `RemoteExecutor`, please | ||
see [Design Doc: RemoteExecutor](./remote_executor.md). | ||
|
||
The `RemoteExecutor.run` interface defination is: | ||
|
||
```python | ||
run(self, | ||
|
@@ -151,58 +150,15 @@ run(self, | |
`JobDesc` object describe the distributed job resource specification to run on | ||
Cluster environment. | ||
|
||
By default, `Executor.run` starts a PaddlePaddle Cloud | ||
[TrainingJob](https://github.com/PaddlePaddle/cloud/blob/develop/doc/autoscale/README.md#training-job-resource), | ||
or you can run each component in the | ||
executor by your own method: | ||
|
||
- Data Parallelism | ||
```python | ||
if os.getenv('PLACE_PSERVER'): | ||
exe.run_pserver() | ||
elif os.getenv('PLACE_TRAINER'): | ||
exe.run_trainer() | ||
``` | ||
- Model Parrallelism | ||
```python | ||
for part in exe.get_parralle_parts(): | ||
exe.run_part(part) | ||
``` | ||
|
||
#### Program and Executor | ||
|
||
As mentioned above, the implementation of IR is [Program](../program.md). | ||
|
||
[Executor](../executor.md) converts and parses the IR to a preferred | ||
graph for final execution. For local training you generally use | ||
`Executor` to run the graph locally. For any kind of distributed | ||
training, you can use `RemoteExecutor` to specify desired distributed | ||
training method with some optional arguments. | ||
|
||
### PaddlePaddle Converter | ||
|
||
The PaddlePaddle converter automatically converts the IR in the request (IR and evaluation inputs/targets) from PaddlePaddle Python to partitioned IRs and dispatches the new IRs and evaluation inputs/targets to different PaddlePaddle runtimes. Below are the steps that are followed : | ||
|
||
1. Add a `feed` OP that feeds the eval inputs, and a `fetch` OP that fetches the eval targets to the IR. | ||
|
||
1. Extract a new computation (sub)graph with `feed` and `fetch` OP as | ||
the boundary. The runtime does not need to run the OP that is not | ||
dependent on the `fetch` OP. | ||
|
||
3. Optimize the computation graph. | ||
|
||
4. Place the OPs in the graph onto different devices on different PaddlePaddle runtime according to a placement algorithm and the device constraints specified by the user. | ||
|
||
5. Partition the graph according to runtime boundaries and add `send` / `recv` OP pair on the runtime boundaries. | ||
|
||
6. Dispatch the partitioned graph to different PaddlePaddle runtimes. | ||
|
||
7. PaddlePaddle runtimes with the `fetch` OP reports evaluation results back to the converter, the converter reports the evaluation results back to the PaddlePaddle Python. | ||
<img src="src/remote_executor.png"/> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. bad link |
||
|
||
The output IRs will be cached to optimize the conversion latency. | ||
`RemoteExecutor.run` sends the `ProgramDesc` and | ||
[TrainingJob](https://github.com/PaddlePaddle/cloud/blob/develop/doc/autoscale/README.md#training-job-resource) | ||
to a server in the cluster which executes `RemoteExecutor.listen`. This server is responsible | ||
to start the final Kubernetes Jobs to run the different role of `ProgramDesc`. | ||
|
||
|
||
#### Placement Algorithm | ||
### Placement Algorithm | ||
|
||
Our first implementation will only support "trainer-parameter server" placement: the parameters, initializers, and optimizers are all placed on the PaddlePaddle runtimes with the parameter server role. Everything else will be placed on the PaddlePaddle runtimes with the trainer role. This has the same functionality as the "trainer-parameter server" architecture of PaddlePaddle v0.10.0, but is more generic and flexible. | ||
|
||
|
@@ -218,7 +174,13 @@ The local training architecture will be the same as the distributed training arc | |
|
||
### Training Data | ||
|
||
In PaddlePaddle v0.10.0, training data is typically read with a [data reader](../reader/README.md) from Python. This approach is no longer efficient when training in a distributed fashion since the Python process no longer runs on the same node with the trainer processes. The Python reader will need to read from the distributed filesystem (assuming it has the required access) and send to the trainers, doubling the network traffic. | ||
In PaddlePaddle v0.10.0, training data is typically read | ||
with [data reader](../reader/README.md) from Python. This approach is | ||
no longer efficient when training distributedly since the Python | ||
process no longer runs on the same node with the trainer processes, | ||
the Python reader will need to read from the distributed filesystem | ||
(assuming it has the access) and send to the trainers, doubling the | ||
network traffic. | ||
|
||
When doing distributed training, the user can still use Python data | ||
reader: the training data are sent with `Executor.run`. However, should | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't quite get this part. Doesn't the pserver run a different ProgramDesc from Trainer's? what does "same computation definition" mean here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.