-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Insert send op while backward op finished #9382
Insert send op while backward op finished #9382
Conversation
paddle/fluid/operators/recv_op.cc
Outdated
mutable detail::RPCClient client_; | ||
for (size_t i = 0; i < outs.size(); i++) { | ||
VLOG(2) << "getting " << outs[i] << " from " << epmap[i]; | ||
rpc_client->AsyncGetVariable(epmap[i], ctx, scope, outs[i]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why there are two rpc_client->AsyncGetVariable(epmap[i], ctx, scope, outs[i]);
(another one at line 56) in this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, it's a mistake, and have fixed it.
4. append send_op to send splited variables to server and fetch | ||
params(splited blocks or origin param) from server. | ||
5. append concat_op to merge splited blocks to update local weights. | ||
3. modify trainer program add split_op and send_op to each grad variable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is our current implementation one send_op
for each grad variable or one send_op
for all grad variable?
From my understanding this lines seems to mean one for each, but the implementation seem to be one for all. Maybe we need to have description matching the implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
def _find_op_by_out(self, program, var): | ||
for idx, op in enumerate(program.global_block().ops): | ||
if var.name in op.output_arg_names: | ||
return idx + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe return idx
here and the caller do insert_idx = self._find_op_by_out(program, splited_vars[0]) + 1
, since the name of this function indicates the index is the index of the op, not the index of the op to be inserted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
return idx + 1 | ||
return -1 | ||
|
||
def _insert_send_vars_op(self, program, index, send_vars, epmap, eps): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function seems not used, are we planning to use it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete the unused function, done.
"epmap": epmap, | ||
"endpoints": eps}) | ||
|
||
def _append_trainer_op(self, program, gradblocks, spliter): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think "trainer op" is too general, the trainer could do forward, calculate grad, send grad, recv param. But this function seems to only do split and send grad? Maybe change to another more specific name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I changed the name to dispatch_trainer_grads
and add some comments here.
@@ -13,38 +13,63 @@ | |||
# limitations under the License. | |||
|
|||
|
|||
def hash_name(varlist, pserver_endpoints): | |||
class DistributedSpliter(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Distributed" seems very general, consider rename to PSStrategy (TF uses this name in https://www.tensorflow.org/api_docs/python/tf/train/replica_device_setter) or PSSpliter, PSBalancer, PSDispatcher? Just some ideas, feel free to come up with your own naming :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! A specific name looks better, I changed the name to PSDispatcher
.
eplist = dispatcher.dispatch(splited_vars) | ||
rpc_client_var = layers.io.get_rpc_client_var(program) | ||
|
||
program.global_block().insert_op( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这样应该不能把GPU到内存的拷贝和 GPU计算并行化。
同一个CUDA Stream中的操作是根据先后次序来的。 @chengduoZH
可能需要另外的一个CUDA Stream来做拷贝。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @gongweibao, 这里是实现RPC的IO和backward的Op的计算过程可以并行处理。
另外看到 @typhoonzero 之前有一个PR #9425 是先实现pinned memory,应该可以并行的Copy,下午会和 @typhoonzero 讨论确定一下。
This PR has too much conflict with the latest code, I will close this PR and reopen a new one to implement #9161 |
…Part2 add sharding_mesh_dimension param (PaddlePaddle#9382) * add custom sharding_dim * Update training_args.py * Update auto_trainer.py * Update auto_trainer.py
Related #9161
benchmark with fc size is 4096:
4trainers + 4pservers:
local:
Seems the speedup ratio is about only 31%