Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【GPUPS】Adam accessor #43919

Merged
merged 33 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
5f126e5
add adam/sharedadam optimzier for gpups;edit optimizer struct;test=de…
danleifeng Jun 14, 2022
509768a
remove useless code;test=develop
danleifeng Jun 15, 2022
724381c
remove useless code;test=develop
danleifeng Jun 15, 2022
15dd7a1
remove useless code;test=develop
danleifeng Jun 15, 2022
4f91222
remove useless code;test=develop
danleifeng Jun 16, 2022
ecc76cd
remove useless code;test=develop
danleifeng Jun 16, 2022
1cffb3e
fix adam; test=develop
danleifeng Jun 20, 2022
9a6ff5d
fix adam; test=develop
danleifeng Jun 21, 2022
e3f9c28
fix adam; test=develop
danleifeng Jun 23, 2022
ef8a713
remove useless code;test=develop
danleifeng Jun 23, 2022
dda7284
fix adam; test=develop
danleifeng Jun 27, 2022
811d61f
remove useless code;test=develop
danleifeng Jun 27, 2022
106c2ad
remove useless code;test=develop
danleifeng Jun 28, 2022
7814b04
[gpups]refine adam aceessor;test=develop
danleifeng Jul 1, 2022
2b87eff
template;test=develop
danleifeng Jul 1, 2022
13aac52
fix adam accessor:template;test=develop
danleifeng Jul 6, 2022
7aa109f
fix; test=develop
danleifeng Jul 6, 2022
4076440
fix; test=develop
danleifeng Jul 6, 2022
996c619
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
danleifeng Jul 6, 2022
1792f91
add feature_value.cu;test=develop
danleifeng Jul 6, 2022
9f0ea75
format; test=develop
danleifeng Jul 7, 2022
680a028
format; test=develop
danleifeng Jul 7, 2022
4bf0be2
format; test=develop
danleifeng Jul 7, 2022
e0610b0
format; test=develop
danleifeng Jul 7, 2022
0719957
format; test=develop
danleifeng Jul 7, 2022
85fab75
format; test=develop
danleifeng Jul 7, 2022
1e99bbe
add ut; test=develop
danleifeng Jul 7, 2022
f6eb220
add ut; test=develop
danleifeng Jul 19, 2022
e49041c
add ut; test=develop
danleifeng Jul 19, 2022
f90ea15
change cmakelist; test=develop
danleifeng Jul 19, 2022
62ec4cd
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
danleifeng Jul 19, 2022
be6a31e
change cmakelist; test=develop
danleifeng Jul 19, 2022
b82f4c8
change cmakelist; test=develop
danleifeng Jul 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions paddle/fluid/distributed/ps/table/ctr_dymf_accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ int CtrDymfAccessor::Initialize() {
_embedx_sgd_rule = CREATE_PSCORE_CLASS(SparseValueSGDRule, name);
_embedx_sgd_rule->LoadConfig(_config.embedx_sgd_param(),
_config.embedx_dim());
common_feature_value.optimizer_name = name;

common_feature_value.embed_sgd_dim = _embed_sgd_rule->Dim();
common_feature_value.embedx_dim = _config.embedx_dim();
Expand All @@ -42,7 +43,10 @@ int CtrDymfAccessor::Initialize() {
if (_config.ctr_accessor_param().show_scale()) {
_show_scale = true;
}
VLOG(0) << " INTO CtrDymfAccessor::Initialize()";
VLOG(0) << " INTO CtrDymfAccessor::Initialize(); embed_sgd_dim:"
<< common_feature_value.embed_sgd_dim
<< " embedx_dim:" << common_feature_value.embedx_dim
<< " embedx_sgd_dim:" << common_feature_value.embedx_sgd_dim;
InitAccessorInfo();
return 0;
}
Expand All @@ -53,9 +57,9 @@ void CtrDymfAccessor::InitAccessorInfo() {

auto embedx_dim = _config.embedx_dim();
VLOG(0) << "InitAccessorInfo embedx_dim:" << embedx_dim;
_accessor_info.select_dim = 3 + embedx_dim;
_accessor_info.select_dim = 4 + embedx_dim;
_accessor_info.select_size = _accessor_info.select_dim * sizeof(float);
_accessor_info.update_dim = 4 + embedx_dim;
_accessor_info.update_dim = 5 + embedx_dim;
_accessor_info.update_size = _accessor_info.update_dim * sizeof(float);
_accessor_info.mf_size =
(embedx_dim + common_feature_value.embedx_sgd_dim) * sizeof(float);
Expand Down Expand Up @@ -179,8 +183,10 @@ int32_t CtrDymfAccessor::Create(float** values, size_t num) {
value[common_feature_value.ClickIndex()] = 0;
value[common_feature_value.SlotIndex()] = -1;
value[common_feature_value.MfDimIndex()] = -1;
_embed_sgd_rule->InitValue(value + common_feature_value.EmbedWIndex(),
value + common_feature_value.EmbedG2SumIndex());
_embed_sgd_rule->InitValue(
value + common_feature_value.EmbedWIndex(),
value + common_feature_value.EmbedG2SumIndex(),
false); // adam embed init not zero, adagrad embed init zero
_embedx_sgd_rule->InitValue(value + common_feature_value.EmbedxWIndex(),
value + common_feature_value.EmbedxG2SumIndex(),
false);
Expand Down Expand Up @@ -293,22 +299,14 @@ std::string CtrDymfAccessor::ParseToString(const float* v, int param) {
i++) {
os << " " << v[i];
}
// os << " " << common_feature_value.Slot(const_cast<float*>(v)) << " "
// << common_feature_value.MfDim(const_cast<float*>(v));
auto show = common_feature_value.Show(const_cast<float*>(v));
auto click = common_feature_value.Click(const_cast<float*>(v));
auto score = ShowClickScore(show, click);
auto mf_dim = int(common_feature_value.MfDim(const_cast<float*>(v)));
if (score >= _config.embedx_threshold() &&
param > common_feature_value.EmbedxG2SumIndex()) {
// VLOG(1) << "common_feature_value.EmbedxG2SumIndex():"
// << common_feature_value.EmbedxG2SumIndex();
// VLOG(1) << "common_feature_value.EmbedxWIndex():"
// << common_feature_value.EmbedxWIndex();
// VLOG(1) << "common_feature_value.MfDim():"
// << common_feature_value.MfDim(const_cast<float*>(v));
for (auto i = common_feature_value.EmbedxG2SumIndex();
i < common_feature_value.EmbedxWIndex() +
common_feature_value.MfDim(const_cast<float*>(v));
i < common_feature_value.Dim(mf_dim);
++i) {
os << " " << v[i];
}
Expand Down
19 changes: 17 additions & 2 deletions paddle/fluid/distributed/ps/table/ctr_dymf_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,24 @@ class CtrDymfAccessor : public ValueAccessor {
int ClickIndex() { return ShowIndex() + 1; }
int EmbedWIndex() { return ClickIndex() + 1; }
int EmbedG2SumIndex() { return EmbedWIndex() + 1; }
int SlotIndex() { return EmbedG2SumIndex() + 1; }
int SlotIndex() { return EmbedG2SumIndex() + embed_sgd_dim; }
int MfDimIndex() { return SlotIndex() + 1; }
int EmbedxG2SumIndex() { return MfDimIndex() + 1; }
int EmbedxWIndex() { return EmbedxG2SumIndex() + 1; }
int EmbedxWIndex() { return EmbedxG2SumIndex() + embedx_sgd_dim; }

// 根据mf_dim计算的总长度
int Dim(int& mf_dim) {
int tmp_embedx_sgd_dim = 1;
if (optimizer_name == "SparseAdamSGDRule") { // adam
tmp_embedx_sgd_dim = mf_dim * 2 + 2;
} else if (optimizer_name == "SparseSharedAdamSGDRule") { // shared_adam
tmp_embedx_sgd_dim = 4;
}
return 7 + embed_sgd_dim + tmp_embedx_sgd_dim + mf_dim;
}

// 根据mf_dim计算的总byte数
int Size(int& mf_dim) { return (Dim(mf_dim)) * sizeof(float); }

float& UnseenDays(float* val) { return val[UnseenDaysIndex()]; }
float& DeltaScore(float* val) { return val[DeltaScoreIndex()]; }
Expand All @@ -73,6 +87,7 @@ class CtrDymfAccessor : public ValueAccessor {
int embed_sgd_dim;
int embedx_dim;
int embedx_sgd_dim;
std::string optimizer_name;
};

struct CtrDymfPushValue {
Expand Down
84 changes: 83 additions & 1 deletion paddle/fluid/distributed/ps/table/sparse_sgd_rule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ void SparseAdamSGDRule::UpdateValueWork(float* w,
float beta1_pow_ = *beta1_pow;
float beta2_pow_ = *beta2_pow;

// lr not change in one update
lr *= sqrt(1 - beta2_pow_) / (1 - beta1_pow_);
for (size_t i = 0; i < _embedding_dim; i++) {
// Calculation
Expand Down Expand Up @@ -252,5 +251,88 @@ void SparseAdamSGDRule::InitValueWork(float* value,
*(sgd + Beta1PowIndex()) = _beta1_decay_rate;
*(sgd + Beta2PowIndex()) = _beta2_decay_rate;
}

void SparseSharedAdamSGDRule::LoadConfig(
const SparseCommonSGDRuleParameter& param, size_t emb_dim) {
_embedding_dim = emb_dim;
auto adam_param = param.adam();
learning_rate_ = adam_param.learning_rate();
_initial_range = adam_param.initial_range();
_beta1_decay_rate = adam_param.beta1_decay_rate();
_beta2_decay_rate = adam_param.beta2_decay_rate();
_ada_epsilon = adam_param.ada_epsilon();
if (adam_param.weight_bounds_size() == 0) {
_min_bound = -std::numeric_limits<float>::max();
_max_bound = std::numeric_limits<float>::max();
} else {
CHECK(adam_param.weight_bounds_size() >= 2)
<< "invalid repeated size for weight_bounds:"
<< adam_param.weight_bounds_size();
_min_bound = adam_param.weight_bounds(0);
_max_bound = adam_param.weight_bounds(1);
}
}

void SparseSharedAdamSGDRule::UpdateValueWork(float* w,
float* sgd,
const float* grad,
float scale) {
float* gsum = sgd + GSumIndex();
float* g2sum = sgd + G2SumIndex();
float* beta1_pow = sgd + Beta1PowIndex();
float* beta2_pow = sgd + Beta2PowIndex();
const float* g = grad;

float lr = learning_rate_;
float beta1_pow_ = *beta1_pow;
float beta2_pow_ = *beta2_pow;
float gsum_ = *gsum;
float g2sum_ = *g2sum;

lr *= sqrt(1 - beta2_pow_) / (1 - beta1_pow_);
double sum_gsum = 0.0;
double sum_g2sum = 0.0;
for (int i = 0; i < _embedding_dim; i++) {
// Calculation
double new_gsum =
_beta1_decay_rate * gsum_ + (1 - _beta1_decay_rate) * g[i];
double new_g2sum =
_beta2_decay_rate * g2sum_ + (1 - _beta2_decay_rate) * g[i] * g[i];
w[i] = w[i] - lr * (new_gsum / (sqrt(new_g2sum) + _ada_epsilon));
BoundValue(w[i]);
sum_gsum += new_gsum;
sum_g2sum += new_g2sum;
}
// update beta_pow_decay
(*gsum) = sum_gsum / _embedding_dim;
(*g2sum) = sum_g2sum / _embedding_dim;
(*beta1_pow) *= _beta1_decay_rate;
(*beta2_pow) *= _beta2_decay_rate;
}

void SparseSharedAdamSGDRule::InitValueWork(float* value,
float* sgd,
bool zero_init) {
for (int i = 0; i < _embedding_dim; ++i) {
if (zero_init) {
value[i] = 0.0;
BoundValue(value[i]);
} else {
value[i] =
(local_uniform_real_distribution<double>()(local_random_engine()) *
2 -
1) *
_initial_range;
BoundValue(value[i]);
}
}
// init rule gsum and g2sum
for (int i = GSumIndex(); i < Beta1PowIndex(); i++) {
sgd[i] = 0.0;
}
// init beta1_pow and beta2_pow
*(sgd + Beta1PowIndex()) = _beta1_decay_rate;
*(sgd + Beta2PowIndex()) = _beta2_decay_rate;
}
} // namespace distributed
} // namespace paddle
23 changes: 23 additions & 0 deletions paddle/fluid/distributed/ps/table/sparse_sgd_rule.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,5 +144,28 @@ class SparseAdamSGDRule : public SparseValueSGDRule {
float _beta2_decay_rate;
float _ada_epsilon;
};

class SparseSharedAdamSGDRule : public SparseValueSGDRule {
public:
virtual void LoadConfig(const SparseCommonSGDRuleParameter& param,
size_t emb_dim);
virtual void UpdateValueWork(float* w,
float* sgd,
const float* push_value,
float scale);
virtual void InitValueWork(float* value, float* sgd, bool zero_init);
virtual size_t Dim() { return 4; }
size_t GSumIndex() { return 0; }
size_t G2SumIndex() { return GSumIndex() + 1; }
size_t Beta1PowIndex() { return G2SumIndex() + 1; }
size_t Beta2PowIndex() { return Beta1PowIndex() + 1; }

protected:
float learning_rate_;
float _beta1_decay_rate;
float _beta2_decay_rate;
float _ada_epsilon;
};

} // namespace distributed
} // namespace paddle
1 change: 1 addition & 0 deletions paddle/fluid/distributed/ps/table/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ REGISTER_PSCORE_CLASS(SparseValueSGDRule, StdAdaGradSGDRule);
REGISTER_PSCORE_CLASS(SparseValueSGDRule, SparseAdamSGDRule);
REGISTER_PSCORE_CLASS(SparseValueSGDRule, SparseNaiveSGDRule);
REGISTER_PSCORE_CLASS(SparseValueSGDRule, SparseAdaGradSGDRule);
REGISTER_PSCORE_CLASS(SparseValueSGDRule, SparseSharedAdamSGDRule);

int32_t TableManager::Initialize() {
static bool initialized = false;
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/distributed/ps/wrapper/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ cc_library(
op_registry
fs
shell
ps_gpu_wrapper
${RPC_DEPS})

target_link_libraries(fleet z)
45 changes: 26 additions & 19 deletions paddle/fluid/distributed/ps/wrapper/fleet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ limitations under the License. */

#include "paddle/fluid/distributed/ps/service/communicator/communicator.h"
#include "paddle/fluid/distributed/ps/table/table.h"
#include "paddle/fluid/distributed/ps/wrapper/fleet.h"
#if defined PADDLE_WITH_HETERPS && defined PADDLE_WITH_PSCORE
#include "paddle/fluid/framework/fleet/ps_gpu_wrapper.h"
#endif

namespace paddle {
namespace distributed {
Expand Down Expand Up @@ -129,6 +133,13 @@ void FleetWrapper::InitWorker(const std::string& dist_desc,
worker_ptr_ = std::shared_ptr<paddle::distributed::PSClient>(
paddle::distributed::PSClientFactory::Create(ps_param));
worker_ptr_->Configure(ps_param, dense_pull_regions, ps_env_, index);
#if defined PADDLE_WITH_HETERPS && defined PADDLE_WITH_PSCORE
VLOG(3) << "FleetWrapper::InitWorker InitializeGPUServer";
auto* accessor = worker_ptr_->GetTableAccessor(0);
auto ps_gpu_wrapper = paddle::framework::PSGPUWrapper::GetInstance();
ps_gpu_wrapper->InitializeGPUServer(ps_param);
ps_gpu_wrapper->SetTableAccessor(accessor);
#endif
}
} else {
VLOG(3) << "Client can be initialized only once";
Expand Down Expand Up @@ -525,24 +536,24 @@ void FleetWrapper::PushSparseFromTensorAsync(
int batch_size = -1;
bool batch_size_consist = true;
for (auto* input : *inputs) {
int cur_batch_size =
size_t cur_batch_size =
input->lod().size() ? input->lod()[0].size() - 1 : input->dims()[0];
if (batch_size == -1) {
batch_size = cur_batch_size;
} else if (batch_size != cur_batch_size) {
batch_size = int(cur_batch_size);
} else if (batch_size != int(cur_batch_size)) {
// CHECK(batch_size == cur_batch_size); // NOLINT
batch_size_consist = false;
break;
}
}
CHECK(batch_size > 0); // NOLINT

int show_size =
size_t show_size =
shows->lod().size() ? shows->lod()[0].size() - 1 : shows->dims()[0];
CHECK(show_size == batch_size || show_size == 1);
int clk_size =
CHECK(show_size == size_t(batch_size) || show_size == 1);
size_t clk_size =
clks->lod().size() ? clks->lod()[0].size() - 1 : clks->dims()[0];
CHECK(clk_size == batch_size || clk_size == 1);
CHECK(clk_size == size_t(batch_size) || clk_size == 1);

CHECK(outputs->size() == inputs->size());
std::vector<uint64_t> push_keys;
Expand Down Expand Up @@ -601,12 +612,10 @@ void FleetWrapper::PushSparseFromTensorAsync(
// in
// ctr_accessor.h
push_values.back()[0] = 2; // TODO(zhaocaibei123): slot
push_values.back()[1] = (static_cast<int>(i) >= show_size
? 1
: static_cast<float>(show_tensor[i]));
push_values.back()[2] = (static_cast<int>(i) >= clk_size
? 0
: static_cast<float>(clk_tensor[i]));
push_values.back()[1] =
(i >= show_size ? 1 : static_cast<float>(show_tensor[i]));
push_values.back()[2] =
(i >= clk_size ? 0 : static_cast<float>(clk_tensor[i]));
float* data = push_values.back().data() + 3;
memcpy(data, g + output_len, sizeof(float) * fea_dim);
}
Expand All @@ -630,12 +639,10 @@ void FleetWrapper::PushSparseFromTensorAsync(
// slot show clk grad... consistent with CtrCommonPushValue defined in
// ctr_accessor.h
push_values.back()[0] = 2; // TODO(zhaocaibei123): slot
push_values.back()[1] = (static_cast<int>(i) >= show_size
? 1
: static_cast<float>(show_tensor[i]));
push_values.back()[2] = (static_cast<int>(i) >= clk_size
? 0
: static_cast<float>(clk_tensor[i]));
push_values.back()[1] =
(i >= show_size ? 1 : static_cast<float>(show_tensor[i]));
push_values.back()[2] =
(i >= clk_size ? 0 : static_cast<float>(clk_tensor[i]));
float* data = push_values.back().data() + 3;
memcpy(data, g + output_len, sizeof(float) * fea_dim);
}
Expand Down
6 changes: 3 additions & 3 deletions paddle/fluid/framework/distributed_strategy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,14 @@ message TableParameter {

message TableAccessorParameter {
optional string accessor_class = 1;
optional SGDParameter embed_sgd_param = 2;
optional SGDParameter embedx_sgd_param = 3;
optional uint32 fea_dim = 4 [ default = 11 ]; // field size of one value
optional uint32 embedx_dim = 5 [ default = 8 ]; // embedx feature size
optional uint32 embedx_threshold = 6
[ default = 10 ]; // embedx feature create threshold
optional CtrAccessorParameter ctr_accessor_param = 7;
repeated TableAccessorSaveParameter table_accessor_save_param = 8;
optional SGDParameter embed_sgd_param = 10;
optional SGDParameter embedx_sgd_param = 11;
}

message SGDParameter {
Expand All @@ -228,7 +228,7 @@ message
repeated float weight_bounds = 4;
}

message SparseAdamSGDParameter { // SparseAdamSGDRule
message SparseAdamSGDParameter { // SparseAdamSGDRule | SparseSharedAdamSGDRule
optional double learning_rate = 1 [ default = 0.001 ];
optional double initial_range = 2 [ default = 0.0001 ];
optional double beta1_decay_rate = 3 [ default = 0.9 ];
Expand Down
15 changes: 11 additions & 4 deletions paddle/fluid/framework/fleet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,17 @@ endif()

if(WITH_HETERPS)
if(WITH_NCCL AND WITH_GPU)
nv_library(
ps_gpu_wrapper
SRCS ps_gpu_wrapper.cu ps_gpu_wrapper.cc
DEPS heter_ps gloo_wrapper ${BRPC_DEPS})
if(WITH_PSCORE)
nv_library(
ps_gpu_wrapper
SRCS ps_gpu_wrapper.cu ps_gpu_wrapper.cc
DEPS heter_ps gloo_wrapper ps_framework_proto ${BRPC_DEPS})
else()
nv_library(
ps_gpu_wrapper
SRCS ps_gpu_wrapper.cu ps_gpu_wrapper.cc
DEPS heter_ps gloo_wrapper ${BRPC_DEPS})
endif()
add_subdirectory(heter_ps)
elseif(WITH_XPU_KP)
xpu_library(
Expand Down
Loading