Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Insert send op while backward op finished #9382

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions paddle/fluid/operators/recv_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ limitations under the License. */

#include <future>
#include "paddle/fluid/operators/detail/grpc_client.h"
#include "paddle/fluid/platform/profiler.h"

namespace paddle {
namespace operators {
Expand All @@ -36,26 +37,43 @@ class RecvOp : public framework::OperatorBase {
const platform::Place& place) const override {
auto outs = Outputs("Out");
std::vector<std::string> epmap = Attr<std::vector<std::string>>("epmap");
std::vector<std::string> endpoints =
Attr<std::vector<std::string>>("endpoints");
auto client_var_name = Output("RPCClient");
PADDLE_ENFORCE_NOT_NULL(scope.FindVar(client_var_name),
"Can not find variable '%s' in the scope.",
client_var_name);

platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto& ctx = *pool.Get(place);

platform::RecordEvent record_event(Type(), &ctx);
auto* client_var = scope.FindVar(client_var_name);
detail::RPCClient* rpc_client = client_var->GetMutable<detail::RPCClient>();

for (size_t i = 0; i < outs.size(); i++) {
VLOG(3) << "getting " << outs[i];
client_.AsyncGetVariable(epmap[i], ctx, scope, outs[i]);
rpc_client->AsyncGetVariable(epmap[i], ctx, scope, outs[i]);
}
PADDLE_ENFORCE(client_.Wait());
}
PADDLE_ENFORCE(rpc_client->Wait());

private:
mutable detail::RPCClient client_;
// tell pservers that current trainer have called fetch
for (auto& ep : endpoints) {
VLOG(2) << "send fetch barrier, ep: " << ep;
rpc_client->AsyncSendFetchBarrier(ep);
}
PADDLE_ENFORCE(rpc_client->Wait());
}
};

class RecvOpMaker : public framework::OpProtoAndCheckerMaker {
public:
RecvOpMaker(OpProto* proto, OpAttrChecker* op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) {
AddOutput("Out", "(Tensor) Variables to get from server.").AsDuplicable();
AddOutput("RPCClient",
"(RPCClient) The RPC client object which will be"
"initialized at most once.");
AddComment(R"DOC(
Recv operator

Expand All @@ -65,7 +83,11 @@ This operator can get variables from server side.
"(string vector, default 127.0.0.1:6164)"
"Server endpoints in the order of input "
"variables for mapping")
.SetDefault({});
.SetDefault({"127.0.0.1:6164"});
AddAttr<std::vector<std::string>>("endpoints",
"(string vector, default 127.0.0.1:6164)"
"Server endpoints list")
.SetDefault({"127.0.0.1:6164"});
}
};

Expand Down
5 changes: 5 additions & 0 deletions paddle/fluid/operators/send_barrier_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ limitations under the License. */

#include <future>
#include "paddle/fluid/operators/detail/grpc_client.h"
#include "paddle/fluid/platform/profiler.h"

