Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[collective] dynamic shape for send_v2 and recv_v2 #42765

Merged
merged 10 commits into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions paddle/fluid/operators/collective/recv_v2_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,19 @@ class RecvOpV2 : public framework::OperatorWithKernel {
"The size of the output shape must be greater than 0 "
"but the value given is %d.",
out_shape.size()));
for (size_t i = 0; i < out_shape.size(); ++i) {
PADDLE_ENFORCE_GE(out_shape[i], 1,
platform::errors::InvalidArgument(
"The shape attribute for recv_v2 must be set "
"explicitly, but the %dth element is %d which "
"is less than 1.",
i, out_shape[i]));
bool dynamic_shape = ctx->Attrs().Get<bool>("dynamic_shape");
if (!dynamic_shape) {
for (size_t i = 0; i < out_shape.size(); ++i) {
PADDLE_ENFORCE_GE(out_shape[i], 1,
FeixLiu marked this conversation as resolved.
Show resolved Hide resolved
platform::errors::InvalidArgument(
"The shape attribute for recv_v2 must be set "
"explicitly, but the %dth element is %d which "
"is less than 1. Or dynamic_shape should be "
"set to True for both send_v2 and recv_v2.",
i, out_shape[i]));
}
ctx->SetOutputDim("Out", phi::make_ddim(out_shape));
}
ctx->SetOutputDim("Out", phi::make_ddim(out_shape));
}
}

