Skip to content

Commit

Permalink
[GpuGraph] cherry pick var slot feature && fix load multi path node (P…
Browse files Browse the repository at this point in the history
…addlePaddle#136)

* optimize mem in  uniq slot feature

* cherry-pick var slot_feature

Co-authored-by: huwei02 <[email protected]>
  • Loading branch information
Thunderbrook and huwei02 authored Oct 18, 2022
1 parent baf678f commit 4e8b290
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 75 deletions.
72 changes: 50 additions & 22 deletions paddle/fluid/distributed/ps/table/common_graph_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea(
std::vector<uint64_t> node_id_array[task_pool_size_];
std::vector<paddle::framework::GpuPsFeaInfo>
node_fea_info_array[task_pool_size_];
slot_feature_num_map_.resize(slot_num);
for (int k = 0; k < slot_num; ++k) {
slot_feature_num_map_[k] = 0;
}

for (size_t i = 0; i < bags.size(); i++) {
if (bags[i].size() > 0) {
tasks.push_back(_shards_task_pool[i]->enqueue([&, i, this]() -> int {
Expand All @@ -94,13 +99,17 @@ paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea(
int total_feature_size = 0;
for (int k = 0; k < slot_num; ++k) {
v->get_feature_ids(k, &feature_ids);
total_feature_size += feature_ids.size();
int feature_ids_size = feature_ids.size();
if (slot_feature_num_map_[k] < feature_ids_size) {
slot_feature_num_map_[k] = feature_ids_size;
}
total_feature_size += feature_ids_size;
if (!feature_ids.empty()) {
feature_array[i].insert(feature_array[i].end(),
feature_ids.begin(),
feature_ids.end());
slot_id_array[i].insert(
slot_id_array[i].end(), feature_ids.size(), k);
slot_id_array[i].end(), feature_ids_size, k);
}
}
x.feature_size = total_feature_size;
Expand All @@ -113,6 +122,13 @@ paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea(
}
}
for (int i = 0; i < (int)tasks.size(); i++) tasks[i].get();

std::stringstream ss;
for (int k = 0; k < slot_num; ++k) {
ss << slot_feature_num_map_[k] << " ";
}
VLOG(0) << "slot_feature_num_map: " << ss.str();

paddle::framework::GpuPsCommGraphFea res;
uint64_t tot_len = 0;
for (int i = 0; i < task_pool_size_; i++) {
Expand Down Expand Up @@ -1210,23 +1226,27 @@ int32_t GraphTable::parse_node_and_load(std::string ntype2files,
VLOG(0) << "parse node type and nodedir failed!";
return -1;
}

std::string delim = ";";
std::string npath = node_to_nodedir[ntypes[0]];
auto npath_list = paddle::framework::localfs_list(npath);
std::string npath_str;
if (part_num > 0 && part_num < (int)npath_list.size()) {
std::vector<std::string> sub_npath_list(
npath_list.begin(), npath_list.begin() + part_num);
npath_str = paddle::string::join_strings(sub_npath_list, delim);
} else {
npath_str = paddle::string::join_strings(npath_list, delim);
}
if (ntypes.size() == 0) {
VLOG(0) << "node_type not specified, nothing will be loaded ";
return 0;
}

std::string delim = ";";
std::vector<std::string> type_npath_strs;
for (size_t i = 0; i <ntypes.size(); i++) {
std::string npath = node_to_nodedir[ntypes[i]];
auto npath_list = paddle::framework::localfs_list(npath);
std::string type_npath_str;
if (part_num > 0 && part_num < (int)npath_list.size()) {
std::vector<std::string> sub_npath_list(
npath_list.begin(), npath_list.begin() + part_num);
type_npath_str = paddle::string::join_strings(sub_npath_list, delim);
} else {
type_npath_str = paddle::string::join_strings(npath_list, delim);
}
type_npath_strs.push_back(type_npath_str);
}
std::string npath_str = paddle::string::join_strings(type_npath_strs, delim);
if (FLAGS_graph_load_in_parallel) {
this->load_nodes(npath_str, "");
} else {
Expand Down Expand Up @@ -1303,7 +1323,6 @@ int32_t GraphTable::load_node_and_edge_file(std::string etype2files,
VLOG(0) << "node_type not specified, nothing will be loaded ";
return 0;
}

if (FLAGS_graph_load_in_parallel) {
this->load_nodes(npath_str, "");
} else {
Expand Down Expand Up @@ -1985,9 +2004,14 @@ int GraphTable::parse_feature(int idx,
// "")
thread_local std::vector<paddle::string::str_ptr> fields;
fields.clear();
const char c = feature_separator_.at(0);
char c = slot_feature_separator_.at(0);
paddle::string::split_string_ptr(feat_str, len, c, &fields);

thread_local std::vector<paddle::string::str_ptr> fea_fields;
fea_fields.clear();
c = feature_separator_.at(0);
paddle::string::split_string_ptr(fields[1].ptr, fields[1].len, c, &fea_fields);

std::string name = fields[0].to_string();
auto it = feat_id_map[idx].find(name);
if (it != feat_id_map[idx].end()) {
Expand All @@ -1998,26 +2022,26 @@ int GraphTable::parse_feature(int idx,
// string_vector_2_string(fields.begin() + 1, fields.end(), ' ',
// fea_ptr);
FeatureNode::parse_value_to_bytes<uint64_t>(
fields.begin() + 1, fields.end(), fea_ptr);
fea_fields.begin(), fea_fields.end(), fea_ptr);
return 0;
} else if (dtype == "string") {
string_vector_2_string(fields.begin() + 1, fields.end(), ' ', fea_ptr);
string_vector_2_string(fea_fields.begin(), fea_fields.end(), ' ', fea_ptr);
return 0;
} else if (dtype == "float32") {
FeatureNode::parse_value_to_bytes<float>(
fields.begin() + 1, fields.end(), fea_ptr);
fea_fields.begin(), fea_fields.end(), fea_ptr);
return 0;
} else if (dtype == "float64") {
FeatureNode::parse_value_to_bytes<double>(
fields.begin() + 1, fields.end(), fea_ptr);
fea_fields.begin(), fea_fields.end(), fea_ptr);
return 0;
} else if (dtype == "int32") {
FeatureNode::parse_value_to_bytes<int32_t>(
fields.begin() + 1, fields.end(), fea_ptr);
fea_fields.begin(), fea_fields.end(), fea_ptr);
return 0;
} else if (dtype == "int64") {
FeatureNode::parse_value_to_bytes<uint64_t>(
fields.begin() + 1, fields.end(), fea_ptr);
fea_fields.begin(), fea_fields.end(), fea_ptr);
return 0;
}
} else {
Expand Down Expand Up @@ -2254,6 +2278,10 @@ void GraphTable::set_feature_separator(const std::string &ch) {
feature_separator_ = ch;
}

void GraphTable::set_slot_feature_separator(const std::string &ch) {
slot_feature_separator_ = ch;
}

int32_t GraphTable::get_server_index_by_id(uint64_t id) {
return id % shard_num / shard_num_per_server;
}
Expand Down
4 changes: 4 additions & 0 deletions paddle/fluid/distributed/ps/table/common_graph_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,7 @@ class GraphTable : public Table {
int32_t make_complementary_graph(int idx, int64_t byte_size);
int32_t dump_edges_to_ssd(int idx);
int32_t get_partition_num(int idx) { return partitions[idx].size(); }
std::vector<int> slot_feature_num_map() const { return slot_feature_num_map_; }
std::vector<uint64_t> get_partition(int idx, int index) {
if (idx >= (int)partitions.size() || index >= (int)partitions[idx].size())
return std::vector<uint64_t>();
Expand All @@ -705,6 +706,7 @@ class GraphTable : public Table {
#endif
virtual int32_t add_comm_edge(int idx, uint64_t src_id, uint64_t dst_id);
virtual int32_t build_sampler(int idx, std::string sample_type = "random");
void set_slot_feature_separator(const std::string &ch);
void set_feature_separator(const std::string &ch);

void build_graph_total_keys();
Expand Down Expand Up @@ -751,7 +753,9 @@ class GraphTable : public Table {
// std::shared_ptr<GraphSampler> graph_sampler;
// REGISTER_GRAPH_FRIEND_CLASS(2, CompleteGraphSampler, BasicBfsGraphSampler)
#endif
std::string slot_feature_separator_ = std::string(" ");
std::string feature_separator_ = std::string(" ");
std::vector<int> slot_feature_num_map_;
};

/*
Expand Down
92 changes: 67 additions & 25 deletions paddle/fluid/framework/data_feed.cu
Original file line number Diff line number Diff line change
Expand Up @@ -325,22 +325,29 @@ __global__ void GraphFillSlotKernel(uint64_t *id_tensor,
uint64_t *feature_buf,
int len,
int total_ins,
int slot_num) {
int slot_num,
int* slot_feature_num_map,
int fea_num_per_node,
int* actual_slot_id_map,
int* fea_offset_map) {
CUDA_KERNEL_LOOP(idx, len) {
int slot_idx = idx / total_ins;
int fea_idx = idx / total_ins;
int ins_idx = idx % total_ins;
((uint64_t *)(id_tensor[slot_idx]))[ins_idx] =
feature_buf[ins_idx * slot_num + slot_idx];
int actual_slot_id = actual_slot_id_map[fea_idx];
int fea_offset = fea_offset_map[fea_idx];
((uint64_t *)(id_tensor[actual_slot_id]))[ins_idx * slot_feature_num_map[actual_slot_id] + fea_offset]
= feature_buf[ins_idx * fea_num_per_node + fea_idx];
}
}

__global__ void GraphFillSlotLodKernelOpt(uint64_t *id_tensor,
int len,
int total_ins) {
int total_ins,
int* slot_feature_num_map) {
CUDA_KERNEL_LOOP(idx, len) {
int slot_idx = idx / total_ins;
int ins_idx = idx % total_ins;
((uint64_t *)(id_tensor[slot_idx]))[ins_idx] = ins_idx;
((uint64_t *)(id_tensor[slot_idx]))[ins_idx] = ins_idx * slot_feature_num_map[slot_idx];
}
}

Expand Down Expand Up @@ -396,7 +403,7 @@ int GraphDataGenerator::FillGraphSlotFeature(int total_instance,
int64_t *slot_lod_tensor_ptr_[slot_num_];
for (int i = 0; i < slot_num_; ++i) {
slot_tensor_ptr_[i] = feed_vec_[3 + 2 * i]->mutable_data<int64_t>(
{total_instance, 1}, this->place_);
{total_instance * h_slot_feature_num_map_[i], 1}, this->place_);
slot_lod_tensor_ptr_[i] = feed_vec_[3 + 2 * i + 1]->mutable_data<int64_t>(
{total_instance + 1}, this->place_);
}
Expand All @@ -422,43 +429,48 @@ int GraphDataGenerator::FillGraphSlotFeature(int total_instance,
train_stream_);
uint64_t *feature_buf = reinterpret_cast<uint64_t *>(d_feature_buf_->ptr());
FillFeatureBuf(ins_cursor, feature_buf, total_instance);
GraphFillSlotKernel<<<GET_BLOCKS(total_instance * slot_num_),
GraphFillSlotKernel<<<GET_BLOCKS(total_instance * fea_num_per_node_),
CUDA_NUM_THREADS,
0,
train_stream_>>>((uint64_t *)d_slot_tensor_ptr_->ptr(),
feature_buf,
total_instance * slot_num_,
total_instance * fea_num_per_node_,
total_instance,
slot_num_);
slot_num_,
(int*)d_slot_feature_num_map_->ptr(),
fea_num_per_node_,
(int*)d_actual_slot_id_map_->ptr(),
(int*)d_fea_offset_map_->ptr());
GraphFillSlotLodKernelOpt<<<GET_BLOCKS((total_instance + 1) * slot_num_),
CUDA_NUM_THREADS,
0,
train_stream_>>>(
(uint64_t *)d_slot_lod_tensor_ptr_->ptr(),
(total_instance + 1) * slot_num_,
total_instance + 1);
total_instance + 1,
(int*)d_slot_feature_num_map_->ptr());
if (debug_mode_) {
uint64_t h_walk[total_instance];
cudaMemcpy(h_walk,
ins_cursor,
total_instance * sizeof(uint64_t),
cudaMemcpyDeviceToHost);
uint64_t h_feature[total_instance * slot_num_];
uint64_t h_feature[total_instance * slot_num_ * fea_num_per_node_];
cudaMemcpy(h_feature,
feature_buf,
total_instance * slot_num_ * sizeof(uint64_t),
total_instance * fea_num_per_node_ * slot_num_ * sizeof(uint64_t),
cudaMemcpyDeviceToHost);
for (int i = 0; i < total_instance; ++i) {
std::stringstream ss;
for (int j = 0; j < slot_num_; ++j) {
ss << h_feature[i * slot_num_ + j] << " ";
for (int j = 0; j < fea_num_per_node_; ++j) {
ss << h_feature[i * fea_num_per_node_ + j] << " ";
}
VLOG(2) << "aft FillFeatureBuf, gpu[" << gpuid_ << "] walk[" << i
<< "] = " << (uint64_t)h_walk[i] << " feature[" << i * slot_num_
<< ".." << (i + 1) * slot_num_ << "] = " << ss.str();
<< "] = " << (uint64_t)h_walk[i] << " feature[" << i * fea_num_per_node_
<< ".." << (i + 1) * fea_num_per_node_ << "] = " << ss.str();
}

uint64_t h_slot_tensor[slot_num_][total_instance];
uint64_t h_slot_tensor[fea_num_per_node_][total_instance];
uint64_t h_slot_lod_tensor[slot_num_][total_instance + 1];
for (int i = 0; i < slot_num_; ++i) {
cudaMemcpy(h_slot_tensor[i],
Expand Down Expand Up @@ -593,7 +605,6 @@ int GraphDataGenerator::GenerateBatch() {
cudaStreamSynchronize(train_stream_);
if (!gpu_graph_training_) return 1;
ins_buf_pair_len_ -= total_instance / 2;

return 1;
}

Expand Down Expand Up @@ -833,7 +844,8 @@ int GraphDataGenerator::FillFeatureBuf(uint64_t *d_walk,

auto gpu_graph_ptr = GraphGpuWrapper::GetInstance();
int ret = gpu_graph_ptr->get_feature_of_nodes(
gpuid_, d_walk, d_feature, key_num, slot_num_);
gpuid_, d_walk, d_feature, key_num, slot_num_,
(int*)d_slot_feature_num_map_->ptr(), fea_num_per_node_);
return ret;
}

Expand All @@ -847,7 +859,9 @@ int GraphDataGenerator::FillFeatureBuf(
(uint64_t *)d_walk->ptr(),
(uint64_t *)d_feature->ptr(),
buf_size_,
slot_num_);
slot_num_,
(int*)d_slot_feature_num_map_->ptr(),
fea_num_per_node_);
return ret;
}

Expand Down Expand Up @@ -1283,7 +1297,7 @@ int GraphDataGenerator::FillWalkBuf() {
size_t batch = 0;
d_feature_list_ = memory::AllocShared(
place_,
once_sample_startid_len_ * slot_num_ * sizeof(uint64_t),
once_sample_startid_len_ * fea_num_per_node_ * sizeof(uint64_t),
phi::Stream(reinterpret_cast<phi::StreamId>(sample_stream_)));
uint64_t *d_feature_list_ptr =
reinterpret_cast<uint64_t *>(d_feature_list_->ptr());
Expand All @@ -1296,9 +1310,11 @@ int GraphDataGenerator::FillWalkBuf() {
d_uniq_node_ptr + cursor,
d_feature_list_ptr,
batch,
slot_num_);
slot_num_,
(int*)d_slot_feature_num_map_->ptr(),
fea_num_per_node_);
if (InsertTable(
d_feature_list_ptr, slot_num_ * batch, d_uniq_fea_num)) {
d_feature_list_ptr, fea_num_per_node_ * batch, d_uniq_fea_num)) {
CopyFeaFromTable(d_uniq_fea_num);
table_->clear(sample_stream_);
cudaMemsetAsync(
Expand Down Expand Up @@ -1368,6 +1384,32 @@ void GraphDataGenerator::AllocResource(int thread_id,
h_device_keys_len_.push_back(h_graph_all_type_keys_len[i][thread_id]);
}
VLOG(2) << "h_device_keys size: " << h_device_keys_len_.size();

h_slot_feature_num_map_ = gpu_graph_ptr->slot_feature_num_map();
fea_num_per_node_ = 0;
for (int i = 0; i < slot_num_; ++i) {
fea_num_per_node_ += h_slot_feature_num_map_[i];
}
std::vector<int> h_actual_slot_id_map, h_fea_offset_map;
h_actual_slot_id_map.resize(fea_num_per_node_);
h_fea_offset_map.resize(fea_num_per_node_);
for (int slot_id = 0, fea_idx = 0; slot_id < slot_num_; ++slot_id) {
for (int j = 0; j < h_slot_feature_num_map_[slot_id]; ++j, ++fea_idx) {
h_actual_slot_id_map[fea_idx] = slot_id;
h_fea_offset_map[fea_idx] = j;
}
}

d_slot_feature_num_map_ = memory::Alloc(place_, slot_num_ * sizeof(int));
cudaMemcpy(d_slot_feature_num_map_->ptr(), h_slot_feature_num_map_.data(),
sizeof(int) * slot_num_, cudaMemcpyHostToDevice);
d_actual_slot_id_map_ = memory::Alloc(place_, fea_num_per_node_ * sizeof(int));
cudaMemcpy(d_actual_slot_id_map_->ptr(), h_actual_slot_id_map.data(),
sizeof(int) * fea_num_per_node_, cudaMemcpyHostToDevice);
d_fea_offset_map_ = memory::Alloc(place_, fea_num_per_node_ * sizeof(int));
cudaMemcpy(d_fea_offset_map_->ptr(), h_fea_offset_map.data(),
sizeof(int) * fea_num_per_node_, cudaMemcpyHostToDevice);

size_t once_max_sample_keynum = walk_degree_ * once_sample_startid_len_;
d_prefix_sum_ = memory::AllocShared(
place_,
Expand Down Expand Up @@ -1429,7 +1471,7 @@ void GraphDataGenerator::AllocResource(int thread_id,
memory::AllocShared(place_, (batch_size_ * 2 * 2) * sizeof(uint64_t));
if (slot_num_ > 0) {
d_feature_buf_ = memory::AllocShared(
place_, (batch_size_ * 2 * 2) * slot_num_ * sizeof(uint64_t));
place_, (batch_size_ * 2 * 2) * fea_num_per_node_ * sizeof(uint64_t));
}
d_pair_num_ = memory::AllocShared(place_, sizeof(int));

Expand Down
Loading

0 comments on commit 4e8b290

Please sign in to comment.