From 4acc17ba9fdc42d711549b7b2ca64fdaec0bfa80 Mon Sep 17 00:00:00 2001 From: lihui Date: Mon, 30 Oct 2023 19:42:45 +0800 Subject: [PATCH 1/2] sharding mode, erase other device param --- paddle/fluid/framework/boxps_trainer.cc | 43 ++++++++++++++++ paddle/fluid/framework/boxps_worker.cc | 58 +++++++++++----------- paddle/fluid/framework/fleet/box_wrapper.h | 3 +- paddle/fluid/framework/trainer.h | 2 + python/paddle/fluid/io.py | 22 +++++--- 5 files changed, 92 insertions(+), 36 deletions(-) diff --git a/paddle/fluid/framework/boxps_trainer.cc b/paddle/fluid/framework/boxps_trainer.cc index f0f1a06541f38..fda178e3018ba 100644 --- a/paddle/fluid/framework/boxps_trainer.cc +++ b/paddle/fluid/framework/boxps_trainer.cc @@ -231,10 +231,53 @@ void BoxPSTrainer::InitTrainerEnv(const ProgramDesc& main_program, this_worker->CreateDeviceResource(main_program); })); } + RemoveOtherDeviceVars(main_program, root_scope_); for (auto& th : wait_futures_) { th.get(); } + VLOG(0) << "InitTrainerEnv done!"; } + +void BoxPSTrainer::RemoveOtherDeviceVars(const ProgramDesc& main_program, + Scope* root_scope) { + std::vector remove_vars; + std::unordered_set unpersist_var_names; + auto& block = main_program.Block(0); + auto all_desc = block.AllOps(); + auto box_wrapper = BoxWrapper::GetInstance(); + int rank_id = box_wrapper->GetMpiRank(); + int gum_num = box_wrapper->GetGpuNum(); + // 1. Get other device's Param + for (auto& op_desc : all_desc) { + // broadcast op + if (op_desc->Type() != "c_broadcast") { + continue; + } + int root_id = op_desc->GetAttrIfExists("root"); + if ((root_id / gum_num) == rank_id) { + continue; + } + for (auto& o : op_desc->Inputs()) { + for (auto& name : o.second) { + unpersist_var_names.insert(name); + } + } + } + VLOG(0) << "root scope remove_params size = " << unpersist_var_names.size(); + // 2. Get moment param + for (auto& unpersist_var_name : unpersist_var_names) { + for (auto& var : block.AllVars()) { + std::string name = var->Name(); + if (var->Persistable() && name.find(unpersist_var_name) == 0) { + remove_vars.push_back(name); + } + } + } + if (remove_vars.empty()) return; + VLOG(0) << "root scope remove_vars's size = " << remove_vars.size(); + root_scope->EraseVars(remove_vars); +} + void BoxPSTrainer::Run() { VLOG(3) << "Going to run"; auto pool = GetThreadPool(thread_num_); diff --git a/paddle/fluid/framework/boxps_worker.cc b/paddle/fluid/framework/boxps_worker.cc index 3277b6654d2d1..00987aee1a599 100644 --- a/paddle/fluid/framework/boxps_worker.cc +++ b/paddle/fluid/framework/boxps_worker.cc @@ -1015,6 +1015,20 @@ void BoxPSWorker::CreateThreadScopeForSharding(const ProgramDesc& program) { thread_vars_.push_back(name); ++param_total; if (var->Persistable()) { + if (unpersist_vars_.find(name) != unpersist_vars_.end()) { + // unpersist vars(include other thread var and other device var) + auto* ptr = thread_scope_->Var(name); + InitializeVariable(ptr, var->GetType()); + // set dims + auto dims = phi::make_ddim(var->GetShape()); + auto var_dtype = + paddle::framework::TransToPhiDataType(var->GetDataType()); + ptr->GetMutable()->Resize(dims).set_type(var_dtype); + ++unpersist_num; + ++persistable_num; + total_persistable_len += ptr->GetMutable()->numel(); + continue; + } Variable* root_var = root_scope_->FindVar(name); if (!root_var) { VLOG(0) << "not found var name=" << name; @@ -1027,10 +1041,10 @@ void BoxPSWorker::CreateThreadScopeForSharding(const ProgramDesc& program) { size_t len = root_tensor->numel(); ++persistable_num; total_persistable_len += len; + real_persist_len += len; + ++real_persist_num; // convert one device to other device c_broadcast param if (persist_param_vars_.find(name) != persist_param_vars_.end()) { - real_persist_len += len; - ++real_persist_num; // same device if (place_ == root_tensor->place()) { ++share_var_num; @@ -1057,34 +1071,20 @@ void BoxPSWorker::CreateThreadScopeForSharding(const ProgramDesc& program) { skip_vars_.push_back(name); } } - // unpersist vars - if (unpersist_vars_.find(name) != unpersist_vars_.end()) { - auto* ptr = thread_scope_->Var(name); - InitializeVariable(ptr, var->GetType()); - // set dims - auto dims = phi::make_ddim(var->GetShape()); - auto var_dtype = - paddle::framework::TransToPhiDataType(var->GetDataType()); - ptr->GetMutable()->Resize(dims).set_type(var_dtype); - ++unpersist_num; + // data norm copy and learning rate + if (!gpu_tensor->initialized() && place_ == root_tensor->place()) { + auto dim = root_tensor->dims(); + gpu_tensor->ShareDataWith(*root_tensor).Resize(dim); + ++share_var_num; + share_persistable_len += len; } else { - real_persist_len += len; - ++real_persist_num; - // data norm copy and learning rate - if (!gpu_tensor->initialized() && place_ == root_tensor->place()) { - auto dim = root_tensor->dims(); - gpu_tensor->ShareDataWith(*root_tensor).Resize(dim); - ++share_var_num; - share_persistable_len += len; - } else { - TensorCopy(*static_cast(root_tensor), - place_, - static_cast(gpu_tensor)); - ++copy_persist_num; - // device 0 need sync datanorm and learning rate to root scope - if (device_id_ == 0) { - need_copy_vars_.push_back(name); - } + TensorCopy(*static_cast(root_tensor), + place_, + static_cast(gpu_tensor)); + ++copy_persist_num; + // device 0 need sync datanorm and learning rate to root scope + if (device_id_ == 0) { + need_copy_vars_.push_back(name); } } } else { diff --git a/paddle/fluid/framework/fleet/box_wrapper.h b/paddle/fluid/framework/fleet/box_wrapper.h index 38079a8888b02..d094729e25bfb 100644 --- a/paddle/fluid/framework/fleet/box_wrapper.h +++ b/paddle/fluid/framework/fleet/box_wrapper.h @@ -422,11 +422,12 @@ class BoxWrapper { fprintf(stdout, "init box wrapper\n"); boxps::MPICluster::Ins(); } - int GetMpiSize() { return boxps::MPICluster::Ins().size(); } + int GetMpiSize() { return boxps::MPICluster::Ins().size(); } int GetMpiRank() { return boxps::MPICluster::Ins().rank(); } int GetNCCLRankId(const int &device_id) { return (GetMpiRank() * gpu_num_ + device_id); } + int GetGpuNum() { return gpu_num_; } void SetDatasetName(const std::string& name) {} void SetInputTableDim(size_t dim) { input_table_dim_ = dim; } void FeedPass(int date, const std::vector& feasgin_to_box); diff --git a/paddle/fluid/framework/trainer.h b/paddle/fluid/framework/trainer.h index cae0c9c5521da..c515646c3ef84 100644 --- a/paddle/fluid/framework/trainer.h +++ b/paddle/fluid/framework/trainer.h @@ -400,6 +400,8 @@ class BoxPSTrainer : public TrainerBase { virtual std::string GetDumpPath(int tid); virtual void DumpWork(int tid); virtual void FinalizeDumpEnv(); + void RemoveOtherDeviceVars(const ProgramDesc& main_program, + Scope* root_scope); protected: int thread_num_; diff --git a/python/paddle/fluid/io.py b/python/paddle/fluid/io.py index 252f7000bfd14..15cb246e61cfb 100644 --- a/python/paddle/fluid/io.py +++ b/python/paddle/fluid/io.py @@ -290,7 +290,8 @@ def save_vars(executor, main_program=None, vars=None, predicate=None, - filename=None): + filename=None, + filter_func=None): """ :api_attr: Static Graph @@ -374,7 +375,8 @@ def name_has_fc(var): main_program=main_program, dirname=dirname, vars=list(filter(predicate, main_program.list_vars())), - filename=filename) + filename=filename, + filter_func=filter_func) else: params_var_name = "saved_params" # give warning when there is no var in model @@ -389,6 +391,8 @@ def name_has_fc(var): save_var_map = {} for each_var in vars: + if filter_func is not None and filter_func(each_var.name): + continue # NOTE: don't save the variable which type is RAW if each_var.type == core.VarDesc.VarType.RAW: continue @@ -668,7 +672,11 @@ def is_valid(var): @dygraph_not_support -def save_persistables(executor, dirname, main_program=None, filename=None): +def save_persistables(executor, + dirname, + main_program=None, + filename=None, + filter_func=None): """ :api_attr: Static Graph @@ -737,7 +745,8 @@ def save_persistables(executor, dirname, main_program=None, filename=None): main_program=main_program, vars=None, predicate=is_persistable, - filename=filename) + filename=filename, + filter_func=filter_func) def load_vars(executor, @@ -1245,7 +1254,8 @@ def save_inference_model(dirname, params_filename=None, export_for_deployment=True, program_only=False, - clip_extra=False): + clip_extra=False, + filter_func=None): """ :api_attr: Static Graph @@ -1453,7 +1463,7 @@ def save_inference_model(dirname, if params_filename is not None: params_filename = os.path.basename(params_filename) - save_persistables(executor, save_dirname, main_program, params_filename) + save_persistables(executor, save_dirname, main_program, params_filename, filter_func) return target_var_name_list From 0c464391a3f23291fade3de052381ed09c4fc9b1 Mon Sep 17 00:00:00 2001 From: lihui Date: Mon, 6 Nov 2023 11:15:09 +0800 Subject: [PATCH 2/2] sharding mode support avg_weight --- paddle/fluid/framework/boxps_worker.cc | 55 +++++++++++++++++--------- 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/paddle/fluid/framework/boxps_worker.cc b/paddle/fluid/framework/boxps_worker.cc index 00987aee1a599..d4432cc162a77 100644 --- a/paddle/fluid/framework/boxps_worker.cc +++ b/paddle/fluid/framework/boxps_worker.cc @@ -551,6 +551,34 @@ int BoxPSWorker::IsParameter(const std::string& name, bool full_match) { return -1; } } + +static bool FindVarInMap(const VariableNameMap& op_var_map, + const std::multiset& var_set) { + for (auto& o : op_var_map) { + for (auto& name : o.second) { + if (var_set.find(name) != var_set.end()) { + return true; + } + } + } + return false; +} + +static bool IsAvgOp(OpDesc* op_desc) { + if (op_desc->Type() != "elementwise_add" && + op_desc->Type() != "elementwise_mul") { + return false; + } + for (auto& o : op_desc->Outputs()) { + for (auto& name : o.second) { + if (name.find("avg_weight") != std::string::npos || + name.find("@avg") != std::string::npos) { + return true; + } + } + } + return false; +} void BoxPSWorker::BuildShardingDepends(const ProgramDesc& program) { nccl_rank_id_ = place_.GetDeviceId(); #if defined(PADDLE_WITH_CUDA) @@ -615,33 +643,24 @@ void BoxPSWorker::BuildShardingDepends(const ProgramDesc& program) { unpersist_vars_.insert(name); } } else { - // adam ubmq1_h2_param.b_0_moment1_0 + // adam ubmq1_h2_param.b_0_moment1_0, avg_weight @avg @w_backup remove_vars_.insert(name); } } std::multiset all_remove_inputs; for (auto& op_desc : all_desc) { - bool find = false; - for (auto& o : op_desc->Inputs()) { - for (auto& name : o.second) { - if (remove_vars_.find(name) == remove_vars_.end()) { - continue; - } - find = true; - break; - } - if (find) { - break; - } - } - if (find) { + if (FindVarInMap(op_desc->Inputs(), remove_vars_)) { for (auto& o : op_desc->Inputs()) { for (auto& name : o.second) { all_remove_inputs.insert(name); } } remove_ops_.insert(op_desc); + } else if (IsAvgOp(op_desc) && + (FindVarInMap(op_desc->Outputs(), remove_vars_) || + FindVarInMap(op_desc->Inputs(), unpersist_vars_))) { + remove_ops_.insert(op_desc); } } @@ -1026,7 +1045,7 @@ void BoxPSWorker::CreateThreadScopeForSharding(const ProgramDesc& program) { ptr->GetMutable()->Resize(dims).set_type(var_dtype); ++unpersist_num; ++persistable_num; - total_persistable_len += ptr->GetMutable()->numel(); + total_persistable_len += ptr->GetMutable()->numel(); continue; } Variable* root_var = root_scope_->FindVar(name); @@ -1079,8 +1098,8 @@ void BoxPSWorker::CreateThreadScopeForSharding(const ProgramDesc& program) { share_persistable_len += len; } else { TensorCopy(*static_cast(root_tensor), - place_, - static_cast(gpu_tensor)); + place_, + static_cast(gpu_tensor)); ++copy_persist_num; // device 0 need sync datanorm and learning rate to root scope if (device_id_ == 0) {