Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update design of dist train refactor #5776

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 45 additions & 20 deletions doc/design/refactor/distributed_architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ limitations:
write the inter-model-shard communication code.

3. The user can not directly specify the parameter update rule: need
to modify the parameter server C++ code and compile a new
binary. This adds complication for researchers: A lot of extra
effort is required. Besides, the training job submission program
to modify the parameter server C++ code and compile a new binary.
This adds complication for researchers: A lot of extra effort is
required. Besides, the training job submission program
may not allow running arbitrary binaries.

This design doc discusses PaddlePaddle's new distributed training
Expand All @@ -44,7 +44,7 @@ replicated Python instances are running on different nodes: both the
training logic and the neural network computation is replicated.

The tasks that should only run once all belong to the training logic,
if we only replicate the neural network computation, but do **not**
if we only replicate the neural network computation but do **not**
replicate the training logic, the limitation could be solved.

### Limitation 2
Expand All @@ -53,13 +53,13 @@ Model parallelism means running a single model on multiple nodes by
partitioning the model onto different nodes and managing the
inter-model-shard communications.

PaddlePaddle should be able to modify the nerual network computation
PaddlePaddle should be able to modify the neural network computation
definition to support model parallelism automatically. However, the
computation is only specified in Python code, and PaddlePaddle can not
computation is only specified in Python code, and PaddlePaddle cannot
modify Python code.

Just like compiler uses a intermediate representation (IR) so that
programmer does not need to manually optimize their code in most of
Just like compiler uses an intermediate representation (IR) so that
the programmer does not need to manually optimize their code in most of
the cases - the compiler will optimize the IR:

<img src="src/compiler.png"/>
Expand All @@ -75,20 +75,20 @@ Python:
### Limitation 3

The user can not directly specify the parameter update rule for the
parameter server because the previous implementaion hard coded that
parameter server because the previous implementation hard coded that
parameter server only do vector's optimization algorithm by
configuration. The user can not specify the parameter server's
computation layer by layer.

This could be fixed by making the parameter server run a separated
IR according to the trainer's varialble (tensors, selectedrows)
defination.
IR according to the trainer's variable (tensors, selectedrows)
definition.

the same
computation definition as the trainer. For a detailed explanation,
computation definition of the trainer. For a detailed explanation,
please
see
[Design Doc: Operation Graph Based Parameter Server](./parameter_server.md)
[Design Doc: Operation Graph-Based Parameter Server](./parameter_server.md)

## Distributed Training Architecture

Expand Down Expand Up @@ -136,18 +136,43 @@ iteratively.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also give an example ProgramDesc generated by transpiler from above sample code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see the dist-graph.png in parameter_server.md.

As shown in the graph, `RemoteExecutor.run` sends the IR to the
PaddlePaddle cluster for Execution. You can also use parameter
`fetch_list` to interactively fetch varirable back to local for
`fetch_list` to interactively fetch variable back to local for
log printing.

The Python `RemoteExecutor` is derived from `Executor` class.
For more information about `RemoteExecutor`, please
see [Design Doc: RemoteExecutor](./remote_executor.md).

The `RemoteExecutor.run` interface defination is:

