Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[collective] dynamic shape for send_v2 and recv_v2 #42765

Merged
merged 10 commits into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions paddle/fluid/distributed/collective/ProcessGroupHeter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,6 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Broadcast(

std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Send(
std::vector<phi::DenseTensor>& in_tensors, int peer) {
#if defined(PADDLE_WITH_NCCL)
PADDLE_ENFORCE_EQ(
CheckTensorsInCudaPlace(in_tensors), true,
platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
#endif

PADDLE_ENFORCE_EQ(
in_tensors.size(), 1,
platform::errors::PreconditionNotMet(
Expand Down Expand Up @@ -299,12 +293,6 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Send(

std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Recv(
std::vector<phi::DenseTensor>& out_tensors, int peer) {
#if defined(PADDLE_WITH_NCCL)
PADDLE_ENFORCE_EQ(
CheckTensorsInCudaPlace(out_tensors), true,
platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
#endif

PADDLE_ENFORCE_EQ(
out_tensors.size(), 1,
platform::errors::PreconditionNotMet(
Expand Down Expand Up @@ -343,7 +331,7 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Recv(
end = std::chrono::high_resolution_clock::now();
diff = end - start;
VLOG(2) << "Time to copy tensor of dims(" << cpu_tensor.dims()
<< ") from gpu to cpu for recv " << std::setw(9)
<< ") from cpu to gpu for recv " << std::setw(9)
<< " is: " << diff.count() << " s" << std::endl;
return CreateTask(rank_, CommType::RECV, out_tensors);
}
Expand Down
26 changes: 18 additions & 8 deletions paddle/fluid/operators/collective/recv_v2_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,21 @@ class RecvOpV2 : public framework::OperatorWithKernel {
"The size of the output shape must be greater than 0 "
"but the value given is %d.",
out_shape.size()));
for (size_t i = 0; i < out_shape.size(); ++i) {
PADDLE_ENFORCE_GE(out_shape[i], 1,
platform::errors::InvalidArgument(
"The shape attribute for recv_v2 must be set "
"explicitly, but the %dth element is %d which "
"is less than 1.",
i, out_shape[i]));
bool dynamic_shape = ctx->Attrs().Get<bool>("dynamic_shape");
if (!dynamic_shape) {
// No need to check out shape if with dynamic_shape,
// since the shape will be recv from send_v2
for (size_t i = 0; i < out_shape.size(); ++i) {
PADDLE_ENFORCE_GE(out_shape[i], 1,
FeixLiu marked this conversation as resolved.
Show resolved Hide resolved
platform::errors::InvalidArgument(
"The shape attribute for recv_v2 must be set "
"explicitly, but the %dth element is %d which "
"is less than 1. Or dynamic_shape should be "
"set to True for both send_v2 and recv_v2.",
i, out_shape[i]));
}
ctx->SetOutputDim("Out", phi::make_ddim(out_shape));
}
ctx->SetOutputDim("Out", phi::make_ddim(out_shape));
}
}

Expand Down Expand Up @@ -87,6 +93,10 @@ class RecvOpV2Maker : public framework::OpProtoAndCheckerMaker {
"use_calc_stream",
"(bool default false) eject CUDA operations to calculation stream.")
.SetDefault(false);
AddAttr<bool>(
"dynamic_shape",
FeixLiu marked this conversation as resolved.
Show resolved Hide resolved
"(bool default false) the send/recv will be done with dynamic shape.")
.SetDefault(false);
AddComment(R"DOC(
Recv Operator

Expand Down
114 changes: 112 additions & 2 deletions paddle/fluid/operators/collective/recv_v2_op.cu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,98 @@ limitations under the License. */
namespace paddle {
namespace operators {

#if (defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL)) && \
NCCL_VERSION_CODE >= 2703
framework::DDim recv_shape_info(const platform::Place &place,
const gpuStream_t &stream,
platform::NCCLComm *comm, const int &peer,
bool with_switch,
FeixLiu marked this conversation as resolved.
Show resolved Hide resolved
distributed::ProcessGroup *pg) {
FeixLiu marked this conversation as resolved.
Show resolved Hide resolved
if (with_switch) {
PADDLE_ENFORCE_NE(pg, nullptr, platform::errors::InvalidArgument(
"Process group should be provided if "
"use switch to send shape info."));
} else {
PADDLE_ENFORCE_EQ((stream != nullptr && comm != nullptr), true,
platform::errors::InvalidArgument(
"NCCLComm and Stream should be provided if use NCCL "
"to send the shape info."));
}

paddle::experimental::DataType shape_dytpe =
paddle::experimental::DataType::INT32;
ncclDataType_t nccl_dtype =
platform::ToNCCLDataType(framework::TransToProtoVarType(shape_dytpe));

// step1: recv the shape size
framework::Tensor gpu_shape_size_tensor(shape_dytpe);
if (!with_switch) {
gpu_shape_size_tensor.Resize({1});
gpu_shape_size_tensor.mutable_data(place, shape_dytpe);
auto *gpu_data = gpu_shape_size_tensor.data<int>();
PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclRecv(
gpu_data, 1, nccl_dtype, peer, comm->comm(), stream));
}

// copy the shape size tensor to cpu
framework::Tensor *cpu_shape_size_tensor = new framework::Tensor(shape_dytpe);
cpu_shape_size_tensor->Resize({1});
cpu_shape_size_tensor->mutable_data(platform::CPUPlace(), shape_dytpe);
if (with_switch) {
std::vector<framework::Tensor> shape_size_tensor;
shape_size_tensor.emplace_back(*cpu_shape_size_tensor);
auto shape_size_task = pg->Recv(shape_size_tensor, peer);
} else {
framework::TensorCopySync(gpu_shape_size_tensor, platform::CPUPlace(),
cpu_shape_size_tensor);
}
auto *cpu_data = cpu_shape_size_tensor->data<int>();
int shape_size = cpu_data[0];
VLOG(3) << "recv the shape size: " << shape_size << " from peer";

// step2: recv the shape
framework::Tensor gpu_shape_tensor(shape_dytpe);
if (!with_switch) {
gpu_shape_tensor.Resize({shape_size});
gpu_shape_tensor.mutable_data(place, shape_dytpe);
auto *gpu_shape_data = gpu_shape_tensor.data<int>();
PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclRecv(
gpu_shape_data, shape_size, nccl_dtype, peer, comm->comm(), stream));
}

// copy the shape tensor to cpu
framework::Tensor *cpu_shape_tensor = new framework::Tensor(shape_dytpe);
cpu_shape_tensor->Resize({shape_size});
cpu_shape_tensor->mutable_data(platform::CPUPlace(), shape_dytpe);
if (with_switch) {
std::vector<framework::Tensor> shape_tensor;
shape_tensor.emplace_back(*cpu_shape_tensor);
auto shape_task = pg->Recv(shape_tensor, peer);
} else {
framework::TensorCopySync(gpu_shape_tensor, platform::CPUPlace(),
cpu_shape_tensor);
}
auto *cpu_shape_data = cpu_shape_tensor->data<int>();
std::vector<int> all_shape;
for (int i = 0; i < shape_size; ++i) {
all_shape.emplace_back(cpu_shape_data[i]);
}
framework::DDim new_dim;
new_dim = new_dim.reshape(all_shape);
VLOG(3) << "recv the shape: (" << new_dim << ") from peer";

return new_dim;
}
#endif

template <typename T>
class RecvOpV2CUDAKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext &ctx) const override {
#if (defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL)) && \
NCCL_VERSION_CODE >= 2703
int rid = ctx.Attr<int>("ring_id");
bool dynamic_shape = ctx.Attr<bool>("dynamic_shape");
PADDLE_ENFORCE_GE(
rid, 0,
platform::errors::InvalidArgument(
Expand All @@ -53,7 +138,18 @@ class RecvOpV2CUDAKernel : public framework::OpKernel<T> {
auto out_shape = ctx.Attr<std::vector<int>>("out_shape");
auto out = ctx.Output<framework::LoDTensor>("Out");
auto out_dims = out->dims();
out->mutable_data<T>(out_dims, place);

if (dynamic_shape) {
VLOG(3) << "recv_v2 will use dynamic shape with send_v2 for switch";
framework::DDim new_dim = recv_shape_info(ctx.GetPlace(),
/* gpuStream_t */ nullptr,
/* NCCLComm* */ nullptr, peer,
/* use_switch */ true, pg);
out->Resize(new_dim);
out->mutable_data<T>(new_dim, place);
} else {
out->mutable_data<T>(out_dims, place);
}

out_tensor.emplace_back(*out);
auto task = pg->Recv(out_tensor, peer);
Expand All @@ -79,6 +175,10 @@ class RecvOpV2CUDAKernel : public framework::OpKernel<T> {

auto *out_var = ctx.OutputVar("Out");
if (out_var->IsType<framework::LoDTensorArray>()) {
PADDLE_ENFORCE_EQ(
dynamic_shape, false,
platform::errors::InvalidArgument("Dynamic shape for send/recv not "
"support LoDTensorArray for now."));
auto out_array = out_var->GetMutable<framework::LoDTensorArray>();
for (size_t idx = 0; idx < out_array->size(); ++idx) {
VLOG(3) << "LodTensorArray: idx(" << idx << ")";
Expand All @@ -99,7 +199,17 @@ class RecvOpV2CUDAKernel : public framework::OpKernel<T> {
auto out_dims = out->dims();
auto numel = out->numel();

out->mutable_data<T>(out_dims, place);
if (dynamic_shape) {
VLOG(3) << "recv_v2 will use dynamic shape with send_v2";
FeixLiu marked this conversation as resolved.
Show resolved Hide resolved
framework::DDim new_dim = recv_shape_info(place, stream, comm, peer,
/* use_switch */ false,
/* ProcessGroup* */ nullptr);
out->Resize(new_dim);
numel = out->numel();
out->mutable_data<T>(new_dim, place);
} else {
out->mutable_data<T>(out_dims, place);
}
PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclRecv(
out->data<T>(), numel, dtype, peer, comm->comm(), stream));
VLOG(3) << "rank " << comm->rank() << " recv " << phi::product(out->dims())
Expand Down
4 changes: 4 additions & 0 deletions paddle/fluid/operators/collective/send_v2_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ class SendOpV2Maker : public framework::OpProtoAndCheckerMaker {
"use_calc_stream",
"(bool default false) eject CUDA operations to calculation stream.")
.SetDefault(false);
AddAttr<bool>(
"dynamic_shape",
"(bool default false) the send/recv will be done with dynamic shape.")
.SetDefault(false);
AddComment(R"DOC(
Send Operator

Expand Down
100 changes: 99 additions & 1 deletion paddle/fluid/operators/collective/send_v2_op.cu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,90 @@ limitations under the License. */
namespace paddle {
namespace operators {

#if (defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL)) && \
NCCL_VERSION_CODE >= 2703
void send_shape_info(const framework::Tensor& x, const platform::Place& place,
const gpuStream_t& stream, platform::NCCLComm* comm,
const int& peer, bool with_switch,
FeixLiu marked this conversation as resolved.
Show resolved Hide resolved
distributed::ProcessGroup* pg) {
if (with_switch) {
PADDLE_ENFORCE_NE(pg, nullptr, platform::errors::InvalidArgument(
"Process group should be provided if "
"use switch to send shape info."));
} else {
PADDLE_ENFORCE_EQ((stream != nullptr && comm != nullptr), true,
platform::errors::InvalidArgument(
"NCCLComm and Stream should be provided if use NCCL "
"to send the shape info."));
}

paddle::experimental::DataType shape_dytpe =
paddle::experimental::DataType::INT32;
ncclDataType_t nccl_dtype =
platform::ToNCCLDataType(framework::TransToProtoVarType(shape_dytpe));
auto dims = x.dims();
int shape_size = dims.size();

// step1: send the shape size
framework::Tensor cpu_shape_size_tensor(shape_dytpe);
cpu_shape_size_tensor.Resize({1});
cpu_shape_size_tensor.mutable_data(platform::CPUPlace(), shape_dytpe);
auto* cpu_data = cpu_shape_size_tensor.data<int>();
cpu_data[0] = shape_size;

if (with_switch) {
std::vector<framework::Tensor> shape_size_tensor;
shape_size_tensor.template emplace_back(cpu_shape_size_tensor);
auto shape_size_task = pg->Send(shape_size_tensor, peer);
} else {
// copy the shape size tensor to gpu and send
framework::Tensor* gpu_shape_size_tensor =
new framework::Tensor(shape_dytpe);
gpu_shape_size_tensor->Resize({1});
gpu_shape_size_tensor->mutable_data(place, shape_dytpe);
framework::TensorCopySync(cpu_shape_size_tensor, place,
gpu_shape_size_tensor);
PADDLE_ENFORCE_GPU_SUCCESS(
platform::dynload::ncclSend(gpu_shape_size_tensor->data<int>(), 1,
nccl_dtype, peer, comm->comm(), stream));
}
VLOG(3) << "send the shape size: " << shape_size << " to peer";

// step2: send the shape
framework::Tensor cpu_shape_tensor(shape_dytpe);
cpu_shape_tensor.Resize({shape_size});
cpu_shape_tensor.mutable_data(platform::CPUPlace(), shape_dytpe);
auto* cpu_shape_data = cpu_shape_tensor.data<int>();
for (int i = 0; i < shape_size; ++i) {
cpu_shape_data[i] = dims[i];
}

if (with_switch) {
std::vector<framework::Tensor> shape_tensor;
shape_tensor.template emplace_back(cpu_shape_tensor);
auto shape_task = pg->Send(shape_tensor, peer);
} else {
// copy the shape tensor to gpu and send
framework::Tensor* gpu_shape_tensor = new framework::Tensor(shape_dytpe);
gpu_shape_tensor->Resize({shape_size});
gpu_shape_tensor->mutable_data(place, shape_dytpe);
framework::TensorCopySync(cpu_shape_tensor, place, gpu_shape_tensor);
PADDLE_ENFORCE_GPU_SUCCESS(
platform::dynload::ncclSend(gpu_shape_tensor->data<int>(), shape_size,
nccl_dtype, peer, comm->comm(), stream));
}
VLOG(3) << "send the shape: (" << dims << ") to peer";
}
#endif

template <typename T>
class SendOpV2CUDAKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& ctx) const override {
#if (defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL)) && \
NCCL_VERSION_CODE >= 2703
int rid = ctx.Attr<int>("ring_id");
bool dynamic_shape = ctx.Attr<bool>("dynamic_shape");
PADDLE_ENFORCE_GE(
rid, 0,
platform::errors::InvalidArgument(
Expand All @@ -45,8 +122,18 @@ class SendOpV2CUDAKernel : public framework::OpKernel<T> {
if (map->has(rid)) {
// Use ProcessGroup
distributed::ProcessGroup* pg = map->get(rid);
std::vector<phi::DenseTensor> in_tensor;
auto x = ctx.Input<framework::LoDTensor>("X");

if (dynamic_shape) {
// dynamic shape for switch send/recv
VLOG(3) << "send_v2 will use dynamic shape with recv_v2 for switch";
send_shape_info(*x, ctx.GetPlace(),
/* gpuStream_t */ nullptr,
/* NCCLComm* */ nullptr, peer,
/* use_switch */ true, pg);
}

std::vector<phi::DenseTensor> in_tensor;
in_tensor.push_back(*x);
auto task = pg->Send(in_tensor, peer);
return;
Expand All @@ -68,6 +155,10 @@ class SendOpV2CUDAKernel : public framework::OpKernel<T> {

auto* x_var = ctx.InputVar("X");
if (x_var->IsType<framework::LoDTensorArray>()) {
PADDLE_ENFORCE_EQ(
dynamic_shape, false,
platform::errors::InvalidArgument("Dynamic shape for send/recv not "
"support LoDTensorArray for now."));
auto& x_array = x_var->Get<framework::LoDTensorArray>();
for (size_t idx = 0; idx < x_array.size(); idx++) {
VLOG(3) << "LodTensorArray: idx(" << idx << ")";
Expand All @@ -85,6 +176,13 @@ class SendOpV2CUDAKernel : public framework::OpKernel<T> {
auto x = ctx.Input<framework::LoDTensor>("X");
int numel = x->numel();

if (dynamic_shape) {
VLOG(3) << "send_v2 will use dynamic shape with recv_v2";
send_shape_info(*x, place, stream, comm, peer,
/* use_switch */ false,
/* ProcessGroup* */ nullptr);
}

ncclDataType_t dtype =
platform::ToNCCLDataType(framework::TransToProtoVarType(x->dtype()));
PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclSend(
Expand Down
2 changes: 2 additions & 0 deletions python/paddle/distributed/collective.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ def barrier(group=None):


def _set_custom_gid(gid):
global _custom_gid
_custom_gid = gid


Expand All @@ -363,6 +364,7 @@ def new_group(ranks=None, backend=None):
paddle.distributed.all_reduce(tindata, group=gp, use_calc_stream=False)

"""
global _custom_gid
global _group_map
if in_dygraph_mode():
global _default_group_name
Expand Down
Loading