From 6e1191cb86bd29bbf7a116d6b3b6327efa441b75 Mon Sep 17 00:00:00 2001 From: qingshui Date: Tue, 11 May 2021 10:50:07 +0800 Subject: [PATCH] paddlebox add fileapi, add shard embeding, fix read ins error bug, add enable train binding cpu mode support mix train (#68) --- paddle/fluid/framework/boxps_trainer.cc | 86 ++-- paddle/fluid/framework/boxps_worker.cc | 160 ++++++- paddle/fluid/framework/channel.h | 19 +- paddle/fluid/framework/data_feed.cc | 8 +- paddle/fluid/framework/data_set.cc | 419 ++++++++++-------- paddle/fluid/framework/data_set.h | 48 +- paddle/fluid/framework/device_worker.h | 77 ++-- paddle/fluid/framework/fleet/box_wrapper.cc | 166 +++++-- paddle/fluid/framework/fleet/box_wrapper.cu | 76 +++- paddle/fluid/framework/fleet/box_wrapper.h | 173 ++++++-- .../fluid/framework/fleet/box_wrapper_impl.h | 58 +-- paddle/fluid/framework/tensor_util.cc | 22 + paddle/fluid/framework/tensor_util.h | 4 + paddle/fluid/framework/threadpool.h | 25 ++ paddle/fluid/framework/trainer.h | 4 +- paddle/fluid/framework/trainer_desc.proto | 5 +- .../operators/collective/c_mixallgather_op.cc | 36 +- paddle/fluid/platform/flags.cc | 13 +- paddle/fluid/pybind/box_helper_py.cc | 35 ++ paddle/fluid/pybind/box_helper_py.h | 1 + paddle/fluid/pybind/pybind.cc | 1 + python/paddle/fluid/__init__.py | 2 + python/paddle/fluid/device_worker.py | 16 +- 23 files changed, 1015 insertions(+), 439 deletions(-) diff --git a/paddle/fluid/framework/boxps_trainer.cc b/paddle/fluid/framework/boxps_trainer.cc index 4259c28c399f1..d864611e9db7f 100644 --- a/paddle/fluid/framework/boxps_trainer.cc +++ b/paddle/fluid/framework/boxps_trainer.cc @@ -17,12 +17,12 @@ #include "paddle/fluid/framework/fleet/box_wrapper.h" #include "paddle/fluid/framework/trainer.h" #include "paddle/fluid/framework/trainer_desc.pb.h" - +DECLARE_bool(enable_binding_train_cpu); namespace paddle { namespace framework { void BoxPSTrainer::Initialize(const TrainerDesc& trainer_desc, - Dataset* dataset) { + Dataset* dataset) { thread_num_ = trainer_desc.thread_num(); VLOG(3) << "pipeline num: " << thread_num_; @@ -36,8 +36,8 @@ void BoxPSTrainer::Initialize(const TrainerDesc& trainer_desc, param_config_ = trainer_desc.boxps_param(); async_mode_ = param_config_.async_mode(); if (async_mode_) { - dense_table_.reset(new BoxPSAsynDenseTable(thread_num_)); - VLOG(3) << "async mode "; + dense_table_.reset(new BoxPSAsynDenseTable(thread_num_)); + VLOG(3) << "async mode "; } dump_thread_num_ = param_config_.dump_thread_num(); if (need_dump_field_ && dump_thread_num_ <= 0) { @@ -47,13 +47,15 @@ void BoxPSTrainer::Initialize(const TrainerDesc& trainer_desc, workers_.resize(thread_num_); param_need_sync_.reset(new std::vector); + int sync_dense_mode = param_config_.sync_dense_mode(); + int sync_weight_step = param_config_.sync_weight_step(); + bool sync_one_ring = param_config_.sync_one_ring(); for (int i = 0; i < thread_num_; ++i) { platform::Place place = platform::CUDAPlace(i); workers_[i] = DeviceWorkerFactory::CreateDeviceWorker( trainer_desc.device_worker_name()); auto this_worker = - std::dynamic_pointer_cast( - workers_[i]); + std::dynamic_pointer_cast(workers_[i]); this_worker->SetDeviceIndex(i); this_worker->SetThreadIndex(i); this_worker->SetDataFeed(readers[i]); @@ -65,10 +67,13 @@ void BoxPSTrainer::Initialize(const TrainerDesc& trainer_desc, this_worker->SetPlace(place); this_worker->Initialize(trainer_desc); this_worker->InitRandomDumpConfig(trainer_desc); + this_worker->SetParamSyncStep(sync_weight_step); + this_worker->SetDenseSyncMode(sync_dense_mode); + this_worker->SetOneRing(sync_one_ring); } param_need_sync_.reset( new std::vector(param_config_.param_need_sync().begin(), - param_config_.param_need_sync().end())); + param_config_.param_need_sync().end())); VLOG(3) << "param_need_sync_ have: "; for (const std::string& name : *param_need_sync_) { VLOG(3) << name; @@ -92,7 +97,7 @@ void BoxPSTrainer::InitDumpEnv() { queue_ = paddle::framework::MakeChannel(); // Only set dump channel on the last section for (int i = 0; i < thread_num_; ++i) { - workers_[i]->SetChannelWriter(queue_.get()); + workers_[i]->SetChannelWriter(queue_.get()); } // TODO(hutuxian): should make it as a config for (int i = 0; i < dump_thread_num_; i++) { @@ -102,13 +107,12 @@ void BoxPSTrainer::InitDumpEnv() { } void BoxPSTrainer::CopyParameters(const Scope& root_scope, int device_id) { - Scope *thread_scope = GetWorkerScope(device_id); + Scope* thread_scope = GetWorkerScope(device_id); for (const std::string& name : *param_need_sync_) { const LoDTensor& root_tensor = root_scope.FindVar(name)->Get(); // TODO(hutxian): check a new var of the same name is created in - LoDTensor* gpu_tensor = - thread_scope->Var(name)->GetMutable(); + LoDTensor* gpu_tensor = thread_scope->Var(name)->GetMutable(); platform::Place place = platform::CUDAPlace(device_id); TensorCopy(*static_cast(&root_tensor), place, static_cast(gpu_tensor)); @@ -116,18 +120,17 @@ void BoxPSTrainer::CopyParameters(const Scope& root_scope, int device_id) { } void BoxPSTrainer::DumpParameters(void) { - Scope *thread_scope = GetWorkerScope(0); + Scope* thread_scope = GetWorkerScope(0); for (const auto& var : persistable_vars_) { auto* root_tensor = root_scope_->Var(var)->GetMutable(); // TODO(hutuxian): Add a final all-reduce? - const auto& thread_tensor = - thread_scope->FindVar(var)->Get(); + const auto& thread_tensor = thread_scope->FindVar(var)->Get(); TensorCopySync(thread_tensor, platform::CPUPlace(), root_tensor); } } void BoxPSTrainer::InitTrainerEnv(const ProgramDesc& main_program, - const platform::Place& place) { + const platform::Place& place) { PADDLE_ENFORCE(root_scope_, "Null root_scope pointer"); for (auto& var : main_program.Block(0).AllVars()) { if (var->Persistable()) { @@ -136,37 +139,68 @@ void BoxPSTrainer::InitTrainerEnv(const ProgramDesc& main_program, } if (async_mode_) { - dense_table_->Init(*root_scope_, *param_need_sync_.get(), persistable_vars_); + dense_table_->Init(*root_scope_, *param_need_sync_.get(), + persistable_vars_); } for (int i = 0; i < thread_num_; ++i) { auto this_worker = - std::dynamic_pointer_cast( - workers_[i]); + std::dynamic_pointer_cast(workers_[i]); this_worker->SetRootScope(root_scope_); this_worker->CreateDeviceResource(main_program); if (async_mode_) { this_worker->SetDenseTable(dense_table_.get()); } -// CopyParameters(*root_scope_, i); + // CopyParameters(*root_scope_, i); } } - +inline std::vector>& +GetThreadPool(int thread_num) { + static std::vector> + thread_pools; + if (!thread_pools.empty()) { + return thread_pools; + } + thread_pools.resize(thread_num); + for (int i = 0; i < thread_num; ++i) { + thread_pools[i].reset(new paddle::framework::ThreadPool(1)); + } + if (!FLAGS_enable_binding_train_cpu) { + return thread_pools; + } + std::vector& train_cores = boxps::get_train_cores(); + if (train_cores.size() < static_cast(thread_num)) { + return thread_pools; + } + std::vector ncores; + for (int i = 0; i < thread_num; ++i) { + ncores.push_back(train_cores[i]); + if (train_cores.size() / 2 == static_cast(thread_num)) { + ncores.push_back(train_cores[i + thread_num]); + } + thread_pools[i]->SetCPUAffinity(ncores, false); + ncores.clear(); + } + return thread_pools; +} void BoxPSTrainer::Run() { VLOG(3) << "Going to run"; + auto pool = GetThreadPool(thread_num_); + wait_futures_.clear(); + CHECK(static_cast(pool.size()) == thread_num_); for (int i = 0; i < thread_num_; ++i) { if (!debug_) { - worker_threads_.push_back( - std::thread(&DeviceWorker::TrainFiles, workers_[i].get())); + wait_futures_.emplace_back( + pool[i]->Run([this, i]() { workers_[i]->TrainFiles(); })); } else { - worker_threads_.push_back(std::thread( - &DeviceWorker::TrainFilesWithProfiler, workers_[i].get())); + wait_futures_.emplace_back( + pool[i]->Run([this, i]() { workers_[i]->TrainFilesWithProfiler(); })); } } } void BoxPSTrainer::Finalize() { - for (auto& th : worker_threads_) { - th.join(); + for (auto& th : wait_futures_) { + th.wait(); } if (async_mode_) { // must be after train thread, otherwise the ps_buffer_ will be closed first diff --git a/paddle/fluid/framework/boxps_worker.cc b/paddle/fluid/framework/boxps_worker.cc index 5516fdbc6499d..385185154a24e 100644 --- a/paddle/fluid/framework/boxps_worker.cc +++ b/paddle/fluid/framework/boxps_worker.cc @@ -25,6 +25,10 @@ limitations under the License. */ #include "paddle/fluid/platform/gpu_info.h" #include "paddle/fluid/platform/lodtensor_printer.h" +#include "paddle/fluid/platform/collective_helper.h" +#include "paddle/fluid/platform/nccl_helper.h" + +DECLARE_bool(enable_sync_dense_moment); namespace paddle { namespace framework { @@ -232,53 +236,162 @@ void BoxPSAsynDenseTable::PushDense(const platform::Place& place, ps_buffer_->Send(&grad); } +static const int DenseKStepNode = 1; +static const int DenseKStepALL = 2; void BoxPSWorker::Initialize(const TrainerDesc& desc) { dev_ctx_ = platform::DeviceContextPool::Instance().Get(place_); + node_size_ = boxps::MPICluster::Ins().size(); + device_num_ = platform::GetCUDADeviceCount(); } void BoxPSWorker::SetDenseTable(BoxPSAsynDenseTable* dense) { dense_table_ = dense; dense_table_->ReShape(place_); } - -void BoxPSWorker::AutoSetCPUAffinity(bool reuse) { - std::vector& train_cores = boxps::get_train_cores(); - if (train_cores.empty()) { - LOG(WARNING) << "not found binding train cores"; - return; +int BoxPSWorker::CheckNeedParam(VarDesc* var) { + if (!var->Persistable()) { + return 0; } - int cpuid = train_cores[device_id_]; - cpu_set_t mask; - CPU_ZERO(&mask); - CPU_SET(cpuid, &mask); - pthread_setaffinity_np(pthread_self(), sizeof(mask), &mask); + std::string name = var->Name(); + size_t len = name.length(); + const char* ext = name.c_str() + len - 4; + // .w_0 .b_0 + if (strncmp(ext, ".w_0", 4) == 0 || strncmp(ext, ".b_0", 4) == 0) { + return 1; + } + if (FLAGS_enable_sync_dense_moment) { + if (len < 14) { + return 0; + } + ext = name.c_str() + len - 14; + // .w_0_moment1_0 .b_0_moment2_0 + if (strncmp(ext, ".w_0_moment1_0", 14) == 0 || + strncmp(ext, ".w_0_moment2_0", 14) == 0 || + strncmp(ext, ".b_0_moment1_0", 14) == 0 || + strncmp(ext, ".b_0_moment2_0", 14) == 0) { + return 2; + } + } + return 0; } +int64_t BoxPSWorker::AllocParamTensor(int64_t* pad_len) { + auto& block = program_->Block(0); + // init var and copy persistable + int64_t total_param_len = 0; + int64_t total_moment_len = 0; + for (auto& var : block.AllVars()) { + std::string name = var->Name(); + if (!var->Persistable()) { + continue; + } + int flag = CheckNeedParam(var); + if (flag == 0) { + continue; + } + const LoDTensor& root_tensor = root_scope_->FindVar(name)->Get(); + int64_t numel = root_tensor.numel(); + if (flag == 1) { + total_param_len += numel; + // grad_params->insert(std::make_pair(name + "@GRAD", numel)); + } else { + total_moment_len += numel; + } + // VLOG(0) << "param name:" << name; + } + *pad_len = 0; + int64_t all_sync_param_len = total_param_len + total_moment_len; + if (sync_mode_ == DenseKStepNode || (node_size_ > 1 && !one_ring_)) { + if ((all_sync_param_len % device_num_) != 0) { + *pad_len = (device_num_ - (all_sync_param_len % device_num_)); + all_sync_param_len += *pad_len; + } + } + VLOG(2) << "param length:" << total_param_len + << ", sync length:" << all_sync_param_len + << ", sync mode:" << sync_mode_ << ", node size:" << node_size_ + << ", device num:" << device_num_ << ", one ring:" << one_ring_; + param_sync_.mutable_data({all_sync_param_len, 1}, place_); + return total_param_len; +} void BoxPSWorker::CreateDeviceResource(const ProgramDesc& main_prog) { program_.reset(new ProgramDesc(main_prog)); for (auto& op_desc : program_->Block(0).AllOps()) { ops_.push_back(OpRegistry::CreateOp(*op_desc)); } + + int64_t pad_len = 0; + AllocParamTensor(&pad_len); + auto& block = program_->Block(0); thread_scope_ = &(root_scope_->NewScope()); + + int64_t offset = 0; // init var and copy persistable for (auto& var : block.AllVars()) { std::string name = var->Name(); if (!var->Persistable()) { auto* ptr = thread_scope_->Var(name); - // printf("init tensor var name: %s\n", var->Name().c_str()); InitializeVariable(ptr, var->GetType()); } else { const LoDTensor& root_tensor = root_scope_->FindVar(name)->Get(); LoDTensor* gpu_tensor = thread_scope_->Var(name)->GetMutable(); + if (CheckNeedParam(var)) { + auto dim = root_tensor.dims(); + size_t len = root_tensor.numel(); + gpu_tensor->ShareDataWith(param_sync_.Slice(offset, offset + len)) + .Resize(dim); + offset += len; + } TensorCopy(*static_cast(&root_tensor), place_, static_cast(gpu_tensor)); } } + CHECK(offset <= (param_sync_.numel() - pad_len)); } +void BoxPSWorker::SyncParam(void) { + if (sync_mode_ == DenseKStepNode && node_size_ == 1) { + return; + } + auto box_ptr = BoxWrapper::GetInstance(); + box_ptr->DenseNcclTimer(device_id_, false, 0x03); + auto comm = platform::NCCLCommContext::Instance().Get(0, device_id_); + auto stream = static_cast(dev_ctx_)->stream(); + + PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamSynchronize(stream)); + box_ptr->DenseNcclTimer(device_id_, true, 0x02); + + int64_t numel = param_sync_.numel(); + float* sendbuff = param_sync_.data(); + + if (sync_mode_ == DenseKStepNode || + (node_size_ > 1 && sync_mode_ == DenseKStepALL && + !one_ring_)) { // KStep Node + int part_param_len = numel / device_num_; + float* recv_ptr = &sendbuff[device_id_ * part_param_len]; + + PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclGroupStart()); + PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclReduceScatter( + sendbuff, recv_ptr, part_param_len, ncclFloat32, ncclSum, comm->comm(), + stream)); + CHECK(box_ptr->SyncDense(stream, part_param_len, recv_ptr, recv_ptr, + device_id_, false)); + PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclGroupEnd()); + PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclAllGather( + recv_ptr, sendbuff, part_param_len, ncclFloat32, comm->comm(), stream)); + } else if (sync_mode_ == DenseKStepALL) { // KStep ALL + PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclAllReduce( + sendbuff, sendbuff, numel, ncclFloat32, ncclSum, comm->comm(), stream)); + } else { + } + const float scale = 1.0 / (device_num_ * node_size_); + TensorScaleValue(place_, param_sync_, ¶m_sync_, scale); + PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamSynchronize(stream)); + box_ptr->DenseNcclTimer(device_id_, true, 0x01); +} int BoxPSWorker::PackBatchTask(void) { device_reader_->AssignFeedVar(*thread_scope_); return device_reader_->Next(); @@ -301,14 +414,16 @@ inline void AddAucMonitor(const Scope* scope, const platform::Place& place) { void BoxPSWorker::TrainFiles() { VLOG(3) << "begin gpubox_worker TrainFiles"; - AutoSetCPUAffinity(true); + platform::Timer timer; + timer.Start(); int64_t accum_num = 0; int batch_size = 0; if (device_reader_ != nullptr) { device_reader_->Start(); } - + int step = 0; + platform::SetDeviceId(device_id_); while ((batch_size = PackBatchTask()) > 0) { VLOG(3) << "begin running ops, batch size:" << batch_size; if (dense_table_) { @@ -319,14 +434,28 @@ void BoxPSWorker::TrainFiles() { } if (dense_table_) { dense_table_->PushDense(place_, *thread_scope_); + } else if (sync_mode_ == DenseKStepNode || sync_mode_ == DenseKStepALL) { + if (step > param_sync_step_) { + step = 0; + SyncParam(); + } } AddAucMonitor(thread_scope_, place_); accum_num += batch_size; thread_scope_->DropKids(); + ++step; + } + // sync param step + if (sync_mode_ == DenseKStepNode || sync_mode_ == DenseKStepALL) { + SyncParam(); } dev_ctx_->Wait(); thread_scope_->DropKids(); + + timer.Pause(); + auto box_ptr = BoxWrapper::GetInstance(); + box_ptr->PrintSyncTimer(device_id_, timer.ElapsedSec()); } /** static @@ -388,7 +517,6 @@ class GPUOpMemStat { */ void BoxPSWorker::TrainFilesWithProfiler() { VLOG(3) << "begin section_worker TrainFiles with profiler"; - AutoSetCPUAffinity(true); int64_t step_cnt = 0; int64_t accum_num = 0; @@ -416,7 +544,7 @@ void BoxPSWorker::TrainFilesWithProfiler() { // print_hbm_mem(device_id_, "BoxPSWorker"); // // GPUOpMemStat op_mem(device_id_); - + platform::SetDeviceId(device_id_); outer_timer.Start(); while (true) { main_timer.Resume(); diff --git a/paddle/fluid/framework/channel.h b/paddle/fluid/framework/channel.h index 503f1513aad20..9e606acc221da 100644 --- a/paddle/fluid/framework/channel.h +++ b/paddle/fluid/framework/channel.h @@ -157,6 +157,19 @@ class ChannelObject { p.resize(finished); return finished; } + // 这个接口就是只读取一次数据,最多长度为size + size_t ReadOnce(std::vector& p, size_t size) { // NOLINT + if (size == 0) { + return 0; + } + std::unique_lock lock(mutex_); + p.resize(size); + size_t finished = Read(size, &p[0], lock, true); + p.resize(finished); + Notify(); + + return finished; + } size_t ReadAll(std::vector& p) { // NOLINT p.clear(); @@ -241,7 +254,8 @@ class ChannelObject { return !closed_; } - size_t Read(size_t n, T* p, std::unique_lock& lock) { // NOLINT + size_t Read(size_t n, T* p, std::unique_lock& lock, // NOLINT + bool once = false) { // NOLINT size_t finished = 0; CHECK(n <= MaxCapacity() - reading_count_); reading_count_ += n; @@ -252,6 +266,9 @@ class ChannelObject { data_.pop_front(); } reading_count_ -= m; + if (once && m > 0) { + break; + } } reading_count_ -= n - finished; return finished; diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index 1eec3b17ae9a4..bde931950af7a 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -98,7 +98,7 @@ class BufferedLineFileReader { x.append(ptr, ret); } } - if (!x.empty()) { + if (!is_error() && !x.empty()) { ++lines; if (lines > skip_lines) { if (!func(x)) { @@ -146,7 +146,7 @@ class BufferedLineFileReader { x.append(ptr, ret); } } - if (!x.empty()) { + if (!is_error() && !x.empty()) { ++lines; if (lines > skip_lines && uniform_distribution_(random_engine_) < sample_rate_) { @@ -3016,8 +3016,12 @@ bool SlotPaddleBoxDataFeed::ParseOneInstance(const std::string& line, } void SlotPaddleBoxDataFeed::UnrollInstance(std::vector& items) { + if (parser_so_path_.empty()) { + return; + } paddle::framework::ISlotParser* parser = global_parser_pool().Get(parser_so_path_, all_slots_info_); + CHECK(parser != nullptr); if (parser->UnrollInstance( items,items.size(), [this](std::vector & release) { diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc index 00c2c5fcd8e3d..6a9b2ab9dbc3d 100644 --- a/paddle/fluid/framework/data_set.cc +++ b/paddle/fluid/framework/data_set.cc @@ -37,6 +37,7 @@ DECLARE_bool(padbox_dataset_disable_shuffle); DECLARE_bool(padbox_dataset_disable_polling); +DECLARE_bool(padbox_dataset_enable_unrollinstance); namespace paddle { namespace framework { @@ -955,22 +956,11 @@ void MultiSlotDataset::GenerateLocalTablesUnlock(int table_id, int feadim, std::vector threads(read_thread_num); consume_task_pool_.resize(consume_thread_num); for (size_t i = 0; i < consume_task_pool_.size(); i++) { - consume_task_pool_[i].reset(new ::ThreadPool(1)); - } - auto consume_func = [&local_map_tables](int shard_id, int feadim, - std::vector& keys) { - for (auto k : keys) { - if (local_map_tables[shard_id].find(k) == - local_map_tables[shard_id].end()) { - local_map_tables[shard_id][k] = std::vector(feadim, 0); - } - } - }; - auto gen_func = [this, &shard_num, &feadim, &local_map_tables, - &consume_func](int i) { + consume_task_pool_[i].reset(new paddle::framework::ThreadPool(1)); + } + auto gen_func = [this, &shard_num, &feadim, &local_map_tables](int i) { std::vector vec_data; std::vector> task_keys(shard_num); - std::vector> task_futures; this->multi_output_channel_[i]->Close(); this->multi_output_channel_[i]->ReadAll(vec_data); for (size_t j = 0; j < vec_data.size(); j++) { @@ -980,9 +970,18 @@ void MultiSlotDataset::GenerateLocalTablesUnlock(int table_id, int feadim, } } + std::vector> task_futures; for (int shard_id = 0; shard_id < shard_num; shard_id++) { - task_futures.emplace_back(consume_task_pool_[shard_id]->enqueue( - consume_func, shard_id, feadim, task_keys[shard_id])); + auto& keys = task_keys[shard_id]; + task_futures.emplace_back(consume_task_pool_[shard_id]->Run( + [this, &local_map_tables, shard_id, feadim, &keys]() { + for (auto k : keys) { + if (local_map_tables[shard_id].find(k) == + local_map_tables[shard_id].end()) { + local_map_tables[shard_id][k] = std::vector(feadim, 0); + } + } + })); } multi_output_channel_[i]->Open(); @@ -1396,33 +1395,52 @@ void MultiSlotDataset::SlotsShuffle( class PadBoxSlotDataConsumer : public boxps::DataConsumer { public: explicit PadBoxSlotDataConsumer(PadBoxSlotDataset* dataset) - : _dataset(dataset) { - BoxWrapper::data_shuffle_->register_handler(this); + : _dataset(dataset), _service_id(-1) { + _service_id = BoxWrapper::data_shuffle_->register_handler(this); + CHECK_GE(_service_id, 0); } virtual ~PadBoxSlotDataConsumer() { - BoxWrapper::data_shuffle_->register_handler(nullptr); + CHECK_GE(BoxWrapper::data_shuffle_->register_handler(this), 0); } virtual void on_receive(const int client_id, const char* buff, int len) { _dataset->ReceiveSuffleData(client_id, buff, len); } + public: + void send_message_callback(const int rank_id, const char* buf, int len, + boxps::ResultCallback* callback) { + int client_id = (_service_id << 16) | rank_id; + BoxWrapper::data_shuffle_->send_message_callback(client_id, buf, len, + callback); + } + private: PadBoxSlotDataset* _dataset; + int _service_id; }; // paddlebox PadBoxSlotDataset::PadBoxSlotDataset() { mpi_size_ = boxps::MPICluster::Ins().size(); mpi_rank_ = boxps::MPICluster::Ins().rank(); SlotRecordPool(); + + auto boxps_ptr = BoxWrapper::GetInstance(); + int thread_num = boxps_ptr->GetFeedpassThreadNum(); + if (thread_num > FLAGS_padbox_dataset_merge_thread_num) { + thread_num = FLAGS_padbox_dataset_merge_thread_num; + } + merge_thread_num_ = thread_num; } PadBoxSlotDataset::~PadBoxSlotDataset() {} // create input channel and output channel void PadBoxSlotDataset::CreateChannel() { if (input_channel_ == nullptr) { input_channel_ = MakeChannel(); + input_channel_->SetBlockSize(10240); } if (shuffle_channel_ == nullptr) { shuffle_channel_ = MakeChannel(); + shuffle_channel_->SetBlockSize(10240); } } // set filelist, file_idx_ will reset to zero. @@ -1439,50 +1457,145 @@ void PadBoxSlotDataset::SetFileList(const std::vector& filelist) { } file_idx_ = 0; } +inline paddle::framework::ThreadPool* GetThreadPool(int thread_num) { + static std::shared_ptr thread_pool = nullptr; + if (thread_pool == nullptr) { + thread_pool.reset(new paddle::framework::ThreadPool(thread_num)); + } + return thread_pool.get(); +} +inline paddle::framework::ThreadPool* GetMergePool(int thread_num) { + static std::shared_ptr thread_pool = nullptr; + if (thread_pool == nullptr) { + thread_pool.reset(new paddle::framework::ThreadPool(thread_num)); + } + return thread_pool.get(); +} +inline paddle::framework::ThreadPool* GetShufflePool(int thread_num) { + static std::shared_ptr thread_pool = nullptr; + if (thread_pool == nullptr) { + thread_pool.reset(new paddle::framework::ThreadPool(thread_num)); + } + return thread_pool.get(); +} +int PadBoxSlotDataset::GetMaxShuffleThreadId(void) { + double rate = static_cast(shuffle_thread_num_) / + static_cast(thread_num_); + int thread_num = static_cast(rate * read_ins_ref_); + int half_num = static_cast(shuffle_thread_num_ >> 1); + if (thread_num < half_num) { + return half_num; + } + return thread_num; +} +int PadBoxSlotDataset::GetMaxMergeThreadId(void) { + double rate = + static_cast(merge_thread_num_) / static_cast(thread_num_); + int half_num = static_cast(merge_thread_num_ >> 1); + int thread_num = static_cast(rate * read_ins_ref_); + if (thread_num < half_num) { + return half_num; + } + return thread_num; +} +void PadBoxSlotDataset::CheckThreadPool(void) { + wait_futures_.clear(); + if (thread_pool_ != nullptr && merge_pool_ != nullptr) { + return; + } + used_fea_index_.clear(); + auto feed_obj = reinterpret_cast(readers_[0].get()); + feed_obj->GetUsedSlotIndex(&used_fea_index_); + + // read ins thread + thread_pool_ = GetThreadPool(thread_num_); + // merge thread + merge_pool_ = GetMergePool(merge_thread_num_); + // shuffle thread + if (!FLAGS_padbox_dataset_disable_shuffle && mpi_size_ > 1) { + shuffle_pool_ = GetShufflePool(shuffle_thread_num_); + } -static void SetCPUAffinity(int tid, bool one_by_one = false) { std::vector& cores = boxps::get_readins_cores(); if (cores.empty()) { - VLOG(0) << "not found binding read ins thread cores"; return; } - - size_t core_num = cores.size(); - cpu_set_t mask; - CPU_ZERO(&mask); - if (one_by_one) { - CPU_SET(cores[tid % core_num], &mask); - } else { - for (size_t i = 0; i < core_num; ++i) { - CPU_SET(cores[i], &mask); - } + thread_pool_->SetCPUAffinity(cores, false); + merge_pool_->SetCPUAffinity(cores, false); + if (shuffle_pool_ != nullptr) { + shuffle_pool_->SetCPUAffinity(cores, false); } - pthread_setaffinity_np(pthread_self(), sizeof(mask), &mask); - // VLOG(0) << "binding read ins thread_id = " << tid << ", cpunum = " << - // core_num; } -// load all data into memory -void PadBoxSlotDataset::LoadIntoMemory() { - VLOG(3) << "DatasetImpl::LoadIntoMemory() begin"; - platform::Timer timeline; - timeline.Start(); - std::vector load_threads; - std::vector shuffle_threads; +// pre load +void PadBoxSlotDataset::PreLoadIntoMemory() { + CheckThreadPool(); + LoadIndexIntoMemory(); + read_ins_ref_ = thread_num_; + for (int64_t i = 0; i < thread_num_; ++i) { + wait_futures_.emplace_back(thread_pool_->Run([this, i]() { + platform::Timer timer; + timer.Start(); + readers_[i]->LoadIntoMemory(); + timer.Pause(); + double span = timer.ElapsedSec(); + if (max_read_ins_span_ < span) { + max_read_ins_span_ = span; + } + if (min_read_ins_span_ == 0 || min_read_ins_span_ > span) { + min_read_ins_span_ = span; + } + if (--read_ins_ref_ == 0) { + input_channel_->Close(); + other_timer_.Start(); + VLOG(0) << "read ins thread end, max:" << max_read_ins_span_ + << ", min:" << min_read_ins_span_; + } + })); + } + // dualbox global data shuffle if (!FLAGS_padbox_dataset_disable_shuffle && mpi_size_ > 1) { finished_counter_ = mpi_size_; mpi_flags_.assign(mpi_size_, 1); VLOG(3) << "RegisterClientToClientMsgHandler"; data_consumer_ = reinterpret_cast(new PadBoxSlotDataConsumer(this)); VLOG(3) << "RegisterClientToClientMsgHandler done"; + + ShuffleData(shuffle_thread_num_); + MergeInsKeys(shuffle_channel_); + } else { + MergeInsKeys(input_channel_); + } +} +void PadBoxSlotDataset::WaitPreLoadDone() { + for (auto& f : wait_futures_) { + f.wait(); + } + if (data_consumer_ != nullptr) { + delete reinterpret_cast(data_consumer_); + data_consumer_ = nullptr; + } + if (FLAGS_padbox_dataset_enable_unrollinstance) { + UnrollInstance(); } + VLOG(1) << "PadBoxSlotDataset::WaitPreLoadDone() end" + << ", memory data size=" << input_records_.size() + << ", cost time=" << max_read_ins_span_ << " seconds"; +} +// load all data into memory +void PadBoxSlotDataset::LoadIntoMemory() { + VLOG(3) << "DatasetImpl::LoadIntoMemory() begin"; + CheckThreadPool(); + LoadIndexIntoMemory(); + + platform::Timer timeline; + timeline.Start(); - std::atomic ref(thread_num_); + read_ins_ref_ = thread_num_; for (int64_t i = 0; i < thread_num_; ++i) { - load_threads.push_back(std::thread([this, i, &ref]() { - SetCPUAffinity(i, false); + wait_futures_.emplace_back(thread_pool_->Run([this, i]() { readers_[i]->LoadIntoMemory(); - if (--ref == 0) { + if (--read_ins_ref_ == 0) { input_channel_->Close(); } })); @@ -1490,89 +1603,93 @@ void PadBoxSlotDataset::LoadIntoMemory() { // dualbox global data shuffle if (!FLAGS_padbox_dataset_disable_shuffle && mpi_size_ > 1) { - ShuffleData(&shuffle_threads, shuffle_thread_num_); + finished_counter_ = mpi_size_; + mpi_flags_.assign(mpi_size_, 1); + VLOG(3) << "RegisterClientToClientMsgHandler"; + data_consumer_ = reinterpret_cast(new PadBoxSlotDataConsumer(this)); + VLOG(3) << "RegisterClientToClientMsgHandler done"; + + ShuffleData(shuffle_thread_num_); MergeInsKeys(shuffle_channel_); } else { MergeInsKeys(input_channel_); } - - for (std::thread& t : load_threads) { - t.join(); - } - - if (!shuffle_threads.empty()) { - for (std::thread& t : shuffle_threads) { - t.join(); - } + // wait all thread finish + for (auto& f : wait_futures_) { + f.wait(); } if (data_consumer_ != nullptr) { delete reinterpret_cast(data_consumer_); data_consumer_ = nullptr; } - // shuffle_channel_->Clear(); - // input_channel_->Clear(); - UnrollInstance(); + if (FLAGS_padbox_dataset_enable_unrollinstance) { + UnrollInstance(); + } timeline.Pause(); + VLOG(1) << "PadBoxSlotDataset::LoadIntoMemory() end" << ", memory data size=" << input_records_.size() << ", cost time=" << timeline.ElapsedSec() << " seconds"; } // add fea keys void PadBoxSlotDataset::MergeInsKeys(const Channel& in) { - platform::Timer timeline; - timeline.Start(); - - std::vector feed_threads; - auto boxps_ptr = BoxWrapper::GetInstance(); - - std::vector used_fea_index; - auto feed_obj = reinterpret_cast(readers_[0].get()); - feed_obj->GetUsedSlotIndex(&used_fea_index); - + merge_ins_ref_ = merge_thread_num_; input_records_.clear(); - boxps::PSAgentBase* agent = boxps_ptr->GetAgent(); - - int thread_num = boxps_ptr->GetFeedpassThreadNum(); - if (thread_num > FLAGS_padbox_dataset_merge_thread_num) { - thread_num = FLAGS_padbox_dataset_merge_thread_num; - } - std::mutex mutex; - for (int tid = 0; tid < thread_num; ++tid) { - feed_threads.push_back(std::thread([this, &in, agent, tid, &mutex, - &used_fea_index, &feed_obj]() { - SetCPUAffinity(tid, false); + CHECK(p_agent_ != nullptr); + for (int tid = 0; tid < merge_thread_num_; ++tid) { + wait_futures_.emplace_back(merge_pool_->Run([this, &in, tid]() { + // VLOG(0) << "merge thread id: " << tid << "start"; + platform::Timer timer; + timer.Start(); + auto feed_obj = + reinterpret_cast(readers_[0].get()); size_t num = 0; std::vector datas; - while (in->Read(datas)) { + while (in->ReadOnce(datas, 10240)) { for (auto& rec : datas) { - for (auto& idx : used_fea_index) { + for (auto& idx : used_fea_index_) { uint64_t* feas = rec->slot_uint64_feasigns_.get_values(idx, &num); if (num > 0) { - agent->AddKeys(feas, num, tid); + p_agent_->AddKeys(feas, num, tid); } } feed_obj->ExpandSlotRecord(&rec); } - mutex.lock(); + merge_mutex_.lock(); for (auto& t : datas) { input_records_.push_back(std::move(t)); } - mutex.unlock(); + merge_mutex_.unlock(); datas.clear(); + if (tid > GetMaxMergeThreadId()) { + break; + } } datas.shrink_to_fit(); - })); - } + timer.Pause(); - for (auto& t : feed_threads) { - t.join(); + double span = timer.ElapsedSec(); + if (max_merge_ins_span_ < span) { + max_merge_ins_span_ = span; + } + if (min_merge_ins_span_ == 0 || min_merge_ins_span_ > span) { + min_merge_ins_span_ = span; + } + // end merge thread + if (--merge_ins_ref_ == 0) { + other_timer_.Pause(); + VLOG(0) << "merge thread id: " << tid << ", span time: " << span + << ", max:" << max_merge_ins_span_ + << ", min:" << min_merge_ins_span_; + } + // else { + // VLOG(0) << "merge thread id: " << tid + // << ", span time: " << span; + // } + })); } - timeline.Pause(); - VLOG(1) << "PadBoxSlotDataset::MergeInsKeys end" - << ", memory data size=" << input_records_.size() - << ", cost time=" << timeline.ElapsedSec() << " seconds"; } // release all memory data void PadBoxSlotDataset::ReleaseMemory() { @@ -1647,23 +1764,20 @@ class ShuffleResultWaitGroup : public boxps::ResultCallback { int counter_ = 0; }; // shuffle data -void PadBoxSlotDataset::ShuffleData(std::vector* shuffle_threads, - int thread_num) { - if (thread_num == -1) { - thread_num = thread_num_; - } +void PadBoxSlotDataset::ShuffleData(int thread_num) { + CHECK_GT(thread_num, 0); VLOG(3) << "start global shuffle threads, num = " << thread_num; shuffle_counter_ = thread_num; for (int tid = 0; tid < thread_num; ++tid) { - shuffle_threads->push_back(std::thread([this, tid]() { - SetCPUAffinity(tid, false); + wait_futures_.emplace_back(shuffle_pool_->Run([this, tid]() { std::vector data; std::vector loc_datas; std::vector releases; std::vector ars(mpi_size_); - + PadBoxSlotDataConsumer* handler = + reinterpret_cast(data_consumer_); ShuffleResultWaitGroup wg; - while (input_channel_->Read(data)) { + while (input_channel_->ReadOnce(data, 10240)) { for (auto& t : data) { int client_id = 0; if (enable_pv_merge_) { // shuffle by pv @@ -1697,14 +1811,16 @@ void PadBoxSlotDataset::ShuffleData(std::vector* shuffle_threads, continue; } ++send_count; - BoxWrapper::data_shuffle_->send_message_callback(i, ar.Buffer(), - ar.Length(), &wg); + handler->send_message_callback(i, ar.Buffer(), ar.Length(), &wg); ar.Clear(); } wg.add(send_count); data.clear(); loc_datas.clear(); + if (tid > GetMaxShuffleThreadId()) { + break; + } } wg.wait(); @@ -1717,7 +1833,7 @@ void PadBoxSlotDataset::ShuffleData(std::vector* shuffle_threads, if (i == mpi_rank_) { continue; } - BoxWrapper::data_shuffle_->send_message_callback(i, NULL, 0, &wg); + handler->send_message_callback(i, NULL, 0, &wg); } wg.wait(); // local closed channel @@ -2060,77 +2176,10 @@ void PadBoxSlotDataset::UnrollInstance() { feed_obj->UnrollInstance(input_records_); } -void InputTableDataset::LoadIntoMemory() { - VLOG(3) << "InputTableDataset::LoadIntoMemory() begin"; - - platform::Timer timer; - timer.Start(); - LoadIndexIntoMemory(); - timer.Pause(); - VLOG(1) << "load index into memory cost: " << timer.ElapsedSec(); - - platform::Timer timeline; - timeline.Start(); - std::vector load_threads; - std::vector shuffle_threads; - - if (mpi_size_ > 1) { - finished_counter_ = mpi_size_; - mpi_flags_.assign(mpi_size_, 1); - VLOG(3) << "RegisterClientToClientMsgHandler"; - data_consumer_ = reinterpret_cast(new PadBoxSlotDataConsumer(this)); - VLOG(3) << "RegisterClientToClientMsgHandler done"; - } - - std::atomic ref(thread_num_); - for (int64_t i = 0; i < thread_num_; ++i) { - load_threads.push_back(std::thread([this, i, &ref]() { - SetCPUAffinity(i, false); - readers_[i]->LoadIntoMemory(); - if (--ref == 0) { - input_channel_->Close(); - } - })); - } - - // dualbox global data shuffle - if (mpi_size_ > 1) { - ShuffleData(&shuffle_threads, shuffle_thread_num_); - MergeInsKeys(shuffle_channel_); - } else { - MergeInsKeys(input_channel_); - } - - for (std::thread& t : load_threads) { - t.join(); - } - - if (!shuffle_threads.empty()) { - for (std::thread& t : shuffle_threads) { - t.join(); - } - } - - if (data_consumer_ != nullptr) { - delete reinterpret_cast(data_consumer_); - data_consumer_ = nullptr; - } - // shuffle_channel_->Clear(); - // input_channel_->Clear(); - - timeline.Pause(); - VLOG(1) << "PadBoxSlotDataset::LoadIntoMemory() end" - << ", memory data size=" << input_records_.size() - << ", cost time=" << timeline.ElapsedSec() << " seconds"; - timeline.Start(); - UnrollInstance(); - timeline.Pause(); - VLOG(1) << "PadBoxSlotDataset::UnrollInstance(), cost time= " << timer.ElapsedSec(); - -} - void InputTableDataset::LoadIndexIntoMemory() { VLOG(3) << "LoadIndexIntoMemory()"; + platform::Timer timer; + timer.Start(); std::vector> readers; size_t file_idx = 0; @@ -2145,26 +2194,16 @@ void InputTableDataset::LoadIndexIntoMemory() { readers[i]->SetFileList(index_filelist_); } - std::vector threads; + std::vector> wait_futures; for (int i = 0; i < thread_num_; ++i) { - threads.push_back(std::thread([i, &readers]() { - SetCPUAffinity(i, false); - readers[i]->LoadIntoMemory(); - })); + wait_futures.emplace_back( + thread_pool_->Run([i, &readers]() { readers[i]->LoadIntoMemory(); })); } - - for (auto& t : threads) { - t.join(); + for (auto& f : wait_futures) { + f.wait(); } - - VLOG(3) << "end LoadIndexIntoMemory()"; -} - -void InputTableDataset::UnrollInstance() { - VLOG(3) << "DatasetImpl::LoadIntoMemory() begin"; - auto feed_obj = reinterpret_cast(readers_[0].get()); - feed_obj->UnrollInstance(input_records_); - + timer.Pause(); + VLOG(1) << "end LoadIndexIntoMemory() cost: " << timer.ElapsedSec(); } #endif diff --git a/paddle/fluid/framework/data_set.h b/paddle/fluid/framework/data_set.h index 17eee9da229f5..c553f2099747a 100644 --- a/paddle/fluid/framework/data_set.h +++ b/paddle/fluid/framework/data_set.h @@ -14,8 +14,6 @@ #pragma once -#include - #include #include #include // NOLINT @@ -27,9 +25,12 @@ #include #include "paddle/fluid/framework/data_feed.h" +#include "paddle/fluid/framework/threadpool.h" DECLARE_int32(padbox_dataset_shuffle_thread_num); DECLARE_int32(padbox_dataset_merge_thread_num); - +namespace boxps { +class PSAgentBase; +} namespace paddle { namespace framework { @@ -307,7 +308,8 @@ class DatasetImpl : public Dataset { int preload_thread_num_; std::mutex global_index_mutex_; int64_t global_index_ = 0; - std::vector> consume_task_pool_; + std::vector> + consume_task_pool_; std::vector input_records_; // only for paddleboxdatafeed }; @@ -376,16 +378,30 @@ class PadBoxSlotDataset : public DatasetImpl { virtual void UnrollInstance(); + // pre load + virtual void LoadIndexIntoMemory() {} + virtual void PreLoadIntoMemory(); + virtual void WaitPreLoadDone(); + protected: // shuffle data - virtual void ShuffleData(std::vector* shuffle_threads, - int thread_num = -1); + virtual void ShuffleData(int thread_num = -1); public: virtual void ReceiveSuffleData(const int client_id, const char* msg, int len); + public: + void SetPSAgent(boxps::PSAgentBase* agent) { p_agent_ = agent; } + boxps::PSAgentBase* GetPSAgent(void) { return p_agent_; } + double GetReadInsTime(void) { return max_read_ins_span_; } + double GetOtherTime(void) { return other_timer_.ElapsedSec(); } + double GetMergeTime(void) { return max_merge_ins_span_; } + protected: void MergeInsKeys(const Channel& in); + void CheckThreadPool(void); + int GetMaxShuffleThreadId(void); + int GetMaxMergeThreadId(void); protected: Channel shuffle_channel_ = nullptr; @@ -398,18 +414,30 @@ class PadBoxSlotDataset : public DatasetImpl { std::atomic shuffle_counter_{0}; void* data_consumer_ = nullptr; std::atomic receiver_cnt_{0}; + boxps::PSAgentBase* p_agent_ = nullptr; + paddle::framework::ThreadPool* thread_pool_ = nullptr; + std::vector> wait_futures_; + double max_read_ins_span_ = 0; + double min_read_ins_span_ = 0; + platform::Timer other_timer_; + double max_merge_ins_span_ = 0; + double min_merge_ins_span_ = 0; + std::atomic read_ins_ref_{0}; + std::atomic merge_ins_ref_{0}; + std::mutex merge_mutex_; + std::vector used_fea_index_; + int merge_thread_num_ = FLAGS_padbox_dataset_merge_thread_num; + paddle::framework::ThreadPool* merge_pool_ = nullptr; + paddle::framework::ThreadPool* shuffle_pool_ = nullptr; }; class InputTableDataset : public PadBoxSlotDataset { public: - virtual void LoadIntoMemory(); virtual void SetIndexFileList(const std::vector& filelist) { index_filelist_ = filelist; } - virtual void UnrollInstance(); + virtual void LoadIndexIntoMemory(); private: - void LoadIndexIntoMemory(); - std::vector index_filelist_; }; #endif diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index f7dd8d9ae23c2..89adadda349ec 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -451,7 +451,7 @@ class HeterBoxWorker : public HogwildWorker { virtual void CacheProgram(const ProgramDesc& main_program) { new (&program_) ProgramDesc(main_program); } - virtual void ProduceTasks() override; + virtual void ProduceTasks(); virtual void SetStream(const cudaStream_t stream) { copy_stream_ = stream; } virtual void SetEvent(const cudaEvent_t event) { event_ = event; } virtual void TrainFilesWithProfiler() {} @@ -584,34 +584,36 @@ class SectionWorker : public DeviceWorker { #ifdef PADDLE_WITH_BOX_PS class BoxPSAsynDenseTable { - typedef operators::reader::BlockingQueue*> PSBufferQueue; -public: - BoxPSAsynDenseTable(const int device_num); - ~BoxPSAsynDenseTable(); - - void Init(const Scope& root_scope, - const std::vector ¶m_need_sync, - const std::vector &persistable_vars); - void Finalize(void); - - // async - void ReShape(const platform::Place& place); - void PullDense(const platform::Place& place, const Scope& scope); - void PushDense(const platform::Place& place, const Scope& scope); - void AsyncUpdate(); - -private: - int device_num_ = 0; - std::vector> device_grads_; - std::vector async_param_list_; - std::vector ps_; - std::vector async_param_size_; - std::shared_ptr ps_buffer_ = nullptr; - Scope *root_scope_ = nullptr; - - RWLock ps_lock_; - std::thread *update_thread_ = nullptr; - float base_lr_ = -1; + typedef operators::reader::BlockingQueue*> + PSBufferQueue; + + public: + explicit BoxPSAsynDenseTable(const int device_num); + ~BoxPSAsynDenseTable(); + + void Init(const Scope& root_scope, + const std::vector& param_need_sync, + const std::vector& persistable_vars); + void Finalize(void); + + // async + void ReShape(const platform::Place& place); + void PullDense(const platform::Place& place, const Scope& scope); + void PushDense(const platform::Place& place, const Scope& scope); + void AsyncUpdate(); + + private: + int device_num_ = 0; + std::vector> device_grads_; + std::vector async_param_list_; + std::vector ps_; + std::vector async_param_size_; + std::shared_ptr ps_buffer_ = nullptr; + Scope* root_scope_ = nullptr; + + RWLock ps_lock_; + std::thread* update_thread_ = nullptr; + float base_lr_ = -1; }; class BoxPSWorker : public DeviceWorker { @@ -634,11 +636,16 @@ class BoxPSWorker : public DeviceWorker { void SetDeviceIndex(int tid) override { device_id_ = tid; } void SetThreadIndex(int thread_id) { thread_id_ = thread_id; } // Async - void SetDenseTable(BoxPSAsynDenseTable *dense); + void SetDenseTable(BoxPSAsynDenseTable* dense); + void SetParamSyncStep(int step) { param_sync_step_ = step; } + void SetDenseSyncMode(int mode) { sync_mode_ = mode; } + void SetOneRing(bool one_ring) { one_ring_ = one_ring; } protected: int PackBatchTask(void); - void AutoSetCPUAffinity(bool reuse); + int CheckNeedParam(VarDesc* var); + int64_t AllocParamTensor(int64_t* pad_len); + void SyncParam(void); protected: int device_id_; @@ -649,7 +656,13 @@ class BoxPSWorker : public DeviceWorker { platform::DeviceContext* dev_ctx_ = nullptr; // dense async table - BoxPSAsynDenseTable *dense_table_ = nullptr; + BoxPSAsynDenseTable* dense_table_ = nullptr; + Tensor param_sync_; + int param_sync_step_ = 0; + int sync_mode_ = 0; + bool one_ring_ = false; + int device_num_ = 0; + int node_size_ = 1; }; #endif diff --git a/paddle/fluid/framework/fleet/box_wrapper.cc b/paddle/fluid/framework/fleet/box_wrapper.cc index 2e15ab31ee659..15fb30ae81811 100644 --- a/paddle/fluid/framework/fleet/box_wrapper.cc +++ b/paddle/fluid/framework/fleet/box_wrapper.cc @@ -325,6 +325,15 @@ void BasicAucCalculator::compute() { } void BoxWrapper::CheckEmbedSizeIsValid(int embedx_dim, int expand_embed_dim) { + if (feature_type_ == static_cast(boxps::FEATURE_SHARE_EMBEDDING)) { + PADDLE_ENFORCE_EQ( + (embedx_dim % boxps::SHARE_EMBEDDING_NUM), 0, + platform::errors::InvalidArgument( + "SetInstance(): invalid embedx_dim. " + "embedx_dim % boxps::SHARE_EMBEDDING_NUM shoule be 0")); + + embedx_dim = embedx_dim / boxps::SHARE_EMBEDDING_NUM; + } PADDLE_ENFORCE_EQ( embedx_dim_, embedx_dim, platform::errors::InvalidArgument("SetInstance(): invalid embedx_dim. " @@ -353,19 +362,26 @@ void BoxWrapper::PullSparse(const paddle::platform::Place& place, } \ } break -#define PULLSPARSE_CASE(i, ...) \ - case i: { \ - constexpr size_t ExpandDim = i; \ - if (feature_type_ == static_cast(boxps::FEATURE_PCOC)) { \ - PullSparseCase>( \ - place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \ - } else if (feature_type_ == static_cast(boxps::FEATURE_QUANT)) { \ - PullSparseCase>( \ - place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \ - } else { \ - PullSparseCase>( \ - place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \ - } \ +#define PULLSPARSE_CASE(i, ...) \ + case i: { \ + constexpr size_t ExpandDim = i; \ + if (feature_type_ == static_cast(boxps::FEATURE_SHARE_EMBEDDING)) { \ + constexpr size_t SingleEmbedxDim = \ + EmbedxDim / boxps::SHARE_EMBEDDING_NUM; \ + PullSparseCase>( \ + place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \ + } else if (feature_type_ == static_cast(boxps::FEATURE_PCOC)) { \ + PullSparseCase>( \ + place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \ + } else if (feature_type_ == static_cast(boxps::FEATURE_QUANT) || \ + feature_type_ == static_cast(boxps::FEATURE_SHOWCLK)) { \ + PullSparseCase>( \ + place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \ + } else { \ + PullSparseCase>( \ + place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \ + } \ } break CheckEmbedSizeIsValid(hidden_size - cvm_offset_, expand_embed_dim); @@ -404,19 +420,26 @@ void BoxWrapper::PushSparseGrad(const paddle::platform::Place& place, } \ } break -#define PUSHSPARSE_CASE(i, ...) \ - case i: { \ - constexpr size_t ExpandDim = i; \ - if (feature_type_ == static_cast(boxps::FEATURE_PCOC)) { \ - PushSparseGradCase< \ - boxps::FeaturePushValueGpuPCOC>( \ - place, keys, grad_values, slot_lengths, hidden_size, \ - expand_embed_dim, batch_size); \ - } else { \ - PushSparseGradCase>( \ - place, keys, grad_values, slot_lengths, hidden_size, \ - expand_embed_dim, batch_size); \ - } \ +#define PUSHSPARSE_CASE(i, ...) \ + case i: { \ + constexpr size_t ExpandDim = i; \ + if (feature_type_ == static_cast(boxps::FEATURE_SHARE_EMBEDDING)) { \ + constexpr size_t SingleEmbedxDim = \ + EmbedxDim / boxps::SHARE_EMBEDDING_NUM; \ + PushSparseGradCase>(place, keys, grad_values, slot_lengths, \ + hidden_size, expand_embed_dim, \ + batch_size); \ + } else if (feature_type_ == static_cast(boxps::FEATURE_PCOC)) { \ + PushSparseGradCase< \ + boxps::FeaturePushValueGpuPCOC>( \ + place, keys, grad_values, slot_lengths, hidden_size, \ + expand_embed_dim, batch_size); \ + } else { \ + PushSparseGradCase>( \ + place, keys, grad_values, slot_lengths, hidden_size, \ + expand_embed_dim, batch_size); \ + } \ } break CheckEmbedSizeIsValid(hidden_size - cvm_offset_, expand_embed_dim); @@ -509,14 +532,10 @@ void BoxWrapper::EndFeedPass(boxps::PSAgentBase* agent) { int ret = boxps_ptr_->EndFeedPass(agent); PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet( "EndFeedPass failed in BoxPS.")); + RelaseAgent(agent); } void BoxWrapper::BeginPass() { - int gpu_num = platform::GetCUDADeviceCount(); - for (int i = 0; i < gpu_num; ++i) { - DeviceBoxData& dev = device_caches_[i]; - dev.ResetTimer(); - } int ret = boxps_ptr_->BeginPass(); PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet( "BeginPass failed in BoxPS.")); @@ -536,18 +555,6 @@ void BoxWrapper::EndPass(bool need_save_delta) { int ret = boxps_ptr_->EndPass(need_save_delta); PADDLE_ENFORCE_EQ( ret, 0, platform::errors::PreconditionNotMet("EndPass failed in BoxPS.")); - int gpu_num = platform::GetCUDADeviceCount(); - for (int i = 0; i < gpu_num; ++i) { - auto& dev = device_caches_[i]; - LOG(WARNING) << "gpu[" << i - << "] sparse pull span: " << dev.all_pull_timer.ElapsedSec() - << ", boxps span: " << dev.boxps_pull_timer.ElapsedSec() - << ", push span: " << dev.all_push_timer.ElapsedSec() - << ", boxps span:" << dev.boxps_push_timer.ElapsedSec() - << ", dense nccl:" << dev.dense_nccl_timer.ElapsedSec() - << ", sync stream:" << dev.dense_sync_timer.ElapsedSec() - << ", wrapper gpu memory:" << dev.GpuMemUsed() << "MB"; - } } void BoxWrapper::RecordReplace(std::vector* records, @@ -701,6 +708,79 @@ void BoxWrapper::AddReplaceFeasign(boxps::PSAgentBase* p_agent, VLOG(0) << "End AddReplaceFeasign: " << timer.ElapsedMS(); } +//===================== box filemgr =============================== +BoxFileMgr::BoxFileMgr() {} +BoxFileMgr::~BoxFileMgr() { destory(); } +bool BoxFileMgr::init(const std::string& fs_name, const std::string& fs_ugi, + const std::string& conf_path) { + if (mgr_ != nullptr) { + mgr_->destory(); + } + mgr_.reset(boxps::PaddleFileMgr::New()); + auto split = fs_ugi.find(","); + std::string user = fs_ugi.substr(0, split); + std::string pwd = fs_ugi.substr(split + 1); + bool ret = mgr_->initialize(fs_name, user, pwd, conf_path); + if (!ret) { + LOG(WARNING) << "init afs api[" << fs_name << "," << fs_ugi << "," + << conf_path << "] failed"; + mgr_ = nullptr; + } + return ret; +} +void BoxFileMgr::destory(void) { + if (mgr_ == nullptr) { + return; + } + mgr_->destory(); + mgr_ = nullptr; +} +std::vector BoxFileMgr::list_dir(const std::string& path) { + std::vector files; + if (!mgr_->list_dir(path, files)) { + LOG(WARNING) << "list dir path:[" << path << "] failed"; + } + return files; +} +bool BoxFileMgr::makedir(const std::string& path) { + return mgr_->makedir(path); +} +bool BoxFileMgr::exists(const std::string& path) { return mgr_->exists(path); } +bool BoxFileMgr::down(const std::string& remote, const std::string& local) { + return mgr_->down(remote, local); +} +bool BoxFileMgr::upload(const std::string& local, const std::string& remote) { + return mgr_->upload(local, remote); +} +bool BoxFileMgr::remove(const std::string& path) { return mgr_->remove(path); } +int64_t BoxFileMgr::file_size(const std::string& path) { + return mgr_->file_size(path); +} +std::vector> BoxFileMgr::dus( + const std::string& path) { + std::vector> files; + if (!mgr_->dus(path, files)) { + LOG(WARNING) << "dus dir path:[" << path << "] failed"; + } + return files; +} +bool BoxFileMgr::truncate(const std::string& path, const size_t len) { + return mgr_->truncate(path, len); +} +bool BoxFileMgr::touch(const std::string& path) { return mgr_->touch(path); } +bool BoxFileMgr::rename(const std::string& src, const std::string& dest) { + return mgr_->rename(src, dest); +} +std::vector> BoxFileMgr::list_info( + const std::string& path) { + std::vector> files; + if (!mgr_->list_info(path, files)) { + LOG(WARNING) << "list dir info path:[" << path << "] failed"; + } + return files; +} +int64_t BoxFileMgr::count(const std::string& path) { return mgr_->count(path); } + } // end namespace framework } // end namespace paddle #endif diff --git a/paddle/fluid/framework/fleet/box_wrapper.cu b/paddle/fluid/framework/fleet/box_wrapper.cu index da9be7f340c04..8a04cc9ba1fc3 100644 --- a/paddle/fluid/framework/fleet/box_wrapper.cu +++ b/paddle/fluid/framework/fleet/box_wrapper.cu @@ -293,6 +293,34 @@ __global__ void PushCopyExpand(FeaturePushValueGpuType* dest, float** src, } } +template +__global__ void PushCopyBaseShareEmbedding( + FeaturePushValueGpuType* dest, float** src, const int hidden, + const int total_len, const int bs, const int* slot_vector, + const int* total_dims, const int64_t* slot_lens, const int slot_num, + const int* key2slot, const int cvm_offset) { + CUDA_KERNEL_LOOP(i, total_len) { + int x = key2slot[i]; + int y = i - slot_lens[x]; + + auto& dest_val = dest[i]; + dest_val.slot = slot_vector[x]; + // dest_val.show = *(src[x] + y * hidden); + // dest_val.clk = *(src[x] + y * hidden + 1); + // dest_val.embed_g = *(src[x] + y * hidden + 2) * -1. * bs; + float* optr = reinterpret_cast(&dest_val.show); + float* src_val = reinterpret_cast(src[x] + y * hidden); + for (int k = 0; k < cvm_offset; ++k) { + optr[k] = src_val[k]; // support variable length + } + + // for embed_g[SHARE_EMBEDDING_NUM] + for (int e_index = 0; e_index < (cvm_offset - 2); ++e_index) { + dest_val.embed_g[e_index] *= -1. * bs; + } + } +} + //============================== expand nncross =============================== template __global__ void PushCopyBaseNNCross(FeaturePushValueGpuType* dest, float** src, @@ -438,7 +466,8 @@ void BoxWrapper::CopyForPull(const paddle::platform::Place& place, total_values_gpu), \ hidden_size, expand_embed_dim, total_length, gpu_keys, total_dims, \ slot_lens, slot_num, key2slot, pull_embedx_scale_, cvm_offset_); \ - } else if (feature_type_ == static_cast(boxps::FEATURE_QUANT)) { \ + } else if (feature_type_ == static_cast(boxps::FEATURE_QUANT) || \ + feature_type_ == static_cast(boxps::FEATURE_SHOWCLK)) { \ PullCopy><<< \ (total_length + 512 - 1) / 512, 512, 0, stream>>>( \ gpu_values, \ @@ -461,7 +490,24 @@ void BoxWrapper::CopyForPull(const paddle::platform::Place& place, #define EXPAND_EMBED_PULL_CASE2(i, ...) \ case i: { \ constexpr size_t ExpandDim = i; \ - if (feature_type_ == static_cast(boxps::FEATURE_PCOC)) { \ + if (feature_type_ == static_cast(boxps::FEATURE_SHARE_EMBEDDING)) { \ + constexpr size_t SingleEmbedxDim = \ + EmbedxDim / boxps::SHARE_EMBEDDING_NUM; \ + typedef boxps::FeaturePullValueGpuShareEmbedding \ + PullShareEmbedding; \ + PullCopyBase<<<(total_length + 512 - 1) / 512, 512, \ + 0, stream>>>( \ + gpu_values, reinterpret_cast(total_values_gpu), \ + hidden_size, total_length, gpu_keys, total_dims, slot_lens, \ + slot_num, key2slot, cvm_offset_); \ + int embedx_total_length = total_length * EmbedxDim; \ + PullCopyExpand<<< \ + (embedx_total_length + 512 - 1) / 512, 512, 0, stream>>>( \ + gpu_values, reinterpret_cast(total_values_gpu), \ + EmbedxDim, embedx_total_length, total_dims, slot_lens, slot_num, \ + key2slot, pull_embedx_scale_, cvm_offset_); \ + } else if (feature_type_ == static_cast(boxps::FEATURE_PCOC)) { \ PullCopyBase><<< \ (total_length + 512 - 1) / 512, 512, 0, stream>>>( \ gpu_values, \ @@ -479,7 +525,8 @@ void BoxWrapper::CopyForPull(const paddle::platform::Place& place, total_values_gpu), \ EmbedxDim, embedx_total_length, total_dims, slot_lens, slot_num, \ key2slot, pull_embedx_scale_, cvm_offset_); \ - } else if (feature_type_ == static_cast(boxps::FEATURE_QUANT)) { \ + } else if (feature_type_ == static_cast(boxps::FEATURE_QUANT) || \ + feature_type_ == static_cast(boxps::FEATURE_SHOWCLK)) { \ PullCopyBase><<< \ (total_length + 512 - 1) / 512, 512, 0, stream>>>( \ gpu_values, \ @@ -540,7 +587,8 @@ void BoxWrapper::CopyForPull(const paddle::platform::Place& place, (EmbedxDim + ExpandDim), EmbedxDim, ExpandDim, embedx_total_length, \ total_dims, slot_lens, slot_num, key2slot, pull_embedx_scale_, \ cvm_offset_); \ - } else if (feature_type_ == static_cast(boxps::FEATURE_QUANT)) { \ + } else if (feature_type_ == static_cast(boxps::FEATURE_QUANT) || \ + feature_type_ == static_cast(boxps::FEATURE_SHOWCLK)) { \ PullCopyBaseNNCross><<<(total_length + 512 - 1) / 512, 512, 0, stream>>>( \ @@ -661,7 +709,25 @@ void BoxWrapper::CopyForPush(const paddle::platform::Place& place, #define EXPAND_EMBED_PUSH_CASE2(i, ...) \ case i: { \ constexpr size_t ExpandDim = i; \ - if (feature_type_ == static_cast(boxps::FEATURE_PCOC)) { \ + if (feature_type_ == static_cast(boxps::FEATURE_SHARE_EMBEDDING)) { \ + constexpr size_t SingleEmbedxDim = \ + EmbedxDim / boxps::SHARE_EMBEDDING_NUM; \ + typedef boxps::FeaturePushValueGpuShareEmbedding \ + PushShareEmbedding; \ + PushCopyBaseShareEmbedding<<< \ + (total_length + 512 - 1) / 512, 512, 0, stream>>>( \ + reinterpret_cast(total_grad_values_gpu), \ + grad_values, hidden_size, total_length, batch_size, d_slot_vector, \ + total_dims, slot_lens, slot_num, key2slot, cvm_offset_); \ + int embedx_total_length = total_length * EmbedxDim; \ + PushCopyExpand<<< \ + (embedx_total_length + 512 - 1) / 512, 512, 0, stream>>>( \ + reinterpret_cast(total_grad_values_gpu), \ + grad_values, EmbedxDim, embedx_total_length, batch_size, \ + d_slot_vector, total_dims, slot_lens, slot_num, key2slot, \ + cvm_offset_); \ + } else if (feature_type_ == static_cast(boxps::FEATURE_PCOC)) { \ PushCopyBase><<< \ (total_length + 512 - 1) / 512, 512, 0, stream>>>( \ reinterpret_cast< \ diff --git a/paddle/fluid/framework/fleet/box_wrapper.h b/paddle/fluid/framework/fleet/box_wrapper.h index fd7bd1a8978ce..a4b5eb7156088 100644 --- a/paddle/fluid/framework/fleet/box_wrapper.h +++ b/paddle/fluid/framework/fleet/box_wrapper.h @@ -56,7 +56,7 @@ namespace paddle { namespace framework { #ifdef PADDLE_WITH_BOX_PS -#define MAX_GPU_NUM 16 +#define MAX_GPU_NUM 16 class BasicAucCalculator { public: explicit BasicAucCalculator(bool mode_collect_in_gpu = false) @@ -245,9 +245,9 @@ class BoxWrapper { struct DeviceBoxData { LoDTensor keys_tensor; LoDTensor dims_tensor; - std::shared_ptr pull_push_buf = nullptr; - std::shared_ptr gpu_keys_ptr = nullptr; - std::shared_ptr gpu_values_ptr = nullptr; + LoDTensor pull_push_tensor; + LoDTensor keys_ptr_tensor; + LoDTensor values_ptr_tensor; LoDTensor slot_lens; LoDTensor d_slot_vector; @@ -275,15 +275,9 @@ class BoxWrapper { size_t total = 0; total += keys_tensor.memory_size(); total += dims_tensor.memory_size(); - if (pull_push_buf != nullptr) { - total += pull_push_buf->size(); - } - if (gpu_keys_ptr != nullptr) { - total += gpu_keys_ptr->size(); - } - if (gpu_values_ptr != nullptr) { - total += gpu_values_ptr->size(); - } + total += pull_push_tensor.memory_size(); + total += keys_ptr_tensor.memory_size(); + total += values_ptr_tensor.memory_size(); total += slot_lens.memory_size(); total += d_slot_vector.memory_size(); total += keys2slot.memory_size(); @@ -358,7 +352,22 @@ class BoxWrapper { void CheckEmbedSizeIsValid(int embedx_dim, int expand_embed_dim); - boxps::PSAgentBase* GetAgent() { return p_agent_; } + boxps::PSAgentBase* GetAgent() { + boxps::PSAgentBase* p_agent = nullptr; + std::lock_guard lock(mutex_); + if (psagents_.empty()) { + p_agent = boxps::PSAgentBase::GetIns(feedpass_thread_num_); + p_agent->Init(); + } else { + p_agent = psagents_.front(); + psagents_.pop_front(); + } + return p_agent; + } + void RelaseAgent(boxps::PSAgentBase* agent) { + std::lock_guard lock(mutex_); + psagents_.push_back(agent); + } void InitializeGPUAndLoadModel( const char* conf_file, const std::vector& slot_vector, const std::vector& slot_omit_in_feedpass, @@ -368,8 +377,8 @@ class BoxWrapper { VLOG(3) << "Begin InitializeGPU"; std::vector stream_list; int gpu_num = platform::GetCUDADeviceCount(); - CHECK(gpu_num <= MAX_GPU_NUM) << "gpu card num: " - << gpu_num << ", more than max num: " << MAX_GPU_NUM; + CHECK(gpu_num <= MAX_GPU_NUM) << "gpu card num: " << gpu_num + << ", more than max num: " << MAX_GPU_NUM; for (int i = 0; i < gpu_num; ++i) { VLOG(3) << "before get context i[" << i << "]"; platform::CUDADeviceContext* context = @@ -383,8 +392,6 @@ class BoxWrapper { // the second parameter is useless boxps_ptr_->InitializeGPUAndLoadModel(conf_file, -1, stream_list, slot_vector, model_path); - p_agent_ = boxps::PSAgentBase::GetIns(feedpass_thread_num_); - p_agent_->Init(); for (const auto& slot_name : slot_omit_in_feedpass) { slot_name_omited_in_feedpass_.insert(slot_name); } @@ -421,9 +428,11 @@ class BoxWrapper { boxps_ptr_->Finalize(); boxps_ptr_ = nullptr; } - if (p_agent_ != nullptr) { - delete p_agent_; - p_agent_ = nullptr; + if (!psagents_.empty()) { + for (auto agent : psagents_) { + delete agent; + } + psagents_.clear(); } if (device_caches_ != nullptr) { delete device_caches_; @@ -505,7 +514,11 @@ class BoxWrapper { s_instance_->feature_type_ = feature_type; s_instance_->pull_embedx_scale_ = pull_embedx_scale; // ToDo: feature gpu value param set diffent value - if (s_instance_->feature_type_ == static_cast(boxps::FEATURE_PCOC)) { + if (s_instance_->feature_type_ == + static_cast(boxps::FEATURE_SHARE_EMBEDDING)) { + s_instance_->cvm_offset_ = boxps::SHARE_EMBEDDING_NUM + 2; + } else if (s_instance_->feature_type_ == + static_cast(boxps::FEATURE_PCOC)) { s_instance_->cvm_offset_ = 8; } else { s_instance_->cvm_offset_ = 3; @@ -852,12 +865,12 @@ class BoxWrapper { class CmatchRankMaskMetricMsg : public MetricMsg { public: CmatchRankMaskMetricMsg(const std::string& label_varname, - const std::string& pred_varname, int metric_phase, - const std::string& cmatch_rank_group, - const std::string& cmatch_rank_varname, - bool ignore_rank = false, - const std::string& mask_varname = "", - int bucket_size = 1000000) { + const std::string& pred_varname, int metric_phase, + const std::string& cmatch_rank_group, + const std::string& cmatch_rank_varname, + bool ignore_rank = false, + const std::string& mask_varname = "", + int bucket_size = 1000000) { label_varname_ = label_varname; pred_varname_ = pred_varname; cmatch_rank_varname_ = cmatch_rank_varname; @@ -1037,6 +1050,19 @@ class BoxWrapper { } // pcoc qvalue tensor LoDTensor& GetQTensor(int device) { return device_caches_[device].qvalue; } + void PrintSyncTimer(int device, double train_span) { + auto& dev = device_caches_[device]; + LOG(WARNING) << "gpu: " << device << ", phase: " << phase_ + << ", train dnn: " << train_span + << ", sparse pull span: " << dev.all_pull_timer.ElapsedSec() + << ", boxps span: " << dev.boxps_pull_timer.ElapsedSec() + << ", push span: " << dev.all_push_timer.ElapsedSec() + << ", boxps span:" << dev.boxps_push_timer.ElapsedSec() + << ", dense nccl:" << dev.dense_nccl_timer.ElapsedSec() + << ", sync stream:" << dev.dense_sync_timer.ElapsedSec() + << ", wrapper gpu memory:" << dev.GpuMemUsed() << "MB"; + dev.ResetTimer(); + } private: static cudaStream_t stream_list_[MAX_GPU_NUM]; @@ -1044,7 +1070,8 @@ class BoxWrapper { std::shared_ptr boxps_ptr_ = nullptr; private: - boxps::PSAgentBase* p_agent_ = nullptr; + std::mutex mutex_; + std::deque psagents_; // TODO(hutuxian): magic number, will add a config to specify const int feedpass_thread_num_ = 30; // magic number std::unordered_set slot_name_omited_in_feedpass_; @@ -1167,6 +1194,34 @@ class BoxWrapper { std::vector random_ins_pool_list; std::mutex mutex4random_pool_; }; +/** + * @brief file mgr + */ +class BoxFileMgr { + public: + BoxFileMgr(); + ~BoxFileMgr(); + bool init(const std::string& fs_name, const std::string& fs_ugi, + const std::string& conf_path); + void destory(void); + std::vector list_dir(const std::string& path); + bool makedir(const std::string& path); + bool exists(const std::string& path); + bool down(const std::string& remote, const std::string& local); + bool upload(const std::string& local, const std::string& remote); + bool remove(const std::string& path); + int64_t file_size(const std::string& path); + std::vector> dus(const std::string& path); + bool truncate(const std::string& path, const size_t len); + bool touch(const std::string& path); + bool rename(const std::string& src, const std::string& dest); + std::vector> list_info( + const std::string& path); + int64_t count(const std::string& path); + + private: + std::shared_ptr mgr_ = nullptr; +}; #endif class BoxHelper { @@ -1219,6 +1274,9 @@ class BoxHelper { feed_pass_span = timer.ElapsedSec(); + PadBoxSlotDataset* dataset = dynamic_cast(dataset_); + dataset->SetPSAgent(agent); + timer.Start(); // add 0 key agent->AddKey(0ul, 0); @@ -1230,9 +1288,6 @@ class BoxHelper { // auc runner if (box_ptr->Mode() == 1) { box_ptr->AddReplaceFeasign(agent, box_ptr->GetFeedpassThreadNum()); - - PadBoxSlotDataset* dataset = dynamic_cast(dataset_); - CHECK(dataset); auto& records = dataset->GetInputRecord(); box_ptr->PushAucRunnerResource(records.size()); box_ptr->GetRandomReplace(&records); @@ -1261,14 +1316,55 @@ class BoxHelper { VLOG(3) << "End LoadIntoMemory(), dataset[" << dataset_ << "]"; } void PreLoadIntoMemory() { +#ifdef PADDLE_WITH_BOX_PS + struct std::tm b; + b.tm_year = year_ - 1900; + b.tm_mon = month_ - 1; + b.tm_mday = day_; + b.tm_min = b.tm_hour = b.tm_sec = 0; + std::time_t x = std::mktime(&b); + + auto box_ptr = BoxWrapper::GetInstance(); + boxps::PSAgentBase* agent = box_ptr->GetAgent(); + VLOG(3) << "Begin PreLoadIntoMemory BeginFeedPass in BoxPS"; + box_ptr->BeginFeedPass(x / 86400, &agent); + PadBoxSlotDataset* dataset = dynamic_cast(dataset_); + dataset->SetPSAgent(agent); + // add 0 key + agent->AddKey(0ul, 0); dataset_->PreLoadIntoMemory(); - feed_data_thread_.reset(new std::thread([&]() { - dataset_->WaitPreLoadDone(); - FeedPass(); - })); - VLOG(3) << "After PreLoadIntoMemory()"; +#endif + } + void WaitFeedPassDone() { +#ifdef PADDLE_WITH_BOX_PS + platform::Timer timer; + timer.Start(); + dataset_->WaitPreLoadDone(); + timer.Pause(); + + double wait_done_span = timer.ElapsedSec(); + + timer.Start(); + PadBoxSlotDataset* dataset = dynamic_cast(dataset_); + boxps::PSAgentBase* agent = dataset->GetPSAgent(); + auto box_ptr = BoxWrapper::GetInstance(); + // auc runner + if (box_ptr->Mode() == 1) { + box_ptr->AddReplaceFeasign(agent, box_ptr->GetFeedpassThreadNum()); + auto& records = dataset->GetInputRecord(); + box_ptr->PushAucRunnerResource(records.size()); + box_ptr->GetRandomReplace(&records); + } + box_ptr->EndFeedPass(agent); + timer.Pause(); + + VLOG(0) << "WaitFeedPassDone cost: " << wait_done_span + << "s, read ins cost: " << dataset->GetReadInsTime() + << "s, merge cost: " << dataset->GetMergeTime() + << "s, other cost: " << dataset->GetOtherTime() + << "s, end feedpass:" << timer.ElapsedSec() << "s"; +#endif } - void WaitFeedPassDone() { feed_data_thread_->join(); } void SlotsShuffle(const std::set& slots_to_replace) { #ifdef PADDLE_WITH_BOX_PS @@ -1379,7 +1475,6 @@ class BoxHelper { private: Dataset* dataset_; - std::shared_ptr feed_data_thread_; int year_; int month_; int day_; diff --git a/paddle/fluid/framework/fleet/box_wrapper_impl.h b/paddle/fluid/framework/fleet/box_wrapper_impl.h index f0e8c5dfa8ff1..6cbc6b12e826e 100644 --- a/paddle/fluid/framework/fleet/box_wrapper_impl.h +++ b/paddle/fluid/framework/fleet/box_wrapper_impl.h @@ -47,23 +47,12 @@ void BoxWrapper::PullSparseCase(const paddle::platform::Place& place, total_length += slot_lengths[i]; slot_lengths_lod.push_back(total_length); } - size_t total_bytes = - reinterpret_cast(total_length * sizeof(FEATURE_VALUE_GPU_TYPE)); -#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) dev.total_key_length = total_length; - auto& pull_buf = dev.pull_push_buf; - if (pull_buf == nullptr) { - pull_buf = memory::AllocShared(place, total_bytes); - } else if (total_bytes > pull_buf->size()) { - auto buf = memory::AllocShared(place, total_bytes); - pull_buf.swap(buf); - buf = nullptr; - } -#else - auto pull_buf = memory::AllocShared(place, total_bytes); -#endif + + int64_t total_bytes = total_length * sizeof(FEATURE_VALUE_GPU_TYPE); FEATURE_VALUE_GPU_TYPE* total_values_gpu = - reinterpret_cast(pull_buf->ptr()); + reinterpret_cast( + dev.pull_push_tensor.mutable_data({total_bytes, 1}, place)); if (platform::is_cpu_place(place)) { PADDLE_THROW(platform::errors::Unimplemented( @@ -78,14 +67,11 @@ void BoxWrapper::PullSparseCase(const paddle::platform::Place& place, dev.keys_tensor.mutable_data({total_length, 1}, place)); int* total_dims = reinterpret_cast( dev.dims_tensor.mutable_data({total_length, 1}, place)); - if (dev.gpu_keys_ptr == nullptr) { - dev.gpu_keys_ptr = - memory::AllocShared(place, keys.size() * sizeof(uint64_t*)); - } - int* key2slot = reinterpret_cast( dev.keys2slot.mutable_data({total_length, 1}, place)); - uint64_t** gpu_keys = reinterpret_cast(dev.gpu_keys_ptr->ptr()); + uint64_t** gpu_keys = + reinterpret_cast(dev.keys_ptr_tensor.mutable_data( + {static_cast(slot_num * sizeof(uint64_t*)), 1}, place)); int64_t* slot_lens = reinterpret_cast( dev.slot_lens.mutable_data({(slot_num + 1), 1}, place)); cudaMemcpyAsync(gpu_keys, keys.data(), keys.size() * sizeof(uint64_t*), @@ -104,11 +90,9 @@ void BoxWrapper::PullSparseCase(const paddle::platform::Place& place, "PullSparseGPU failed in BoxPS.")); pull_boxps_timer.Pause(); - if (dev.gpu_values_ptr == nullptr) { - dev.gpu_values_ptr = - memory::AllocShared(place, values.size() * sizeof(float*)); - } - float** gpu_values = reinterpret_cast(dev.gpu_values_ptr->ptr()); + float** gpu_values = + reinterpret_cast(dev.values_ptr_tensor.mutable_data( + {static_cast(slot_num * sizeof(float*)), 1}, place)); cudaMemcpyAsync(gpu_values, values.data(), values.size() * sizeof(float*), cudaMemcpyHostToDevice, stream); @@ -145,24 +129,11 @@ void BoxWrapper::PushSparseGradCase( #endif all_timer.Resume(); -#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) int64_t total_length = dev.total_key_length; - // std::accumulate(slot_lengths.begin(), slot_lengths.end(), 0UL); - size_t total_bytes = - reinterpret_cast(total_length * sizeof(FeaturePushValueGpuType)); - auto& push_buf = dev.pull_push_buf; - if (push_buf == nullptr) { - push_buf = memory::AllocShared(place, total_bytes); - } else if (total_bytes > push_buf->size()) { - auto buf = memory::AllocShared(place, total_bytes); - push_buf.swap(buf); - buf = nullptr; - } -#else - auto push_buf = memory::AllocShared(place, total_bytes); -#endif + int64_t total_bytes = total_length * sizeof(FeaturePushValueGpuType); FeaturePushValueGpuType* total_grad_values_gpu = - reinterpret_cast(push_buf->ptr()); + reinterpret_cast( + dev.pull_push_tensor.mutable_data({total_bytes, 1}, place)); if (platform::is_cpu_place(place)) { PADDLE_THROW(platform::errors::Unimplemented( "Warning:: CPUPlace is not supported in PaddleBox now.")); @@ -187,7 +158,8 @@ void BoxWrapper::PushSparseGradCase( reinterpret_cast(dev.slot_lens.data()); const int* d_slot_vector = dev.d_slot_vector.data(); const int* key2slot = reinterpret_cast(dev.keys2slot.data()); - float** gpu_values = reinterpret_cast(dev.gpu_values_ptr->ptr()); + float** gpu_values = + reinterpret_cast(dev.values_ptr_tensor.data()); cudaMemcpyAsync(gpu_values, grad_values.data(), grad_values.size() * sizeof(float*), cudaMemcpyHostToDevice, stream); diff --git a/paddle/fluid/framework/tensor_util.cc b/paddle/fluid/framework/tensor_util.cc index 6bc656851da82..fbd0e5c6c9b64 100644 --- a/paddle/fluid/framework/tensor_util.cc +++ b/paddle/fluid/framework/tensor_util.cc @@ -1059,5 +1059,27 @@ std::ostream& operator<<(std::ostream& os, const Tensor& t) { return os; } +#define CUDA_KERNEL_LOOP(i, n) \ + for (int64_t i = blockIdx.x * blockDim.x + threadIdx.x; i < (n); \ + i += blockDim.x * gridDim.x) + +__global__ void kernel_scale_value(const int64_t len, const float* in, + float* out, const float scale) { + CUDA_KERNEL_LOOP(i, len) { out[i] = in[i] * scale; } +} +void TensorScaleValue(const platform::Place& place, + const framework::Tensor& tensor, framework::Tensor* out, + const float scale) { + auto stream = dynamic_cast( + platform::DeviceContextPool::Instance().Get(place)) + ->stream(); + const float* src = tensor.data(); + float* dst = out->data(); + int64_t len = tensor.numel(); + const int BLOCK_SIZE_ = 256; + kernel_scale_value<<<(len + BLOCK_SIZE_ - 1) / BLOCK_SIZE_, BLOCK_SIZE_, 0, + stream>>>(len, src, dst, scale); +} + } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/tensor_util.h b/paddle/fluid/framework/tensor_util.h index 50644370bc6b6..3540921792dd5 100644 --- a/paddle/fluid/framework/tensor_util.h +++ b/paddle/fluid/framework/tensor_util.h @@ -217,5 +217,9 @@ void TensorToVector(const Tensor& src, std::vector* dst) { } std::ostream& operator<<(std::ostream& os, const Tensor& t); +void TensorScaleValue(const platform::Place& place, + const framework::Tensor& tensor, framework::Tensor* out, + const float scale); + } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/threadpool.h b/paddle/fluid/framework/threadpool.h index 7fecf07475b14..0151f24832910 100644 --- a/paddle/fluid/framework/threadpool.h +++ b/paddle/fluid/framework/threadpool.h @@ -100,6 +100,31 @@ class ThreadPool { scheduled_.notify_one(); return f; } + // binding cpu cores + void SetCPUAffinity(const std::vector& cores, bool one_by_one = false) { + if (cores.empty()) { + return; + } + size_t core_num = cores.size(); + cpu_set_t mask; + CPU_ZERO(&mask); + if (one_by_one) { + for (size_t i = 0; i < threads_.size(); ++i) { + CPU_SET(cores[i % core_num], &mask); + pthread_setaffinity_np(threads_[i]->native_handle(), sizeof(mask), + &mask); + } + } else { + for (size_t i = 0; i < core_num; ++i) { + CPU_SET(cores[i], &mask); + } + for (size_t i = 0; i < threads_.size(); ++i) { + pthread_setaffinity_np(threads_[i]->native_handle(), sizeof(mask), + &mask); + } + } + // VLOG(0) << "binding read ins thread_id = " << tid << ", cpunum = " << + } private: DISABLE_COPY_AND_ASSIGN(ThreadPool); diff --git a/paddle/fluid/framework/trainer.h b/paddle/fluid/framework/trainer.h index 572f2ddfd53f3..982c7ca225254 100644 --- a/paddle/fluid/framework/trainer.h +++ b/paddle/fluid/framework/trainer.h @@ -335,7 +335,8 @@ class BoxPSTrainer : public TrainerBase { BoxPSWorkerParameter param_config_; std::vector> workers_; - std::vector worker_threads_; + // std::vector worker_threads_; + std::vector> wait_futures_; std::vector readers_; std::shared_ptr> param_need_sync_; @@ -343,7 +344,6 @@ class BoxPSTrainer : public TrainerBase { bool async_mode_ = false; std::shared_ptr dense_table_ = nullptr; - }; #endif } // namespace framework diff --git a/paddle/fluid/framework/trainer_desc.proto b/paddle/fluid/framework/trainer_desc.proto index cfeb75b23b072..2eb5db0bd4ff4 100644 --- a/paddle/fluid/framework/trainer_desc.proto +++ b/paddle/fluid/framework/trainer_desc.proto @@ -68,7 +68,7 @@ message TrainerDesc { optional PullDenseWorkerParameter pull_dense_param = 102; optional SectionWorkerParameter section_param = 104; optional BoxPSWorkerParameter boxps_param = 105; - + // datafeed desc optional DataFeedDesc data_desc = 201; } @@ -102,6 +102,9 @@ message BoxPSWorkerParameter { optional bool async_mode = 2; optional int32 dump_thread_num = 3; optional proto.ProgramDesc program_desc = 4; + optional int32 sync_dense_mode = 5; + optional int32 sync_weight_step = 6; + optional bool sync_one_ring = 7; } message SectionConfig { diff --git a/paddle/fluid/operators/collective/c_mixallgather_op.cc b/paddle/fluid/operators/collective/c_mixallgather_op.cc index 7a1ceffe2e967..1ccd92127d17b 100644 --- a/paddle/fluid/operators/collective/c_mixallgather_op.cc +++ b/paddle/fluid/operators/collective/c_mixallgather_op.cc @@ -139,14 +139,14 @@ class CMixAllGatherOpCUDAKernel : public framework::OpKernel { box_ptr->DenseNcclTimer(device_id, false, 0x03); - size_t numel = 0; + int64_t numel = 0; auto dtype = static_cast(in_tensors[0]->type()); GetTensorMemSize(in_tensors, &numel); int64_t offset = 0; - size_t recv_len = 0; - size_t pad_len = 0; + int64_t recv_len = 0; + int64_t pad_len = 0; T *recvbuff = nullptr; T *sendbuff = nullptr; @@ -188,37 +188,29 @@ class CMixAllGatherOpCUDAKernel : public framework::OpKernel { sendbuff = &recvbuff[offset]; } } else { // allreduce - if (nranks > 1 && ((numel % device_num) != 0)) { + if (nranks > 1 && comm_rank_num == device_num && + ((numel % device_num) != 0)) { pad_len = device_num - (numel % device_num); numel = numel + pad_len; } - recvbuff = fused_tensor->mutable_data({static_cast(numel), 1}, - place); + recvbuff = fused_tensor->mutable_data({numel, 1}, place); sendbuff = recvbuff; recv_len = numel; } - CHECK(static_cast(recv_len) == fused_tensor->numel()); auto dev_ctx = paddle::platform::DeviceContextPool::Instance().Get(place); + CHECK(static_cast(recv_len) == fused_tensor->numel()); // copy input datas for (size_t i = 0; i < in_tensors.size(); ++i) { - size_t len = static_cast(in_tensors[i]->numel()); - auto sub_tensor = fused_tensor->Slice(static_cast(offset), - static_cast(offset + len)); + int64_t len = in_tensors[i]->numel(); + auto sub_tensor = fused_tensor->Slice(offset, offset + len); framework::TensorCopy(*in_tensors[i], place, *dev_ctx, &sub_tensor); offset += len; } - cudaStream_t stream = nullptr; - if (ctx.Attr("use_calc_stream")) { - stream = static_cast(dev_ctx)->stream(); - } else { - stream = static_cast(dev_ctx)->stream(); - PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamSynchronize(stream)); - stream = comm->stream(); - } + cudaStream_t stream = + static_cast(dev_ctx)->stream(); PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamSynchronize(stream)); - box_ptr->DenseNcclTimer(device_id, true, 0x02); ncclDataType_t nccl_dtype = platform::ToNCCLDataType(dtype); @@ -226,7 +218,7 @@ class CMixAllGatherOpCUDAKernel : public framework::OpKernel { if (multi_nccl) { // multi node nccl more than two network card if (nccl_mode == NCCL_ALLREDUCE) { // allreduce // [inner reducescatter->node allreduce->allgather] - int part_param_len = numel / device_num; + int64_t part_param_len = numel / device_num; T *recv_ptr = &recvbuff[device_id * part_param_len]; PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclGroupStart()); PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclReduceScatter( @@ -248,7 +240,7 @@ class CMixAllGatherOpCUDAKernel : public framework::OpKernel { nccl_dtype, comm->comm(), stream)); } else { // mixallgather // [inner reducescatter->node allgather->inner allgather] - int part_param_len = numel / device_num; + int64_t part_param_len = numel / device_num; T *recv_ptr = &recvbuff[device_id * part_param_len]; PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclGroupStart()); PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclReduceScatter( @@ -344,7 +336,7 @@ class CMixAllGatherOpCUDAKernel : public framework::OpKernel { protected: void GetTensorMemSize( const std::vector &lod_tensors, - size_t *numel) const { + int64_t *numel) const { *numel = 0; for (size_t i = 0; i < lod_tensors.size(); ++i) { CHECK(lod_tensors[i]->IsInitialized()); diff --git a/paddle/fluid/platform/flags.cc b/paddle/fluid/platform/flags.cc index 57f5c257cd054..afc5666a585b4 100644 --- a/paddle/fluid/platform/flags.cc +++ b/paddle/fluid/platform/flags.cc @@ -490,8 +490,10 @@ DEFINE_bool(padbox_dataset_disable_shuffle, false, "if true ,will disable data shuffle"); DEFINE_int32(padbox_slotrecord_extend_dim, 0, "paddlebox pcoc extend dim"); DEFINE_bool(padbox_auc_runner_mode, false, "auc runner mode"); -DEFINE_bool(padbox_dataset_disable_polling, false, "if true ,will disable input file list polling"); - +DEFINE_bool(padbox_dataset_disable_polling, false, + "if true ,will disable input file list polling"); +DEFINE_bool(padbox_dataset_enable_unrollinstance, false, + "if true ,will enable unrollinstance"); /** * MKLDNN related FLAG * Name: use_mkldnn @@ -576,3 +578,10 @@ DEFINE_string(tracer_mkldnn_ops_on, "", */ DEFINE_string(tracer_mkldnn_ops_off, "", "List of OneDNN operation types to be turned off"); +/** + * enable train dnn binding cpu + */ +DEFINE_bool(enable_binding_train_cpu, true, + "enable train binding cpu, default true"); +DEFINE_bool(enable_sync_dense_moment, false, + "enable sync dense moment, default false"); diff --git a/paddle/fluid/pybind/box_helper_py.cc b/paddle/fluid/pybind/box_helper_py.cc index 86d11f0b3cbe5..334df5d133036 100644 --- a/paddle/fluid/pybind/box_helper_py.cc +++ b/paddle/fluid/pybind/box_helper_py.cc @@ -103,6 +103,41 @@ void BindBoxWrapper(py::module* m) { .def("set_input_table_dim", &framework::BoxWrapper::SetInputTableDim, py::call_guard()); } // end BoxWrapper +void BindBoxFileMgr(py::module* m) { + py::class_>( + *m, "BoxFileMgr") + .def(py::init([]() { return std::make_shared(); })) + .def("init", &framework::BoxFileMgr::init, + py::call_guard()) + .def("list_dir", &framework::BoxFileMgr::list_dir, + py::call_guard()) + .def("makedir", &framework::BoxFileMgr::makedir, + py::call_guard()) + .def("exists", &framework::BoxFileMgr::exists, + py::call_guard()) + .def("download", &framework::BoxFileMgr::down, + py::call_guard()) + .def("upload", &framework::BoxFileMgr::upload, + py::call_guard()) + .def("remove", &framework::BoxFileMgr::remove, + py::call_guard()) + .def("file_size", &framework::BoxFileMgr::file_size, + py::call_guard()) + .def("dus", &framework::BoxFileMgr::dus, + py::call_guard()) + .def("truncate", &framework::BoxFileMgr::truncate, + py::call_guard()) + .def("touch", &framework::BoxFileMgr::touch, + py::call_guard()) + .def("rename", &framework::BoxFileMgr::rename, + py::call_guard()) + .def("list_info", &framework::BoxFileMgr::list_info, + py::call_guard()) + .def("count", &framework::BoxFileMgr::count, + py::call_guard()) + .def("finalize", &framework::BoxFileMgr::destory, + py::call_guard()); +} // end BoxFileMgr #endif } // end namespace pybind diff --git a/paddle/fluid/pybind/box_helper_py.h b/paddle/fluid/pybind/box_helper_py.h index 7bc36516c6580..319614fb86e41 100644 --- a/paddle/fluid/pybind/box_helper_py.h +++ b/paddle/fluid/pybind/box_helper_py.h @@ -25,6 +25,7 @@ namespace pybind { void BindBoxHelper(py::module* m); #ifdef PADDLE_WITH_BOX_PS void BindBoxWrapper(py::module* m); +void BindBoxFileMgr(py::module* m); #endif } // namespace pybind diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 9930acff00ad7..fe8a645fc7161 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -2805,6 +2805,7 @@ All parameter, weight, gradient are variables in Paddle. BindBoxHelper(&m); #ifdef PADDLE_WITH_BOX_PS BindBoxWrapper(&m); + BindBoxFileMgr(&m); #endif #ifdef PADDLE_WITH_NCCL BindNCCLWrapper(&m); diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index 9e1be082c0fe7..d0b3e20251a27 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -259,6 +259,8 @@ def __bootstrap__(): 'padbox_dataset_disable_shuffle', 'padbox_slotrecord_extend_dim', 'padbox_auc_runner_mode', + 'padbox_dataset_enable_unrollinstance', + 'enable_binding_train_cpu', ] core.init_gflags(["--tryfromenv=" + ",".join(read_env_flags)]) core.init_glog(sys.argv[0]) diff --git a/python/paddle/fluid/device_worker.py b/python/paddle/fluid/device_worker.py index b49d7f85d2864..5ddcad67f906d 100644 --- a/python/paddle/fluid/device_worker.py +++ b/python/paddle/fluid/device_worker.py @@ -16,7 +16,8 @@ from __future__ import print_function __all__ = [ - 'DeviceWorker', 'Hogwild', 'DownpourSGD', 'Section', 'DownpourSGDOPT', 'BoxPSWorker' + 'DeviceWorker', 'Hogwild', 'DownpourSGD', 'Section', 'DownpourSGDOPT', + 'BoxPSWorker' ] @@ -424,10 +425,11 @@ def _gen_worker_desc(self, trainer_desc): assert isinstance(place, core.CUDAPlace) cfg.place = cfg.CUDAPlace cfg.place_id = place_id - + class BoxPSWorker(DeviceWorker): """ BoxPSWorker """ + def __init__(self): """Init.""" super(DeviceWorker, self).__init__() @@ -444,14 +446,18 @@ def _gen_worker_desc(self, trainer_desc): pipeline_opt = self._program._pipeline_opt boxps_param = trainer_desc.boxps_param boxps_param.async_mode = pipeline_opt.get("async_mode", False) + boxps_param.sync_dense_mode = pipeline_opt.get("sync_dense_mode", 0) + boxps_param.sync_weight_step = pipeline_opt.get("sync_weight_step", 0) + boxps_param.sync_one_ring = pipeline_opt.get("sync_one_ring", False) for e in pipeline_opt["param_need_sync"]: boxps_param.param_need_sync.append(e) boxps_param.dump_thread_num = pipeline_opt.get("dump_thread_num", 1) - + program_list = pipeline_opt.get("section_program_list", []) if len(program_list) > 0: - boxps_param.program_desc.ParseFromString( - program_list[0]["program"]._get_desc().serialize_to_string()) + boxps_param.program_desc.ParseFromString(program_list[0][ + "program"]._get_desc().serialize_to_string()) + class DeviceWorkerFactory(object): def _create_device_worker(self, worker_type):