```python
run(self,
program=None,
feed=None,
fetch_list=None,
feed_var_name='feed',
fetch_var_name='fetch',
job_desc=JobDesc(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we want to separate job creation and run, since run can be run multiple times on a same job (e.g., multiple mini-batch steps).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run call will probably do both submitting the job to k8s cluster and fetch output variables. The problem is submitting job will fail if the job already exists, and run may be called multiple times to fetch output. Adding a new interface will cause cluster training code become much different to local training.

Is it fine to do job submission when creating RemoteExecutor?

Copy link
Contributor

@helinwang helinwang Nov 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need to support many actions for a job (e.g., creation, view state, deletion), maybe we can use CLI to manage resource (can be a Python package), the CLI returns a job ID and the Python script can use the job ID in run?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OR implement like below is more reasonable? The python program is running on every node, and the k8s job is still managed by CLI which is the same as current implementation?

job_desc = JobDesc(num_trainer=4, num_ps=2, GPU_per_trainer=2)
pserver = RemoteExecutor(placement="/pserver/*", job_desc=job_desc)
trainer = RemoteExecutor(placement="/trainer/*", job_desc=job_desc)

placement = os.getenv('PADDLE_PLACEMENT')
if placement.startswith('/pserver'):
    pserver.run(startup_program)
    pserver.run(optimize_program)
    pserver.wait()
elif placement.startswith('/trainer'):
    for pass_num in xrange(num_passes):
        trainer.run(main_program)
        trainer.wait()    # wait for sync SGD

Copy link
Contributor

@helinwang helinwang Nov 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please refer to this graph, there is only a single Python instance that controls the cpp instances in the cluster. The Python instance could run locally or in the cluster. Please see Limitation 1 on why a single Python instances is better.

The Python code send the entire graph to a service process in the cluster that will do the graph conversion, placement and sending sub-graph to each runtime instance. So I think the Python code that the user writes should not contain the placement logic below:

placement = os.getenv('PADDLE_PLACEMENT')
if placement.startswith('/pserver'):
    pserver.run(startup_program)
    pserver.run(optimize_program)
    pserver.wait()
elif placement.startswith('/trainer'):
    for pass_num in xrange(num_passes):
        trainer.run(main_program)
        trainer.wait()    # wait for sync SGD

And only one executor is necessary. So maybe something like:

job = paddle.createJob(num_trainer=4, num_ps=2, GPU_per_trainer=2) # or create using CLI.
exe = paddle.RemoteExecutor(job)
exe.run(main_program)

Copy link
Contributor

@putcn putcn Nov 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. agree that local python should only do IR creation and submission.
  2. I prefer not to differentiate local training job and remote training job, the only difference to the user should be the URI of computing resource(device).
  3. And internally, we should not differentiate remote/local devices either.
  4. I'm proposing a new class Orchestrator, which is more or less "RemoteExecutor" as mentioned by Helin, the naming is just to hide the idea that where it lives.
  5. To unify local or remote training, we might want to treat all training "Remote", so that "local" training is just a special case that resources(devices) are labeled with uri "localhost"
  6. To make this happen, Orchestrator, Executors should all expose gRPC or web services to accept ProgramDesc and report back with task status.
  7. the whole IR should be submitted to an "Orchestrator", which see the whole picture of the resource it has (local/remote executors), do graph partitioning and send sub IRs to executors(locally or remotely) base on the affinity config (placement strategy)
  8. Orchestrator should be a gRPC service, which manages executors list and executors affinity would be fetched by orchestrator talking to executor as below:
# the orchestrator instance here is just a client of Orchestrator which runs in machine_a.cluster at port 2222
orchestrator = paddle.Orchestrator(uri="machine_a.cluster:2222") 
# add an executor. orchestrator will talk to Executor and fetch device type(CPU, GPU) and other device hardware data
orchestrator.addExecutors(uri="machine_a.cluster:3333") 
 # add another remote Executor
orchestrator.addExecutors(uri="machine_b.cluster:3333")  
  1. then the job will be started with following:
job_desc = paddle.createJob(num_trainer=4, num_ps=2) # there should be other config regarding affinity. 
# also, should we wrap job and main_program together? I'm not sure
orchestrator.run(main_program, job_desc) # submit the main_program and job_desc to remote orchestrator

if we just need to run local training job, create an orchestrator in localhost and add localhost Executors
11. the benefit is flexibility and separation of concern.
12. when we need to deploy paddle in k8s, it is not necessary to have a designated "master", since any node can be "master", pods may start with an arbitrary sequence. the first one becomes the orchestrator, when new pods are available, just add addExecutors, executors will check what kind of hardware it is assigned, talk to the orchestrator about its hardware feature. this kind of service discovery logic can be done via a k8s adaptor, which watches the pods for this job and calls "addExecutor" or "removeExecutor".
13. I know this comment is too long to read, so thanks for reading till the end :P

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just discussed with @helinwang here is our ideas we want to talk to you with

  1. for now we are going to focus on how we are going to launch training with bare metals since this is the fundamental of cluster training.
  2. there will be 2 major roles in cluster, Orchestrator and Executor
  3. To launch a Orchestrator in a cluster node
./orchestrator -port 8080 -ip 192.168.1.2

now orchestrator will start listening for incoming resource registration and protobuf
4. to launch an Executor in a cluster node

./executor -orchestrator_uri 192.168.1.2:8080

now executor is created and register itself with orchestrator with its hardware features
5. to start a cluster training

python train.py 192.168.1.2:8080

now protobuf (IR) will be sent to the orchestrator; orchestrator change/cut the IR and set sub IR to executors. base on the hardware features of executors, they will be set to different roles(pserver or trainer)

  1. to start a local training
python train.py

IR will be delivered to a lite orchestrator which only do graph change to fit multi thread or multi GPU case, then the updated graph will be delivered to executors and run.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are only considering Kubernetes, the orchestrator is Kubernetes controller and scheduler; and the executor can be the entry of the job image.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Python code example above is:

for data in train_reader():
  loss, acc = exe.run(trainer_prog,
                                   feed=feeder.feed(data),
                                   fetch_list=[avg_cost])

The run is called iteratively, so the job_desc parameter is passed in multiple times. Is it necessary to couple job_desc with run? Another possible approach is to use another set of API to manage remote resource lifecycle / logs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See discussions here, because we use python code to form the iterate loop, so "remote executor" can not just run a ProgramDesc to form a full training job. A possible method is to use while_op, but this will make user API a bit complex.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the discussion, Python loop feels native but not as efficient on remote training, I think we should provide both options to the user, and let the user to choose. So we probably still need to support Python loop well.

Does a training job can only be one ProgramDesc run (e.g, one run with while op)? I think we don't need to couple training job with ProgramDesc run. The training job can just be some resource, waiting to run however many ProgramDescs, it preserves the state between multiples ProgramDesc runs. In this way we can support Python loop in remote training.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with support both methods.

jobname,
num_trainer,
num_pserver,
cpu_per_trainer,
gpu_per_trainer,
mem_per_trainer,
cpu_per_pserver,
mem_per_pserver
))
```

