-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update design of dist train refactor #5776
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -61,68 +61,110 @@ The revamped distributed training architecture can address the above discussed l | |
|
||
<img src="src/distributed_architecture.png"/> | ||
|
||
The major components in the architecture are: *PaddlePaddle Python*, *PaddlePaddle converter* and *PaddlePaddle runtime*. | ||
The major components are: *Python API*, *Distribute Transpiler* and *Remote Executor*. | ||
|
||
### PaddlePaddle Python | ||
### Python API | ||
|
||
PaddlePaddle Python is the Python library that user's Python code invokes, to read the data. build the neural network topology, start training, etc. | ||
Python API is the Python library that user's Python code invokes, to read the data, build the neural network topology, and start training, etc. | ||
|
||
```Python | ||
paddle.init() | ||
input = paddle.op.recordIO("/home/data/mnist.recordio") # file stored on the cluster | ||
img, label = input[0], input[1] | ||
hidden = paddle.layer.fc(input=img, size=200, act=paddle.activation.Tanh()) | ||
prediction = paddle.layer.fc(input=img, size=10, act=paddle.activation.Softmax()) | ||
cost = paddle.layer.classification_cost(input=prediction, label=label) | ||
optimizer = paddle.optimizer.SGD(cost, learning_rate=0.01) | ||
session = paddle.session.NewRemote(num_trainer=3, num_ps=2, GPU_per_trainer=1) | ||
for i in range(1000): | ||
_, cost_val = session.eval(targets=[cost, optimizer]) | ||
print cost_val | ||
images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype='float32') | ||
label = fluid.layers.data(name='label', shape=[1], dtype='int64') | ||
... | ||
predict = fluid.layers.fc(input=conv_pool_2, size=10, act="softmax") | ||
cost = fluid.layers.cross_entropy(input=predict, label=label) | ||
avg_cost = fluid.layers.mean(x=cost) | ||
optimizer = fluid.optimizer.Adam(learning_rate=0.01) | ||
optimizer.minimize(avg_cost) | ||
|
||
train_reader = paddle.batch( | ||
paddle.reader.shuffle( | ||
paddle.dataset.mnist.train(), buf_size=500), | ||
batch_size=BATCH_SIZE) | ||
|
||
place = fluid.CPUPlace() | ||
exe = fluid.Executor(place) | ||
|
||
for pass_id in range(10): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see we are using "python for" as loop here, not sure how's the progress on WhileOp, and are we trying to encourage user with WhileOp. if so, do we want to use WhileOp instead in this example? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a key point. For now, both recv side and send side is not using while op for iteration loop, which causes we have to run the python code instead of a I think this should be left to later discussions, we intend to make everything into the IR. |
||
for data in train_reader(): | ||
loss, acc = exe.run(trainer_prog, | ||
feed=feeder.feed(data), | ||
fetch_list=[avg_cost]) | ||
``` | ||
|
||
The above code is what a typical Python trainer code is, the neural network topology is built using the helper functions such as `paddle.layer.fc`. Training is done by calling `session.eval` iteratively. | ||
|
||
#### session.eval | ||
|
||
As shown in the graph, `session.eval` sends the IR and the evaluation inputs or targets to the PaddlePaddle cluster for evaluation. | ||
The targets can be any variable in the computation graph. When the target is say, the `optimizer` variable, the neural network will be optimized once. When the target is the `cost` variable, `session.eval` returns the cost value. Based on what the target is, an appropriate action is taken. | ||
|
||
The Python `session` is a wrapper of the C++ `Session` class. For more information about `Session`, refer to this document - [Design Doc: Session](./session.md). | ||
|
||
### PaddlePaddle Converter | ||
|
||
The PaddlePaddle converter automatically converts the IR in the request (IR and evaluation inputs/targets) from PaddlePaddle Python to partitioned IRs and dispatches the new IRs and evaluation inputs/targets to different PaddlePaddle runtimes. Below are the steps that are followed : | ||
|
||
1. Add a `feed` OP that feeds the eval inputs, and a `fetch` OP that fetches the eval targets to the IR. | ||
|
||
2. Extract a new computation (sub)graph with the `feed` and `fetch` OPs as the boundary. The runtime does not need to run the OP that is not dependent on the `fetch` OP. | ||
|
||
3. Optimize the computation graph. | ||
|
||
4. Place the OPs in the graph onto different devices on different PaddlePaddle runtime according to a placement algorithm and the device constraints specified by the user. | ||
|
||
5. Partition the graph according to runtime boundaries and add `send` / `recv` OP pair on the runtime boundaries. | ||
The code above is a typical local training program, the "Training Program" is built using helper functions such as | ||
`fluid.layer.fc`. The training is done by calling `Executor.run` | ||
iteratively. | ||
|
||
For more details, the implementation of IR is [Program](../program.md), and `ProgramDesc` is the protobuf type. | ||
|
||
[Executor](../executor.md) simply runs the `ProgramDesc`. For local training you generally use | ||
`Executor` to run the program locally. For any kind of distributed training, you can use | ||
`RemoteExecutor` to specify desired distributed training method with some optional arguments. | ||
|
||
### Distributed Transpiler | ||
|
||
The Distributed Transpiler automatically converts the IR (in protobuf format) to partitioned IRs. Then | ||
the Remote Executor dispatches the new IRs to Remote Executors across the cluster. | ||
Below are the steps that are followed : | ||
|
||
1. User only need to change `Executor` to `RemoteExecutor` to change local program to distributed program. | ||
1. `RemoteExecutor` calls `Distributed Transpiler` to "transpile" user's program to several IRs representing a | ||
distributed training program: | ||
1. Parse configurations from `RemoteExecutor`. | ||
1. Determine the type of distributed program, can be DataParallelism, ModelParallelism or Streaming. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How is the of distributed program determined, is it set by the user? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From configurations of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, thanks! |
||
1. Partition the `ProgramDesc` according to type and add `send` / `recv` OP pair on the boundaries. For | ||
DataParallelism type for example, it removes the optimization operators and add a `send` OP to the | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. " For DataParallelism type for example" => "Take DataParallelism type as an example" |
||
"trainer" role, then add the optimization operators to the parameter server role within the `recv` OP. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do we know if an operator is an optimization operators? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, thanks! It currently works, but I think we need to aim for ProgramDesc for input (no Python) and ProgramDesc for output (open for discussion). |
||
1. Dispatch the partitioned graph to different `RemoteExecutor` in the cluster. | ||
1. `RemoteExecutor` on each node run the received `ProgramDesc` utill the end. | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we also give an example ProgramDesc generated by transpiler from above sample code? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please see the dist-graph.png in |
||
|
||
### RemoteExecutor | ||
|
||
As shown in the graph, `RemoteExecutor.run` sends the IR to the cluster for Execution. | ||
You can also use parameter `fetch_list` to interactively fetch variable back to local for | ||
log printing. | ||
|
||
The Python `RemoteExecutor` is derived from `Executor` class. | ||
|
||
```python | ||
run(self, | ||
program=None, | ||
feed=None, | ||
fetch_list=None, | ||
feed_var_name='feed', | ||
fetch_var_name='fetch', | ||
job_desc=JobDesc( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we want to separate job creation and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Is it fine to do job submission when creating There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably need to support many actions for a job (e.g., creation, view state, deletion), maybe we can use CLI to manage resource (can be a Python package), the CLI returns a job ID and the Python script can use the job ID in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OR implement like below is more reasonable? The python program is running on every node, and the k8s job is still managed by CLI which is the same as current implementation? job_desc = JobDesc(num_trainer=4, num_ps=2, GPU_per_trainer=2)
pserver = RemoteExecutor(placement="/pserver/*", job_desc=job_desc)
trainer = RemoteExecutor(placement="/trainer/*", job_desc=job_desc)
placement = os.getenv('PADDLE_PLACEMENT')
if placement.startswith('/pserver'):
pserver.run(startup_program)
pserver.run(optimize_program)
pserver.wait()
elif placement.startswith('/trainer'):
for pass_num in xrange(num_passes):
trainer.run(main_program)
trainer.wait() # wait for sync SGD There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please refer to this graph, there is only a single Python instance that controls the cpp instances in the cluster. The Python instance could run locally or in the cluster. Please see Limitation 1 on why a single Python instances is better. The Python code send the entire graph to a service process in the cluster that will do the graph conversion, placement and sending sub-graph to each runtime instance. So I think the Python code that the user writes should not contain the placement logic below: placement = os.getenv('PADDLE_PLACEMENT')
if placement.startswith('/pserver'):
pserver.run(startup_program)
pserver.run(optimize_program)
pserver.wait()
elif placement.startswith('/trainer'):
for pass_num in xrange(num_passes):
trainer.run(main_program)
trainer.wait() # wait for sync SGD And only one executor is necessary. So maybe something like: job = paddle.createJob(num_trainer=4, num_ps=2, GPU_per_trainer=2) # or create using CLI.
exe = paddle.RemoteExecutor(job)
exe.run(main_program) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
# the orchestrator instance here is just a client of Orchestrator which runs in machine_a.cluster at port 2222
orchestrator = paddle.Orchestrator(uri="machine_a.cluster:2222")
# add an executor. orchestrator will talk to Executor and fetch device type(CPU, GPU) and other device hardware data
orchestrator.addExecutors(uri="machine_a.cluster:3333")
# add another remote Executor
orchestrator.addExecutors(uri="machine_b.cluster:3333")
job_desc = paddle.createJob(num_trainer=4, num_ps=2) # there should be other config regarding affinity.
# also, should we wrap job and main_program together? I'm not sure
orchestrator.run(main_program, job_desc) # submit the main_program and job_desc to remote orchestrator if we just need to run local training job, create an orchestrator in localhost and add localhost Executors There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just discussed with @helinwang here is our ideas we want to talk to you with
./orchestrator -port 8080 -ip 192.168.1.2 now orchestrator will start listening for incoming resource registration and protobuf ./executor -orchestrator_uri 192.168.1.2:8080 now executor is created and register itself with orchestrator with its hardware features python train.py 192.168.1.2:8080 now protobuf (IR) will be sent to the orchestrator; orchestrator change/cut the IR and set sub IR to executors. base on the hardware features of executors, they will be set to different roles(pserver or trainer)
python train.py IR will be delivered to a lite orchestrator which only do graph change to fit multi thread or multi GPU case, then the updated graph will be delivered to executors and run. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we are only considering Kubernetes, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Python code example above is: for data in train_reader():
loss, acc = exe.run(trainer_prog,
feed=feeder.feed(data),
fetch_list=[avg_cost]) The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See discussions here, because we use python code to form the iterate loop, so "remote executor" can not just run a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the discussion, Python loop feels native but not as efficient on remote training, I think we should provide both options to the user, and let the user to choose. So we probably still need to support Python loop well. Does a training job can only be one ProgramDesc run (e.g, one run with while op)? I think we don't need to couple training job with ProgramDesc run. The training job can just be some resource, waiting to run however many ProgramDescs, it preserves the state between multiples ProgramDesc runs. In this way we can support Python loop in remote training. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree with support both methods. |
||
jobname, | ||
num_trainer, | ||
num_pserver, | ||
cpu_per_trainer, | ||
gpu_per_trainer, | ||
mem_per_trainer, | ||
cpu_per_pserver, | ||
mem_per_pserver | ||
)) | ||
``` | ||
|
||
6. Dispatch the partitioned graph to different PaddlePaddle runtimes. | ||
`JobDesc` object describe the distributed job resource specification to run on | ||
Cluster environment. | ||
|
||
7. PaddlePaddle runtimes with the `fetch` OP reports evaluation results back to the converter, the converter reports the evaluation results back to the PaddlePaddle Python. | ||
<img src="src/remote_executor.png"/> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. bad link |
||
|
||
The output IRs will be cached to optimize the conversion latency. | ||
`RemoteExecutor.run` sends the `ProgramDesc` and | ||
[TrainingJob](https://github.com/PaddlePaddle/cloud/blob/develop/doc/autoscale/README.md#training-job-resource) | ||
to a server in the cluster which executes `RemoteExecutor.listen`. This server is responsible | ||
to start the final Kubernetes Jobs to run the different role of `ProgramDesc`. | ||
|
||
|
||
#### Placement Algorithm | ||
### Placement Algorithm | ||
|
||
Our first implementation will only support "trainer-parameter server" placement: the parameters, initializers, and optimizers are all placed on the PaddlePaddle runtimes with the parameter server role. Everything else will be placed on the PaddlePaddle runtimes with the trainer role. This has the same functionality as the "trainer-parameter server" architecture of PaddlePaddle v0.10.0, but is more generic and flexible. | ||
|
||
In the future, a more general placement algorithm should be implemented, which makes placements according to the input IR, and a model of device computation time and device communication time. Model parallelism requires the generic placement algorithm. | ||
|
||
|
||
### PaddlePaddle Runtime | ||
|
||
The PaddlePaddle runtime owns multiple devices (e.g., CPUs, GPUs) and runs the IR. The runtime does not need to do OP placement since it is already done by the converter. | ||
|
||
|
||
### Local Training Architecture | ||
|
||
The local training architecture will be the same as the distributed training architecture, the difference is that everything runs locally, and there is just one PaddlePaddle runtime: | ||
|
@@ -132,9 +174,18 @@ The local training architecture will be the same as the distributed training arc | |
|
||
### Training Data | ||
|
||
In PaddlePaddle v0.10.0, training data is typically read with a [data reader](../reader/README.md) from Python. This approach is no longer efficient when training in a distributed fashion since the Python process no longer runs on the same node with the trainer processes. The Python reader will need to read from the distributed filesystem (assuming it has the required access) and send to the trainers, doubling the network traffic. | ||
|
||
When doing distributed training, the user can still use Python data reader: the training data are sent with `session.eval`. However this should be used for debugging purpose only. The users are encouraged to use the read data OPs. | ||
In PaddlePaddle v0.10.0, training data is typically read | ||
with [data reader](../reader/README.md) from Python. This approach is | ||
no longer efficient when training distributedly since the Python | ||
process no longer runs on the same node with the trainer processes, | ||
the Python reader will need to read from the distributed filesystem | ||
(assuming it has the access) and send to the trainers, doubling the | ||
network traffic. | ||
|
||
When doing distributed training, the user can still use Python data | ||
reader: the training data are sent with `Executor.run`. However, should | ||
be used for debugging purpose only. The users are encouraged to use | ||
the read data OPs. | ||
|
||
|
||
## References: | ||
|
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we explicitly mark the "Distribute Transpiler" in the graph?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my understanding is: the transpiler works in the cluster instead of in local machine. did we discuss this issue? can you remind me of the reason why we put it in the local machine if so?