-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update design of dist train refactor #5776
Update design of dist train refactor #5776
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
fetch_list=None, | ||
feed_var_name='feed', | ||
fetch_var_name='fetch', | ||
job_desc=JobDesc( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we want to separate job creation and run
, since run
can be run multiple times on a same job (e.g., multiple mini-batch steps).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
run
call will probably do both submitting the job to k8s cluster and fetch output variables. The problem is submitting job will fail if the job already exists, and run
may be called multiple times to fetch output. Adding a new interface will cause cluster training code become much different to local training.
Is it fine to do job submission when creating RemoteExecutor
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably need to support many actions for a job (e.g., creation, view state, deletion), maybe we can use CLI to manage resource (can be a Python package), the CLI returns a job ID and the Python script can use the job ID in run
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OR implement like below is more reasonable? The python program is running on every node, and the k8s job is still managed by CLI which is the same as current implementation?
job_desc = JobDesc(num_trainer=4, num_ps=2, GPU_per_trainer=2)
pserver = RemoteExecutor(placement="/pserver/*", job_desc=job_desc)
trainer = RemoteExecutor(placement="/trainer/*", job_desc=job_desc)
placement = os.getenv('PADDLE_PLACEMENT')
if placement.startswith('/pserver'):
pserver.run(startup_program)
pserver.run(optimize_program)
pserver.wait()
elif placement.startswith('/trainer'):
for pass_num in xrange(num_passes):
trainer.run(main_program)
trainer.wait() # wait for sync SGD
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please refer to this graph, there is only a single Python instance that controls the cpp instances in the cluster. The Python instance could run locally or in the cluster. Please see Limitation 1 on why a single Python instances is better.
The Python code send the entire graph to a service process in the cluster that will do the graph conversion, placement and sending sub-graph to each runtime instance. So I think the Python code that the user writes should not contain the placement logic below:
placement = os.getenv('PADDLE_PLACEMENT')
if placement.startswith('/pserver'):
pserver.run(startup_program)
pserver.run(optimize_program)
pserver.wait()
elif placement.startswith('/trainer'):
for pass_num in xrange(num_passes):
trainer.run(main_program)
trainer.wait() # wait for sync SGD
And only one executor is necessary. So maybe something like:
job = paddle.createJob(num_trainer=4, num_ps=2, GPU_per_trainer=2) # or create using CLI.
exe = paddle.RemoteExecutor(job)
exe.run(main_program)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- agree that local python should only do IR creation and submission.
- I prefer not to differentiate local training job and remote training job, the only difference to the user should be the URI of computing resource(device).
- And internally, we should not differentiate remote/local devices either.
- I'm proposing a new class Orchestrator, which is more or less "RemoteExecutor" as mentioned by Helin, the naming is just to hide the idea that where it lives.
- To unify local or remote training, we might want to treat all training "Remote", so that "local" training is just a special case that resources(devices) are labeled with uri "localhost"
- To make this happen, Orchestrator, Executors should all expose gRPC or web services to accept ProgramDesc and report back with task status.
- the whole IR should be submitted to an "Orchestrator", which see the whole picture of the resource it has (local/remote executors), do graph partitioning and send sub IRs to executors(locally or remotely) base on the affinity config (placement strategy)
- Orchestrator should be a gRPC service, which manages executors list and executors affinity would be fetched by orchestrator talking to executor as below:
# the orchestrator instance here is just a client of Orchestrator which runs in machine_a.cluster at port 2222
orchestrator = paddle.Orchestrator(uri="machine_a.cluster:2222")
# add an executor. orchestrator will talk to Executor and fetch device type(CPU, GPU) and other device hardware data
orchestrator.addExecutors(uri="machine_a.cluster:3333")
# add another remote Executor
orchestrator.addExecutors(uri="machine_b.cluster:3333")
- then the job will be started with following:
job_desc = paddle.createJob(num_trainer=4, num_ps=2) # there should be other config regarding affinity.
# also, should we wrap job and main_program together? I'm not sure
orchestrator.run(main_program, job_desc) # submit the main_program and job_desc to remote orchestrator
if we just need to run local training job, create an orchestrator in localhost and add localhost Executors
11. the benefit is flexibility and separation of concern.
12. when we need to deploy paddle in k8s, it is not necessary to have a designated "master", since any node can be "master", pods may start with an arbitrary sequence. the first one becomes the orchestrator, when new pods are available, just add addExecutors, executors will check what kind of hardware it is assigned, talk to the orchestrator about its hardware feature. this kind of service discovery logic can be done via a k8s adaptor, which watches the pods for this job and calls "addExecutor" or "removeExecutor".
13. I know this comment is too long to read, so thanks for reading till the end :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just discussed with @helinwang here is our ideas we want to talk to you with
- for now we are going to focus on how we are going to launch training with bare metals since this is the fundamental of cluster training.
- there will be 2 major roles in cluster, Orchestrator and Executor
- To launch a Orchestrator in a cluster node
./orchestrator -port 8080 -ip 192.168.1.2
now orchestrator will start listening for incoming resource registration and protobuf
4. to launch an Executor in a cluster node
./executor -orchestrator_uri 192.168.1.2:8080
now executor is created and register itself with orchestrator with its hardware features
5. to start a cluster training
python train.py 192.168.1.2:8080
now protobuf (IR) will be sent to the orchestrator; orchestrator change/cut the IR and set sub IR to executors. base on the hardware features of executors, they will be set to different roles(pserver or trainer)
- to start a local training
python train.py
IR will be delivered to a lite orchestrator which only do graph change to fit multi thread or multi GPU case, then the updated graph will be delivered to executors and run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are only considering Kubernetes, the orchestrator
is Kubernetes controller and scheduler; and the executor
can be the entry of the job image.
I have a question regarding fault tolerance handling in this case. Since we are going to split main IR into arbitrarily sub IRs, ultimately, this main IR may be split into many pieces with model parallelization. When we are dynamically adjusting(add/remove) the number of trainers, how we are going to guarantee that the overall IR is still complete or what kind of scaling rule does it need to follow so that when some branch computation is complete, the truck must exist so that the whole graph is not blocked? |
@putcn Great question, I think the endpoint that the remote executor calls will have the global view (the IR and the number of trainer), it should be able to convert the graph. |
session = paddle.session.NewRemote(num_trainer=3, num_ps=2, GPU_per_trainer=1) | ||
optimizer = paddle.optimizer.SGD(learning_rate=0.01) | ||
opts = optimizer.minimize(cost) | ||
exe = RemoteExecutor(num_trainer=3, num_ps=2, GPU_per_trainer=2, sync_batches=1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I saw the name RemoteExecutor
, I'd thought it might require a parameter of the remote IP...
I guess here you might want DistributedExecutor
?
BTW, do we really need to expose the concept executor in our API to our users? I prefer not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe RemoteExecutor
should init with parameter servers' IP addresses, and DistributedExecutor
use etcd to get server IPs.
BTW, do we really need to expose the concept executor in our API to our users? I prefer not.
Thought Executor
is the same concept of V2 API's trainer
class, It's simple to wrap the name to let it compatible with v2 API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my mind the RemoteExecutor
and DistributedExecutor
is a naming difference, we can use either one. We can put the IP (and authentication) as an argument, I initially thought the program will read the IP and authentication config from ~/.paddle/config
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add some additional words. We still hold the worldview that Python code should be same when the user wants to switch between local and multi-nodes, multi-GPU.
The user should not aware the devices/nodes he may use, he only needs to care about the model part. He only needs to modify the configuration file when switching between different environments.
If we agree with that, so the Remoteexecutor
should not be exposed, even the executor
is also should be invisible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@typhoonzero @helinwang @dzhwinter
I have a concern about if we should add a device field in our framework.proto. For details, please refer to #6035. Sometimes, users do not want to aware the hardware environment. But sometimes, users do want to set device for specific operators. How can we handle this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@QiJune Thanks! Please refer to: #6035 (comment)
is the `optimizer` variable, the neural network will be optimized | ||
once. When the target is the `cost` variable, `session.eval` returns | ||
the cost value. | ||
#### RemoteExecutor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is the RemoteExecutor in? The master node of a cluster?
And where is the Converter in? Local machine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After a short discussion with @putcn today, we may need more discussion to decide the API. Things we agree is that:
- local/remote training should be of the same form.
- we should have a Python API to submit
ProgramDesc
to the cluster
The implementation may have two major methods:
- start a server process (orchestrator/controller) in the cluster somewhere, it accepts
ProgramDesc
and execute it. - submit a Python program as a cluster job. The Python program executes the
ProgramDesc
.
I think there may be three things we need to take into consideration:
User configuration can be null. And I think that we should add device field in operator proto message, and the output ProgramDesc will have device information. The output ProgramDesc is dependent on hardware environment and cannot be transplanted. If we change to another cluster, we have to generate another output ProgramDesc.
|
please | ||
see | ||
[Design Doc: Operation Graph-Based Parameter Server](./parameter_server.md) | ||
This could be fixed by making the parameter server run the same computation definition as the trainer (the user's Python module). For a detailed explanation, refer to this document - |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't quite get this part. Doesn't the pserver run a different ProgramDesc from Trainer's? what does "same computation definition" mean here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
see | ||
[Design Doc: Operation Graph-Based Parameter Server](./parameter_server.md) | ||
This could be fixed by making the parameter server run the same computation definition as the trainer (the user's Python module). For a detailed explanation, refer to this document - | ||
[Design Doc: Operation Graph Based Parameter Server](./parameter_server.md) | ||
|
||
## Distributed Training Architecture | ||
|
||
The revamped distributed training architecture can address the above discussed limitations. Below is the illustration of how it does so: | ||
|
||
<img src="src/distributed_architecture.png"/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we explicitly mark the "Distribute Transpiler" in the graph?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my understanding is: the transpiler works in the cluster instead of in local machine. did we discuss this issue? can you remind me of the reason why we put it in the local machine if so?
place = fluid.CPUPlace() | ||
exe = fluid.Executor(place) | ||
|
||
for pass_id in range(10): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see we are using "python for" as loop here, not sure how's the progress on WhileOp, and are we trying to encourage user with WhileOp. if so, do we want to use WhileOp instead in this example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a key point. For now, both recv side and send side is not using while op for iteration loop, which causes we have to run the python code instead of a ProgramDesc
to run a full train job.
I think this should be left to later discussions, we intend to make everything into the IR.
"trainer" role, then add the optimization operators to the parameter server role within the `recv` OP. | ||
1. Dispatch the partitioned graph to different `RemoteExecutor` in the cluster. | ||
1. `RemoteExecutor` on each node run the received `ProgramDesc` utill the end. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also give an example ProgramDesc generated by transpiler from above sample code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see the dist-graph.png in parameter_server.md
.
1. Parse configurations from `RemoteExecutor`. | ||
1. Determine the type of distributed program, can be DataParallelism, ModelParallelism or Streaming. | ||
1. Partition the `ProgramDesc` according to type and add `send` / `recv` OP pair on the boundaries. For | ||
DataParallelism type for example, it removes the optimization operators and add a `send` OP to the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
" For DataParallelism type for example" => "Take DataParallelism type as an example"
6. Dispatch the partitioned graph to different PaddlePaddle runtimes. | ||
|
||
7. PaddlePaddle runtimes with the `fetch` OP reports evaluation results back to the converter, the converter reports the evaluation results back to the PaddlePaddle Python. | ||
<img src="src/remote_executor.png"/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bad link
1. `RemoteExecutor` calls `Distributed Transpiler` to "transpile" user's program to several IRs representing a | ||
distributed training program: | ||
1. Parse configurations from `RemoteExecutor`. | ||
1. Determine the type of distributed program, can be DataParallelism, ModelParallelism or Streaming. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is the of distributed program determined, is it set by the user?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From configurations of RemoteExecutor
, like pass parameter when creating RemoteExecutor
object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, thanks!
1. Determine the type of distributed program, can be DataParallelism, ModelParallelism or Streaming. | ||
1. Partition the `ProgramDesc` according to type and add `send` / `recv` OP pair on the boundaries. For | ||
DataParallelism type for example, it removes the optimization operators and add a `send` OP to the | ||
"trainer" role, then add the optimization operators to the parameter server role within the `recv` OP. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we know if an operator is an optimization operators?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, optimizer.minimize()
can return the list of optimizer operators objects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, thanks! It currently works, but I think we need to aim for ProgramDesc for input (no Python) and ProgramDesc for output (open for discussion).
fetch_list=None, | ||
feed_var_name='feed', | ||
fetch_var_name='fetch', | ||
job_desc=JobDesc( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Python code example above is:
for data in train_reader():
loss, acc = exe.run(trainer_prog,
feed=feeder.feed(data),
fetch_list=[avg_cost])
The run
is called iteratively, so the job_desc
parameter is passed in multiple times. Is it necessary to couple job_desc
with run
? Another possible approach is to use another set of API to manage remote resource lifecycle / logs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See discussions here, because we use python code to form the iterate loop, so "remote executor" can not just run a ProgramDesc
to form a full training job. A possible method is to use while_op, but this will make user API a bit complex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the discussion, Python loop feels native but not as efficient on remote training, I think we should provide both options to the user, and let the user to choose. So we probably still need to support Python loop well.
Does a training job can only be one ProgramDesc run (e.g, one run with while op)? I think we don't need to couple training job with ProgramDesc run. The training job can just be some resource, waiting to run however many ProgramDescs, it preserves the state between multiples ProgramDesc runs. In this way we can support Python loop in remote training.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with support both methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, how remote executor should look is something that we need more discussion. But I think we can merge this PR and iterate.
|
||
7. PaddlePaddle runtimes with the `fetch` OP reports evaluation results back to the converter, the converter reports the evaluation results back to the PaddlePaddle Python. | ||
<img src="src/remote_executor.png"/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in the graph, remoteExecutor.Listen should not send training job to k8s API server. Otherwise remoteExecutor is dependent on k8s API server. I think we should aim for remoteExecutor does not depend on k8s, so it can be used by both k8s cluster or bare metal cluster. We can have a adapter that knows k8s and update remoteExecutor accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. I can update it after later discussion on RemoteExecutor
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
thanks @helinwang, I got my answer from last week's meeting, this PR LGTM |
Fix #5666