Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support CPU Parallel in DataParallel Interface by GLOO to speed up training #35745

Merged
merged 23 commits into from
Oct 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions paddle/fluid/framework/fleet/gloo_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,24 @@ class GlooWrapper {
return ret;
}

// TODO(xiongkun03): support all gather array of
// numbers with different length
// can use AllgathervOptions, may be work in different
// occasion. Need some survey.
template <typename T>
void AllGatherVector(T* input_ptr, T* output_ptr,
size_t element_num) { // NOLINT
CHECK_EQ(is_initialized_, true);
#ifdef PADDLE_WITH_GLOO
gloo::AllgatherOptions opts(context_);
opts.setInput(input_ptr, element_num);
opts.setOutput(output_ptr, element_num * size_);
gloo::allgather(opts);
#else
LOG(WARNING) << "AllGather does nothing when WITH_GLOO=OFF";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个地方是 Throw Exception?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个保持了gloo_wrapper的其他接口的处理范式,参见line 221行

#endif
}

protected:
bool is_initialized_ = false;
#ifdef PADDLE_WITH_GLOO
Expand Down
115 changes: 113 additions & 2 deletions paddle/fluid/imperative/gloo_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/string/split.h"
#include "paddle/fluid/string/string_helper.h"

namespace paddle {
namespace framework {
Expand Down Expand Up @@ -67,8 +68,36 @@ void GLOOParallelContext::AllReduceByStream(const framework::Variable &src,
framework::Variable *dst,
int ring_id, bool use_calc_stream) {
// AllReduce(src, dst, strategy_, ring_id, use_calc_stream);
auto src_tensor = src.Get<framework::LoDTensor>();
auto *dst_tensor = dst->GetMutable<framework::LoDTensor>();
if (src.IsType<framework::LoDTensor>()) {
if (!dst->IsType<framework::LoDTensor>()) {
dst->Clear();
}
AllReduce(src.Get<framework::LoDTensor>(),
dst->GetMutable<framework::LoDTensor>());
} else if (src.IsType<framework::SelectedRows>()) {
if (&src != dst) {
if (!dst->IsType<framework::SelectedRows>()) {
dst->Clear();
}
AllReduce(src.Get<framework::SelectedRows>(),
dst->GetMutable<framework::SelectedRows>());
} else {
// SelectedRows cannot be allreduce in-place
framework::Variable tmp_dst;
AllReduce(src.Get<framework::SelectedRows>(),
tmp_dst.GetMutable<framework::SelectedRows>());
*dst = std::move(tmp_dst);
}
} else {
PADDLE_THROW(platform::errors::InvalidArgument(
"Unsupported variable type %s for imperative allreduce, only "
"LoDTensor and SelectedRows are supported.",
platform::demangle(framework::ToTypeName(src.Type()))));
}
}

void GLOOParallelContext::AllReduce(const framework::Tensor &src_tensor,
framework::Tensor *dst_tensor) {
auto gloo_wrapper = framework::GlooWrapper::GetInstance();
dst_tensor->Resize(src_tensor.dims());
switch (src_tensor.type()) {
Expand All @@ -84,6 +113,88 @@ void GLOOParallelContext::AllReduceByStream(const framework::Variable &src,
gloo_wrapper->Barrier();
}

#define GLOO_ALL_GATHER_CASE(type, T, gw) \
case type: { \
const auto *src_tensor_ptr = src_tensor.data<T>(); \
gw->AllGatherVector<T>(const_cast<T *>(src_tensor_ptr), \
reinterpret_cast<T *>(dst_tensor_ptr), \
value_sendcount); \
break; \
}

void GLOOParallelContext::AllReduce(const framework::SelectedRows &src,
framework::SelectedRows *dst) {
// auto ;
// int local_rank = strategy_.local_rank_;
int nranks = strategy_.nranks_;
VLOG(3) << "SelectedRows AllReduce start";
const auto &src_tensor = src.value();
const auto &place = src_tensor.place();
auto dtype = src_tensor.type();
// 1. Gather rows number from all workers. Here use ncclAllGather to do this,
// but we can use other ways to implement is in the future
const auto &src_rows = src.rows();
auto gloo_wrapper = framework::GlooWrapper::GetInstance();
size_t local_row_num = src_rows.size();
std::vector<size_t> rows_num_vector =
gloo_wrapper->AllGather<size_t>(local_row_num);
const auto *cpu_rows_num_ptr = rows_num_vector.data();
auto rows_num = std::accumulate(cpu_rows_num_ptr, cpu_rows_num_ptr + nranks,
static_cast<int64_t>(0));
dst->set_height(src.height());
VLOG(3) << "Gather rows: " << string::join_strings(rows_num_vector, ',')
<< ", total rows number: " << rows_num
<< ", height: " << src.height();
auto *dst_rows = dst->mutable_rows();
dst_rows->resize(rows_num);
auto *dst_rows_ptr = dst_rows->MutableData(place);
const int64_t *src_rows_ptr = src_rows.Data(place);

// VLOG(3) << "Selected Rows of src:" << string::join_strings(dst_rows, ',')

auto *dst_tensor = dst->mutable_value();
auto dims = src_tensor.dims();
dims[0] = rows_num;
auto feature_size = framework::product(dims) / dims[0];
dst_tensor->Resize(dims);
if (std::all_of(cpu_rows_num_ptr, cpu_rows_num_ptr + nranks,
[&](size_t row) { return row == cpu_rows_num_ptr[0]; })) {
// During sparse communication, the number of each card is same.
// Because gloo wrapper utility class currently don't support
// broadcast, so we only deal the-same case.
VLOG(3) << "Use the gloo all reduce to sync. SRC:" << src_tensor;
// framework::SerializeToStream(VLOG(4), src);
VLOG(3) << "allgather replaces broadcast to speed up in sparse allreduce";
auto value_sendcount = cpu_rows_num_ptr[0] * feature_size;
auto *dst_tensor_ptr = dst_tensor->mutable_data(place, dtype);

gloo_wrapper->AllGatherVector<int64_t>(const_cast<int64_t *>(src_rows_ptr),
static_cast<int64_t *>(dst_rows_ptr),
rows_num_vector[0]);

switch (dtype) {
GLOO_ALL_GATHER_CASE(framework::proto::VarType::FP32, float,
gloo_wrapper);
GLOO_ALL_GATHER_CASE(framework::proto::VarType::FP64, double,
gloo_wrapper);
GLOO_ALL_GATHER_CASE(framework::proto::VarType::INT32, int, gloo_wrapper);
GLOO_ALL_GATHER_CASE(framework::proto::VarType::INT64, int64_t,
gloo_wrapper);
default: {
PADDLE_THROW(platform::errors::InvalidArgument(
"Invalid datatype for allreduce"));
}
}
VLOG(3) << "Selected Row DST:" << *dst_tensor;
VLOG(3) << "Selected Rows of DST:"
<< string::join_strings(std::vector<int64_t>(*dst_rows), ',');
} else {
PADDLE_THROW(platform::errors::InvalidArgument(
"The number of each card is not the same, gloo only support the-same"
"batch division"));
}
}

paddle::platform::DeviceContext *GLOOParallelContext::GetDeviceContext(
int ring_id) {
// return the CPUDeviceContext
Expand Down
8 changes: 8 additions & 0 deletions paddle/fluid/imperative/gloo_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
#include <memory>
#include <string>
#include <vector>
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/imperative/parallel_context.h"
#include "paddle/fluid/platform/device_context.h"

Expand Down Expand Up @@ -52,6 +55,11 @@ class GLOOParallelContext : public ParallelContext {

void SynchronizeCompute() override;

private:
void AllReduce(const framework::Tensor& src, framework::Tensor* dst);
void AllReduce(const framework::SelectedRows& src,
framework::SelectedRows* dst);

private:
std::unique_ptr<platform::CPUDeviceContext> device_;
};
Expand Down
51 changes: 45 additions & 6 deletions python/paddle/distributed/fleet/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,12 @@ def _parse_args():
type=str,
default="log",
help="The path for each process's log. Default --log_dir=log/")

base_group.add_argument(
Aurelius84 marked this conversation as resolved.
Show resolved Hide resolved
"--backend",
type=str,
default="auto",
help="Specifize the backend, can be gloo|nccl|bkcl|auto. Default value is auto which perfers nccl or bkcl."
)
Aurelius84 marked this conversation as resolved.
Show resolved Hide resolved
base_group.add_argument(
"--nproc_per_node",
type=int,
Expand Down Expand Up @@ -230,8 +235,21 @@ def get_cluster_from_args(args, device_mode, devices_per_proc):
devices_per_proc)


def cpuonly_check(args):
if args.ips and len(args.ips.split(',')) > 1:
raise RuntimeError(
"CPUONLY launch only support single trainer, that is len(ips)=1, but got %s."
% args.ips)
if args.run_mode:
assert args.run_mode == 'cpuonly', "CPUONLY launch only support run mode is CPUONLY"
if args.servers:
raise RuntimeError("CPUONLY launch can't have --servers as arguments.")
return True


def launch_collective(args):
# parse arguments, used for cloud-single-machine and local
if args.backend == 'gloo': cpuonly_check(args)
(device_mode, devices_per_proc) = launch_utils.get_device_proc_info(args)
trainers_num = cloud_utils.get_trainers_num()
logger.debug("parsed from args trainerss_num:{} mode:{} devices:{}".format(
Expand Down Expand Up @@ -265,6 +283,7 @@ def launch_collective(args):
global_envs["PADDLE_WITH_GLOO"] = str(os.getenv("PADDLE_WITH_GLOO", "0"))
global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3"
global_envs["PADDLE_GLOO_FS_PATH"] = gloo_rendezvous_dir
global_envs["PADDLE_DISTRI_BACKEND"] = args.backend
Aurelius84 marked this conversation as resolved.
Show resolved Hide resolved

procs = start_local_trainers(
cluster,
Expand Down Expand Up @@ -349,9 +368,12 @@ def which_distributed_mode(args):

if fluid.core.is_compiled_with_cuda():
accelerators = fluid.core.get_cuda_device_count()
args.backend = 'nccl'
elif fluid.core.is_compiled_with_npu():
args.backend = 'unknown'
Aurelius84 marked this conversation as resolved.
Show resolved Hide resolved
accelerators = fluid.core.get_npu_device_count()
elif fluid.core.is_compiled_with_xpu():
args.backend = 'bkcl'
accelerators = fluid.core.get_xpu_device_count()
else:
accelerators = 0
Expand All @@ -372,10 +394,14 @@ def which_distributed_mode(args):
else:
if not fluid.core.is_compiled_with_cuda(
) and not fluid.core.is_compiled_with_xpu():
logger.warning(
"Not found distinct arguments and not compiled with cuda or xpu. Default use ps mode"
)
return DistributeMode.PS
if args.servers:
logger.warning(
"Not found distinct arguments and not compiled with cuda or xpu. \
But found args.servers not empty, default use ps mode")
return DistributeMode.PS
else:
args.backend = "gloo"
return DistributeMode.COLLECTIVE
else:
logger.warning(
"Not found distinct arguments and compiled with cuda or xpu. Default use collective mode"
Expand Down Expand Up @@ -556,7 +582,20 @@ def launch():
logger = get_logger()
_print_arguments(args)

distribute_mode = which_distributed_mode(args)
if args.backend == 'auto':
distribute_mode = which_distributed_mode(args)
assert args.backend in [
'gloo', 'nccl', 'bkcl', 'unknown'
] # which_distributed_mode must modify args.backend
else:
assert args.run_mode == 'collective' or args.run_mode == None, "When backend is not 'auto', run mode must be collective"
check_backend(args.backend)
distribute_mode = DistributeMode.COLLECTIVE

block_windows_and_macos(
args.backend) # raise error when using gloo on windows or macos
if args.backend == 'gloo':
logger.warning("launch start with CPUONLY mode")

if enable_elastic(args, distribute_mode):
launch_elastic(args, distribute_mode)
Expand Down
63 changes: 56 additions & 7 deletions python/paddle/distributed/fleet/launch_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import tempfile
import shutil
from contextlib import closing
import multiprocessing
import socket
import warnings
import six
Expand All @@ -30,6 +31,7 @@
import paddle
import paddle.fluid as fluid
from distutils.util import strtobool
import paddle.utils.cpp_extension.extension_utils as utils
logger = logging.getLogger("root")
logger.propagate = False

Expand Down Expand Up @@ -669,29 +671,31 @@ def get_xpus(xpus):
return res_xpus


def get_device_mode():
def get_device_mode(backend):
if fluid.core.is_compiled_with_npu() and \
Aurelius84 marked this conversation as resolved.
Show resolved Hide resolved
fluid.core.get_npu_device_count() > 0:
print("launch train in ascend npu mode!")
return DeviceMode.ASCEND_NPU

if fluid.core.is_compiled_with_cuda() and \
if backend == 'nccl' and \
fluid.core.get_cuda_device_count() > 0:
print("launch train in GPU mode!")
return DeviceMode.GPU

if fluid.core.is_compiled_with_xpu() and fluid.core.get_xpu_device_count(
) > 0:
if backend == 'bkcl' and fluid.core.get_xpu_device_count() > 0:
print("launch train in XPU mode")
return DeviceMode.XPU

print("launch train in CPU mode")
return DeviceMode.CPU
if backend == 'gloo':
print("launch train in CPU mode")
return DeviceMode.CPU

raise RuntimeError("Don't supported devices")


def get_device_proc_info(args):
# device_mode
device_mode = get_device_mode()
device_mode = get_device_mode(args.backend)

# devices
devices_per_proc = []
Expand Down Expand Up @@ -722,6 +726,9 @@ def get_device_proc_info(args):
else:
devices_per_proc = xpus
elif device_mode == DeviceMode.CPU:
if hasattr(args, "paddle_cpuonly") and args.nproc_per_node is None:
#NOTE (xiongkun03) set it to cpu core number
args.nproc_per_node = multiprocessing.cpu_count()
if args.nproc_per_node is None:
devices_per_proc = [0]
else:
Expand Down Expand Up @@ -1237,3 +1244,45 @@ def start_pod_heter_worker(self, args, pod):
tp.cmd = cmd

self.procs["heter_worker"].append(tp)


def check_backend(backend):
if backend not in ['nccl', 'gloo', 'bkcl', 'auto']:
raise ValueError(
"paddle.distributed initialize error, "
"backend argument can only be one of 'nccl', 'gloo', 'bkcl', 'auto', but got %s"
% backend)

if backend == 'nccl' and not fluid.core.is_compiled_with_cuda():
raise ValueError(
"paddle.distributed initialize error, "
"your paddle is not compiled with cuda but you assign 'nccl' as backend."
)

if backend == 'bkcl' and not fluid.core.is_compiled_with_xpu():
raise ValueError(
"paddle.distributed initialize error, "
"your paddle is not compiled with xpu but you assign 'bkcl' as backend."
)


def block_windows_and_macos(backend):
if backend != 'gloo': return
if utils.OS_NAME.startswith('darwin'): # MACOS , block
raise ValueError(
"You are going to using gloo on macos, but currently is not supported"
)
if utils.IS_WINDOWS: # MACOS , block
raise ValueError(
"You are going to using gloo on windows, but currently is not supported"
)


def get_backend_by_compile_flag():
if fluid.core.is_compiled_with_cuda():
return 'nccl'

if fluid.core.is_compiled_with_xpu():
return 'bkcl'

return 'gloo'
Loading