Expand Down Expand Up @@ -87,6 +91,10 @@ class RecvOpV2Maker : public framework::OpProtoAndCheckerMaker {
"use_calc_stream",
"(bool default false) eject CUDA operations to calculation stream.")
.SetDefault(false);
AddAttr<bool>(
"dynamic_shape",
FeixLiu marked this conversation as resolved.
Show resolved Hide resolved
"(bool default false) the send/recv will be done with dynamic shape.")
.SetDefault(false);
AddComment(R"DOC(
Recv Operator

Expand Down
66 changes: 65 additions & 1 deletion paddle/fluid/operators/collective/recv_v2_op.cu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class RecvOpV2CUDAKernel : public framework::OpKernel<T> {
#if (defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL)) && \
NCCL_VERSION_CODE >= 2703
int rid = ctx.Attr<int>("ring_id");
bool dynamic_shape = ctx.Attr<bool>("dynamic_shape");
PADDLE_ENFORCE_GE(
rid, 0,
platform::errors::InvalidArgument(
Expand Down Expand Up @@ -79,6 +80,10 @@ class RecvOpV2CUDAKernel : public framework::OpKernel<T> {

auto *out_var = ctx.OutputVar("Out");
if (out_var->IsType<framework::LoDTensorArray>()) {
PADDLE_ENFORCE_EQ(
dynamic_shape, false,
platform::errors::InvalidArgument("Dynamic shape for send/recv not "
"support LoDTensorArray for now."));
auto out_array = out_var->GetMutable<framework::LoDTensorArray>();
for (size_t idx = 0; idx < out_array->size(); ++idx) {
VLOG(3) << "LodTensorArray: idx(" << idx << ")";
Expand All @@ -99,7 +104,66 @@ class RecvOpV2CUDAKernel : public framework::OpKernel<T> {
auto out_dims = out->dims();
auto numel = out->numel();

out->mutable_data<T>(out_dims, place);
if (dynamic_shape) {
VLOG(3) << "recv_v2 will use dynamic shape with send_v2";
FeixLiu marked this conversation as resolved.
Show resolved Hide resolved
paddle::experimental::DataType shape_dytpe =
paddle::experimental::DataType::INT64;
ncclDataType_t nccl_dtype =
platform::ToNCCLDataType(framework::TransToProtoVarType(shape_dytpe));

// step1: recv the shape size

// recv the shape size tensor on gpu
framework::Tensor gpu_shape_size_tensor(shape_dytpe);
gpu_shape_size_tensor.Resize({1});
gpu_shape_size_tensor.mutable_data(place, shape_dytpe);
auto *gpu_data = gpu_shape_size_tensor.data<int64_t>();
FeixLiu marked this conversation as resolved.
Show resolved Hide resolved
PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclRecv(
gpu_data, 1, nccl_dtype, peer, comm->comm(), stream));
framework::Tensor *cpu_shape_size_tensor =
new framework::Tensor(shape_dytpe);

// copy the shape size tensor to cpu
cpu_shape_size_tensor->Resize({1});
cpu_shape_size_tensor->mutable_data(platform::CPUPlace(), shape_dytpe);
framework::TensorCopySync(gpu_shape_size_tensor, platform::CPUPlace(),
cpu_shape_size_tensor);
auto *cpu_data = cpu_shape_size_tensor->data<int64_t>();
int64_t shape_size = cpu_data[0];
VLOG(3) << "recv the shape size: " << shape_size << " from peer";

// step2: recv the shape

// recv the shape tensor on gpu
framework::Tensor gpu_shape_tensor(shape_dytpe);
gpu_shape_tensor.Resize({shape_size});
gpu_shape_tensor.mutable_data(place, shape_dytpe);
auto *gpu_shape_data = gpu_shape_tensor.data<int64_t>();
PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclRecv(
gpu_shape_data, shape_size, nccl_dtype, peer, comm->comm(), stream));

// copy the shape tensor to cpu
framework::Tensor *cpu_shape_tensor = new framework::Tensor(shape_dytpe);
cpu_shape_tensor->Resize({shape_size});
cpu_shape_tensor->mutable_data(platform::CPUPlace(), shape_dytpe);
framework::TensorCopySync(gpu_shape_tensor, platform::CPUPlace(),
cpu_shape_tensor);
auto *cpu_shape_data = cpu_shape_tensor->data<int64_t>();
std::vector<int> all_shape;
for (int i = 0; i < shape_size; ++i) {
all_shape.emplace_back(cpu_shape_data[i]);
}
framework::DDim new_dim;
new_dim = new_dim.reshape(all_shape);
VLOG(3) << "recv the shape: (" << new_dim << ") from peer";

// step3: reshape the out tensor and recv the out tensor
out->Resize(new_dim);
numel = out->numel();
out->mutable_data<T>(new_dim, place);
} else {
out->mutable_data<T>(out_dims, place);
}
PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclRecv(
out->data<T>(), numel, dtype, peer, comm->comm(), stream));
VLOG(3) << "rank " << comm->rank() << " recv " << phi::product(out->dims())
Expand Down
4 changes: 4 additions & 0 deletions paddle/fluid/operators/collective/send_v2_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ class SendOpV2Maker : public framework::OpProtoAndCheckerMaker {
"use_calc_stream",
"(bool default false) eject CUDA operations to calculation stream.")
.SetDefault(false);
AddAttr<bool>(
"dynamic_shape",
"(bool default false) the send/recv will be done with dynamic shape.")
.SetDefault(false);
AddComment(R"DOC(
Send Operator
Expand Down
57 changes: 57 additions & 0 deletions paddle/fluid/operators/collective/send_v2_op.cu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class SendOpV2CUDAKernel : public framework::OpKernel<T> {
#if (defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL)) && \
NCCL_VERSION_CODE >= 2703
int rid = ctx.Attr<int>("ring_id");
bool dynamic_shape = ctx.Attr<bool>("dynamic_shape");
PADDLE_ENFORCE_GE(
rid, 0,
platform::errors::InvalidArgument(
Expand Down Expand Up @@ -68,6 +69,10 @@ class SendOpV2CUDAKernel : public framework::OpKernel<T> {

auto* x_var = ctx.InputVar("X");
if (x_var->IsType<framework::LoDTensorArray>()) {
PADDLE_ENFORCE_EQ(
dynamic_shape, false,
platform::errors::InvalidArgument("Dynamic shape for send/recv not "
"support LoDTensorArray for now."));
auto& x_array = x_var->Get<framework::LoDTensorArray>();
for (size_t idx = 0; idx < x_array.size(); idx++) {
VLOG(3) << "LodTensorArray: idx(" << idx << ")";
Expand All @@ -85,6 +90,58 @@ class SendOpV2CUDAKernel : public framework::OpKernel<T> {
auto x = ctx.Input<framework::LoDTensor>("X");
int numel = x->numel();

if (dynamic_shape) {
VLOG(3) << "send_v2 will use dynamic shape with recv_v2";
paddle::experimental::DataType shape_dytpe =
paddle::experimental::DataType::INT64;
ncclDataType_t nccl_dtype =
platform::ToNCCLDataType(framework::TransToProtoVarType(shape_dytpe));
auto dims = x->dims();
int64_t shape_size = dims.size();

// step1: send the shape size

// prepare the shape size tensor on cpu
framework::Tensor cpu_shape_size_tensor(shape_dytpe);
cpu_shape_size_tensor.Resize({1});
cpu_shape_size_tensor.mutable_data(platform::CPUPlace(), shape_dytpe);
auto* cpu_data = cpu_shape_size_tensor.data<int64_t>();
cpu_data[0] = shape_size;

// copy the shape size tensor to gpu and send
framework::Tensor* gpu_shape_size_tensor =
new framework::Tensor(shape_dytpe);
gpu_shape_size_tensor->Resize({1});
gpu_shape_size_tensor->mutable_data(place, shape_dytpe);
framework::TensorCopySync(cpu_shape_size_tensor, place,
gpu_shape_size_tensor);
PADDLE_ENFORCE_GPU_SUCCESS(
platform::dynload::ncclSend(gpu_shape_size_tensor->data<int64_t>(), 1,
nccl_dtype, peer, comm->comm(), stream));
VLOG(3) << "send the shape size: " << shape_size << " to peer";

// step2: send the shape

// perpare the shape tensor on cpu
framework::Tensor cpu_shape_tensor(shape_dytpe);
cpu_shape_tensor.Resize({shape_size});
cpu_shape_tensor.mutable_data(platform::CPUPlace(), shape_dytpe);
auto* cpu_shape_data = cpu_shape_tensor.data<int64_t>();
for (int i = 0; i < shape_size; ++i) {
cpu_shape_data[i] = dims[i];
}

// copy the shape tensor to gpu and send
framework::Tensor* gpu_shape_tensor = new framework::Tensor(shape_dytpe);
gpu_shape_tensor->Resize({shape_size});
gpu_shape_tensor->mutable_data(place, shape_dytpe);
framework::TensorCopySync(cpu_shape_tensor, place, gpu_shape_tensor);
PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclSend(
gpu_shape_tensor->data<int64_t>(), shape_size, nccl_dtype, peer,
comm->comm(), stream));
VLOG(3) << "send the shape: (" << dims << ") to peer";
}

ncclDataType_t dtype =
platform::ToNCCLDataType(framework::TransToProtoVarType(x->dtype()));
PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclSend(
Expand Down
20 changes: 16 additions & 4 deletions python/paddle/distributed/collective.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ def barrier(group=None):


def _set_custom_gid(gid):
global _custom_gid
_custom_gid = gid


Expand All @@ -363,6 +364,7 @@ def new_group(ranks=None, backend=None):
paddle.distributed.all_reduce(tindata, group=gp, use_calc_stream=False)

"""
global _custom_gid
global _group_map
if in_dygraph_mode():
global _default_group_name
Expand Down Expand Up @@ -1859,7 +1861,7 @@ def alltoall(in_tensor_list, out_tensor_list, group=None, use_calc_stream=True):
out_tensor_list.extend(paddle.split(out, nranks, 0))


def send(tensor, dst=0, group=None, use_calc_stream=True):
def send(tensor, dst=0, group=None, use_calc_stream=True, dynamic_shape=False):
"""
Send a tensor to the receiver.

Expand All @@ -1869,6 +1871,9 @@ def send(tensor, dst=0, group=None, use_calc_stream=True):
dst (int): The destination rank id.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
use_calc_stream (bool, optional): Whether to use calculate stream or communication stream. Default: True.
dynamic_shape(bool, optional): Whether the send/recv will use dynamic shape or not. Default: False.
If set True, this op will first send the shape info of the tensor to the dst.
Then send the tensor to the dst.

Returns:
None.
Expand Down Expand Up @@ -1905,7 +1910,8 @@ def send(tensor, dst=0, group=None, use_calc_stream=True):

if _non_static_mode():
return _C_ops.send_v2(tensor, 'use_calc_stream', use_calc_stream,
'ring_id', ring_id, 'peer', dst)
'ring_id', ring_id, 'peer', dst, 'dynamic_shape',
dynamic_shape)
op_type = 'send_v2'
check_variable_and_dtype(
tensor, 'tensor', ['float16', 'float32', 'float64', 'int32', 'int64'],
Expand All @@ -1919,10 +1925,11 @@ def send(tensor, dst=0, group=None, use_calc_stream=True):
'ring_id': ring_id,
'peer': dst,
'use_calc_stream': use_calc_stream,
'dynamic_shape': dynamic_shape
})


def recv(tensor, src=0, group=None, use_calc_stream=True):
def recv(tensor, src=0, group=None, use_calc_stream=True, dynamic_shape=False):
"""
Receive a tensor to the sender.

Expand All @@ -1932,6 +1939,9 @@ def recv(tensor, src=0, group=None, use_calc_stream=True):
src (int): The source rank id.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
use_calc_stream (bool, optional): Whether to use calculate stream or communication stream. Default: True.
dynamic_shape (bool, optional): Whether the send/recv will use dynamic shape or not. Default: False.
If set True, this op will first receive the shape info from the src.
Then receive tensor from the src based on the received shape info.

Returns:
None.
Expand Down Expand Up @@ -1969,7 +1979,8 @@ def recv(tensor, src=0, group=None, use_calc_stream=True):
if _non_static_mode():
return _C_ops.recv_v2(tensor, 'use_calc_stream', use_calc_stream,
'ring_id', ring_id, 'peer', src, 'dtype',
tensor.dtype, 'out_shape', tensor.shape)
tensor.dtype, 'out_shape', tensor.shape,
'dynamic_shape', dynamic_shape)
op_type = 'recv_v2'
check_variable_and_dtype(
tensor, 'tensor', ['float16', 'float32', 'float64', 'int32', 'int64'],
Expand All @@ -1984,4 +1995,5 @@ def recv(tensor, src=0, group=None, use_calc_stream=True):
'out_shape': tensor.shape,
'dtype': tensor.dtype,
'use_calc_stream': use_calc_stream,
'dynamic_shape': dynamic_shape
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

import numpy as np
import argparse
import os
import sys
import signal
import time
import socket
from contextlib import closing
from six import string_types
import math
import paddle
import paddle.fluid as fluid
import paddle.fluid.profiler as profiler
import paddle.fluid.unique_name as nameGen
from paddle.fluid import core
import unittest
from multiprocessing import Process
import paddle.fluid.layers as layers
from functools import reduce
from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main


class TestCollectiveSendRecvAPI(TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0

def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.to_tensor(indata)
if rank == 0:
paddle.distributed.send(tindata, dst=1, dynamic_shape=True)
else:
paddle.distributed.recv(tindata, src=0, dynamic_shape=True)
return [tindata.numpy()]


if __name__ == "__main__":
runtime_main(TestCollectiveSendRecvAPI, "sendrecv")
Loading