Skip to content

Commit

Permalink
add all2all commulication, fix gpu transfer (PaddlePaddle#67)
Browse files Browse the repository at this point in the history
1. add all2all communication
2. fix gpu 2-hop transfer communication of v100/a100
3. detect nvlink and nic topo automatically
  • Loading branch information
qingshui authored and root committed Nov 28, 2022
1 parent be42e5a commit 0611054
Show file tree
Hide file tree
Showing 33 changed files with 3,844 additions and 737 deletions.
85 changes: 85 additions & 0 deletions paddle/fluid/framework/barrier.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <semaphore.h>
#include <pthread.h>
#include "paddle/fluid/platform/enforce.h"

namespace paddle {
namespace framework {
class Barrier {
public:
explicit Barrier(int count = 1) {
CHECK(count >= 1);
CHECK(0 == pthread_barrier_init(&_barrier, NULL, count));
}
~Barrier() {
CHECK(0 == pthread_barrier_destroy(&_barrier));
}
void reset(int count) {
CHECK(count >= 1);
CHECK(0 == pthread_barrier_destroy(&_barrier));
CHECK(0 == pthread_barrier_init(&_barrier, NULL, count));
}
void wait() {
int err = pthread_barrier_wait(&_barrier);
CHECK(
(err = pthread_barrier_wait(&_barrier), err == 0
|| err == PTHREAD_BARRIER_SERIAL_THREAD));
}
private:
pthread_barrier_t _barrier;
};
// Call func(args...). If interrupted by signal, recall the function.
template<class FUNC, class ... ARGS>
auto ignore_signal_call(FUNC &&func,
ARGS &&... args) -> typename std::result_of<FUNC(ARGS...)>::type {
for (;;) {
auto err = func(args...);

if (err < 0 && errno == EINTR) {
LOG(INFO) << "Signal is caught. Ignored.";
continue;
}
return err;
}
}
class Semaphore {
public:
Semaphore() {
CHECK(0 == sem_init(&_sem, 0, 0));
}
~Semaphore() {
CHECK(0 == sem_destroy(&_sem));
}
void post() {
CHECK(0 == sem_post(&_sem));
}
void wait() {
CHECK(0 == ignore_signal_call(sem_wait, &_sem));
}
bool try_wait() {
int err = 0;
CHECK(
(err = ignore_signal_call(sem_trywait, &_sem), err == 0
|| errno == EAGAIN));
return err == 0;
}
private:
sem_t _sem;
};
}
}
8 changes: 5 additions & 3 deletions paddle/fluid/framework/data_feed.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1412,12 +1412,14 @@ int GraphDataGenerator::FillWalkBuf() {
return total_row_ != 0;
}

void GraphDataGenerator::SetFeedVec(std::vector<LoDTensor *> feed_vec) {
//void GraphDataGenerator::SetFeedVec(std::vector<LoDTensor *> feed_vec) {
void GraphDataGenerator::SetFeedVec(std::vector<phi::DenseTensor *> feed_vec) {
feed_vec_ = feed_vec;
}

void GraphDataGenerator::AllocResource(int thread_id,
std::vector<LoDTensor *> feed_vec) {
std::vector<phi::DenseTensor *> feed_vec) {
//std::vector<LoDTensor *> feed_vec) {
auto gpu_graph_ptr = GraphGpuWrapper::GetInstance();
gpuid_ = gpu_graph_ptr->device_id_mapping[thread_id];
thread_id_ = thread_id;
Expand Down Expand Up @@ -1566,7 +1568,7 @@ void GraphDataGenerator::SetConfig(
train_table_cap_ = graph_config.train_table_cap();
infer_table_cap_ = graph_config.infer_table_cap();
epoch_finish_ = false;
VLOG(0) << "Confirm GraphConfig, walk_degree : " << walk_degree_
VLOG(1) << "Confirm GraphConfig, walk_degree : " << walk_degree_
<< ", walk_len : " << walk_len_ << ", window : " << window_
<< ", once_sample_startid_len : " << once_sample_startid_len_
<< ", sample_times_one_chunk : " << repeat_time_
Expand Down
7 changes: 5 additions & 2 deletions paddle/fluid/framework/data_feed.h
Original file line number Diff line number Diff line change
Expand Up @@ -900,9 +900,11 @@ class GraphDataGenerator {
GraphDataGenerator() {}
virtual ~GraphDataGenerator() {}
void SetConfig(const paddle::framework::DataFeedDesc& data_feed_desc);
void AllocResource(int thread_id, std::vector<LoDTensor*> feed_vec);
//void AllocResource(int thread_id, std::vector<LoDTensor*> feed_vec);
void AllocResource(int thread_id, std::vector<phi::DenseTensor*> feed_vec);
void AllocTrainResource(int thread_id);
void SetFeedVec(std::vector<LoDTensor*> feed_vec);
//void SetFeedVec(std::vector<LoDTensor*> feed_vec);
void SetFeedVec(std::vector<phi::DenseTensor*> feed_vec);
int AcquireInstance(BufState* state);
int GenerateBatch();
int FillWalkBuf();
Expand Down Expand Up @@ -1233,6 +1235,7 @@ class DataFeed {
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
GraphDataGenerator gpu_graph_data_generator_;
#endif
bool train_mode_;
};

// PrivateQueueDataFeed is the base virtual class for ohther DataFeeds.
Expand Down
4 changes: 3 additions & 1 deletion paddle/fluid/framework/fleet/gloo_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,9 @@ void GlooWrapper::Init() {
}
#endif
is_initialized_ = true;
VLOG(3) << "gloo initialized done.";
VLOG(0) << "gloo initialized done, rank=" << rank_
<< ", size=" << size_
<< ", store_type=" << store_type_;
}

template std::vector<int64_t> GlooWrapper::AllReduce<int64_t>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,6 @@ class concurrent_unordered_map : public managed {
const key_type insert_key = x.first;

bool insert_success = false;

size_type counter = 0;
while (false == insert_success) {
if (counter++ >= hashtbl_size) {
Expand Down Expand Up @@ -578,23 +577,20 @@ class concurrent_unordered_map : public managed {
if (keys_equal(unused_key, old_key) || keys_equal(insert_key, old_key)) {
update_existing_value(existing_value, x, op);
insert_success = true;
if (m_enable_collision_stat) {
atomicAdd(&m_insert_times, 1);
}

if (local_count != NULL && keys_equal(unused_key, old_key)) {
atomicAdd(local_count, 1);
}
break;
}

if (m_enable_collision_stat) {
atomicAdd(&m_insert_collisions, 1);
}
current_index = (current_index + 1) % hashtbl_size;
current_hash_bucket = &(hashtbl_values[current_index]);
}

if (m_enable_collision_stat) {
atomicAdd(&m_insert_times, 1);
atomicAdd(&m_insert_collisions, uint64_t(counter + 1));
}

return iterator(
m_hashtbl_values, m_hashtbl_values + hashtbl_size, current_hash_bucket);
}
Expand Down Expand Up @@ -680,15 +676,13 @@ x.second );
begin_ptr = m_hashtbl_values + m_hashtbl_size;
break;
}
if (m_enable_collision_stat) {
atomicAdd(&m_query_collisions, 1);
}
hash_tbl_idx = (hash_tbl_idx + 1) % m_hashtbl_size;
++counter;
}

if (m_enable_collision_stat) {
atomicAdd(&m_query_times, 1);
atomicAdd(&m_query_collisions, (uint64_t)(counter + 1));
}

return const_iterator(
Expand Down
42 changes: 21 additions & 21 deletions paddle/fluid/framework/fleet/heter_ps/feature_value.cu
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ __global__ void PullDedupCopy(const size_t N,
int x = key2slot[i];
int y = i - slot_lens[x];

assert(slot_dims[x] == hidden);
float* dest_ptr = dest[x] + y * hidden;
// 0 key fill zero
if (total_keys[i] == 0) {
Expand All @@ -92,10 +91,11 @@ __global__ void PullDedupCopy(const size_t N,
*(dest_ptr + off) = src_ptr[accessor.EmbedWIndex()];
break;
default:
if (src_ptr[accessor.MfSizeIndex()] == 0) {
int embedx_id = off - 3;
if (embedx_id >= int(src_ptr[accessor.MfSizeIndex()])) {
*(dest_ptr + off) = 0;
} else {
*(dest_ptr + off) = src_ptr[accessor.EmbedxWIndex() + off - 3];
*(dest_ptr + off) = src_ptr[accessor.EmbedxWIndex() + embedx_id];
}
break;
}
Expand Down Expand Up @@ -127,10 +127,9 @@ __global__ void PushCopyWithPool(float* dest,
int y = i - (x ? len[low - 1] : 0);
float* cur = (float*)((char*)dest + i * grad_value_size); // NOLINT

cur[gpu_accessor.common_push_value.SlotIndex()] =
(float)slot_vector[x]; // NOLINT
cur[gpu_accessor.common_push_value.SlotIndex()] = float(slot_vector[x]);
int mf_dim = mf_dim_vector[x];
cur[gpu_accessor.common_push_value.MfDimIndex()] = mf_dim;
cur[gpu_accessor.common_push_value.MfDimIndex()] = float(mf_dim);

cur[gpu_accessor.common_push_value.ShowIndex()] =
*(src[x] + y * (mf_dim + 3));
Expand Down Expand Up @@ -176,9 +175,10 @@ __global__ void PushMergeCopyAtomic(const size_t N,
int mf_dim = slot_dims[x] - 3;
switch (off) {
case 0:
cur[accessor.SlotIndex()] = (float)slot_vector[x]; // NOLINT
cur[accessor.MfDimIndex()] = mf_dim;
phi::CudaAtomicAdd(&cur[accessor.ShowIndex()], *(ptr + off));
cur[accessor.SlotIndex()] = float(slot_vector[x]);
cur[accessor.MfDimIndex()] = float(mf_dim);
paddle::platform::CudaAtomicAdd(&cur[accessor.ShowIndex()],
*(ptr + off));
break;
case 1:
phi::CudaAtomicAdd(&cur[accessor.ClickIndex()], *(ptr + off));
Expand All @@ -189,11 +189,11 @@ __global__ void PushMergeCopyAtomic(const size_t N,
break;
default:
int embedx_idx = off - 3;
if (mf_dim < embedx_idx) {
return;
if (embedx_idx < mf_dim) {
paddle::platform::CudaAtomicAdd(
&cur[accessor.EmbedxGIndex() + embedx_idx],
*(ptr + off) * -1. * bs);
}
phi::CudaAtomicAdd(&cur[accessor.EmbedxGIndex() + embedx_idx],
*(ptr + off) * -1. * bs);
break;
}
}
Expand Down Expand Up @@ -232,8 +232,8 @@ __global__ void PushMergeCopy(const size_t N,
if (total_keys[i] == 0) {
switch (off) {
case 0:
cur[accessor.SlotIndex()] = 0;
cur[accessor.MfDimIndex()] = 0;
cur[accessor.SlotIndex()] = float(0);
cur[accessor.MfDimIndex()] = float(0);
cur[accessor.ShowIndex()] = 0.0;
break;
case 1:
Expand Down Expand Up @@ -261,8 +261,8 @@ __global__ void PushMergeCopy(const size_t N,

switch (off) {
case 0:
cur[accessor.SlotIndex()] = (float)slot_vector[x]; // NOLINT
cur[accessor.MfDimIndex()] = mf_dim;
cur[accessor.SlotIndex()] = float(slot_vector[x]);
cur[accessor.MfDimIndex()] = float(mf_dim);
SUM_GRAD_VALUE
cur[accessor.ShowIndex()] = val;
break;
Expand All @@ -276,12 +276,12 @@ __global__ void PushMergeCopy(const size_t N,
break;
default:
int embedx_idx = off - 3;
if (mf_dim < embedx_idx) {
if (embedx_idx < mf_dim) {
SUM_GRAD_VALUE
cur[accessor.EmbedxGIndex() + embedx_idx] = val * -1. * bs;
} else {
cur[accessor.EmbedxGIndex() + embedx_idx] = 0.0;
return;
}
SUM_GRAD_VALUE
cur[accessor.EmbedxGIndex() + embedx_idx] = val * -1. * bs;
break;
}
}
Expand Down
24 changes: 16 additions & 8 deletions paddle/fluid/framework/fleet/heter_ps/feature_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,10 @@ class CommonFeatureValueAccessor {
float mf_size
std::vector<float> embedx_w;
*/
__host__ __device__ static int Dim(int embedx_dim) {
return 4 + embedx_dim;
}
__host__ __device__ int Dim(int embedx_dim) { return 4 + embedx_dim; }
__host__ __device__ int DimSize(size_t dim) { return sizeof(float); }
__host__ __device__ int Size(int embedx_dim) {
return TYPEALIGN(8, Dim(embedx_dim) * sizeof(float));
return Dim(embedx_dim) * sizeof(float);
}
__host__ __device__ int ShowIndex() { return 0; }
__host__ __device__ int ClickIndex() { return 1; }
Expand All @@ -198,7 +196,7 @@ class CommonFeatureValueAccessor {
return sizeof(float);
}
__host__ __device__ int Size(int embedx_dim) {
return TYPEALIGN(8, Dim(embedx_dim) * sizeof(float));
return Dim(embedx_dim) * sizeof(float);
}
__host__ __device__ int SlotIndex() { return 0; }
__host__ __device__ int ShowIndex() {
Expand Down Expand Up @@ -420,13 +418,23 @@ class CommonFeatureValueAccessor {
// set pull value real dim size
int mf_dim = int(src_val[common_feature_value.MfDimIndex()]);
dest_val[common_pull_value.MfSizeIndex()] = mf_dim;
// check
PADDLE_ENFORCE(
mf_dim <= mf_size, "mf_dim[%d] <= mf_size[%d]", mf_dim, mf_size);

int embedx_off = common_pull_value.EmbedxWIndex();
int value_off = common_feature_value.EmbedxWIndex();
for (int k = 0; k < mf_dim; ++k) {
dest_val[embedx_off + k] = src_val[value_off + k];
}
}
// set zero value by infer
__host__ __device__ void PullZeroValue(float* dest_val) {
dest_val[common_pull_value.ShowIndex()] = 0.0;
dest_val[common_pull_value.ClickIndex()] = 0.0;
dest_val[common_pull_value.EmbedWIndex()] = 0.0;
dest_val[common_pull_value.MfSizeIndex()] = 0;
}

// dy_mf_fill_shard_grads_kernel,update_one 阶段 gpukernel
// 中从src_val赋值给dest_val
Expand Down Expand Up @@ -507,10 +515,10 @@ class CommonFeatureValueAccessor {
*(dest_val + common_pull_value.EmbedWIndex()) =
src_val[common_feature_value.EmbedWIndex()];
}

if (src_val[common_feature_value.MfSizeIndex()] == 0 || *key == 0) {
int mf_size = int(src_val[common_feature_value.MfSizeIndex()]);
if (mf_size == 0 || *key == 0) {
for (int j = 0; j < mf_dim; j++) {
*(dest_val + common_pull_value.EmbedxWIndex() + j) = 0;
*(dest_val + 3 + j) = 0;
}
} else {
for (int j = 0; j < mf_dim; j++) {
Expand Down
8 changes: 8 additions & 0 deletions paddle/fluid/framework/fleet/heter_ps/gpu_graph_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <stdio.h>
#include "paddle/fluid/platform/enforce.h"

DECLARE_bool(gpugraph_debug_gpu_memory);

namespace paddle {
namespace framework {

Expand Down Expand Up @@ -75,6 +77,9 @@ class CudaDeviceRestorer {
};

inline void debug_gpu_memory_info(int gpu_id, const char* desc) {
if (!FLAGS_gpugraph_debug_gpu_memory) {
return;
}
CudaDeviceRestorer r;

size_t avail{0};
Expand All @@ -93,6 +98,9 @@ inline void debug_gpu_memory_info(int gpu_id, const char* desc) {
}

inline void debug_gpu_memory_info(const char* desc) {
if (!FLAGS_gpugraph_debug_gpu_memory) {
return;
}
CudaDeviceRestorer r;

int device_num = 0;
Expand Down
Loading

0 comments on commit 0611054

Please sign in to comment.