Skip to content

Commit

Permalink
fix infer batch num in multi node (PaddlePaddle#215)
Browse files Browse the repository at this point in the history
  • Loading branch information
miaoli06 authored Feb 20, 2023
1 parent 4628a15 commit 7ab8d80
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 1 deletion.
21 changes: 20 additions & 1 deletion paddle/fluid/framework/data_feed.h
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,14 @@ class GraphDataGenerator {
uint64_t CopyUniqueNodes();
int GetPathNum() { return total_row_; }
void ResetPathNum() { total_row_ = 0; }
int GetGraphBatchsize() {return batch_size_;};
void SetNewBatchsize(int batch_num) {
if (!gpu_graph_training_ && !sage_mode_) {
batch_size_ = (total_row_ + batch_num - 1) / batch_num;
} else {
return;
}
}
void ResetEpochFinish() { epoch_finish_ = false; }
void ClearSampleState();
void DumpWalkPath(std::string dump_path, size_t dump_rate);
Expand Down Expand Up @@ -1166,7 +1174,18 @@ class DataFeed {
virtual const std::vector<std::string>& GetInsContentVec() const {
return ins_content_vec_;
}
virtual int GetCurBatchSize() { return batch_size_; }
virtual int GetCurBatchSize() {
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
return gpu_graph_data_generator_.GetGraphBatchsize();
#else
return batch_size_;
#endif
}
virtual void SetNewBatchsize(int batch_num) {
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
gpu_graph_data_generator_.SetNewBatchsize(batch_num);
#endif
}
virtual int GetGraphPathNum() {
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
return gpu_graph_data_generator_.GetPathNum();
Expand Down
37 changes: 37 additions & 0 deletions paddle/fluid/framework/data_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1931,8 +1931,45 @@ void SlotRecordDataset::PrepareTrain() {
return;
}

void SlotRecordDataset::DynamicAdjustBatchNum() {
VLOG(3) << "dynamic adjust batch num of graph in multi node";
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
if (gpu_graph_mode_) {
int thread_max_batch_num = 0;
for (size_t i = 0; i < readers_.size(); i++) {
int batch_size = readers_[i]->GetCurBatchSize();
int64_t ins_num = readers_[i]->GetGraphPathNum();
int batch_num = (ins_num + batch_size - 1) / batch_size;
if (batch_num > thread_max_batch_num) {
thread_max_batch_num = batch_num;
}
VLOG(3) << "ins num:" << ins_num << ", batch size:"
<< batch_size << ", batch_num:" << thread_max_batch_num;
}
#ifdef PADDLE_WITH_GLOO
auto gloo_wrapper = paddle::framework::GlooWrapper::GetInstance();
if (gloo_wrapper->Size() > 1) {
if (!gloo_wrapper->IsInitialized()) {
VLOG(0) << "GLOO is not inited";
gloo_wrapper->Init();
}
std::vector<int> thread_batch_num_vec(1, thread_max_batch_num);
auto thread_max_batch_num_vec =
gloo_wrapper->AllReduce(thread_batch_num_vec, "max");
thread_max_batch_num = thread_max_batch_num_vec[0];
VLOG(3) << "thread max batch num:" << thread_max_batch_num;
for (size_t i = 0; i < readers_.size(); i++) {
readers_[i]->SetNewBatchsize(thread_max_batch_num);
}
}
#endif
}
#endif
}

void SlotRecordDataset::DynamicAdjustReadersNum(int thread_num) {
if (thread_num_ == thread_num) {
DynamicAdjustBatchNum();
VLOG(3) << "DatasetImpl<T>::DynamicAdjustReadersNum thread_num_="
<< thread_num_ << ", thread_num_=thread_num, no need to adjust";
return;
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/framework/data_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ class SlotRecordDataset : public DatasetImpl<SlotRecord> {
bool discard_remaining_ins);
virtual void PrepareTrain();
virtual void DynamicAdjustReadersNum(int thread_num);
void DynamicAdjustBatchNum();

protected:
bool enable_heterps_ = true;
Expand Down

0 comments on commit 7ab8d80

Please sign in to comment.