Skip to content

Commit

Permalink
User specified backend (PaddlePaddle#35745)
Browse files Browse the repository at this point in the history
  • Loading branch information
2742195759 authored and root committed Oct 21, 2021
1 parent 6a20205 commit d20a74b
Show file tree
Hide file tree
Showing 20 changed files with 950 additions and 65 deletions.
18 changes: 18 additions & 0 deletions paddle/fluid/framework/fleet/gloo_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,24 @@ class GlooWrapper {
return std::move(ret);
}

// TODO(xiongkun03): support all gather array of
// numbers with different length
// can use AllgathervOptions, may be work in different
// occasion. Need some survey.
template <typename T>
void AllGatherVector(T* input_ptr, T* output_ptr,
size_t element_num) { // NOLINT
CHECK_EQ(is_initialized_, true);
#ifdef PADDLE_WITH_GLOO
gloo::AllgatherOptions opts(context_);
opts.setInput(input_ptr, element_num);
opts.setOutput(output_ptr, element_num * size_);
gloo::allgather(opts);
#else
LOG(WARNING) << "AllGather does nothing when WITH_GLOO=OFF";
#endif
}

protected:
bool is_initialized_ = false;
#ifdef PADDLE_WITH_GLOO
Expand Down
115 changes: 113 additions & 2 deletions paddle/fluid/imperative/gloo_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/string/split.h"
#include "paddle/fluid/string/string_helper.h"

namespace paddle {
namespace framework {
Expand Down Expand Up @@ -67,8 +68,36 @@ void GLOOParallelContext::AllReduceByStream(const framework::Variable &src,
framework::Variable *dst,
int ring_id, bool use_calc_stream) {
// AllReduce(src, dst, strategy_, ring_id, use_calc_stream);
auto src_tensor = src.Get<framework::LoDTensor>();
auto *dst_tensor = dst->GetMutable<framework::LoDTensor>();
if (src.IsType<framework::LoDTensor>()) {
if (!dst->IsType<framework::LoDTensor>()) {
dst->Clear();
}
AllReduce(src.Get<framework::LoDTensor>(),
dst->GetMutable<framework::LoDTensor>());
} else if (src.IsType<framework::SelectedRows>()) {
if (&src != dst) {
if (!dst->IsType<framework::SelectedRows>()) {
dst->Clear();
}
AllReduce(src.Get<framework::SelectedRows>(),
dst->GetMutable<framework::SelectedRows>());
} else {
// SelectedRows cannot be allreduce in-place
framework::Variable tmp_dst;
AllReduce(src.Get<framework::SelectedRows>(),
tmp_dst.GetMutable<framework::SelectedRows>());
*dst = std::move(tmp_dst);
}
} else {
PADDLE_THROW(platform::errors::InvalidArgument(
"Unsupported variable type %s for imperative allreduce, only "
"LoDTensor and SelectedRows are supported.",
platform::demangle(framework::ToTypeName(src.Type()))));
}
}

void GLOOParallelContext::AllReduce(const framework::Tensor &src_tensor,
framework::Tensor *dst_tensor) {
auto gloo_wrapper = framework::GlooWrapper::GetInstance();
dst_tensor->Resize(src_tensor.dims());
switch (src_tensor.type()) {
Expand All @@ -84,6 +113,88 @@ void GLOOParallelContext::AllReduceByStream(const framework::Variable &src,
gloo_wrapper->Barrier();
}

#define GLOO_ALL_GATHER_CASE(type, T, gw) \
case type: { \
const auto *src_tensor_ptr = src_tensor.data<T>(); \
gw->AllGatherVector<T>(const_cast<T *>(src_tensor_ptr), \
reinterpret_cast<T *>(dst_tensor_ptr), \
value_sendcount); \
break; \
}

void GLOOParallelContext::AllReduce(const framework::SelectedRows &src,
framework::SelectedRows *dst) {
// auto ;
// int local_rank = strategy_.local_rank_;
int nranks = strategy_.nranks_;
VLOG(3) << "SelectedRows AllReduce start";
const auto &src_tensor = src.value();
const auto &place = src_tensor.place();
auto dtype = src_tensor.type();
// 1. Gather rows number from all workers. Here use ncclAllGather to do this,
// but we can use other ways to implement is in the future
const auto &src_rows = src.rows();
auto gloo_wrapper = framework::GlooWrapper::GetInstance();
size_t local_row_num = src_rows.size();
std::vector<size_t> rows_num_vector =
gloo_wrapper->AllGather<size_t>(local_row_num);
const auto *cpu_rows_num_ptr = rows_num_vector.data();
auto rows_num = std::accumulate(cpu_rows_num_ptr, cpu_rows_num_ptr + nranks,
static_cast<int64_t>(0));
dst->set_height(src.height());
VLOG(3) << "Gather rows: " << string::join_strings(rows_num_vector, ',')
<< ", total rows number: " << rows_num
<< ", height: " << src.height();
auto *dst_rows = dst->mutable_rows();
dst_rows->resize(rows_num);
auto *dst_rows_ptr = dst_rows->MutableData(place);
const int64_t *src_rows_ptr = src_rows.Data(place);

// VLOG(3) << "Selected Rows of src:" << string::join_strings(dst_rows, ',')

auto *dst_tensor = dst->mutable_value();
auto dims = src_tensor.dims();
dims[0] = rows_num;
auto feature_size = framework::product(dims) / dims[0];
dst_tensor->Resize(dims);
if (std::all_of(cpu_rows_num_ptr, cpu_rows_num_ptr + nranks,
[&](size_t row) { return row == cpu_rows_num_ptr[0]; })) {
// During sparse communication, the number of each card is same.
// Because gloo wrapper utility class currently don't support
// broadcast, so we only deal the-same case.
VLOG(3) << "Use the gloo all reduce to sync. SRC:" << src_tensor;
// framework::SerializeToStream(VLOG(4), src);
VLOG(3) << "allgather replaces broadcast to speed up in sparse allreduce";
auto value_sendcount = cpu_rows_num_ptr[0] * feature_size;
auto *dst_tensor_ptr = dst_tensor->mutable_data(place, dtype);

gloo_wrapper->AllGatherVector<int64_t>(const_cast<int64_t *>(src_rows_ptr),
static_cast<int64_t *>(dst_rows_ptr),
rows_num_vector[0]);

switch (dtype) {
GLOO_ALL_GATHER_CASE(framework::proto::VarType::FP32, float,
gloo_wrapper);
GLOO_ALL_GATHER_CASE(framework::proto::VarType::FP64, double,
gloo_wrapper);
GLOO_ALL_GATHER_CASE(framework::proto::VarType::INT32, int, gloo_wrapper);
GLOO_ALL_GATHER_CASE(framework::proto::VarType::INT64, int64_t,
gloo_wrapper);
default: {
PADDLE_THROW(platform::errors::InvalidArgument(
"Invalid datatype for allreduce"));
}
}
VLOG(3) << "Selected Row DST:" << *dst_tensor;
VLOG(3) << "Selected Rows of DST:"
<< string::join_strings(std::vector<int64_t>(*dst_rows), ',');
} else {
PADDLE_THROW(platform::errors::InvalidArgument(
"The number of each card is not the same, gloo only support the-same"
"batch division"));
}
}

paddle::platform::DeviceContext *GLOOParallelContext::GetDeviceContext(
int ring_id) {
// return the CPUDeviceContext
Expand Down
8 changes: 8 additions & 0 deletions paddle/fluid/imperative/gloo_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
#include <memory>
#include <string>
#include <vector>
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/imperative/parallel_context.h"
#include "paddle/fluid/platform/device_context.h"

Expand Down Expand Up @@ -52,6 +55,11 @@ class GLOOParallelContext : public ParallelContext {

void SynchronizeCompute() override;

private:
void AllReduce(const framework::Tensor& src, framework::Tensor* dst);
void AllReduce(const framework::SelectedRows& src,
framework::SelectedRows* dst);

private:
std::unique_ptr<platform::CPUDeviceContext> device_;
};
Expand Down
51 changes: 45 additions & 6 deletions python/paddle/distributed/fleet/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,12 @@ def _parse_args():
type=str,
default="log",
help="The path for each process's log. Default --log_dir=log/")

base_group.add_argument(
"--backend",
type=str,
default="auto",
help="Specifize the backend, can be gloo|nccl|bkcl|auto. Default value is auto which perfers nccl or bkcl."
)
base_group.add_argument(
"--nproc_per_node",
type=int,
Expand Down Expand Up @@ -230,8 +235,21 @@ def get_cluster_from_args(args, device_mode, devices_per_proc):
devices_per_proc)


def cpuonly_check(args):
if args.ips and len(args.ips.split(',')) > 1:
raise RuntimeError(
"CPUONLY launch only support single trainer, that is len(ips)=1, but got %s."
% args.ips)
if args.run_mode:
assert args.run_mode == 'cpuonly', "CPUONLY launch only support run mode is CPUONLY"
if args.servers:
raise RuntimeError("CPUONLY launch can't have --servers as arguments.")
return True


def launch_collective(args):
# parse arguments, used for cloud-single-machine and local
if args.backend == 'gloo': cpuonly_check(args)
(device_mode, devices_per_proc) = launch_utils.get_device_proc_info(args)
trainers_num = cloud_utils.get_trainers_num()
logger.debug("parsed from args trainerss_num:{} mode:{} devices:{}".format(
Expand Down Expand Up @@ -265,6 +283,7 @@ def launch_collective(args):
global_envs["PADDLE_WITH_GLOO"] = str(os.getenv("PADDLE_WITH_GLOO", "0"))
global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3"
global_envs["PADDLE_GLOO_FS_PATH"] = gloo_rendezvous_dir
global_envs["PADDLE_DISTRI_BACKEND"] = args.backend

procs = start_local_trainers(
cluster,
Expand Down Expand Up @@ -349,9 +368,12 @@ def which_distributed_mode(args):

if fluid.core.is_compiled_with_cuda():
accelerators = fluid.core.get_cuda_device_count()
args.backend = 'nccl'
elif fluid.core.is_compiled_with_npu():
args.backend = 'unknown'
accelerators = fluid.core.get_npu_device_count()
elif fluid.core.is_compiled_with_xpu():
args.backend = 'bkcl'
accelerators = fluid.core.get_xpu_device_count()
else:
accelerators = 0
Expand All @@ -372,10 +394,14 @@ def which_distributed_mode(args):
else:
if not fluid.core.is_compiled_with_cuda(
) and not fluid.core.is_compiled_with_xpu():
logger.warning(
"Not found distinct arguments and not compiled with cuda or xpu. Default use ps mode"
)
return DistributeMode.PS
if args.servers:
logger.warning(
"Not found distinct arguments and not compiled with cuda or xpu. \
But found args.servers not empty, default use ps mode")
return DistributeMode.PS
else:
args.backend = "gloo"
return DistributeMode.COLLECTIVE
else:
logger.warning(
"Not found distinct arguments and compiled with cuda or xpu. Default use collective mode"
Expand Down Expand Up @@ -556,7 +582,20 @@ def launch():
logger = get_logger()
_print_arguments(args)

distribute_mode = which_distributed_mode(args)
if args.backend == 'auto':
distribute_mode = which_distributed_mode(args)
assert args.backend in [
'gloo', 'nccl', 'bkcl', 'unknown'
] # which_distributed_mode must modify args.backend
else:
assert args.run_mode == 'collective' or args.run_mode == None, "When backend is not 'auto', run mode must be collective"
check_backend(args.backend)
distribute_mode = DistributeMode.COLLECTIVE

block_windows_and_macos(
args.backend) # raise error when using gloo on windows or macos
if args.backend == 'gloo':
logger.warning("launch start with CPUONLY mode")

if enable_elastic(args, distribute_mode):
launch_elastic(args, distribute_mode)
Expand Down
63 changes: 56 additions & 7 deletions python/paddle/distributed/fleet/launch_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import tempfile
import shutil
from contextlib import closing
import multiprocessing
import socket
import warnings
import six
Expand All @@ -30,6 +31,7 @@
import paddle
import paddle.fluid as fluid
from distutils.util import strtobool
import paddle.utils.cpp_extension.extension_utils as utils
logger = logging.getLogger("root")
logger.propagate = False

Expand Down Expand Up @@ -669,29 +671,31 @@ def get_xpus(xpus):
return res_xpus


def get_device_mode():
def get_device_mode(backend):
if fluid.core.is_compiled_with_npu() and \
fluid.core.get_npu_device_count() > 0:
print("launch train in ascend npu mode!")
return DeviceMode.ASCEND_NPU

if fluid.core.is_compiled_with_cuda() and \
if backend == 'nccl' and \
fluid.core.get_cuda_device_count() > 0:
print("launch train in GPU mode!")
return DeviceMode.GPU

if fluid.core.is_compiled_with_xpu() and fluid.core.get_xpu_device_count(
) > 0:
if backend == 'bkcl' and fluid.core.get_xpu_device_count() > 0:
print("launch train in XPU mode")
return DeviceMode.XPU

print("launch train in CPU mode")
return DeviceMode.CPU
if backend == 'gloo':
print("launch train in CPU mode")
return DeviceMode.CPU

raise RuntimeError("Don't supported devices")


def get_device_proc_info(args):
# device_mode
device_mode = get_device_mode()
device_mode = get_device_mode(args.backend)

# devices
devices_per_proc = []
Expand Down Expand Up @@ -722,6 +726,9 @@ def get_device_proc_info(args):
else:
devices_per_proc = xpus
elif device_mode == DeviceMode.CPU:
if hasattr(args, "paddle_cpuonly") and args.nproc_per_node is None:
#NOTE (xiongkun03) set it to cpu core number
args.nproc_per_node = multiprocessing.cpu_count()
if args.nproc_per_node is None:
devices_per_proc = [0]
else:
Expand Down Expand Up @@ -1237,3 +1244,45 @@ def start_pod_heter_worker(self, args, pod):
tp.cmd = cmd

self.procs["heter_worker"].append(tp)


def check_backend(backend):
if backend not in ['nccl', 'gloo', 'bkcl', 'auto']:
raise ValueError(
"paddle.distributed initialize error, "
"backend argument can only be one of 'nccl', 'gloo', 'bkcl', 'auto', but got %s"
% backend)

if backend == 'nccl' and not fluid.core.is_compiled_with_cuda():
raise ValueError(
"paddle.distributed initialize error, "
"your paddle is not compiled with cuda but you assign 'nccl' as backend."
)

if backend == 'bkcl' and not fluid.core.is_compiled_with_xpu():
raise ValueError(
"paddle.distributed initialize error, "
"your paddle is not compiled with xpu but you assign 'bkcl' as backend."
)


def block_windows_and_macos(backend):
if backend != 'gloo': return
if utils.OS_NAME.startswith('darwin'): # MACOS , block
raise ValueError(
"You are going to using gloo on macos, but currently is not supported"
)
if utils.IS_WINDOWS: # MACOS , block
raise ValueError(
"You are going to using gloo on windows, but currently is not supported"
)


def get_backend_by_compile_flag():
if fluid.core.is_compiled_with_cuda():
return 'nccl'

if fluid.core.is_compiled_with_xpu():
return 'bkcl'

return 'gloo'
Loading

0 comments on commit d20a74b

Please sign in to comment.