Skip to content

Commit

Permalink
add tensor copying for cuda_pinned
Browse files Browse the repository at this point in the history
  • Loading branch information
chengduoZH committed Apr 8, 2018
1 parent 231a219 commit 1e29c2e
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 63 deletions.
7 changes: 6 additions & 1 deletion paddle/fluid/framework/details/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ cc_library(scale_loss_grad_op_handle SRCS scale_loss_grad_op_handle.cc DEPS op_h
cc_library(fetch_op_handle SRCS fetch_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory)
nv_library(nccl_all_reduce_op_handle SRCS nccl_all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory
dynload_cuda)
cc_library(collect_parameters SRCS collect_parameters.cc)
nv_library(all_reduce_op_handle SRCS all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory
dynload_cuda collect_parameters)
nv_library(all_gather_op_handle SRCS all_gather_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory
dynload_cuda collect_parameters)
cc_library(computation_op_handle SRCS computation_op_handle.cc DEPS framework_proto scope place operator op_registry)

cc_library(ssa_graph SRCS ssa_graph.cc DEPS var_handle op_handle_base)
cc_library(ssa_graph_builder SRCS ssa_graph_builder.cc DEPS ssa_graph)

if(WITH_GPU)
set(multi_devices_graph_builder_deps nccl_all_reduce_op_handle)
set(multi_devices_graph_builder_deps nccl_all_reduce_op_handle all_reduce_op_handle all_gather_op_handle)
else()
set(multi_devices_graph_builder_deps)
endif()
Expand Down
3 changes: 2 additions & 1 deletion paddle/fluid/framework/details/all_gather_op_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ AllGatherOpHandle::AllGatherOpHandle(const Scope &local_scope,
: local_scope_(local_scope), place_(place) {}

void AllGatherOpHandle::RunImpl() {
PADDLE_ENFORCE_EQ(this->inputs_.size(), 1);
auto &var_name = static_cast<VarHandle *>(this->inputs_[0])->name_;

// Wait input done, this Wait is asynchronous operation
Expand All @@ -31,7 +32,7 @@ void AllGatherOpHandle::RunImpl() {
auto var = local_scope_.FindVar(var_name);
PADDLE_ENFORCE(var);

ParameterCollection::Instance().Get(var_name)->Send<Variable>(var);
ParameterCollection::Instance().Get(var_name)->Send<Variable *>(&var);
}

std::string AllGatherOpHandle::Name() const { return "all_gather"; }
Expand Down
71 changes: 35 additions & 36 deletions paddle/fluid/framework/details/all_reduce_op_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ AllReduceOpHandle::AllReduceOpHandle(const std::vector<Scope *> &local_scopes,
}

void AllReduceOpHandle::RunImpl() {
PADDLE_ENFORCE_EQ(this->inputs_.size(), device_count_);
auto in_var0 = static_cast<VarHandle *>(this->inputs_[0]);
auto &var_name = in_var0->name_;

Expand All @@ -41,53 +42,56 @@ void AllReduceOpHandle::RunImpl() {
in->generated_op_->Wait(dev_ctxes_[p]);
}

platform::Place cuda_pinned_place = platform::CUDAPinnedPlace();
// sum
Variable var;
ParameterCollection::Instance().Get(var_name)->Receive<Variable>(&var);
Variable *var;
ParameterCollection::Instance().Get(var_name)->Receive<Variable *>(&var);

Tensor reducer;
Tensor temp;
if (var.IsType<framework::SelectedRows>()) {
// reduce sparse parameter
if (var->IsType<framework::SelectedRows>()) {
// reduce sparse gradient

} else if (var.IsType<framework::LoDTensor>()) {
auto param = var.Get<LoDTensor>();
} else if (var->IsType<framework::LoDTensor>()) {
auto param = var->Get<LoDTensor>();

auto dev_id = static_cast<platform::CUDAPlace>(param.place()).device;
auto &p = nccl_ctxs_.DevCtx(dev_id);
PADDLE_ENFORCE(platform::is_gpu_place(param.place()));

reducer.mutable_data(param.dims(), platform::CUDAPinnedPlace());
temp.mutable_data(param.dims(), platform::CUDAPinnedPlace());
auto dev_id = boost::get<platform::CUDAPlace>(param.place()).device;
auto dev_ctx = nccl_ctxs_.DevCtx(dev_id);

framework::TensorCopy(param, platform::CUDAPinnedPlace(), dev_ctxes_[p],
reducer);
dev_ctxes_[p]->Wait();
reducer.Resize(param.dims());
temp.Resize(param.dims());
reducer.mutable_data(cuda_pinned_place, param.type());
temp.mutable_data(cuda_pinned_place, param.type());

framework::TensorCopy(param, cuda_pinned_place, *dev_ctx, &reducer);
dev_ctx->Wait();
} else {
PADDLE_THROW("Parameter should be LoDTensor or SelectedRows");
PADDLE_THROW("Gradient should be LoDTensor or SelectedRows");
}

// TODO(zcd): float should be T
float *reducer_ptr = reducer.data<float>();
for (int j = 0; j < device_count_ - 1; ++j) {
Variable other_var;
ParameterCollection::Instance().Get(var_name)->Receive<Variable>(
Variable *other_var;
ParameterCollection::Instance().Get(var_name)->Receive<Variable *>(
&other_var);
PADDLE_ENFORCE(other_var.Type() == var.Type());
PADDLE_ENFORCE(other_var->Type() == var->Type());

if (var.second->IsType<framework::SelectedRows>()) {
// reduce sparse parameter
if (var->IsType<framework::SelectedRows>()) {
// reduce sparse gradient

} else if (var.second->IsType<framework::LoDTensor>()) {
auto param = other_var.Get<LoDTensor>();
} else if (var->IsType<framework::LoDTensor>()) {
auto param = other_var->Get<LoDTensor>();
PADDLE_ENFORCE_EQ(reducer.numel(), param.numel());

auto dev_id = static_cast<platform::CUDAPlace>(param.place()).device;
auto &p = nccl_ctxs_.DevCtx(dev_id);
auto dev_id = boost::get<platform::CUDAPlace>(param.place()).device;
auto dev_ctx = nccl_ctxs_.DevCtx(dev_id);

framework::TensorCopy(param, platform::CUDAPinnedPlace(), dev_ctxes_[p],
temp);
framework::TensorCopy(param, cuda_pinned_place, *dev_ctx, &temp);

dev_ctxes_[p]->Wait();
dev_ctx->Wait();
float *temp_ptr = temp.data<float>();
for (int k = 0; k < reducer.numel(); ++k) {
reducer_ptr[k] += temp_ptr[k];
Expand All @@ -101,18 +105,13 @@ void AllReduceOpHandle::RunImpl() {
auto *s = local_scopes_[i];
int dev_id = boost::get<platform::CUDAPlace>(p).device;
auto var = s->FindVar(var_name);
if (var.IsType<framework::SelectedRows>()) {
// reduce sparse parameter

} else if (var.IsType<framework::LoDTensor>()) {
auto &lod_tensor = var->Get<LoDTensor>();
void *buffer = const_cast<void *>(lod_tensor.data<void>());
if (numel == 0) {
numel = static_cast<size_t>(lod_tensor.numel());
}
if (var->IsType<framework::SelectedRows>()) {
// reduce sparse gradient

} else if (var->IsType<framework::LoDTensor>()) {
auto lod_tensor = var->GetMutable<LoDTensor>();
auto dev_ctx = nccl_ctxs_.DevCtx(dev_id);
framework::TensorCopy(reducer, p, dev_ctx, lod_tensor);
framework::TensorCopy(reducer, p, *dev_ctx, lod_tensor);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/framework/details/all_reduce_op_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/platform/nccl_helper.h"

namespace paddle {
namespace framework {
Expand All @@ -33,7 +34,6 @@ struct AllReduceOpHandle : public OpHandleBase {
const std::vector<platform::Place> &places_;
const platform::NCCLContextMap &nccl_ctxs_;
const int device_count_;
std::thread all_reduce_calls_;

AllReduceOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
Expand Down
13 changes: 5 additions & 8 deletions paddle/fluid/framework/details/collect_parameters.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include <unordered_set>

#include "paddle/fluid/framework/details/collect_parameters.h"
#include "paddle/fluid/framework/variable.h"

Expand All @@ -23,15 +21,14 @@ namespace details {
ParameterCollection *ParameterCollection::param_collect = nullptr;

ParameterCollection::ParameterCollection(
const std::vector<std::string> &para_names, const int device_count)
const std::unordered_set<std::string> &para_names, const int device_count)
: device_count_(device_count) {
PADDLE_ENFORCE_GT(para_names.size(), 0);
PADDLE_ENFORCE_NOT_NULL(param_collect,
"Need to Create ParameterCollection first!");

using PtrType = std::unique_ptr<ChannelHolder>;
for (size_t i = 0; i < para_names.size(); ++i) {
param_channels_.emplace(para_names[i], PtrType(new ChannelHolder()));
param_channels_[para_names[i]]->Reset<Variable *>(device_count);
for (auto pn : para_names) {
param_channels_.emplace(pn, PtrType(new ChannelHolder()));
param_channels_[pn]->Reset<Variable *>(device_count);
}
}

Expand Down
15 changes: 9 additions & 6 deletions paddle/fluid/framework/details/collect_parameters.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

#include <map>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include "paddle/fluid/framework/channel.h"

namespace paddle {
Expand All @@ -26,8 +27,9 @@ namespace details {

class ParameterCollection {
public:
explicit ParameterCollection(const std::vector<std::string> &para_names,
const int device_count);
explicit ParameterCollection(
const std::unordered_set<std::string> &para_names,
const int device_count);

static ParameterCollection &Instance() {
PADDLE_ENFORCE_NOT_NULL(param_collect,
Expand All @@ -36,8 +38,9 @@ class ParameterCollection {
}

/*! \brief Create should only called by Init function */
static ParameterCollection &Init(const std::vector<std::string> &para_names,
const int device_count) {
static ParameterCollection &Init(
const std::unordered_set<std::string> &para_names,
const int device_count) {
if (param_collect == nullptr) {
param_collect = new ParameterCollection(para_names, device_count);
}
Expand All @@ -51,7 +54,7 @@ class ParameterCollection {
private:
static ParameterCollection *param_collect;
const int device_count_;
std::unordered_map<const std::string, std::unique_ptr<ChannelHolder>>
std::unordered_map<std::string, std::unique_ptr<ChannelHolder>>
param_channels_;
DISABLE_COPY_AND_ASSIGN(ParameterCollection);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ MultiDevSSAGraphBuilder::MultiDevSSAGraphBuilder(
for (auto &p : params) {
grad_names_.insert(GradVarName(p));
}
if (use_gather_reduce) {
ParameterCollection::Init(grad_names_,
static_cast<int>(local_scopes.size()));
}
}

std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
Expand Down Expand Up @@ -150,7 +154,9 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
result.ops_.emplace_back(
new AllGatherOpHandle(*local_scopes_[i], p));
auto *op_handle = result.ops_.back().get();

op_handle->dev_ctxes_[p] = const_cast<platform::DeviceContext *>(
platform::DeviceContextPool::Instance().Get(p));
// why is vars empyt ??????
if (vars.empty()) { // This device has no data. continue.
continue;
}
Expand Down
41 changes: 38 additions & 3 deletions paddle/fluid/framework/tensor_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,44 @@ void TensorCopy(const Tensor& src, const platform::Place& dst_place,
memory::Copy(
dst_gpu_place, dst_ptr, src_gpu_place, src_ptr, size,
reinterpret_cast<const platform::CUDADeviceContext&>(ctx).stream());

// TODO(zcd): CUDAPinnedPlace->CUDAPlace, CUDAPlace->CUDAPinnedPlace
// TODO(zcd): CUDAPinnedPlace->CPUPlace, CPUPlace->CUDAPinnedPlace
} else if (platform::is_cuda_pinned_place(src_place) &&
platform::is_gpu_place(dst_place)) {
auto src_cuda_pinned_place =
boost::get<platform::CUDAPinnedPlace>(src_place);
auto dst_gpu_place = boost::get<platform::CUDAPlace>(dst_place);
auto ctx_place = ctx.GetPlace();
PADDLE_ENFORCE(platform::is_gpu_place(ctx_place));
auto ctx_gpu_place = boost::get<platform::CUDAPlace>(ctx_place);
PADDLE_ENFORCE_EQ(dst_gpu_place, ctx_gpu_place);
memory::Copy(
dst_gpu_place, dst_ptr, src_cuda_pinned_place, src_ptr, size,
reinterpret_cast<const platform::CUDADeviceContext&>(ctx).stream());
} else if (platform::is_gpu_place(src_place) && // NOLINT
platform::is_cuda_pinned_place(dst_place)) {
auto src_gpu_place = boost::get<platform::CUDAPlace>(src_place);
auto dst_cuda_pinned_place =
boost::get<platform::CUDAPinnedPlace>(dst_place);
auto ctx_place = ctx.GetPlace();
PADDLE_ENFORCE(platform::is_gpu_place(ctx_place));
auto ctx_gpu_place = boost::get<platform::CUDAPlace>(ctx_place);
PADDLE_ENFORCE_EQ(src_gpu_place, ctx_gpu_place);
memory::Copy(
dst_cuda_pinned_place, dst_ptr, src_gpu_place, src_ptr, size,
reinterpret_cast<const platform::CUDADeviceContext&>(ctx).stream());
} else if (platform::is_cuda_pinned_place(src_place) &&
platform::is_cpu_place(dst_place)) {
memory::Copy(boost::get<platform::CPUPlace>(dst_place), dst_ptr,
boost::get<platform::CUDAPinnedPlace>(src_place), src_ptr,
size);
} else if (platform::is_cpu_place(src_place) &&
platform::is_cuda_pinned_place(dst_place)) {
memory::Copy(boost::get<platform::CUDAPinnedPlace>(dst_place), dst_ptr,
boost::get<platform::CPUPlace>(src_place), src_ptr, size);
} else if (platform::is_cuda_pinned_place(src_place) &&
platform::is_cuda_pinned_place(dst_place)) {
memory::Copy(boost::get<platform::CUDAPinnedPlace>(dst_place), dst_ptr,
boost::get<platform::CUDAPinnedPlace>(src_place), src_ptr,
size);
#endif
} else {
PADDLE_THROW("TensorCopy failed.");
Expand Down
9 changes: 5 additions & 4 deletions paddle/fluid/pybind/pybind.cc
Original file line number Diff line number Diff line change
Expand Up @@ -546,10 +546,11 @@ All parameter, weight, gradient are variables in Paddle.
const std::unordered_set<std::string> &params,
const ProgramDesc &startup_program,
const ProgramDesc &main_program, const std::string &loss_var_name,
Scope *scope, bool allow_op_delay) {
new (&self) ParallelExecutor(num_threads, use_event, places,
params, startup_program, main_program,
loss_var_name, scope, allow_op_delay);
Scope *scope, bool allow_op_delay, bool use_gather_reduce) {
new (&self)
ParallelExecutor(num_threads, use_event, places, params,
startup_program, main_program, loss_var_name,
scope, allow_op_delay, use_gather_reduce);
})
.def("run", &ParallelExecutor::Run);

Expand Down
6 changes: 4 additions & 2 deletions python/paddle/fluid/parallel_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ def __init__(self,
loss_name,
use_cuda,
num_threads=None,
allow_op_delay=False):
allow_op_delay=False,
use_gather_reduce=False):
self._places = []
self._act_places = []
if use_cuda:
Expand Down Expand Up @@ -66,7 +67,8 @@ def __init__(self,
main.desc,
loss_name,
scope,
allow_op_delay)
allow_op_delay,
use_gather_reduce)
self.scope = scope

def run(self, fetch_list, feed_dict={}):
Expand Down
1 change: 1 addition & 0 deletions python/setup.py.in
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ package_dir={
# So that package points to other directory.
'paddle.fluid.proto.profiler': '${PADDLE_BINARY_DIR}/paddle/fluid/platform',
'paddle.fluid.proto': '${PADDLE_BINARY_DIR}/paddle/fluid/framework',
'paddle.fluid': '${PADDLE_BINARY_DIR}/python/paddle/fluid',
}
if '${WITH_FLUID_ONLY}'== 'OFF':
package_dir['py_paddle']='${PADDLE_BINARY_DIR}/python/py_paddle'
Expand Down

0 comments on commit 1e29c2e

Please sign in to comment.