Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support both use_calc_stream and sync_op in all_reduce #45282

Merged
merged 1 commit into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions paddle/fluid/distributed/collective/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ cc_library(
processgroup
SRCS ProcessGroup.cc
DEPS dense_tensor)
cc_library(
processgroup_stream
SRCS ProcessGroupStream.cc
DEPS dense_tensor)
cc_library(
eager_reducer
SRCS reducer.cc
DEPS eager_api processgroup phi_api string_helper)
DEPS eager_api processgroup processgroup_stream phi_api string_helper)

if(WITH_DISTRIBUTE)
cc_library(
Expand All @@ -18,7 +22,12 @@ if(WITH_NCCL OR WITH_RCCL)
cc_library(
processgroup_nccl
SRCS ProcessGroupNCCL.cc NCCLTools.cc Common.cc
DEPS processgroup place enforce collective_helper device_context
DEPS processgroup
processgroup_stream
place
enforce
collective_helper
device_context
dense_tensor)
if(WITH_DISTRIBUTE AND WITH_PSCORE)
if(CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0)
Expand Down
8 changes: 7 additions & 1 deletion paddle/fluid/distributed/collective/ProcessGroup.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@ namespace paddle {
namespace distributed {

ProcessGroup::Task::Task(int rank,
const std::vector<phi::DenseTensor>& inputTensors,
const std::vector<phi::DenseTensor>& inputs,
CommType comm_type)
: rank_(rank), comm_type_(comm_type) {}

ProcessGroup::Task::Task(int rank,
const std::vector<phi::DenseTensor>& inputs,
CommType comm_type,
bool sync_op)
: rank_(rank), comm_type_(comm_type), sync_op_(sync_op) {}

ProcessGroup::Task::~Task() = default;

bool ProcessGroup::Task::IsCompleted() {
Expand Down
27 changes: 23 additions & 4 deletions paddle/fluid/distributed/collective/ProcessGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,27 @@ class ProcessGroup {
class Task {
public:
Task(int rank,
const std::vector<phi::DenseTensor>& inputTensors,
CommType opType = CommType::UNKNOWN);
const std::vector<phi::DenseTensor>& inputs,
CommType comm_type);
Task(int rank,
const std::vector<phi::DenseTensor>& inputs,
CommType comm_type,
bool sync_op);

virtual ~Task();
virtual bool IsCompleted();
virtual bool Wait(std::chrono::milliseconds timeout = kWaitTimeout);
virtual void Synchronize();
bool IsSync() const { return sync_op_; }

protected:
const int rank_;
CommType comm_type_;
CommType comm_type_{CommType::UNKNOWN};
std::mutex mutex_;
bool is_completed_ = false;
bool is_completed_{false};

private:
bool sync_op_{true};
};

explicit ProcessGroup(int rank,
Expand All @@ -82,6 +90,7 @@ class ProcessGroup {

virtual const std::string GetBackendName() const = 0;

// TODO(liyurui): This API will be moved later
virtual std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>& /* input tensors */, // NOLINT
std::vector<phi::DenseTensor>& /* output tensors */, // NOLINT
Expand All @@ -90,6 +99,16 @@ class ProcessGroup {
"ProcessGroup%s does not support allreduce", GetBackendName()));
}

virtual std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>& /* input tensors */, // NOLINT
std::vector<phi::DenseTensor>& /* output tensors */, // NOLINT
const AllreduceOptions&,
bool) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support allreduce with sync_op flag",
GetBackendName()));
}

