Skip to content

Commit

Permalink
enhance broadcast_op_handle and gather_op_handle
Browse files Browse the repository at this point in the history
  • Loading branch information
chengduoZH committed Apr 13, 2018
1 parent b0267ac commit 02842cf
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 239 deletions.
71 changes: 50 additions & 21 deletions paddle/fluid/framework/details/broadcast_op_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,74 @@ namespace paddle {
namespace framework {
namespace details {

Tensor *GetTensorFromVar(Variable *in_var) {
if (in_var->IsType<LoDTensor>()) {
return in_var->GetMutable<LoDTensor>();
} else if (in_var->IsType<SelectedRows>()) {
return in_var->GetMutable<SelectedRows>()->mutable_value();
} else {
PADDLE_THROW("Var should be LoDTensor or SelectedRows");
}
return nullptr;
}

BroadcastOpHandle::BroadcastOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places)
: local_scopes_(local_scopes), places_(places) {}

void BroadcastOpHandle::RunImpl() {
PADDLE_ENFORCE_EQ(this->inputs_.size(), 1,
// the input may have dummy var.
std::vector<VarHandle *> in_var_handle;
for (auto *in : inputs_) {
auto *out_handle = dynamic_cast<VarHandle *>(in);
if (out_handle) {
in_var_handle.push_back(out_handle);
}
}
PADDLE_ENFORCE_EQ(in_var_handle.size(), 1,
"The number of input should be one.");

// the output may have dummy var.
std::vector<VarHandle *> out_var_handles;
for (auto *out : outputs_) {
auto *out_handle = dynamic_cast<VarHandle *>(out);
if (out_handle) {
out_var_handles.push_back(out_handle);
}
}

PADDLE_ENFORCE_EQ(
this->outputs_.size(), places_.size(),
out_var_handles.size(), places_.size(),
"The number of output should equal to the number of places.");

// Wait input done, this Wait is asynchronous operation
auto in_var_handle = static_cast<VarHandle *>(this->inputs_[0]);
auto &in_place = in_var_handle->place_;
if (inputs_[0]->generated_op_) {
inputs_[0]->generated_op_->Wait(dev_ctxes_[in_place]);
for (auto *out : outputs_) {
auto out_handle = static_cast<VarHandle *>(out);
auto &out_p = out_handle->place_;
inputs_[0]->generated_op_->Wait(dev_ctxes_[out_p]);
auto &in_place = in_var_handle[0]->place_;
if (in_var_handle[0]->generated_op_) {
in_var_handle[0]->generated_op_->Wait(dev_ctxes_[in_place]);
for (auto *out : out_var_handles) {
auto &out_p = out->place_;
if (platform::is_same_place(in_place, out_p)) continue;
in_var_handle[0]->generated_op_->Wait(dev_ctxes_[out_p]);
}
}

auto in_scope_idx = in_var_handle->scope_idx_;
//
auto in_scope_idx = in_var_handle[0]->scope_idx_;
PADDLE_ENFORCE_LT(in_scope_idx, local_scopes_.size(),
"The input(%s) is not in the local_scopes.",
in_var_handle->name_);
auto in_var = local_scopes_[in_scope_idx]->FindVar(in_var_handle->name_);

in_var_handle[0]->name_);
auto in_var = local_scopes_[in_scope_idx]->FindVar(in_var_handle[0]->name_);
Tensor *in_tensor = GetTensorFromVar(in_var);
for (auto *out : outputs_) {
auto out_handle = static_cast<VarHandle *>(out);
auto &out_p = out_handle->place_;

auto out_scope_idx = out_handle->scope_idx_;
for (auto *out : out_var_handles) {
auto &out_p = out->place_;

auto out_scope_idx = out->scope_idx_;
PADDLE_ENFORCE_LT(out_scope_idx, local_scopes_.size(),
"%s is not in the local_scopes ", out_handle->name_);
"%s is not in the local_scopes ", out->name_);

auto *s = local_scopes_[out_scope_idx];
auto out_var = s->FindVar(out_handle->name_);
auto out_var = s->FindVar(out->name_);
PADDLE_ENFORCE_EQ(out_p.which(), in_place.which(),
"The place of input and output should be the same.");

Expand Down Expand Up @@ -89,7 +118,7 @@ void BroadcastOpHandle::RunImpl() {
auto dst_gpu_place = boost::get<platform::CUDAPlace>(out_p);
void *dst_ptr = out_tensor->mutable_data(out_p);
void *src_ptr = in_tensor->data<void>();
int64_t size = in_tensor->numel();
int64_t size = in_tensor->numel() * SizeOfType(in_tensor->type());
memory::Copy(
dst_gpu_place, dst_ptr, src_gpu_place, src_ptr, size,
reinterpret_cast<platform::CUDADeviceContext *>(dev_ctxes_[out_p])
Expand Down
151 changes: 78 additions & 73 deletions paddle/fluid/framework/details/broadcast_op_handle_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,20 @@ namespace p = paddle::platform;
// test data amount
const f::DDim kDims = {20, 20};

class BroadcastTester : public ::testing::Test {
public:
struct TestBroadcastOpHandle {
std::vector<std::unique_ptr<p::DeviceContext>> ctxs_;
std::vector<Scope*> local_scopes_;
Scope g_scope_;
std::unique_ptr<OpHandleBase> op_handle_;
std::vector<std::unique_ptr<VarHandleBase>> vars_;
std::vector<p::Place> gpu_list_;

void WaitAll() {
for (size_t j = 0; j < ctxs_.size(); ++j) {
ctxs_[j]->Wait();
}
}

void InitCtxOnGpu(bool use_gpu) {
if (use_gpu) {
#ifdef PADDLE_WITH_CUDA
Expand Down Expand Up @@ -57,61 +69,56 @@ class BroadcastTester : public ::testing::Test {
}
}

void BroadcastInitOp(int input_scope_idx) {
void InitBroadcastOp(size_t input_scope_idx) {
for (size_t j = 0; j < gpu_list_.size(); ++j) {
local_scope_.push_back(&g_scope_.NewScope());
local_scope_[j]->Var("out");
local_scopes_.push_back(&(g_scope_.NewScope()));
local_scopes_[j]->Var("out");
}
local_scope_[input_scope_idx]->Var("input");
local_scopes_[input_scope_idx]->Var("input");

bc_op_handle_ = new f::details::BroadcastOpHandle(local_scope_, gpu_list_);
op_handle_.reset(new BroadcastOpHandle(local_scopes_, gpu_list_));

f::details::VarHandle* in_var_handle = new f::details::VarHandle();
vars_.emplace_back(new VarHandle());
VarHandle* in_var_handle = static_cast<VarHandle*>(vars_.back().get());
in_var_handle->place_ = gpu_list_[input_scope_idx];
in_var_handle->name_ = "input";
in_var_handle->version_ = 1;
in_var_handle->scope_idx_ = input_scope_idx;
in_var_handle->generated_op_ = nullptr;
bc_op_handle_->AddInput(in_var_handle);
op_handle_->AddInput(in_var_handle);

// add dummy var
vars_.emplace_back(new DummyVarHandle());
DummyVarHandle* dummy_var_handle =
static_cast<DummyVarHandle*>(vars_.back().get());
dummy_var_handle->generated_op_ = nullptr;
op_handle_->AddInput(dummy_var_handle);

for (size_t j = 0; j < gpu_list_.size(); ++j) {
bc_op_handle_->dev_ctxes_[gpu_list_[j]] = ctxs_[j];
f::details::VarHandle* out_var_handle = new f::details::VarHandle();
op_handle_->dev_ctxes_[gpu_list_[j]] = ctxs_[j].get();
vars_.emplace_back(new VarHandle());
VarHandle* out_var_handle = static_cast<VarHandle*>(vars_.back().get());
out_var_handle->place_ = gpu_list_[j];
out_var_handle->name_ = "out";
out_var_handle->version_ = 2;
out_var_handle->scope_idx_ = j;
bc_op_handle_->AddOutput(out_var_handle);
}
}
void BroadcastOpDestroy() {
for (auto in : bc_op_handle_->inputs_) {
delete in;
}
for (auto out : bc_op_handle_->outputs_) {
delete out;
op_handle_->AddOutput(out_var_handle);
}
delete bc_op_handle_;
for (size_t j = 0; j < ctxs_.size(); ++j) {
delete ctxs_[j];
}
}

void WaitAll() {
for (size_t j = 0; j < ctxs_.size(); ++j) {
ctxs_[j]->Wait();
}
// add dummy var
vars_.emplace_back(new DummyVarHandle());
DummyVarHandle* out_dummy_var_handle =
static_cast<DummyVarHandle*>(vars_.back().get());
out_dummy_var_handle->generated_op_ = nullptr;
op_handle_->AddOutput(out_dummy_var_handle);
}

void TestBroadcastLodTensor() {
int input_scope_idx = 0;
BroadcastInitOp(input_scope_idx);

auto in_var = local_scope_[input_scope_idx]->Var("input");
void TestBroadcastLodTensor(size_t input_scope_idx) {
auto in_var = local_scopes_[input_scope_idx]->Var("input");
auto in_lod_tensor = in_var->GetMutable<f::LoDTensor>();
in_lod_tensor->mutable_data<float>(kDims, gpu_list_[input_scope_idx]);

std::vector<float> send_vector(f::product(kDims), input_scope_idx + 12);
std::vector<float> send_vector(static_cast<size_t>(f::product(kDims)));
for (size_t k = 0; k < send_vector.size(); ++k) {
send_vector[k] = k;
}
Expand All @@ -120,56 +127,51 @@ class BroadcastTester : public ::testing::Test {
send_vector, *(ctxs_[input_scope_idx]), in_lod_tensor);
in_lod_tensor->set_lod(lod);

bc_op_handle_->Run(false);
op_handle_->Run(false);

WaitAll();

p::CPUPlace cpu_place;
for (size_t j = 0; j < gpu_list_.size(); ++j) {
auto out_var = local_scope_[j]->Var("out");
auto out_var = local_scopes_[j]->Var("out");
auto out_tensor = out_var->Get<f::LoDTensor>();
PADDLE_ENFORCE_EQ(out_tensor.lod(), lod, "lod is not equal.");

f::Tensor result_tensor;
f::TensorCopy(out_tensor, cpu_place, *(ctxs_[j]), &result_tensor);
float* ct = result_tensor.mutable_data<float>(cpu_place);

for (int64_t j = 0; j < f::product(kDims); ++j) {
ASSERT_NEAR(ct[j], send_vector[j], 1e-5);
for (int64_t i = 0; i < f::product(kDims); ++i) {
ASSERT_NEAR(ct[i], send_vector[i], 1e-5);
}
}

BroadcastOpDestroy();
}

void TestBroadcastSelectedRows() {
int input_scope_idx = 0;
BroadcastInitOp(input_scope_idx);

auto in_var = local_scope_[input_scope_idx]->Var("input");
void TestBroadcastSelectedRows(size_t input_scope_idx) {
auto in_var = local_scopes_[input_scope_idx]->Var("input");
auto in_selected_rows = in_var->GetMutable<f::SelectedRows>();
auto value = in_selected_rows->mutable_value();
value->mutable_data<float>(kDims, gpu_list_[input_scope_idx]);
int height = kDims[0] * 2;
int height = static_cast<int>(kDims[0]) * 2;
std::vector<int64_t> rows{0, 1, 2, 3, 3, 0, 14, 7, 3, 1,
2, 4, 6, 3, 1, 1, 1, 1, 3, 7};
in_selected_rows->set_height(height);
in_selected_rows->set_rows(rows);

std::vector<float> send_vector(f::product(kDims));
std::vector<float> send_vector(static_cast<size_t>(f::product(kDims)));
for (size_t k = 0; k < send_vector.size(); ++k) {
send_vector[k] = k;
}
paddle::framework::TensorFromVector<float>(
send_vector, *(ctxs_[input_scope_idx]), value);

bc_op_handle_->Run(false);
op_handle_->Run(false);

WaitAll();

p::CPUPlace cpu_place;
for (size_t j = 0; j < gpu_list_.size(); ++j) {
auto out_var = local_scope_[j]->Var("out");
auto out_var = local_scopes_[j]->Var("out");
auto& out_select_rows = out_var->Get<f::SelectedRows>();
auto rt = out_select_rows.value();

Expand All @@ -183,41 +185,44 @@ class BroadcastTester : public ::testing::Test {
f::TensorCopy(rt, cpu_place, *(ctxs_[j]), &result_tensor);
float* ct = result_tensor.data<float>();

for (int64_t j = 0; j < f::product(kDims); ++j) {
ASSERT_NEAR(ct[j], send_vector[j], 1e-5);
for (int64_t i = 0; i < f::product(kDims); ++i) {
ASSERT_NEAR(ct[i], send_vector[i], 1e-5);
}
}

BroadcastOpDestroy();
}

public:
f::Scope g_scope_;
std::vector<p::DeviceContext*> ctxs_;
std::vector<f::Scope*> local_scope_;
std::vector<p::Place> gpu_list_;
f::details::BroadcastOpHandle* bc_op_handle_;
};

TEST_F(BroadcastTester, TestCPUBroadcastTestLodTensor) {
InitCtxOnGpu(false);
TestBroadcastLodTensor();
TEST(BroadcastTester, TestCPUBroadcastTestLodTensor) {
TestBroadcastOpHandle test_op;
size_t input_scope_idx = 0;
test_op.InitCtxOnGpu(false);
test_op.InitBroadcastOp(input_scope_idx);
test_op.TestBroadcastLodTensor(input_scope_idx);
}

TEST_F(BroadcastTester, TestCPUBroadcastTestSelectedRows) {
InitCtxOnGpu(false);
TestBroadcastSelectedRows();
TEST(BroadcastTester, TestCPUBroadcastTestSelectedRows) {
TestBroadcastOpHandle test_op;
size_t input_scope_idx = 0;
test_op.InitCtxOnGpu(false);
test_op.InitBroadcastOp(input_scope_idx);
test_op.TestBroadcastSelectedRows(input_scope_idx);
}

#ifdef PADDLE_WITH_CUDA
TEST_F(BroadcastTester, TestGPUBroadcastTestLodTensor) {
InitCtxOnGpu(true);
TestBroadcastLodTensor();
TEST(BroadcastTester, TestGPUBroadcastTestLodTensor) {
TestBroadcastOpHandle test_op;
size_t input_scope_idx = 0;
test_op.InitCtxOnGpu(true);
test_op.InitBroadcastOp(input_scope_idx);
test_op.TestBroadcastLodTensor(input_scope_idx);
}

TEST_F(BroadcastTester, TestGPUBroadcastTestSelectedRows) {
InitCtxOnGpu(true);
TestBroadcastSelectedRows();
TEST(BroadcastTester, TestGPUBroadcastTestSelectedRows) {
TestBroadcastOpHandle test_op;
size_t input_scope_idx = 0;
test_op.InitCtxOnGpu(true);
test_op.InitBroadcastOp(input_scope_idx);
test_op.TestBroadcastSelectedRows(input_scope_idx);
}
#endif

Expand Down
Loading

0 comments on commit 02842cf

Please sign in to comment.