Skip to content

Commit

Permalink
[collective] dynamic shape for send_v2 and recv_v2 (#42765)
Browse files Browse the repository at this point in the history
  • Loading branch information
FeixLiu authored May 18, 2022
1 parent 133d63f commit 1f64c42
Show file tree
Hide file tree
Showing 8 changed files with 306 additions and 24 deletions.
14 changes: 1 addition & 13 deletions paddle/fluid/distributed/collective/ProcessGroupHeter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,6 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Broadcast(

std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Send(
std::vector<phi::DenseTensor>& in_tensors, int peer) {
#if defined(PADDLE_WITH_NCCL)
PADDLE_ENFORCE_EQ(
CheckTensorsInCudaPlace(in_tensors), true,
platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
#endif

PADDLE_ENFORCE_EQ(
in_tensors.size(), 1,
platform::errors::PreconditionNotMet(
Expand Down Expand Up @@ -299,12 +293,6 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Send(

std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Recv(
std::vector<phi::DenseTensor>& out_tensors, int peer) {
#if defined(PADDLE_WITH_NCCL)
PADDLE_ENFORCE_EQ(
CheckTensorsInCudaPlace(out_tensors), true,
platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
#endif

PADDLE_ENFORCE_EQ(
out_tensors.size(), 1,
platform::errors::PreconditionNotMet(
Expand Down Expand Up @@ -343,7 +331,7 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Recv(
end = std::chrono::high_resolution_clock::now();
diff = end - start;
VLOG(2) << "Time to copy tensor of dims(" << cpu_tensor.dims()
<< ") from gpu to cpu for recv " << std::setw(9)
<< ") from cpu to gpu for recv " << std::setw(9)
<< " is: " << diff.count() << " s" << std::endl;
return CreateTask(rank_, CommType::RECV, out_tensors);
}
Expand Down
26 changes: 18 additions & 8 deletions paddle/fluid/operators/collective/recv_v2_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,21 @@ class RecvOpV2 : public framework::OperatorWithKernel {
"The size of the output shape must be greater than 0 "
"but the value given is %d.",
out_shape.size()));
for (size_t i = 0; i < out_shape.size(); ++i) {
PADDLE_ENFORCE_GE(out_shape[i], 1,
platform::errors::InvalidArgument(
"The shape attribute for recv_v2 must be set "
"explicitly, but the %dth element is %d which "
"is less than 1.",
i, out_shape[i]));
bool dynamic_shape = ctx->Attrs().Get<bool>("dynamic_shape");
if (!dynamic_shape) {
// No need to check out shape if with dynamic_shape,
// since the shape will be recv from send_v2
for (size_t i = 0; i < out_shape.size(); ++i) {
PADDLE_ENFORCE_GE(out_shape[i], 1,
platform::errors::InvalidArgument(
"The shape attribute for recv_v2 must be set "
"explicitly, but the %dth element is %d which "
"is less than 1. Or dynamic_shape should be "
"set to True for both send_v2 and recv_v2.",
i, out_shape[i]));
}
ctx->SetOutputDim("Out", phi::make_ddim(out_shape));
}
ctx->SetOutputDim("Out", phi::make_ddim(out_shape));
}
}

Expand Down Expand Up @@ -87,6 +93,10 @@ class RecvOpV2Maker : public framework::OpProtoAndCheckerMaker {
"use_calc_stream",
"(bool default false) eject CUDA operations to calculation stream.")
.SetDefault(false);
AddAttr<bool>(
"dynamic_shape",
"(bool default false) the send/recv will be done with dynamic shape.")
.SetDefault(false);
AddComment(R"DOC(
Recv Operator
Expand Down
108 changes: 106 additions & 2 deletions paddle/fluid/operators/collective/recv_v2_op.cu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,93 @@ limitations under the License. */
namespace paddle {
namespace operators {

#if (defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL)) && \
NCCL_VERSION_CODE >= 2703
framework::DDim recv_shape_info(const platform::Place &place,
const gpuStream_t &stream,
platform::NCCLComm *comm, const int &peer,
distributed::ProcessGroup *group) {
if (!group) {
PADDLE_ENFORCE_EQ((stream != nullptr && comm != nullptr), true,
platform::errors::InvalidArgument(
"NCCLComm and Stream should be provided if use NCCL "
"to send the shape info."));
}

paddle::experimental::DataType shape_dytpe =
paddle::experimental::DataType::INT32;
ncclDataType_t nccl_dtype =
platform::ToNCCLDataType(framework::TransToProtoVarType(shape_dytpe));

// step1: recv the shape size
framework::Tensor gpu_shape_size_tensor(shape_dytpe);
if (!group) {
gpu_shape_size_tensor.Resize({1});
gpu_shape_size_tensor.mutable_data(place, shape_dytpe);
auto *gpu_data = gpu_shape_size_tensor.data<int>();
PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclRecv(
gpu_data, 1, nccl_dtype, peer, comm->comm(), stream));
}

// copy the shape size tensor to cpu
framework::Tensor *cpu_shape_size_tensor = new framework::Tensor(shape_dytpe);
cpu_shape_size_tensor->Resize({1});
cpu_shape_size_tensor->mutable_data(platform::CPUPlace(), shape_dytpe);
if (group) {
std::vector<framework::Tensor> shape_size_tensor;
shape_size_tensor.emplace_back(*cpu_shape_size_tensor);
auto shape_size_task = group->Recv(shape_size_tensor, peer);
} else {
framework::TensorCopySync(gpu_shape_size_tensor, platform::CPUPlace(),
cpu_shape_size_tensor);
}
auto *cpu_data = cpu_shape_size_tensor->data<int>();
int shape_size = cpu_data[0];
VLOG(3) << "recv the shape size: " << shape_size << " from peer";

// step2: recv the shape
framework::Tensor gpu_shape_tensor(shape_dytpe);
if (!group) {
gpu_shape_tensor.Resize({shape_size});
gpu_shape_tensor.mutable_data(place, shape_dytpe);
auto *gpu_shape_data = gpu_shape_tensor.data<int>();
PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclRecv(
gpu_shape_data, shape_size, nccl_dtype, peer, comm->comm(), stream));
}

// copy the shape tensor to cpu
framework::Tensor *cpu_shape_tensor = new framework::Tensor(shape_dytpe);
cpu_shape_tensor->Resize({shape_size});
cpu_shape_tensor->mutable_data(platform::CPUPlace(), shape_dytpe);
if (group) {
std::vector<framework::Tensor> shape_tensor;
shape_tensor.emplace_back(*cpu_shape_tensor);
auto shape_task = group->Recv(shape_tensor, peer);
} else {
framework::TensorCopySync(gpu_shape_tensor, platform::CPUPlace(),
cpu_shape_tensor);
}
auto *cpu_shape_data = cpu_shape_tensor->data<int>();
std::vector<int> all_shape;
for (int i = 0; i < shape_size; ++i) {
all_shape.emplace_back(cpu_shape_data[i]);
}
framework::DDim new_dim;
new_dim = new_dim.reshape(all_shape);
VLOG(3) << "recv the shape: (" << new_dim << ") from peer";

return new_dim;
}
#endif

template <typename T>
class RecvOpV2CUDAKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext &ctx) const override {
#if (defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL)) && \
NCCL_VERSION_CODE >= 2703
int rid = ctx.Attr<int>("ring_id");
bool dynamic_shape = ctx.Attr<bool>("dynamic_shape");
PADDLE_ENFORCE_GE(
rid, 0,
platform::errors::InvalidArgument(
Expand All @@ -53,7 +133,18 @@ class RecvOpV2CUDAKernel : public framework::OpKernel<T> {
auto out_shape = ctx.Attr<std::vector<int>>("out_shape");
auto out = ctx.Output<framework::LoDTensor>("Out");
auto out_dims = out->dims();
out->mutable_data<T>(out_dims, place);

if (dynamic_shape) {
VLOG(3) << "recv_v2 will use dynamic shape with send_v2 for switch";
framework::DDim new_dim =
recv_shape_info(ctx.GetPlace(),
/* gpuStream_t */ nullptr,
/* NCCLComm* */ nullptr, peer, pg);
out->Resize(new_dim);
out->mutable_data<T>(new_dim, place);
} else {
out->mutable_data<T>(out_dims, place);
}

out_tensor.emplace_back(*out);
auto task = pg->Recv(out_tensor, peer);
Expand All @@ -79,6 +170,10 @@ class RecvOpV2CUDAKernel : public framework::OpKernel<T> {

auto *out_var = ctx.OutputVar("Out");
if (out_var->IsType<framework::LoDTensorArray>()) {
PADDLE_ENFORCE_EQ(
dynamic_shape, false,
platform::errors::InvalidArgument("Dynamic shape for send/recv not "
"support LoDTensorArray for now."));
auto out_array = out_var->GetMutable<framework::LoDTensorArray>();
for (size_t idx = 0; idx < out_array->size(); ++idx) {
VLOG(3) << "LodTensorArray: idx(" << idx << ")";
Expand All @@ -99,7 +194,16 @@ class RecvOpV2CUDAKernel : public framework::OpKernel<T> {
auto out_dims = out->dims();
auto numel = out->numel();

out->mutable_data<T>(out_dims, place);
if (dynamic_shape) {
VLOG(3) << "recv_v2 will use dynamic shape with send_v2";
framework::DDim new_dim = recv_shape_info(place, stream, comm, peer,
/* ProcessGroup* */ nullptr);
out->Resize(new_dim);
numel = out->numel();
out->mutable_data<T>(new_dim, place);
} else {
out->mutable_data<T>(out_dims, place);
}
PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclRecv(
out->data<T>(), numel, dtype, peer, comm->comm(), stream));
VLOG(3) << "rank " << comm->rank() << " recv " << phi::product(out->dims())
Expand Down
4 changes: 4 additions & 0 deletions paddle/fluid/operators/collective/send_v2_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ class SendOpV2Maker : public framework::OpProtoAndCheckerMaker {
"use_calc_stream",
"(bool default false) eject CUDA operations to calculation stream.")
.SetDefault(false);
AddAttr<bool>(
"dynamic_shape",
"(bool default false) the send/recv will be done with dynamic shape.")
.SetDefault(false);
AddComment(R"DOC(
Send Operator
Expand Down
92 changes: 91 additions & 1 deletion paddle/fluid/operators/collective/send_v2_op.cu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,84 @@ limitations under the License. */
namespace paddle {
namespace operators {

#if (defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL)) && \
NCCL_VERSION_CODE >= 2703
void send_shape_info(const framework::Tensor& x, const platform::Place& place,
const gpuStream_t& stream, platform::NCCLComm* comm,
const int& peer, distributed::ProcessGroup* group) {
if (!group) {
PADDLE_ENFORCE_EQ((stream != nullptr && comm != nullptr), true,
platform::errors::InvalidArgument(
"NCCLComm and Stream should be provided if use NCCL "
"to send the shape info."));
}
paddle::experimental::DataType shape_dytpe =
paddle::experimental::DataType::INT32;
ncclDataType_t nccl_dtype =
platform::ToNCCLDataType(framework::TransToProtoVarType(shape_dytpe));
auto dims = x.dims();
int shape_size = dims.size();

// step1: send the shape size
framework::Tensor cpu_shape_size_tensor(shape_dytpe);
cpu_shape_size_tensor.Resize({1});
cpu_shape_size_tensor.mutable_data(platform::CPUPlace(), shape_dytpe);
auto* cpu_data = cpu_shape_size_tensor.data<int>();
cpu_data[0] = shape_size;

if (group) {
std::vector<framework::Tensor> shape_size_tensor;
shape_size_tensor.template emplace_back(cpu_shape_size_tensor);
auto shape_size_task = group->Send(shape_size_tensor, peer);
} else {
// copy the shape size tensor to gpu and send
framework::Tensor* gpu_shape_size_tensor =
new framework::Tensor(shape_dytpe);
gpu_shape_size_tensor->Resize({1});
gpu_shape_size_tensor->mutable_data(place, shape_dytpe);
framework::TensorCopySync(cpu_shape_size_tensor, place,
gpu_shape_size_tensor);
PADDLE_ENFORCE_GPU_SUCCESS(
platform::dynload::ncclSend(gpu_shape_size_tensor->data<int>(), 1,
nccl_dtype, peer, comm->comm(), stream));
}
VLOG(3) << "send the shape size: " << shape_size << " to peer";

// step2: send the shape
framework::Tensor cpu_shape_tensor(shape_dytpe);
cpu_shape_tensor.Resize({shape_size});
cpu_shape_tensor.mutable_data(platform::CPUPlace(), shape_dytpe);
auto* cpu_shape_data = cpu_shape_tensor.data<int>();
for (int i = 0; i < shape_size; ++i) {
cpu_shape_data[i] = dims[i];
}

if (group) {
std::vector<framework::Tensor> shape_tensor;
shape_tensor.template emplace_back(cpu_shape_tensor);
auto shape_task = group->Send(shape_tensor, peer);
} else {
// copy the shape tensor to gpu and send
framework::Tensor* gpu_shape_tensor = new framework::Tensor(shape_dytpe);
gpu_shape_tensor->Resize({shape_size});
gpu_shape_tensor->mutable_data(place, shape_dytpe);
framework::TensorCopySync(cpu_shape_tensor, place, gpu_shape_tensor);
PADDLE_ENFORCE_GPU_SUCCESS(
platform::dynload::ncclSend(gpu_shape_tensor->data<int>(), shape_size,
nccl_dtype, peer, comm->comm(), stream));
}
VLOG(3) << "send the shape: (" << dims << ") to peer";
}
#endif

template <typename T>
class SendOpV2CUDAKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& ctx) const override {
#if (defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL)) && \
NCCL_VERSION_CODE >= 2703
int rid = ctx.Attr<int>("ring_id");
bool dynamic_shape = ctx.Attr<bool>("dynamic_shape");
PADDLE_ENFORCE_GE(
rid, 0,
platform::errors::InvalidArgument(
Expand All @@ -45,8 +116,17 @@ class SendOpV2CUDAKernel : public framework::OpKernel<T> {
if (map->has(rid)) {
// Use ProcessGroup
distributed::ProcessGroup* pg = map->get(rid);
std::vector<phi::DenseTensor> in_tensor;
auto x = ctx.Input<framework::LoDTensor>("X");

if (dynamic_shape) {
// dynamic shape for switch send/recv
VLOG(3) << "send_v2 will use dynamic shape with recv_v2 for switch";
send_shape_info(*x, ctx.GetPlace(),
/* gpuStream_t */ nullptr,
/* NCCLComm* */ nullptr, peer, pg);
}

std::vector<phi::DenseTensor> in_tensor;
in_tensor.push_back(*x);
auto task = pg->Send(in_tensor, peer);
return;
Expand All @@ -68,6 +148,10 @@ class SendOpV2CUDAKernel : public framework::OpKernel<T> {

auto* x_var = ctx.InputVar("X");
if (x_var->IsType<framework::LoDTensorArray>()) {
PADDLE_ENFORCE_EQ(
dynamic_shape, false,
platform::errors::InvalidArgument("Dynamic shape for send/recv not "
"support LoDTensorArray for now."));
auto& x_array = x_var->Get<framework::LoDTensorArray>();
for (size_t idx = 0; idx < x_array.size(); idx++) {
VLOG(3) << "LodTensorArray: idx(" << idx << ")";
Expand All @@ -85,6 +169,12 @@ class SendOpV2CUDAKernel : public framework::OpKernel<T> {
auto x = ctx.Input<framework::LoDTensor>("X");
int numel = x->numel();

if (dynamic_shape) {
VLOG(3) << "send_v2 will use dynamic shape with recv_v2";
send_shape_info(*x, place, stream, comm, peer,
/* ProcessGroup* */ nullptr);
}

ncclDataType_t dtype =
platform::ToNCCLDataType(framework::TransToProtoVarType(x->dtype()));
PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclSend(
Expand Down
2 changes: 2 additions & 0 deletions python/paddle/distributed/collective.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ def barrier(group=None):


def _set_custom_gid(gid):
global _custom_gid
_custom_gid = gid


Expand All @@ -363,6 +364,7 @@ def new_group(ranks=None, backend=None):
paddle.distributed.all_reduce(tindata, group=gp, use_calc_stream=False)
"""
global _custom_gid
global _group_map
if in_dygraph_mode():
global _default_group_name
Expand Down
Loading

0 comments on commit 1f64c42

Please sign in to comment.