virtual std::shared_ptr<ProcessGroup::Task> Broadcast(
std::vector<phi::DenseTensor>& /* input tensors */, // NOLINT
std::vector<phi::DenseTensor>& /* output tensors */, // NOLINT
Expand Down
133 changes: 130 additions & 3 deletions paddle/fluid/distributed/collective/ProcessGroupNCCL.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,20 @@ ProcessGroupNCCL::NCCLTask::NCCLTask(
int rank,
CommType CommType,
const std::vector<phi::DenseTensor>& inputs)
: Task(rank, inputs, CommType), places_(places) {
: TaskStream(rank, inputs, CommType), places_(places) {
control_events_.resize(places.size());
ncclComms_.resize(places.size());
}

ProcessGroupNCCL::NCCLTask::NCCLTask(
const std::vector<Place>& places,
int rank,
CommType comm_type,
const std::vector<phi::DenseTensor>& inputs,
bool sync_op,
bool use_calc_stream)
: TaskStream(rank, inputs, comm_type, sync_op, use_calc_stream),
places_(places) {
control_events_.resize(places.size());
ncclComms_.resize(places.size());
}
Expand Down Expand Up @@ -116,6 +129,13 @@ void ProcessGroupNCCL::CheckSplitSizes(std::vector<int64_t>* split_sizes,

// TODO(sheniang03): Add timeout for wait, now timeout unused
bool ProcessGroupNCCL::NCCLTask::Wait(std::chrono::milliseconds timeout) {
// Warning here when use calc stream but also invoke waiting explicitly.
if (UseCalcStream()) {
VLOG(3) << "Warning: The communication is on calc stream, wait here is "
"useless.";
return true;
}

SynchronizeStreams();
if (FLAGS_nccl_blocking_wait) {
// NOTE(shenliang03): It will block host for sync
Expand Down Expand Up @@ -146,7 +166,7 @@ ProcessGroupNCCL::ProcessGroupNCCL(const std::shared_ptr<Store>& store,
int size,
const platform::Place& place,
int gid)
: ProcessGroup(rank, size, place, gid), store_(store) {
: ProcessGroupStream(rank, size, place, gid), store_(store) {
platform::SetDeviceId(place_.device);
}

Expand Down Expand Up @@ -223,6 +243,81 @@ void ProcessGroupNCCL::CreateNCCLManagerCache(
places_to_ctx_.emplace(places_key, std::move(dev_ctx));
}

template <typename Fn>
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Collective(
std::vector<phi::DenseTensor>& inputs,
std::vector<phi::DenseTensor>& outputs,
Fn fn,
CommType comm_type,
bool sync_op,
bool use_calc_stream) {
const auto& places = GetPlaceList(inputs);
const auto& key = GetKeyFromPlaces(places);

{
std::lock_guard<std::mutex> lock(mutex_);
if (places_to_ncclcomm_.find(key) == places_to_ncclcomm_.end()) {
CreateNCCLManagerCache(key, places);
}
}

auto& nccl_comms = places_to_ncclcomm_[key];

SyncDefaultStream(places, places_to_events_[key], places_to_ctx_[key]);

auto task = std::make_shared<ProcessGroupNCCL::NCCLTask>(
places, rank_, comm_type, inputs, sync_op, use_calc_stream);

platform::CUDADeviceGuard cuda_guard;

{
platform::NCCLGroupGuard nccl_guard;
for (size_t i = 0; i < inputs.size(); ++i) {
cuda_guard.SetDevice(places[i]);

gpuStream_t nccl_stream;
if (use_calc_stream) {
nccl_stream =
static_cast<phi::GPUContext*>(
platform::DeviceContextPool::Instance().Get(places[i]))
->stream();
} else {
nccl_stream = places_to_ctx_[key][i]->stream();
}

fn(inputs[i], outputs[i], nccl_comms[i]->GetNcclComm(), nccl_stream);
}
}

if (FLAGS_use_stream_safe_cuda_allocator) {
for (size_t i = 0; i < inputs.size(); ++i) {
cuda_guard.SetDevice(places[i]);

gpuStream_t nccl_stream;
if (use_calc_stream) {
nccl_stream =
static_cast<phi::GPUContext*>(
platform::DeviceContextPool::Instance().Get(places[i]))
->stream();
} else {
nccl_stream = places_to_ctx_[key][i]->stream();
}

memory::RecordStream(inputs[i].Holder(), nccl_stream);
}
}

// Adding stream event dependency only when use comm stream
if (!use_calc_stream) {
for (size_t i = 0; i < inputs.size(); ++i) {
cuda_guard.SetDevice(places[i]);
task->control_events_[i].Record(*places_to_ctx_[key][i]);
}
}

return task;
}

template <typename Fn>
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Collective(
std::vector<phi::DenseTensor>& inputs,
Expand Down Expand Up @@ -386,6 +481,37 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllReduce(
CommType::ALLREDUCE);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllReduce(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
const AllreduceOptions& opts,
bool sync_op,
bool use_calc_stream) {
PADDLE_ENFORCE_EQ(
CheckTensorsInCudaPlace(in_tensors),
true,
platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
return Collective(
in_tensors,
out_tensors,
[&](const phi::DenseTensor& input,
phi::DenseTensor& output,
ncclComm_t comm,
const gpuStream_t& stream) {
return platform::dynload::ncclAllReduce(
input.data(),
output.data(),
input.numel(),
platform::ToNCCLDataType(input.type()),
ToNCCLRedType(opts.reduce_op),
comm,
stream);
},
CommType::ALLREDUCE,
sync_op,
use_calc_stream);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Broadcast(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
Expand Down Expand Up @@ -432,7 +558,8 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Barrier(
new paddle::experimental::DefaultAllocator(place));
barrierTensors.emplace_back(allocator.get(), meta);
}
auto task = ProcessGroupNCCL::AllReduce(barrierTensors, barrierTensors);
auto task = ProcessGroupNCCL::AllReduce(
barrierTensors, barrierTensors, AllreduceOptions());
auto nccl_task = dynamic_cast<ProcessGroupNCCL::NCCLTask*>(task.get());
nccl_task->barrierTensors_ = std::move(barrierTensors);
return task;
Expand Down
30 changes: 27 additions & 3 deletions paddle/fluid/distributed/collective/ProcessGroupNCCL.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include <unordered_map>
#include <vector>

#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/fluid/distributed/collective/ProcessGroupStream.h"
#include "paddle/fluid/distributed/store/store.h"
#include "paddle/fluid/platform/cuda_device_guard.h"
#include "paddle/fluid/platform/device_context.h"
Expand All @@ -46,16 +46,23 @@ namespace distributed {

using Place = paddle::platform::Place;

class ProcessGroupNCCL : public ProcessGroup {
class ProcessGroupNCCL : public ProcessGroupStream {
public:
class NCCLTask : public ProcessGroup::Task,
class NCCLTask : public ProcessGroupStream::TaskStream,
public std::enable_shared_from_this<NCCLTask> {
public:
NCCLTask(const std::vector<Place>& places,
int rank,
CommType CommType,
const std::vector<phi::DenseTensor>& inputs);

NCCLTask(const std::vector<Place>& places,
int rank,
CommType comm_type,
const std::vector<phi::DenseTensor>& inputs,
bool is_sync,
bool use_calc_stream);

bool IsCompleted();

void SynchronizeStreams();
Expand Down Expand Up @@ -89,6 +96,14 @@ class ProcessGroupNCCL : public ProcessGroup {
return std::string(NCCL_BACKEND_NAME);
}

std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in_tensors will be modified?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be modified in the following pr

std::vector<phi::DenseTensor>& out_tensors, // NOLINT
const AllreduceOptions& options,
bool sync_op,
bool use_calc_stream) override;

// TODO(liyurui): This API will be moved later
std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
Expand Down Expand Up @@ -194,6 +209,15 @@ class ProcessGroupNCCL : public ProcessGroup {
Fn fn,
CommType op_type);

template <typename Fn>
std::shared_ptr<ProcessGroupStream::Task> Collective(
std::vector<phi::DenseTensor>& inputs, // NOLINT
std::vector<phi::DenseTensor>& outputs, // NOLINT
Fn fn,
CommType comm_type,
bool sync_op,
bool use_calc_stream);

template <typename Fn>
void Collective(const phi::DenseTensor*,
phi::DenseTensor*,
Expand Down
49 changes: 49 additions & 0 deletions paddle/fluid/distributed/collective/ProcessGroupStream.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/distributed/collective/ProcessGroupStream.h"

namespace paddle {
namespace distributed {

ProcessGroupStream::ProcessGroupStream(int rank,
int size,
const platform::Place& place,
int gid)
: ProcessGroup(rank, size, place, gid) {}

std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::AllReduce(
std::vector<phi::DenseTensor>& input_tensors, // NOLINT
std::vector<phi::DenseTensor>& output_tensors, // NOLINT
const AllreduceOptions& options,
bool sync_op) {
return AllReduce(input_tensors,
output_tensors,
options,
sync_op,
/*use_calc_stream*/ false);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::AllReduce(
std::vector<phi::DenseTensor>& input_tensors, // NOLINT
std::vector<phi::DenseTensor>& output_tensors, // NOLINT
const AllreduceOptions& options,
bool sync_op,
bool use_calc_stream) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support do allreduce", GetBackendName()));
}

} // namespace distributed
} // namespace paddle
Loading