Skip to content

Commit

Permalink
paddlebox add fileapi, add shard embeding, fix read ins error bug, ad…
Browse files Browse the repository at this point in the history
…d enable train binding cpu mode support mix train (#68)
  • Loading branch information
qingshui authored May 11, 2021
1 parent 6fa5f88 commit 6e1191c
Show file tree
Hide file tree
Showing 23 changed files with 1,015 additions and 439 deletions.
86 changes: 60 additions & 26 deletions paddle/fluid/framework/boxps_trainer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
#include "paddle/fluid/framework/fleet/box_wrapper.h"
#include "paddle/fluid/framework/trainer.h"
#include "paddle/fluid/framework/trainer_desc.pb.h"

DECLARE_bool(enable_binding_train_cpu);
namespace paddle {
namespace framework {

void BoxPSTrainer::Initialize(const TrainerDesc& trainer_desc,
Dataset* dataset) {
Dataset* dataset) {
thread_num_ = trainer_desc.thread_num();
VLOG(3) << "pipeline num: " << thread_num_;

Expand All @@ -36,8 +36,8 @@ void BoxPSTrainer::Initialize(const TrainerDesc& trainer_desc,
param_config_ = trainer_desc.boxps_param();
async_mode_ = param_config_.async_mode();
if (async_mode_) {
dense_table_.reset(new BoxPSAsynDenseTable(thread_num_));
VLOG(3) << "async mode ";
dense_table_.reset(new BoxPSAsynDenseTable(thread_num_));
VLOG(3) << "async mode ";
}
dump_thread_num_ = param_config_.dump_thread_num();
if (need_dump_field_ && dump_thread_num_ <= 0) {
Expand All @@ -47,13 +47,15 @@ void BoxPSTrainer::Initialize(const TrainerDesc& trainer_desc,
workers_.resize(thread_num_);
param_need_sync_.reset(new std::vector<std::string>);

int sync_dense_mode = param_config_.sync_dense_mode();
int sync_weight_step = param_config_.sync_weight_step();
bool sync_one_ring = param_config_.sync_one_ring();
for (int i = 0; i < thread_num_; ++i) {
platform::Place place = platform::CUDAPlace(i);
workers_[i] = DeviceWorkerFactory::CreateDeviceWorker(
trainer_desc.device_worker_name());
auto this_worker =
std::dynamic_pointer_cast<paddle::framework::BoxPSWorker>(
workers_[i]);
std::dynamic_pointer_cast<paddle::framework::BoxPSWorker>(workers_[i]);
this_worker->SetDeviceIndex(i);
this_worker->SetThreadIndex(i);
this_worker->SetDataFeed(readers[i]);
Expand All @@ -65,10 +67,13 @@ void BoxPSTrainer::Initialize(const TrainerDesc& trainer_desc,
this_worker->SetPlace(place);
this_worker->Initialize(trainer_desc);
this_worker->InitRandomDumpConfig(trainer_desc);
this_worker->SetParamSyncStep(sync_weight_step);
this_worker->SetDenseSyncMode(sync_dense_mode);
this_worker->SetOneRing(sync_one_ring);
}
param_need_sync_.reset(
new std::vector<std::string>(param_config_.param_need_sync().begin(),
param_config_.param_need_sync().end()));
param_config_.param_need_sync().end()));
VLOG(3) << "param_need_sync_ have: ";
for (const std::string& name : *param_need_sync_) {
VLOG(3) << name;
Expand All @@ -92,7 +97,7 @@ void BoxPSTrainer::InitDumpEnv() {
queue_ = paddle::framework::MakeChannel<std::string>();
// Only set dump channel on the last section
for (int i = 0; i < thread_num_; ++i) {
workers_[i]->SetChannelWriter(queue_.get());
workers_[i]->SetChannelWriter(queue_.get());
}
// TODO(hutuxian): should make it as a config
for (int i = 0; i < dump_thread_num_; i++) {
Expand All @@ -102,32 +107,30 @@ void BoxPSTrainer::InitDumpEnv() {
}

void BoxPSTrainer::CopyParameters(const Scope& root_scope, int device_id) {
Scope *thread_scope = GetWorkerScope(device_id);
Scope* thread_scope = GetWorkerScope(device_id);
for (const std::string& name : *param_need_sync_) {
const LoDTensor& root_tensor = root_scope.FindVar(name)->Get<LoDTensor>();

// TODO(hutxian): check a new var of the same name is created in
LoDTensor* gpu_tensor =
thread_scope->Var(name)->GetMutable<LoDTensor>();
LoDTensor* gpu_tensor = thread_scope->Var(name)->GetMutable<LoDTensor>();
platform::Place place = platform::CUDAPlace(device_id);
TensorCopy(*static_cast<const Tensor*>(&root_tensor), place,
static_cast<Tensor*>(gpu_tensor));
}
}

void BoxPSTrainer::DumpParameters(void) {
Scope *thread_scope = GetWorkerScope(0);
Scope* thread_scope = GetWorkerScope(0);
for (const auto& var : persistable_vars_) {
auto* root_tensor = root_scope_->Var(var)->GetMutable<LoDTensor>();
// TODO(hutuxian): Add a final all-reduce?
const auto& thread_tensor =
thread_scope->FindVar(var)->Get<LoDTensor>();
const auto& thread_tensor = thread_scope->FindVar(var)->Get<LoDTensor>();
TensorCopySync(thread_tensor, platform::CPUPlace(), root_tensor);
}
}

void BoxPSTrainer::InitTrainerEnv(const ProgramDesc& main_program,
const platform::Place& place) {
const platform::Place& place) {
PADDLE_ENFORCE(root_scope_, "Null root_scope pointer");
for (auto& var : main_program.Block(0).AllVars()) {
if (var->Persistable()) {
Expand All @@ -136,37 +139,68 @@ void BoxPSTrainer::InitTrainerEnv(const ProgramDesc& main_program,
}

if (async_mode_) {
dense_table_->Init(*root_scope_, *param_need_sync_.get(), persistable_vars_);
dense_table_->Init(*root_scope_, *param_need_sync_.get(),
persistable_vars_);
}
for (int i = 0; i < thread_num_; ++i) {
auto this_worker =
std::dynamic_pointer_cast<paddle::framework::BoxPSWorker>(
workers_[i]);
std::dynamic_pointer_cast<paddle::framework::BoxPSWorker>(workers_[i]);
this_worker->SetRootScope(root_scope_);
this_worker->CreateDeviceResource(main_program);
if (async_mode_) {
this_worker->SetDenseTable(dense_table_.get());
}
// CopyParameters(*root_scope_, i);
// CopyParameters(*root_scope_, i);
}
}

inline std::vector<std::shared_ptr<paddle::framework::ThreadPool>>&
GetThreadPool(int thread_num) {
static std::vector<std::shared_ptr<paddle::framework::ThreadPool>>
thread_pools;
if (!thread_pools.empty()) {
return thread_pools;
}
thread_pools.resize(thread_num);
for (int i = 0; i < thread_num; ++i) {
thread_pools[i].reset(new paddle::framework::ThreadPool(1));
}
if (!FLAGS_enable_binding_train_cpu) {
return thread_pools;
}
std::vector<int>& train_cores = boxps::get_train_cores();
if (train_cores.size() < static_cast<size_t>(thread_num)) {
return thread_pools;
}
std::vector<int> ncores;
for (int i = 0; i < thread_num; ++i) {
ncores.push_back(train_cores[i]);
if (train_cores.size() / 2 == static_cast<size_t>(thread_num)) {
ncores.push_back(train_cores[i + thread_num]);
}
thread_pools[i]->SetCPUAffinity(ncores, false);
ncores.clear();
}
return thread_pools;
}
void BoxPSTrainer::Run() {
VLOG(3) << "Going to run";
auto pool = GetThreadPool(thread_num_);
wait_futures_.clear();
CHECK(static_cast<int>(pool.size()) == thread_num_);
for (int i = 0; i < thread_num_; ++i) {
if (!debug_) {
worker_threads_.push_back(
std::thread(&DeviceWorker::TrainFiles, workers_[i].get()));
wait_futures_.emplace_back(
pool[i]->Run([this, i]() { workers_[i]->TrainFiles(); }));
} else {
worker_threads_.push_back(std::thread(
&DeviceWorker::TrainFilesWithProfiler, workers_[i].get()));
wait_futures_.emplace_back(
pool[i]->Run([this, i]() { workers_[i]->TrainFilesWithProfiler(); }));
}
}
}

void BoxPSTrainer::Finalize() {
for (auto& th : worker_threads_) {
th.join();
for (auto& th : wait_futures_) {
th.wait();
}
if (async_mode_) {
// must be after train thread, otherwise the ps_buffer_ will be closed first
Expand Down
Loading

0 comments on commit 6e1191c

Please sign in to comment.