diff --git a/paddle/fluid/distributed/collective/CMakeLists.txt b/paddle/fluid/distributed/collective/CMakeLists.txt index 9e2ceb29f2a64..f7de900d6969b 100644 --- a/paddle/fluid/distributed/collective/CMakeLists.txt +++ b/paddle/fluid/distributed/collective/CMakeLists.txt @@ -2,10 +2,14 @@ cc_library( processgroup SRCS ProcessGroup.cc DEPS dense_tensor) +cc_library( + processgroup_stream + SRCS ProcessGroupStream.cc + DEPS dense_tensor) cc_library( eager_reducer SRCS reducer.cc - DEPS eager_api processgroup phi_api string_helper) + DEPS eager_api processgroup processgroup_stream phi_api string_helper) if(WITH_DISTRIBUTE) cc_library( @@ -18,7 +22,12 @@ if(WITH_NCCL OR WITH_RCCL) cc_library( processgroup_nccl SRCS ProcessGroupNCCL.cc NCCLTools.cc Common.cc - DEPS processgroup place enforce collective_helper device_context + DEPS processgroup + processgroup_stream + place + enforce + collective_helper + device_context dense_tensor) if(WITH_DISTRIBUTE AND WITH_PSCORE) if(CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0) diff --git a/paddle/fluid/distributed/collective/ProcessGroup.cc b/paddle/fluid/distributed/collective/ProcessGroup.cc index b1d9ad6dc9831..925d8e771cbae 100644 --- a/paddle/fluid/distributed/collective/ProcessGroup.cc +++ b/paddle/fluid/distributed/collective/ProcessGroup.cc @@ -18,10 +18,16 @@ namespace paddle { namespace distributed { ProcessGroup::Task::Task(int rank, - const std::vector& inputTensors, + const std::vector& inputs, CommType comm_type) : rank_(rank), comm_type_(comm_type) {} +ProcessGroup::Task::Task(int rank, + const std::vector& inputs, + CommType comm_type, + bool sync_op) + : rank_(rank), comm_type_(comm_type), sync_op_(sync_op) {} + ProcessGroup::Task::~Task() = default; bool ProcessGroup::Task::IsCompleted() { diff --git a/paddle/fluid/distributed/collective/ProcessGroup.h b/paddle/fluid/distributed/collective/ProcessGroup.h index 8a5465bf0515b..0937b26746132 100644 --- a/paddle/fluid/distributed/collective/ProcessGroup.h +++ b/paddle/fluid/distributed/collective/ProcessGroup.h @@ -55,19 +55,27 @@ class ProcessGroup { class Task { public: Task(int rank, - const std::vector& inputTensors, - CommType opType = CommType::UNKNOWN); + const std::vector& inputs, + CommType comm_type); + Task(int rank, + const std::vector& inputs, + CommType comm_type, + bool sync_op); virtual ~Task(); virtual bool IsCompleted(); virtual bool Wait(std::chrono::milliseconds timeout = kWaitTimeout); virtual void Synchronize(); + bool IsSync() const { return sync_op_; } protected: const int rank_; - CommType comm_type_; + CommType comm_type_{CommType::UNKNOWN}; std::mutex mutex_; - bool is_completed_ = false; + bool is_completed_{false}; + + private: + bool sync_op_{true}; }; explicit ProcessGroup(int rank, @@ -82,6 +90,7 @@ class ProcessGroup { virtual const std::string GetBackendName() const = 0; + // TODO(liyurui): This API will be moved later virtual std::shared_ptr AllReduce( std::vector& /* input tensors */, // NOLINT std::vector& /* output tensors */, // NOLINT @@ -90,6 +99,16 @@ class ProcessGroup { "ProcessGroup%s does not support allreduce", GetBackendName())); } + virtual std::shared_ptr AllReduce( + std::vector& /* input tensors */, // NOLINT + std::vector& /* output tensors */, // NOLINT + const AllreduceOptions&, + bool) { + PADDLE_THROW(platform::errors::InvalidArgument( + "ProcessGroup%s does not support allreduce with sync_op flag", + GetBackendName())); + } + virtual std::shared_ptr Broadcast( std::vector& /* input tensors */, // NOLINT std::vector& /* output tensors */, // NOLINT diff --git a/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc b/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc index 4a52ece783909..6a8ea7d1daab1 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc @@ -55,7 +55,20 @@ ProcessGroupNCCL::NCCLTask::NCCLTask( int rank, CommType CommType, const std::vector& inputs) - : Task(rank, inputs, CommType), places_(places) { + : TaskStream(rank, inputs, CommType), places_(places) { + control_events_.resize(places.size()); + ncclComms_.resize(places.size()); +} + +ProcessGroupNCCL::NCCLTask::NCCLTask( + const std::vector& places, + int rank, + CommType comm_type, + const std::vector& inputs, + bool sync_op, + bool use_calc_stream) + : TaskStream(rank, inputs, comm_type, sync_op, use_calc_stream), + places_(places) { control_events_.resize(places.size()); ncclComms_.resize(places.size()); } @@ -116,6 +129,13 @@ void ProcessGroupNCCL::CheckSplitSizes(std::vector* split_sizes, // TODO(sheniang03): Add timeout for wait, now timeout unused bool ProcessGroupNCCL::NCCLTask::Wait(std::chrono::milliseconds timeout) { + // Warning here when use calc stream but also invoke waiting explicitly. + if (UseCalcStream()) { + VLOG(3) << "Warning: The communication is on calc stream, wait here is " + "useless."; + return true; + } + SynchronizeStreams(); if (FLAGS_nccl_blocking_wait) { // NOTE(shenliang03): It will block host for sync @@ -146,7 +166,7 @@ ProcessGroupNCCL::ProcessGroupNCCL(const std::shared_ptr& store, int size, const platform::Place& place, int gid) - : ProcessGroup(rank, size, place, gid), store_(store) { + : ProcessGroupStream(rank, size, place, gid), store_(store) { platform::SetDeviceId(place_.device); } @@ -223,6 +243,81 @@ void ProcessGroupNCCL::CreateNCCLManagerCache( places_to_ctx_.emplace(places_key, std::move(dev_ctx)); } +template +std::shared_ptr ProcessGroupNCCL::Collective( + std::vector& inputs, + std::vector& outputs, + Fn fn, + CommType comm_type, + bool sync_op, + bool use_calc_stream) { + const auto& places = GetPlaceList(inputs); + const auto& key = GetKeyFromPlaces(places); + + { + std::lock_guard lock(mutex_); + if (places_to_ncclcomm_.find(key) == places_to_ncclcomm_.end()) { + CreateNCCLManagerCache(key, places); + } + } + + auto& nccl_comms = places_to_ncclcomm_[key]; + + SyncDefaultStream(places, places_to_events_[key], places_to_ctx_[key]); + + auto task = std::make_shared( + places, rank_, comm_type, inputs, sync_op, use_calc_stream); + + platform::CUDADeviceGuard cuda_guard; + + { + platform::NCCLGroupGuard nccl_guard; + for (size_t i = 0; i < inputs.size(); ++i) { + cuda_guard.SetDevice(places[i]); + + gpuStream_t nccl_stream; + if (use_calc_stream) { + nccl_stream = + static_cast( + platform::DeviceContextPool::Instance().Get(places[i])) + ->stream(); + } else { + nccl_stream = places_to_ctx_[key][i]->stream(); + } + + fn(inputs[i], outputs[i], nccl_comms[i]->GetNcclComm(), nccl_stream); + } + } + + if (FLAGS_use_stream_safe_cuda_allocator) { + for (size_t i = 0; i < inputs.size(); ++i) { + cuda_guard.SetDevice(places[i]); + + gpuStream_t nccl_stream; + if (use_calc_stream) { + nccl_stream = + static_cast( + platform::DeviceContextPool::Instance().Get(places[i])) + ->stream(); + } else { + nccl_stream = places_to_ctx_[key][i]->stream(); + } + + memory::RecordStream(inputs[i].Holder(), nccl_stream); + } + } + + // Adding stream event dependency only when use comm stream + if (!use_calc_stream) { + for (size_t i = 0; i < inputs.size(); ++i) { + cuda_guard.SetDevice(places[i]); + task->control_events_[i].Record(*places_to_ctx_[key][i]); + } + } + + return task; +} + template std::shared_ptr ProcessGroupNCCL::Collective( std::vector& inputs, @@ -386,6 +481,37 @@ std::shared_ptr ProcessGroupNCCL::AllReduce( CommType::ALLREDUCE); } +std::shared_ptr ProcessGroupNCCL::AllReduce( + std::vector& in_tensors, + std::vector& out_tensors, + const AllreduceOptions& opts, + bool sync_op, + bool use_calc_stream) { + PADDLE_ENFORCE_EQ( + CheckTensorsInCudaPlace(in_tensors), + true, + platform::errors::InvalidArgument("All inputs should be in CudaPlace.")); + return Collective( + in_tensors, + out_tensors, + [&](const phi::DenseTensor& input, + phi::DenseTensor& output, + ncclComm_t comm, + const gpuStream_t& stream) { + return platform::dynload::ncclAllReduce( + input.data(), + output.data(), + input.numel(), + platform::ToNCCLDataType(input.type()), + ToNCCLRedType(opts.reduce_op), + comm, + stream); + }, + CommType::ALLREDUCE, + sync_op, + use_calc_stream); +} + std::shared_ptr ProcessGroupNCCL::Broadcast( std::vector& in_tensors, std::vector& out_tensors, @@ -432,7 +558,8 @@ std::shared_ptr ProcessGroupNCCL::Barrier( new paddle::experimental::DefaultAllocator(place)); barrierTensors.emplace_back(allocator.get(), meta); } - auto task = ProcessGroupNCCL::AllReduce(barrierTensors, barrierTensors); + auto task = ProcessGroupNCCL::AllReduce( + barrierTensors, barrierTensors, AllreduceOptions()); auto nccl_task = dynamic_cast(task.get()); nccl_task->barrierTensors_ = std::move(barrierTensors); return task; diff --git a/paddle/fluid/distributed/collective/ProcessGroupNCCL.h b/paddle/fluid/distributed/collective/ProcessGroupNCCL.h index dfd07c438110c..50ef0b1f1ac28 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupNCCL.h +++ b/paddle/fluid/distributed/collective/ProcessGroupNCCL.h @@ -21,7 +21,7 @@ #include #include -#include "paddle/fluid/distributed/collective/ProcessGroup.h" +#include "paddle/fluid/distributed/collective/ProcessGroupStream.h" #include "paddle/fluid/distributed/store/store.h" #include "paddle/fluid/platform/cuda_device_guard.h" #include "paddle/fluid/platform/device_context.h" @@ -46,9 +46,9 @@ namespace distributed { using Place = paddle::platform::Place; -class ProcessGroupNCCL : public ProcessGroup { +class ProcessGroupNCCL : public ProcessGroupStream { public: - class NCCLTask : public ProcessGroup::Task, + class NCCLTask : public ProcessGroupStream::TaskStream, public std::enable_shared_from_this { public: NCCLTask(const std::vector& places, @@ -56,6 +56,13 @@ class ProcessGroupNCCL : public ProcessGroup { CommType CommType, const std::vector& inputs); + NCCLTask(const std::vector& places, + int rank, + CommType comm_type, + const std::vector& inputs, + bool is_sync, + bool use_calc_stream); + bool IsCompleted(); void SynchronizeStreams(); @@ -89,6 +96,14 @@ class ProcessGroupNCCL : public ProcessGroup { return std::string(NCCL_BACKEND_NAME); } + std::shared_ptr AllReduce( + std::vector& in_tensors, // NOLINT + std::vector& out_tensors, // NOLINT + const AllreduceOptions& options, + bool sync_op, + bool use_calc_stream) override; + + // TODO(liyurui): This API will be moved later std::shared_ptr AllReduce( std::vector& in_tensors, std::vector& out_tensors, @@ -194,6 +209,15 @@ class ProcessGroupNCCL : public ProcessGroup { Fn fn, CommType op_type); + template + std::shared_ptr Collective( + std::vector& inputs, // NOLINT + std::vector& outputs, // NOLINT + Fn fn, + CommType comm_type, + bool sync_op, + bool use_calc_stream); + template void Collective(const phi::DenseTensor*, phi::DenseTensor*, diff --git a/paddle/fluid/distributed/collective/ProcessGroupStream.cc b/paddle/fluid/distributed/collective/ProcessGroupStream.cc new file mode 100644 index 0000000000000..9a20b8e6eaf79 --- /dev/null +++ b/paddle/fluid/distributed/collective/ProcessGroupStream.cc @@ -0,0 +1,49 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/distributed/collective/ProcessGroupStream.h" + +namespace paddle { +namespace distributed { + +ProcessGroupStream::ProcessGroupStream(int rank, + int size, + const platform::Place& place, + int gid) + : ProcessGroup(rank, size, place, gid) {} + +std::shared_ptr ProcessGroupStream::AllReduce( + std::vector& input_tensors, // NOLINT + std::vector& output_tensors, // NOLINT + const AllreduceOptions& options, + bool sync_op) { + return AllReduce(input_tensors, + output_tensors, + options, + sync_op, + /*use_calc_stream*/ false); +} + +std::shared_ptr ProcessGroupStream::AllReduce( + std::vector& input_tensors, // NOLINT + std::vector& output_tensors, // NOLINT + const AllreduceOptions& options, + bool sync_op, + bool use_calc_stream) { + PADDLE_THROW(platform::errors::InvalidArgument( + "ProcessGroup%s does not support do allreduce", GetBackendName())); +} + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/collective/ProcessGroupStream.h b/paddle/fluid/distributed/collective/ProcessGroupStream.h new file mode 100644 index 0000000000000..81a05ee2416e0 --- /dev/null +++ b/paddle/fluid/distributed/collective/ProcessGroupStream.h @@ -0,0 +1,72 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "paddle/fluid/distributed/collective/ProcessGroup.h" + +namespace paddle { +namespace distributed { + +// NOTE(liyurui): Notice that some backends use `stream` as an abstract +// conception of hardward resource. We provide this base class allowing users to +// put communications on calculation stream. In some scenorios, we found this +// will save the time of switching streams. +class ProcessGroupStream : public ProcessGroup { + public: + class TaskStream : public ProcessGroup::Task { + public: + // TODO(liyurui): This constructor is temporary here for compatible reason, + // will be deleted soon. + TaskStream(int rank, + const std::vector& inputs, + CommType comm_type) + : Task(rank, inputs, comm_type) {} + + TaskStream(int rank, + const std::vector& inputs, + CommType comm_type, + bool sync_op, + bool use_calc_stream) + : Task(rank, inputs, comm_type, sync_op), + use_calc_stream_(use_calc_stream) {} + + virtual ~TaskStream() = default; + + protected: + bool UseCalcStream() const { return use_calc_stream_; } + + private: + bool use_calc_stream_{false}; + }; + + ProcessGroupStream(int rank, int size, const platform::Place& place, int gid); + virtual ~ProcessGroupStream() = default; + + std::shared_ptr AllReduce( + std::vector& input_tensors, // NOLINT + std::vector& output_tensors, // NOLINT + const AllreduceOptions& options, + bool sync_op) override; + + virtual std::shared_ptr AllReduce( + std::vector& input_tensors, // NOLINT + std::vector& output_tensors, // NOLINT + const AllreduceOptions& options, + bool sync_op, + bool use_calc_stream); +}; + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/pybind/distributed_py.cc b/paddle/fluid/pybind/distributed_py.cc index 1b325dcf3784f..5a7e2355f64eb 100644 --- a/paddle/fluid/pybind/distributed_py.cc +++ b/paddle/fluid/pybind/distributed_py.cc @@ -22,6 +22,7 @@ limitations under the License. */ #endif #include "paddle/fluid/distributed/collective/ProcessGroup.h" +#include "paddle/fluid/distributed/collective/ProcessGroupStream.h" #include "paddle/fluid/distributed/collective/Types.h" #include "paddle/fluid/distributed/collective/reducer.h" #include "paddle/fluid/framework/lod_tensor.h" @@ -134,6 +135,25 @@ void BindDistributed(py::module *m) { py::arg("op") = distributed::ReduceOp::SUM, py::call_guard()) + .def( + "allreduce", + [](distributed::ProcessGroup &self, + py::handle py_tensor, + distributed::ReduceOp op, + bool sync_op) { + auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); + distributed::AllreduceOptions opts; + opts.reduce_op = op; + auto dense = + std::dynamic_pointer_cast(tensor.impl()); + std::vector tensors = {*dense}; + return self.AllReduce(tensors, tensors, opts, sync_op); + }, + py::arg("tensor"), + py::arg("op"), + py::arg("sync_op"), + py::call_guard()) + .def( "broadcast", [](distributed::ProcessGroup &self, @@ -384,11 +404,36 @@ void BindDistributed(py::module *m) { py::arg("op") = distributed::ReduceOp::SUM, py::call_guard()); + auto ProcessGroupStream = + py::class_>( + *m, "ProcessGroupStream", ProcessGroup) + .def( + "allreduce_on_calc_stream", + [](distributed::ProcessGroupStream &self, + py::handle py_tensor, + distributed::ReduceOp op) { + auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); + distributed::AllreduceOptions opts; + opts.reduce_op = op; + auto dense = + std::dynamic_pointer_cast(tensor.impl()); + std::vector tensors = {*dense}; + return self.AllReduce(tensors, + tensors, + opts, + /*sync_op*/ true, + /*use_calc_stream*/ true); + }, + py::arg("tensor"), + py::arg("op"), + py::call_guard()); + #if defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL) auto processGroupNCCL = py::class_>( - *m, "ProcessGroupNCCL", ProcessGroup) + *m, "ProcessGroupNCCL", ProcessGroupStream) .def(py::init &, int, int, @@ -485,6 +530,7 @@ void BindDistributed(py::module *m) { py::class_>(*m, "task") .def("is_completed", &distributed::ProcessGroup::Task::IsCompleted) + .def("is_sync", &distributed::ProcessGroup::Task::IsSync) .def("wait", &distributed::ProcessGroup::Task::Wait, py::arg("timeout") = kWaitTimeout, diff --git a/python/paddle/distributed/__init__.py b/python/paddle/distributed/__init__.py index a238126dc6c38..bf9773ad9409f 100644 --- a/python/paddle/distributed/__init__.py +++ b/python/paddle/distributed/__init__.py @@ -51,6 +51,8 @@ from .collective import P2POp # noqa: F401 from .collective import reduce_scatter # noqa: F401 +from .communication import * # noqa: F401 + from .auto_parallel import shard_op # noqa: F401 from .auto_parallel import shard_tensor # noqa: F401 diff --git a/python/paddle/distributed/communication/__init__.py b/python/paddle/distributed/communication/__init__.py new file mode 100644 index 0000000000000..95d6c31580a88 --- /dev/null +++ b/python/paddle/distributed/communication/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__all__ = ["stream"] diff --git a/python/paddle/distributed/communication/stream/__init__.py b/python/paddle/distributed/communication/stream/__init__.py new file mode 100644 index 0000000000000..24194dd9fb1e2 --- /dev/null +++ b/python/paddle/distributed/communication/stream/__init__.py @@ -0,0 +1,17 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .all_reduce import all_reduce + +__all__ = ["all_reduce"] diff --git a/python/paddle/distributed/communication/stream/all_reduce.py b/python/paddle/distributed/communication/stream/all_reduce.py new file mode 100644 index 0000000000000..6a0b622cf0dfe --- /dev/null +++ b/python/paddle/distributed/communication/stream/all_reduce.py @@ -0,0 +1,90 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle.fluid.framework as framework +from ...collective import _get_default_group, _get_reduce_op, ReduceOp + + +def _all_reduce_in_dygraph(tensor, op, group, sync_op, use_calc_stream): + op_type = _get_reduce_op(op, "all_reduce") + group = _get_default_group() if group is None else group + if use_calc_stream: + return group.process_group.allreduce_on_calc_stream(tensor, op_type) + + task = group.process_group.allreduce(tensor, op_type, sync_op) + if sync_op: + task.wait() + + return task + + +def all_reduce(tensor, + op=ReduceOp.SUM, + group=None, + sync_op=True, + use_calc_stream=False): + """ + + Perform specific reduction (for example, sum, max) on inputs across devices. + + Args: + tensor (Tensor): The input tensor on each rank. The result will overwrite this tenor after communication. Support + float16, float32, float64, int32 or int64 as the input data type. + op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.Min|ReduceOp.PROD, optional): The reduction used. If none is given, use ReduceOp.SUM as default. + group (Group, optional): Communicate in which group. If none is given, use the global group as default. + sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default. + use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This + option is designed for high performance demand, be careful to turn it on except you are clearly know its meaning. + + Returns: + Return a task object. + + Warning: + This API only supports the dygraph mode now. + + Examples: + .. code-block:: python + + # required: distributed + import paddle + import paddle.distributed as dist + + dist.init_parallel_env() + local_rank = dist.get_rank() + data = None + if local_rank == 0: + data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]]) + else: + data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]]) + task = dist.stream.all_reduce(data, sync_op=False) + task.wait() + out = data.numpy() + # [[5, 7, 9], [5, 7, 9]] + """ + if group is not None and not group.is_member(): + raise RuntimeError( + "The group should not be None and all ranks which invoke this operation should be the member of this group." + ) + + if not sync_op and use_calc_stream: + raise RuntimeError( + "use_calc_stream can only be true in sync op behavior.") + + if framework.in_dygraph_mode(): + return _all_reduce_in_dygraph(tensor, op, group, sync_op, + use_calc_stream) + + raise RuntimeError( + "paddle.distributed.stream.all_reduce is only supported in dygraph mode now." + ) diff --git a/python/paddle/fluid/tests/unittests/collective/CMakeLists.txt b/python/paddle/fluid/tests/unittests/collective/CMakeLists.txt index a69973eeb9208..a299de42cb4da 100644 --- a/python/paddle/fluid/tests/unittests/collective/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/collective/CMakeLists.txt @@ -272,5 +272,13 @@ if((WITH_GPU ) set_tests_properties(test_gen_nccl_id_op PROPERTIES RUN_SERIAL 1) endif() +if((WITH_GPU) AND (LINUX)) + py_test_modules( + test_communication_stream_allreduce_api MODULES + test_communication_stream_allreduce_api ENVS + "PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=") + set_tests_properties(test_communication_stream_allreduce_api + PROPERTIES TIMEOUT "120" RUN_SERIAL 1) +endif() add_subdirectory(fleet) add_subdirectory(multinode) diff --git a/python/paddle/fluid/tests/unittests/collective/communication_stream_allreduce_api_dygraph.py b/python/paddle/fluid/tests/unittests/collective/communication_stream_allreduce_api_dygraph.py new file mode 100644 index 0000000000000..92ef3365d5198 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/collective/communication_stream_allreduce_api_dygraph.py @@ -0,0 +1,64 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import numpy as np +import paddle +import paddle.fluid as fluid +import paddle.distributed as dist +import test_communication_api_base as test_base +import test_collective_api_base as test_collective_base + + +class StreamAllReduceTestCase(): + + def __init__(self): + self._sync_op = eval(os.getenv("sync_op")) + self._use_calc_stream = eval(os.getenv("use_calc_stream")) + self._backend = os.getenv("backend") + self._shape = eval(os.getenv("shape")) + self._dtype = os.getenv("dtype") + self._seeds = eval(os.getenv("seeds")) + if self._backend not in ["nccl", "gloo"]: + raise NotImplementedError( + "Only support nccl and gloo as the backend for now.") + os.environ["PADDLE_DISTRI_BACKEND"] = self._backend + + def run_test_case(self): + dist.init_parallel_env() + + test_data_list = [] + for seed in self._seeds: + test_data_list.append( + test_collective_base.create_test_data(shape=self._shape, + dtype=self._dtype, + seed=seed)) + + rank = dist.get_rank() + tensor = paddle.to_tensor(test_data_list[rank]) + task = dist.stream.all_reduce(tensor, + sync_op=self._sync_op, + use_calc_stream=self._use_calc_stream) + if not self._sync_op: + task.wait() + + result = test_data_list[0] + for i in range(1, len(test_data_list)): + result += test_data_list[i] + + assert np.allclose(tensor, result, rtol=1e-05, atol=1e-05) + + +if __name__ == "__main__": + StreamAllReduceTestCase().run_test_case() diff --git a/python/paddle/fluid/tests/unittests/collective/test_communication_api_base.py b/python/paddle/fluid/tests/unittests/collective/test_communication_api_base.py new file mode 100644 index 0000000000000..22edac0015535 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/collective/test_communication_api_base.py @@ -0,0 +1,75 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import sys +import tempfile +import itertools +import subprocess +import os +import shutil + + +class CommunicationTestDistBase(unittest.TestCase): + + def setUp(self, save_log_dir=None, num_of_devices=2, timeout=120): + self._python_interp = sys.executable + self._save_log_dir = save_log_dir + self._log_dir = tempfile.TemporaryDirectory() + self._num_of_devices = num_of_devices + self._device_list = [str(i) for i in range(num_of_devices)] + self._timeout = timeout + self._seeds = [i + 10 for i in range(num_of_devices)] + self._devices = ','.join(self._device_list) + + def run_test_case(self, script_file, user_defined_envs=None): + runtime_envs = os.environ + runtime_envs.update(user_defined_envs) + runtime_envs["CUDA_VISIBLE_DEVICES"] = self._devices + start_command = f"{self._python_interp} -u -m paddle.distributed.launch --log_dir {self._log_dir.name} --devices {self._devices} {script_file}" + start_command_list = start_command.strip().split() + + try: + self._launcher = subprocess.run(start_command_list, + env=runtime_envs, + timeout=self._timeout, + check=True) + except subprocess.TimeoutExpired as err: + raise TimeoutError( + "Timeout while running command {}, try to set a longer period, {} is not enough." + .format(err.cmd, err.timeout)) + except subprocess.CalledProcessError as err: + raise RuntimeError( + "Error occurs when running this test case. The return code of command {} is {}" + .format(err.cmd, err.returncode)) + + def tearDown(self): + if self._save_log_dir: + temp_log_dir_name = os.path.basename(self._log_dir.name) + dir_name = os.path.join(self._save_log_dir, temp_log_dir_name) + if not os.path.isdir(dir_name): + print("The running logs will copy to {}".format(dir_name)) + shutil.copytree(self._log_dir.name, dir_name) + else: + raise RuntimeError( + "Directory {} exists, failed to save log.".format(dir_name)) + + +def gen_product_envs_list(default_envs, changeable_envs): + envs_list = list() + for values in itertools.product(*changeable_envs.values()): + envs = dict(zip(changeable_envs.keys(), values)) + envs.update(default_envs) + envs_list.append(envs) + return envs_list diff --git a/python/paddle/fluid/tests/unittests/collective/test_communication_stream_allreduce_api.py b/python/paddle/fluid/tests/unittests/collective/test_communication_stream_allreduce_api.py new file mode 100644 index 0000000000000..6c6dcfa1c705d --- /dev/null +++ b/python/paddle/fluid/tests/unittests/collective/test_communication_stream_allreduce_api.py @@ -0,0 +1,51 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import paddle +import itertools +import test_communication_api_base as test_base + + +class TestCommunicationStreamAllreduceAPI(test_base.CommunicationTestDistBase): + + def setUp(self): + super(TestCommunicationStreamAllreduceAPI, self).setUp(num_of_devices=2, + timeout=120) + self._default_envs = { + "backend": "nccl", + "shape": "(100, 200)", + "dtype": "float32", + "seeds": str(self._seeds) + } + self._changeable_envs = { + "sync_op": ["True", "False"], + "use_calc_stream": ["True", "False"] + } + + def test_allreduce_stream(self): + envs_list = test_base.gen_product_envs_list(self._default_envs, + self._changeable_envs) + for envs in envs_list: + if eval(envs["use_calc_stream"]) and not eval(envs["sync_op"]): + continue + self.run_test_case("communication_stream_allreduce_api_dygraph.py", + user_defined_envs=envs) + + def tearDown(self): + super(TestCommunicationStreamAllreduceAPI, self).tearDown() + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/collective/testslist.csv b/python/paddle/fluid/tests/unittests/collective/testslist.csv index 1243fc0f63f97..8792db307a342 100644 --- a/python/paddle/fluid/tests/unittests/collective/testslist.csv +++ b/python/paddle/fluid/tests/unittests/collective/testslist.csv @@ -32,3 +32,4 @@ test_collective_wait,linux,gpu;rocm,300,DIST,test_runner.py,2,1,http_proxy=;http test_eager_dist_api,linux,gpu;rocm,120,DIST,test_runner.py,2,1,http_proxy=;https_proxy=;PYTHONPATH=.., test_new_group_api,linux,gpu;rocm,120,DIST,test_runner.py,2,1,http_proxy=;https_proxy=;PYTHONPATH=.., test_gen_nccl_id_op,,gpu;rocm;ASCEND;ASCEND_CL,,DIST,../dist_test.sh,2,1,http_proxy=;https_proxy=;PYTHONPATH=.., +test_communication_stream_allreduce_api,linux,gpu;rocm,120,DIST,,2,1,PYTHONPATH=..;http_proxy=;https_proxy=, diff --git a/python/setup.py.in b/python/setup.py.in index 66f0575284d8d..95ed79a786b43 100755 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -270,6 +270,8 @@ packages=['paddle', 'paddle.dataset', 'paddle.reader', 'paddle.distributed', + 'paddle.distributed.communication', + 'paddle.distributed.communication.stream', 'paddle.distributed.metric', 'paddle.distributed.ps', 'paddle.distributed.ps.utils',