namespace paddle {
namespace operators {
Expand All @@ -41,6 +42,10 @@ class SendBarrierOp : public framework::OperatorBase {
PADDLE_ENFORCE_NOT_NULL(scope.FindVar(client_var_name),
"Can not find variable '%s' in the scope.",
client_var_name);

platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto& ctx = *pool.Get(place);
platform::RecordEvent record_event(Type(), &ctx);
auto* client_var = scope.FindVar(client_var_name);
detail::RPCClient* rpc_client = client_var->GetMutable<detail::RPCClient>();

Expand Down
7 changes: 5 additions & 2 deletions paddle/fluid/operators/send_vars_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ limitations under the License. */

#include <future>
#include "paddle/fluid/operators/detail/grpc_client.h"
#include "paddle/fluid/platform/profiler.h"

namespace paddle {
namespace operators {
Expand Down Expand Up @@ -53,7 +54,7 @@ class SendVarsOp : public framework::OperatorBase {
auto ins = Inputs("X");

std::vector<std::string> epmap = Attr<std::vector<std::string>>("epmap");
int sync_send = Attr<int>("sync_sent");
int sync_send = Attr<int>("sync_send");

platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto& ctx = *pool.Get(place);
Expand All @@ -62,6 +63,8 @@ class SendVarsOp : public framework::OperatorBase {
PADDLE_ENFORCE_NOT_NULL(scope.FindVar(client_var_name),
"Can not find variable '%s' in the scope.",
client_var_name);
platform::RecordEvent record_event(Type(), &ctx);

auto* client_var = scope.FindVar(client_var_name);
detail::RPCClient* rpc_client = client_var->GetMutable<detail::RPCClient>();

Expand Down Expand Up @@ -95,7 +98,7 @@ Send operator

This operator will send variables to listen_and_serve op at the parameter server.
)DOC");
AddAttr<int>("ync_send",
AddAttr<int>("sync_send",
"(int, default 0)"
"sync send or async send.")
.SetDefault(0);
Expand Down
149 changes: 84 additions & 65 deletions python/paddle/fluid/distribute_transpiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
from framework import Program, default_main_program, default_startup_program, Parameter, Variable
import optimizer
from layer_helper import LayerHelper
from distributed_spliter import *
import ps_dispatcher
import math
from . import core
import debuger
import layers


class VarBlock:
Expand Down Expand Up @@ -138,7 +139,7 @@ def transpile(self,
program=None,
pservers="127.0.0.1:6174",
trainers=1,
split_method=round_robin):
dispatch_method="RoundRobin"):
"""
Transpile the program to distributed data-parallelism programs.
The main_program will be transformed to use a remote parameter server
Expand All @@ -151,10 +152,11 @@ def transpile(self,
Steps to transpile trainer:
1. split variable to multiple blocks, aligned by product(dim[1:]) (width).
2. rename splited grad variables to add trainer_id suffix ".trainer_%d".
3. modify trainer program add split_op to each grad variable.
4. append send_op to send splited variables to server and fetch
params(splited blocks or origin param) from server.
5. append concat_op to merge splited blocks to update local weights.
3. dispatch gradient variable blocks to multiple server and insert
split_op and send_vars_op to the trainer program.
4. append send_barrier to send barrier message to server
5. append recv_op to fetch params(splited blocks or origin param) from server.
6. append concat_op to merge splited blocks to update local weights.

Steps to transpile pserver:
1. create new program for parameter server.
Expand All @@ -176,11 +178,10 @@ def transpile(self,
:type pservers: string
:param trainers: total number of workers/trainers in the job
:type trainers: int
:param split_method: A function to determin how to split variables
to different servers equally.
:param split_method: Specified a function by name to determin how to
split variables to different servers equally.
:type split_method: function
"""
assert (callable(split_method))
if program is None:
program = default_main_program()
self.program = program
Expand All @@ -192,28 +193,30 @@ def transpile(self,
self.trainer_id = trainer_id
pserver_endpoints = pservers.split(",")

# instantiate spliter method from given name
assert (dispatch_method in dir(ps_dispatcher))
dispatcher = getattr(ps_dispatcher, dispatch_method)(pserver_endpoints)

# step1
param_list = [pg[0] for pg in params_grads]
grad_list = [pg[1] for pg in params_grads]
grad_blocks = split_dense_variable(grad_list, len(pserver_endpoints))
param_blocks = split_dense_variable(param_list, len(pserver_endpoints))

# step2
grad_var_mapping = self._append_split_op(program, grad_blocks)
# Dispatch each gradient variable block to the parameter server by dispatcher,
# and also insert splite_op and send_var_op after each grad op
# in the trainer program.
send_inputs, eplist = self._dispatch_trainer_grads(program, grad_blocks,
dispatcher)
# step3
send_inputs = []
send_outputs = []
for b in grad_blocks: # append by order
varname, block_id, _ = b.split(":")
send_inputs.append(grad_var_mapping[varname][int(block_id)])

param_var_mapping = self._create_vars_from_blocklist(program,
param_blocks)
for b in param_blocks:
varname, block_id, _ = b.split(":")
send_outputs.append(param_var_mapping[varname][int(block_id)])
# let send_op know which endpoint to send which var to, eplist has the same
# order as send_inputs.
eplist = split_method(send_inputs, pserver_endpoints)

# create mapping of endpoint -> split var to create pserver side program
self.param_grad_ep_mapping = dict()
for i, ep in enumerate(eplist):
Expand All @@ -224,20 +227,14 @@ def transpile(self,
self.param_grad_ep_mapping[ep]["params"].append(param)
self.param_grad_ep_mapping[ep]["grads"].append(grad)

rpc_client_var = program.global_block().create_var(
name="RPC_CLIENT_VAR",
persistable=True,
type=core.VarDesc.VarType.RAW)

# create send_op
program.global_block().append_op(
type="send",
inputs={"X": send_inputs},
outputs={"Out": send_outputs,
"RPCClient": rpc_client_var},
attrs={"endpoints": pserver_endpoints,
"epmap": eplist})
# step4
# step 4
# append send_barrier_op
layers.io.send_barrier(pserver_endpoints)
# step 5
# append recv op to receive parameters
layers.io.Recv(send_outputs, eplist, pserver_endpoints)

# step 6
for varname, splited_var in param_var_mapping.iteritems():
if len(splited_var) <= 1:
continue
Expand Down Expand Up @@ -352,14 +349,6 @@ def __append_optimize_op__(op, block):
for glb_op in global_ops:
__append_optimize_op__(glb_op, per_opt_block)

# NOT USED: single block version:
#
# for _, op in enumerate(self.optimize_ops):
# for _, opt_op in enumerate(opt_op_on_pserver):
# if ufind.is_connected(op, opt_op):
# __append_optimize_op__(glb_op, optimize_block)
# break

# step5 append the listen_and_serv op
pserver_program.global_block().append_op(
type="listen_and_serv",
Expand Down Expand Up @@ -500,38 +489,68 @@ def _clone_var(self, block, var):
lod_level=var.lod_level,
persistable=True)

def _append_split_op(self, program, gradblocks):
def _find_op_by_out(self, program, var):
for idx, op in enumerate(program.global_block().ops):
if var.name in op.output_arg_names:
return idx
return -1

def _dispatch_trainer_grads(self, program, gradblocks, dispatcher):
send_inputs = []
ret_eplist = []
# Split variables that need to be split and append respective ops
var_mapping = self._create_vars_from_blocklist(
program, gradblocks, add_trainer_suffix=True)
for varname, splited_vars in var_mapping.iteritems():
# variable that don't need to split have empty splited_vars
if len(splited_vars) <= 1:
eplist = []
insert_idx = -1

if len(splited_vars) == 0:
continue
orig_var = program.global_block().vars[varname]
if orig_var.type == core.VarDesc.VarType.SELECTED_ROWS:
height_sections = []
for v in splited_vars:
height_sections.append(v.shape[0])
program.global_block().append_op(
type="split_selected_rows",
inputs={"X": orig_var},
outputs={"Out": splited_vars},
attrs={"height_sections": height_sections})
elif orig_var.type == core.VarDesc.VarType.LOD_TENSOR:
sections = []
for v in splited_vars:
sections.append(v.shape[0])
program.global_block().append_op(
type="split",
inputs={"X": orig_var},
outputs={"Out": splited_vars},
attrs={"sections": sections} # assume split evenly
)
elif len(splited_vars) == 1:
insert_idx = self._find_op_by_out(program, splited_vars[0]) + 1
else:
AssertionError("Variable type should be in set "
"[LOD_TENSOR, SELECTED_ROWS]")
return var_mapping
orig_var = program.global_block().vars[varname]
insert_idx = self._find_op_by_out(program, orig_var) + 1
if orig_var.type == core.VarDesc.VarType.SELECTED_ROWS:
height_sections = []
for v in splited_vars:
height_sections.append(v.shape[0])
program.global_block().insert_op(
index=insert_idx + 1,
type="split_selected_rows",
inputs={"X": orig_var},
outputs={"Out": splited_vars},
attrs={"height_sections": height_sections})
elif orig_var.type == core.VarDesc.VarType.LOD_TENSOR:
sections = []
for v in splited_vars:
sections.append(v.shape[0])
program.global_block().insert_op(
index=insert_idx + 1,
type="split",
inputs={"X": orig_var},
outputs={"Out": splited_vars},
attrs={"sections": sections} # assume split evenly
)
else:
AssertionError("Variable type should be in set "
"[LOD_TENSOR, SELECTED_ROWS]")
eplist = dispatcher.dispatch(splited_vars)
rpc_client_var = layers.io.get_rpc_client_var(program)

program.global_block().insert_op(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这样应该不能把GPU到内存的拷贝和 GPU计算并行化。
同一个CUDA Stream中的操作是根据先后次序来的。 @chengduoZH
可能需要另外的一个CUDA Stream来做拷贝。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gongweibao, 这里是实现RPC的IO和backward的Op的计算过程可以并行处理。
另外看到 @typhoonzero 之前有一个PR #9425 是先实现pinned memory,应该可以并行的Copy,下午会和 @typhoonzero 讨论确定一下。

index=insert_idx + 2,
type="send_vars",
inputs={"X": splited_vars},
outputs={"RPCClient": rpc_client_var},
attrs={"sync_send": 0,
"epmap": eplist})
ret_eplist.extend(eplist)
send_inputs.extend(splited_vars)

return send_inputs, ret_eplist

def _get_optimizer_input_shape(self, op_type, varkey, orig_shape,
param_shape):
Expand Down
Loading