Skip to content

Commit

Permalink
【GPUPS】Adam accessor (PaddlePaddle#43919)
Browse files Browse the repository at this point in the history
* add adam/sharedadam optimzier for gpups;edit optimizer struct;test=develop
  • Loading branch information
danleifeng authored and Aurelius84 committed Jul 29, 2022
1 parent 20c06dd commit b50ef42
Show file tree
Hide file tree
Showing 36 changed files with 2,714 additions and 1,282 deletions.
28 changes: 13 additions & 15 deletions paddle/fluid/distributed/ps/table/ctr_dymf_accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ int CtrDymfAccessor::Initialize() {
_embedx_sgd_rule = CREATE_PSCORE_CLASS(SparseValueSGDRule, name);
_embedx_sgd_rule->LoadConfig(_config.embedx_sgd_param(),
_config.embedx_dim());
common_feature_value.optimizer_name = name;

common_feature_value.embed_sgd_dim = _embed_sgd_rule->Dim();
common_feature_value.embedx_dim = _config.embedx_dim();
Expand All @@ -42,7 +43,10 @@ int CtrDymfAccessor::Initialize() {
if (_config.ctr_accessor_param().show_scale()) {
_show_scale = true;
}
VLOG(0) << " INTO CtrDymfAccessor::Initialize()";
VLOG(0) << " INTO CtrDymfAccessor::Initialize(); embed_sgd_dim:"
<< common_feature_value.embed_sgd_dim
<< " embedx_dim:" << common_feature_value.embedx_dim
<< " embedx_sgd_dim:" << common_feature_value.embedx_sgd_dim;
InitAccessorInfo();
return 0;
}
Expand All @@ -53,9 +57,9 @@ void CtrDymfAccessor::InitAccessorInfo() {

auto embedx_dim = _config.embedx_dim();
VLOG(0) << "InitAccessorInfo embedx_dim:" << embedx_dim;
_accessor_info.select_dim = 3 + embedx_dim;
_accessor_info.select_dim = 4 + embedx_dim;
_accessor_info.select_size = _accessor_info.select_dim * sizeof(float);
_accessor_info.update_dim = 4 + embedx_dim;
_accessor_info.update_dim = 5 + embedx_dim;
_accessor_info.update_size = _accessor_info.update_dim * sizeof(float);
_accessor_info.mf_size =
(embedx_dim + common_feature_value.embedx_sgd_dim) * sizeof(float);
Expand Down Expand Up @@ -179,8 +183,10 @@ int32_t CtrDymfAccessor::Create(float** values, size_t num) {
value[common_feature_value.ClickIndex()] = 0;
value[common_feature_value.SlotIndex()] = -1;
value[common_feature_value.MfDimIndex()] = -1;
_embed_sgd_rule->InitValue(value + common_feature_value.EmbedWIndex(),
value + common_feature_value.EmbedG2SumIndex());
_embed_sgd_rule->InitValue(
value + common_feature_value.EmbedWIndex(),
value + common_feature_value.EmbedG2SumIndex(),
false); // adam embed init not zero, adagrad embed init zero
_embedx_sgd_rule->InitValue(value + common_feature_value.EmbedxWIndex(),
value + common_feature_value.EmbedxG2SumIndex(),
false);
Expand Down Expand Up @@ -293,22 +299,14 @@ std::string CtrDymfAccessor::ParseToString(const float* v, int param) {
i++) {
os << " " << v[i];
}
// os << " " << common_feature_value.Slot(const_cast<float*>(v)) << " "
// << common_feature_value.MfDim(const_cast<float*>(v));
auto show = common_feature_value.Show(const_cast<float*>(v));
auto click = common_feature_value.Click(const_cast<float*>(v));
auto score = ShowClickScore(show, click);
auto mf_dim = int(common_feature_value.MfDim(const_cast<float*>(v)));
if (score >= _config.embedx_threshold() &&
param > common_feature_value.EmbedxG2SumIndex()) {
// VLOG(1) << "common_feature_value.EmbedxG2SumIndex():"
// << common_feature_value.EmbedxG2SumIndex();
// VLOG(1) << "common_feature_value.EmbedxWIndex():"
// << common_feature_value.EmbedxWIndex();
// VLOG(1) << "common_feature_value.MfDim():"
// << common_feature_value.MfDim(const_cast<float*>(v));
for (auto i = common_feature_value.EmbedxG2SumIndex();
i < common_feature_value.EmbedxWIndex() +
common_feature_value.MfDim(const_cast<float*>(v));
i < common_feature_value.Dim(mf_dim);
++i) {
os << " " << v[i];
}
Expand Down
19 changes: 17 additions & 2 deletions paddle/fluid/distributed/ps/table/ctr_dymf_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,24 @@ class CtrDymfAccessor : public ValueAccessor {
int ClickIndex() { return ShowIndex() + 1; }
int EmbedWIndex() { return ClickIndex() + 1; }
int EmbedG2SumIndex() { return EmbedWIndex() + 1; }
int SlotIndex() { return EmbedG2SumIndex() + 1; }
int SlotIndex() { return EmbedG2SumIndex() + embed_sgd_dim; }
int MfDimIndex() { return SlotIndex() + 1; }
int EmbedxG2SumIndex() { return MfDimIndex() + 1; }
int EmbedxWIndex() { return EmbedxG2SumIndex() + 1; }
int EmbedxWIndex() { return EmbedxG2SumIndex() + embedx_sgd_dim; }

// 根据mf_dim计算的总长度
int Dim(int& mf_dim) {
int tmp_embedx_sgd_dim = 1;
if (optimizer_name == "SparseAdamSGDRule") { // adam
tmp_embedx_sgd_dim = mf_dim * 2 + 2;
} else if (optimizer_name == "SparseSharedAdamSGDRule") { // shared_adam
tmp_embedx_sgd_dim = 4;
}
return 7 + embed_sgd_dim + tmp_embedx_sgd_dim + mf_dim;
}

// 根据mf_dim计算的总byte数
int Size(int& mf_dim) { return (Dim(mf_dim)) * sizeof(float); }

float& UnseenDays(float* val) { return val[UnseenDaysIndex()]; }
float& DeltaScore(float* val) { return val[DeltaScoreIndex()]; }
Expand All @@ -73,6 +87,7 @@ class CtrDymfAccessor : public ValueAccessor {
int embed_sgd_dim;
int embedx_dim;
int embedx_sgd_dim;
std::string optimizer_name;
};

struct CtrDymfPushValue {
Expand Down
84 changes: 83 additions & 1 deletion paddle/fluid/distributed/ps/table/sparse_sgd_rule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ void SparseAdamSGDRule::UpdateValueWork(float* w,
float beta1_pow_ = *beta1_pow;
float beta2_pow_ = *beta2_pow;

// lr not change in one update
lr *= sqrt(1 - beta2_pow_) / (1 - beta1_pow_);
for (size_t i = 0; i < _embedding_dim; i++) {
// Calculation
Expand Down Expand Up @@ -252,5 +251,88 @@ void SparseAdamSGDRule::InitValueWork(float* value,
*(sgd + Beta1PowIndex()) = _beta1_decay_rate;
*(sgd + Beta2PowIndex()) = _beta2_decay_rate;
}

void SparseSharedAdamSGDRule::LoadConfig(
const SparseCommonSGDRuleParameter& param, size_t emb_dim) {
_embedding_dim = emb_dim;
auto adam_param = param.adam();
learning_rate_ = adam_param.learning_rate();
_initial_range = adam_param.initial_range();
_beta1_decay_rate = adam_param.beta1_decay_rate();
_beta2_decay_rate = adam_param.beta2_decay_rate();
_ada_epsilon = adam_param.ada_epsilon();
if (adam_param.weight_bounds_size() == 0) {
_min_bound = -std::numeric_limits<float>::max();
_max_bound = std::numeric_limits<float>::max();
} else {
CHECK(adam_param.weight_bounds_size() >= 2)
<< "invalid repeated size for weight_bounds:"
<< adam_param.weight_bounds_size();
_min_bound = adam_param.weight_bounds(0);
_max_bound = adam_param.weight_bounds(1);
}
}

void SparseSharedAdamSGDRule::UpdateValueWork(float* w,
float* sgd,
const float* grad,
float scale) {
float* gsum = sgd + GSumIndex();
float* g2sum = sgd + G2SumIndex();
float* beta1_pow = sgd + Beta1PowIndex();
float* beta2_pow = sgd + Beta2PowIndex();
const float* g = grad;

float lr = learning_rate_;
float beta1_pow_ = *beta1_pow;
float beta2_pow_ = *beta2_pow;
float gsum_ = *gsum;
float g2sum_ = *g2sum;

lr *= sqrt(1 - beta2_pow_) / (1 - beta1_pow_);
double sum_gsum = 0.0;
double sum_g2sum = 0.0;
for (int i = 0; i < _embedding_dim; i++) {
// Calculation
double new_gsum =
_beta1_decay_rate * gsum_ + (1 - _beta1_decay_rate) * g[i];
double new_g2sum =
_beta2_decay_rate * g2sum_ + (1 - _beta2_decay_rate) * g[i] * g[i];
w[i] = w[i] - lr * (new_gsum / (sqrt(new_g2sum) + _ada_epsilon));
BoundValue(w[i]);
sum_gsum += new_gsum;
sum_g2sum += new_g2sum;
}
// update beta_pow_decay
(*gsum) = sum_gsum / _embedding_dim;
(*g2sum) = sum_g2sum / _embedding_dim;
(*beta1_pow) *= _beta1_decay_rate;
(*beta2_pow) *= _beta2_decay_rate;
}

void SparseSharedAdamSGDRule::InitValueWork(float* value,
float* sgd,
bool zero_init) {
for (int i = 0; i < _embedding_dim; ++i) {
if (zero_init) {
value[i] = 0.0;
BoundValue(value[i]);
} else {
value[i] =
(local_uniform_real_distribution<double>()(local_random_engine()) *
2 -
1) *
_initial_range;
BoundValue(value[i]);
}
}
// init rule gsum and g2sum
for (int i = GSumIndex(); i < Beta1PowIndex(); i++) {
sgd[i] = 0.0;
}
// init beta1_pow and beta2_pow
*(sgd + Beta1PowIndex()) = _beta1_decay_rate;
*(sgd + Beta2PowIndex()) = _beta2_decay_rate;
}
} // namespace distributed
} // namespace paddle
23 changes: 23 additions & 0 deletions paddle/fluid/distributed/ps/table/sparse_sgd_rule.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,5 +144,28 @@ class SparseAdamSGDRule : public SparseValueSGDRule {
float _beta2_decay_rate;
float _ada_epsilon;
};

class SparseSharedAdamSGDRule : public SparseValueSGDRule {
public:
virtual void LoadConfig(const SparseCommonSGDRuleParameter& param,
size_t emb_dim);
virtual void UpdateValueWork(float* w,
float* sgd,
const float* push_value,
float scale);
virtual void InitValueWork(float* value, float* sgd, bool zero_init);
virtual size_t Dim() { return 4; }
size_t GSumIndex() { return 0; }
size_t G2SumIndex() { return GSumIndex() + 1; }
size_t Beta1PowIndex() { return G2SumIndex() + 1; }
size_t Beta2PowIndex() { return Beta1PowIndex() + 1; }

protected:
float learning_rate_;
float _beta1_decay_rate;
float _beta2_decay_rate;
float _ada_epsilon;
};

} // namespace distributed
} // namespace paddle
1 change: 1 addition & 0 deletions paddle/fluid/distributed/ps/table/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ REGISTER_PSCORE_CLASS(SparseValueSGDRule, StdAdaGradSGDRule);
REGISTER_PSCORE_CLASS(SparseValueSGDRule, SparseAdamSGDRule);
REGISTER_PSCORE_CLASS(SparseValueSGDRule, SparseNaiveSGDRule);
REGISTER_PSCORE_CLASS(SparseValueSGDRule, SparseAdaGradSGDRule);
REGISTER_PSCORE_CLASS(SparseValueSGDRule, SparseSharedAdamSGDRule);

int32_t TableManager::Initialize() {
static bool initialized = false;
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/distributed/ps/wrapper/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ cc_library(
op_registry
fs
shell
ps_gpu_wrapper
${RPC_DEPS})

target_link_libraries(fleet z)
45 changes: 26 additions & 19 deletions paddle/fluid/distributed/ps/wrapper/fleet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ limitations under the License. */

#include "paddle/fluid/distributed/ps/service/communicator/communicator.h"
#include "paddle/fluid/distributed/ps/table/table.h"
#include "paddle/fluid/distributed/ps/wrapper/fleet.h"
#if defined PADDLE_WITH_HETERPS && defined PADDLE_WITH_PSCORE
#include "paddle/fluid/framework/fleet/ps_gpu_wrapper.h"
#endif

namespace paddle {
namespace distributed {
Expand Down Expand Up @@ -129,6 +133,13 @@ void FleetWrapper::InitWorker(const std::string& dist_desc,
worker_ptr_ = std::shared_ptr<paddle::distributed::PSClient>(
paddle::distributed::PSClientFactory::Create(ps_param));
worker_ptr_->Configure(ps_param, dense_pull_regions, ps_env_, index);
#if defined PADDLE_WITH_HETERPS && defined PADDLE_WITH_PSCORE
VLOG(3) << "FleetWrapper::InitWorker InitializeGPUServer";
auto* accessor = worker_ptr_->GetTableAccessor(0);
auto ps_gpu_wrapper = paddle::framework::PSGPUWrapper::GetInstance();
ps_gpu_wrapper->InitializeGPUServer(ps_param);
ps_gpu_wrapper->SetTableAccessor(accessor);
#endif
}
} else {
VLOG(3) << "Client can be initialized only once";
Expand Down Expand Up @@ -525,24 +536,24 @@ void FleetWrapper::PushSparseFromTensorAsync(
int batch_size = -1;
bool batch_size_consist = true;
for (auto* input : *inputs) {
int cur_batch_size =
size_t cur_batch_size =
input->lod().size() ? input->lod()[0].size() - 1 : input->dims()[0];
if (batch_size == -1) {
batch_size = cur_batch_size;
} else if (batch_size != cur_batch_size) {
batch_size = int(cur_batch_size);
} else if (batch_size != int(cur_batch_size)) {
// CHECK(batch_size == cur_batch_size); // NOLINT
batch_size_consist = false;
break;
}
}
CHECK(batch_size > 0); // NOLINT

int show_size =
size_t show_size =
shows->lod().size() ? shows->lod()[0].size() - 1 : shows->dims()[0];
CHECK(show_size == batch_size || show_size == 1);
int clk_size =
CHECK(show_size == size_t(batch_size) || show_size == 1);
size_t clk_size =
clks->lod().size() ? clks->lod()[0].size() - 1 : clks->dims()[0];
CHECK(clk_size == batch_size || clk_size == 1);
CHECK(clk_size == size_t(batch_size) || clk_size == 1);

CHECK(outputs->size() == inputs->size());
std::vector<uint64_t> push_keys;
Expand Down Expand Up @@ -601,12 +612,10 @@ void FleetWrapper::PushSparseFromTensorAsync(
// in
// ctr_accessor.h
push_values.back()[0] = 2; // TODO(zhaocaibei123): slot
push_values.back()[1] = (static_cast<int>(i) >= show_size
? 1
: static_cast<float>(show_tensor[i]));
push_values.back()[2] = (static_cast<int>(i) >= clk_size
? 0
: static_cast<float>(clk_tensor[i]));
push_values.back()[1] =
(i >= show_size ? 1 : static_cast<float>(show_tensor[i]));
push_values.back()[2] =
(i >= clk_size ? 0 : static_cast<float>(clk_tensor[i]));
float* data = push_values.back().data() + 3;
memcpy(data, g + output_len, sizeof(float) * fea_dim);
}
Expand All @@ -630,12 +639,10 @@ void FleetWrapper::PushSparseFromTensorAsync(
// slot show clk grad... consistent with CtrCommonPushValue defined in
// ctr_accessor.h
push_values.back()[0] = 2; // TODO(zhaocaibei123): slot
push_values.back()[1] = (static_cast<int>(i) >= show_size
? 1
: static_cast<float>(show_tensor[i]));
push_values.back()[2] = (static_cast<int>(i) >= clk_size
? 0
: static_cast<float>(clk_tensor[i]));
push_values.back()[1] =
(i >= show_size ? 1 : static_cast<float>(show_tensor[i]));
push_values.back()[2] =
(i >= clk_size ? 0 : static_cast<float>(clk_tensor[i]));
float* data = push_values.back().data() + 3;
memcpy(data, g + output_len, sizeof(float) * fea_dim);
}
Expand Down
6 changes: 3 additions & 3 deletions paddle/fluid/framework/distributed_strategy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,14 @@ message TableParameter {

message TableAccessorParameter {
optional string accessor_class = 1;
optional SGDParameter embed_sgd_param = 2;
optional SGDParameter embedx_sgd_param = 3;
optional uint32 fea_dim = 4 [ default = 11 ]; // field size of one value
optional uint32 embedx_dim = 5 [ default = 8 ]; // embedx feature size
optional uint32 embedx_threshold = 6
[ default = 10 ]; // embedx feature create threshold
optional CtrAccessorParameter ctr_accessor_param = 7;
repeated TableAccessorSaveParameter table_accessor_save_param = 8;
optional SGDParameter embed_sgd_param = 10;
optional SGDParameter embedx_sgd_param = 11;
}

message SGDParameter {
Expand All @@ -228,7 +228,7 @@ message
repeated float weight_bounds = 4;
}

message SparseAdamSGDParameter { // SparseAdamSGDRule
message SparseAdamSGDParameter { // SparseAdamSGDRule | SparseSharedAdamSGDRule
optional double learning_rate = 1 [ default = 0.001 ];
optional double initial_range = 2 [ default = 0.0001 ];
optional double beta1_decay_rate = 3 [ default = 0.9 ];
Expand Down
15 changes: 11 additions & 4 deletions paddle/fluid/framework/fleet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,17 @@ endif()

if(WITH_HETERPS)
if(WITH_NCCL AND WITH_GPU)
nv_library(
ps_gpu_wrapper
SRCS ps_gpu_wrapper.cu ps_gpu_wrapper.cc
DEPS heter_ps gloo_wrapper ${BRPC_DEPS})
if(WITH_PSCORE)
nv_library(
ps_gpu_wrapper
SRCS ps_gpu_wrapper.cu ps_gpu_wrapper.cc
DEPS heter_ps gloo_wrapper ps_framework_proto ${BRPC_DEPS})
else()
nv_library(
ps_gpu_wrapper
SRCS ps_gpu_wrapper.cu ps_gpu_wrapper.cc
DEPS heter_ps gloo_wrapper ${BRPC_DEPS})
endif()
add_subdirectory(heter_ps)
elseif(WITH_XPU_KP)
xpu_library(
Expand Down
Loading

0 comments on commit b50ef42

Please sign in to comment.