`JobDesc` object describe the distributed job resource specification to run on
Cluster environment.

By default, `Executor.run` starts a PaddlePaddle Cloud
[TrainingJob](https://github.com/PaddlePaddle/cloud/blob/develop/doc/autoscale/README.md#training-job-resource), or you can run each component in the
[TrainingJob](https://github.com/PaddlePaddle/cloud/blob/develop/doc/autoscale/README.md#training-job-resource),
or you can run each component in the
executor by your own method:

- Data Parrallelism
- Data Parallelism
```python
if os.getenv('PLACE_PSERVER'):
exe.run_pserver()
Expand All @@ -164,10 +189,10 @@ executor by your own method:

As mentioned above, the implementation of IR is [Program](../program.md).

[Executor](../executor.md) converts and parses the IR to a prefered
[Executor](../executor.md) converts and parses the IR to a preferred
graph for final execution. For local training you generally use
`Executor` to run the graph locally. For any kind of distributed
training, you can use `RemoteExecutor` to specify desired distributed
training, you can use `RemoteExecutor` to specify desired distributed
training method with some optional arguments.

### PaddlePaddle Converter
Expand All @@ -182,7 +207,7 @@ to different PaddlePaddle runtimes. Below are the steps:

1. Extract a new computation (sub)graph with `feed` and `fetch` OP as
the boundary. The runtime does not need to run the OP that is not
dependent by the `fetch` OP.
dependent on the `fetch` OP.

1. Optimizes the computation graph.

Expand Down Expand Up @@ -238,7 +263,7 @@ the Python reader will need to read from the distributed filesystem
network traffic.

When doing distributed training, the user can still use Python data
reader: the training data are sent with `Executor.run`. However should
reader: the training data are sent with `Executor.run`. However, should
be used for debugging purpose only. The users are encouraged to use
the read data OPs.

Expand Down