Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
FeixLiu committed May 17, 2022
1 parent 469121d commit 62c4eae
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 33 deletions.
30 changes: 12 additions & 18 deletions paddle/fluid/operators/collective/recv_v2_op.cu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,8 @@ namespace operators {
framework::DDim recv_shape_info(const platform::Place &place,
const gpuStream_t &stream,
platform::NCCLComm *comm, const int &peer,
bool with_switch,
distributed::ProcessGroup *pg) {
if (with_switch) {
PADDLE_ENFORCE_NE(pg, nullptr, platform::errors::InvalidArgument(
"Process group should be provided if "
"use switch to send shape info."));
} else {
distributed::ProcessGroup *group) {
if (!group) {
PADDLE_ENFORCE_EQ((stream != nullptr && comm != nullptr), true,
platform::errors::InvalidArgument(
"NCCLComm and Stream should be provided if use NCCL "
Expand All @@ -50,7 +45,7 @@ framework::DDim recv_shape_info(const platform::Place &place,

// step1: recv the shape size
framework::Tensor gpu_shape_size_tensor(shape_dytpe);
if (!with_switch) {
if (!group) {
gpu_shape_size_tensor.Resize({1});
gpu_shape_size_tensor.mutable_data(place, shape_dytpe);
auto *gpu_data = gpu_shape_size_tensor.data<int>();
Expand All @@ -62,10 +57,10 @@ framework::DDim recv_shape_info(const platform::Place &place,
framework::Tensor *cpu_shape_size_tensor = new framework::Tensor(shape_dytpe);
cpu_shape_size_tensor->Resize({1});
cpu_shape_size_tensor->mutable_data(platform::CPUPlace(), shape_dytpe);
if (with_switch) {
if (group) {
std::vector<framework::Tensor> shape_size_tensor;
shape_size_tensor.emplace_back(*cpu_shape_size_tensor);
auto shape_size_task = pg->Recv(shape_size_tensor, peer);
auto shape_size_task = group->Recv(shape_size_tensor, peer);
} else {
framework::TensorCopySync(gpu_shape_size_tensor, platform::CPUPlace(),
cpu_shape_size_tensor);
Expand All @@ -76,7 +71,7 @@ framework::DDim recv_shape_info(const platform::Place &place,

// step2: recv the shape
framework::Tensor gpu_shape_tensor(shape_dytpe);
if (!with_switch) {
if (!group) {
gpu_shape_tensor.Resize({shape_size});
gpu_shape_tensor.mutable_data(place, shape_dytpe);
auto *gpu_shape_data = gpu_shape_tensor.data<int>();
Expand All @@ -88,10 +83,10 @@ framework::DDim recv_shape_info(const platform::Place &place,
framework::Tensor *cpu_shape_tensor = new framework::Tensor(shape_dytpe);
cpu_shape_tensor->Resize({shape_size});
cpu_shape_tensor->mutable_data(platform::CPUPlace(), shape_dytpe);
if (with_switch) {
if (group) {
std::vector<framework::Tensor> shape_tensor;
shape_tensor.emplace_back(*cpu_shape_tensor);
auto shape_task = pg->Recv(shape_tensor, peer);
auto shape_task = group->Recv(shape_tensor, peer);
} else {
framework::TensorCopySync(gpu_shape_tensor, platform::CPUPlace(),
cpu_shape_tensor);
Expand Down Expand Up @@ -141,10 +136,10 @@ class RecvOpV2CUDAKernel : public framework::OpKernel<T> {

if (dynamic_shape) {
VLOG(3) << "recv_v2 will use dynamic shape with send_v2 for switch";
framework::DDim new_dim = recv_shape_info(ctx.GetPlace(),
/* gpuStream_t */ nullptr,
/* NCCLComm* */ nullptr, peer,
/* use_switch */ true, pg);
framework::DDim new_dim =
recv_shape_info(ctx.GetPlace(),
/* gpuStream_t */ nullptr,
/* NCCLComm* */ nullptr, peer, pg);
out->Resize(new_dim);
out->mutable_data<T>(new_dim, place);
} else {
Expand Down Expand Up @@ -202,7 +197,6 @@ class RecvOpV2CUDAKernel : public framework::OpKernel<T> {
if (dynamic_shape) {
VLOG(3) << "recv_v2 will use dynamic shape with send_v2";
framework::DDim new_dim = recv_shape_info(place, stream, comm, peer,
/* use_switch */ false,
/* ProcessGroup* */ nullptr);
out->Resize(new_dim);
numel = out->numel();
Expand Down
22 changes: 7 additions & 15 deletions paddle/fluid/operators/collective/send_v2_op.cu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,13 @@ namespace operators {
NCCL_VERSION_CODE >= 2703
void send_shape_info(const framework::Tensor& x, const platform::Place& place,
const gpuStream_t& stream, platform::NCCLComm* comm,
const int& peer, bool with_switch,
distributed::ProcessGroup* pg) {
if (with_switch) {
PADDLE_ENFORCE_NE(pg, nullptr, platform::errors::InvalidArgument(
"Process group should be provided if "
"use switch to send shape info."));
} else {
const int& peer, distributed::ProcessGroup* group) {
if (!group) {
PADDLE_ENFORCE_EQ((stream != nullptr && comm != nullptr), true,
platform::errors::InvalidArgument(
"NCCLComm and Stream should be provided if use NCCL "
"to send the shape info."));
}

paddle::experimental::DataType shape_dytpe =
paddle::experimental::DataType::INT32;
ncclDataType_t nccl_dtype =
Expand All @@ -55,10 +49,10 @@ void send_shape_info(const framework::Tensor& x, const platform::Place& place,
auto* cpu_data = cpu_shape_size_tensor.data<int>();
cpu_data[0] = shape_size;

if (with_switch) {
if (group) {
std::vector<framework::Tensor> shape_size_tensor;
shape_size_tensor.template emplace_back(cpu_shape_size_tensor);
auto shape_size_task = pg->Send(shape_size_tensor, peer);
auto shape_size_task = group->Send(shape_size_tensor, peer);
} else {
// copy the shape size tensor to gpu and send
framework::Tensor* gpu_shape_size_tensor =
Expand All @@ -82,10 +76,10 @@ void send_shape_info(const framework::Tensor& x, const platform::Place& place,
cpu_shape_data[i] = dims[i];
}

if (with_switch) {
if (group) {
std::vector<framework::Tensor> shape_tensor;
shape_tensor.template emplace_back(cpu_shape_tensor);
auto shape_task = pg->Send(shape_tensor, peer);
auto shape_task = group->Send(shape_tensor, peer);
} else {
// copy the shape tensor to gpu and send
framework::Tensor* gpu_shape_tensor = new framework::Tensor(shape_dytpe);
Expand Down Expand Up @@ -129,8 +123,7 @@ class SendOpV2CUDAKernel : public framework::OpKernel<T> {
VLOG(3) << "send_v2 will use dynamic shape with recv_v2 for switch";
send_shape_info(*x, ctx.GetPlace(),
/* gpuStream_t */ nullptr,
/* NCCLComm* */ nullptr, peer,
/* use_switch */ true, pg);
/* NCCLComm* */ nullptr, peer, pg);
}

std::vector<phi::DenseTensor> in_tensor;
Expand Down Expand Up @@ -179,7 +172,6 @@ class SendOpV2CUDAKernel : public framework::OpKernel<T> {
if (dynamic_shape) {
VLOG(3) << "send_v2 will use dynamic shape with recv_v2";
send_shape_info(*x, place, stream, comm, peer,
/* use_switch */ false,
/* ProcessGroup* */ nullptr);
}

Expand Down

1 comment on commit 62c4eae

@paddle-bot-old
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Congratulation! Your pull request passed all required CI. You could ask reviewer(s) to approve and merge. 🎉

Please sign in to comment.