Skip to content

Commit

Permalink
Merge pull request #32 from qingshui/paddlebox
Browse files Browse the repository at this point in the history
slotpool  optimize ,  fix archive dio read and write,  add gperftools signal, add tensor resize by dim
  • Loading branch information
qingshui authored Mar 3, 2022
2 parents 77c533b + e1f606a commit 9ed7577
Show file tree
Hide file tree
Showing 8 changed files with 212 additions and 164 deletions.
5 changes: 3 additions & 2 deletions cmake/cuda.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ if (WITH_NV_JETSON)
set(paddle_known_gpu_archs9 "53 62")
set(paddle_known_gpu_archs10 "53 62 72")
else()
set(paddle_known_gpu_archs "30 35 50 52 60 61 70")
set(paddle_known_gpu_archs "30 35 50 52 60 61 70 75 80 86")
set(paddle_known_gpu_archs7 "30 35 50 52")
set(paddle_known_gpu_archs8 "30 35 50 52 60 61")
set(paddle_known_gpu_archs9 "30 35 50 52 60 61 70")
set(paddle_known_gpu_archs10 "30 35 50 52 60 61 70 75")
set(paddle_known_gpu_archs11 "52 60 61 70 75 80")
set(paddle_known_gpu_archs11 "52 60 61 70 75 80 86")
endif()


######################################################################################
# A function for automatic detection of GPUs installed (if autodetection is enabled)
# Usage:
Expand Down
44 changes: 29 additions & 15 deletions paddle/fluid/framework/data_feed.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2029,7 +2029,8 @@ static const int PAGE_BLOCK_SIZE = 4096;
static const int INT_BYTES = sizeof(int);
BinaryArchiveWriter::BinaryArchiveWriter() : fd_(-1) {
capacity_ = MAX_FILE_BUFF + 64 * 1024;
buff_ = reinterpret_cast<char*>(malloc(capacity_));
CHECK_EQ(0, posix_memalign(reinterpret_cast<void**>(&buff_), PAGE_BLOCK_SIZE,
capacity_));
}
BinaryArchiveWriter::~BinaryArchiveWriter() {
close();
Expand All @@ -2052,7 +2053,7 @@ bool BinaryArchiveWriter::open(const std::string& path) {
bool BinaryArchiveWriter::write(const SlotRecord& rec) {
thread_local BinaryArchive ar;
mutex_.lock();
ar.SetWriteBuffer(buff_ + woffset_, capacity_ - woffset_, nullptr);
ar.SetWriteBuffer(&buff_[woffset_], capacity_ - woffset_, nullptr);
ar << rec;
woffset_ += ar.Length();
if (woffset_ < MAX_FILE_BUFF) {
Expand All @@ -2069,9 +2070,9 @@ bool BinaryArchiveWriter::write(const SlotRecord& rec) {
int left = (woffset_ % PAGE_BLOCK_SIZE);
int write_len = (woffset_ - left);
int ret = ::write(fd_, buff_, write_len);
memmove(buff_, buff_ + write_len, left);
memmove(buff_, &buff_[write_len], left);
woffset_ = left + INT_BYTES;
head_ = buff_ + left;
head_ = &buff_[left];
mutex_.unlock();

return (ret == write_len);
Expand Down Expand Up @@ -2108,7 +2109,8 @@ class BinaryArchiveReader {
public:
BinaryArchiveReader() {
capacity_ = MAX_FILE_BUFF + 64 * 1024;
buff_ = reinterpret_cast<char*>(malloc(capacity_));
CHECK_EQ(0, posix_memalign(reinterpret_cast<void**>(&buff_),
PAGE_BLOCK_SIZE, capacity_));
}
~BinaryArchiveReader() {
if (buff_ != nullptr) {
Expand All @@ -2117,7 +2119,8 @@ class BinaryArchiveReader {
}
}
bool open(const std::string& path) {
fd_ = ::open(path.c_str(), O_RDONLY);
// dio read
fd_ = ::open(path.c_str(), O_RDONLY | O_DIRECT);
if (fd_ < 0) {
VLOG(0) << "open [" << path << "] failed";
return false;
Expand All @@ -2131,12 +2134,13 @@ class BinaryArchiveReader {
int body_len = 0;
int left_len = 0;
int need_len = 0;
int buff_off = 0;
char* ptr = buff_;

BinaryArchive ar;
while ((ret = ::read(fd_, ptr, capacity_ - left_len)) > 0) {
while ((ret = ::read(fd_, ptr, (capacity_ - left_len - buff_off))) > 0) {
left_len += ret;
ptr = &buff_[0];
ptr = &buff_[buff_off];
body_len = *(reinterpret_cast<int*>(ptr));
if (body_len <= 0) {
break;
Expand All @@ -2162,10 +2166,19 @@ class BinaryArchiveReader {
need_len = body_len + INT_BYTES;
}
if (left_len > 0) {
memmove(&buff_[0], ptr, left_len);
ptr = &buff_[0] + left_len;
int align_bytes = left_len % PAGE_BLOCK_SIZE;
if (align_bytes == 0) {
memmove(&buff_[0], ptr, left_len);
ptr = &buff_[left_len];
buff_off = 0;
} else {
buff_off = PAGE_BLOCK_SIZE - align_bytes;
memmove(&buff_[buff_off], ptr, left_len);
ptr = &buff_[buff_off + left_len];
}
} else {
ptr = &buff_[0];
buff_off = 0;
}
}

Expand Down Expand Up @@ -2656,19 +2669,20 @@ void SlotPaddleBoxDataFeed::BuildSlotBatchGPU(const int ins_num) {
// h_tensor_ptrs[j] = feed->mutable_data<int64_t>(this->place_);
h_tensor_ptrs[j] = feed->data<int64_t>();
}

if (info.dense) {
if (info.inductive_shape_index != -1) {
info.local_shape[info.inductive_shape_index] =
total_instance / info.total_dims_without_inductive;
}
feed->Resize(framework::make_ddim(info.local_shape));
// feed->Resize(framework::make_ddim(info.local_shape));
feed->Resize(info.local_shape);
} else {
LoD& lod = (*feed->mutable_lod());
lod.resize(1);
lod[0].resize(offset_cols_size);
memcpy(lod[0].MutableData(platform::CPUPlace()), off_start_ptr,
offset_cols_size * sizeof(size_t));
// lod[0].resize(offset_cols_size);
// memcpy(lod[0].MutableData(platform::CPUPlace()), off_start_ptr,
// offset_cols_size * sizeof(size_t));
lod[0].assign(off_start_ptr, off_start_ptr + offset_cols_size);
}
}
data_timer_.Pause();
Expand Down
130 changes: 75 additions & 55 deletions paddle/fluid/framework/data_feed.h
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,7 @@ class SlotObjAllocator {
void clear() {
T* tmp = NULL;
while (free_nodes_ != NULL) {
tmp = reinterpret_cast<T*>(reinterpret_cast<void*>(free_nodes_));
tmp = reinterpret_cast<T*>(free_nodes_);
free_nodes_ = free_nodes_->next;
deleter_(tmp);
--capacity_;
Expand All @@ -912,18 +912,38 @@ class SlotObjAllocator {
}
T* acquire(void) {
T* x = NULL;
x = reinterpret_cast<T*>(reinterpret_cast<void*>(free_nodes_));
x = reinterpret_cast<T*>(free_nodes_);
free_nodes_ = free_nodes_->next;
--capacity_;
return x;
}
void release(T* x) {
Node* node = reinterpret_cast<Node*>(reinterpret_cast<void*>(x));
Node* node = reinterpret_cast<Node*>(x);
node->next = free_nodes_;
free_nodes_ = node;
++capacity_;
}
size_t capacity(void) { return capacity_; }
// get more
size_t get(size_t n, T** data) {
size_t i = 0;
while (capacity_ > 0 && i < n) {
data[i] = reinterpret_cast<T*>(free_nodes_);
free_nodes_ = free_nodes_->next;
--capacity_;
++i;
}
return i;
}
size_t put(size_t n, T** data) {
for (size_t i = 0; i < n; ++i) {
Node* node = reinterpret_cast<Node*>(data[i]);
node->next = free_nodes_;
free_nodes_ = node;
++capacity_;
}
return capacity_;
}

private:
struct alignas(T) Node {
Expand All @@ -940,44 +960,39 @@ static const int OBJPOOL_BLOCK_SIZE = 10000;
class SlotObjPool {
public:
SlotObjPool()
: max_capacity_(FLAGS_padbox_record_pool_max_size),
: inited_(true),
max_capacity_(FLAGS_padbox_record_pool_max_size),
alloc_(free_slotrecord) {
ins_chan_ = MakeChannel<SlotRecord>();
ins_chan_->SetBlockSize(OBJPOOL_BLOCK_SIZE);
for (int i = 0; i < FLAGS_padbox_slotpool_thread_num; ++i) {
threads_.push_back(std::thread([this]() { run(); }));
}
disable_pool_ = false;
count_ = 0;
}
~SlotObjPool() {
ins_chan_->Close();
inited_ = false;
cond_.notify_all();
for (auto& t : threads_) {
t.join();
}
}
void disable_pool(bool disable) { disable_pool_ = disable; }
void set_max_capacity(size_t max_capacity) { max_capacity_ = max_capacity; }
void get(std::vector<SlotRecord>* output, int n) {
void get(std::vector<SlotRecord>* output, size_t n) {
output->resize(n);
return get(&(*output)[0], n);
}
void get(SlotRecord* output, int n) {
int size = 0;
void get(SlotRecord* output, size_t n) {
size_t size = 0;
mutex_.lock();
int left = static_cast<int>(alloc_.capacity());
if (left > 0) {
size = (left >= n) ? n : left;
for (int i = 0; i < size; ++i) {
output[i] = alloc_.acquire();
}
}
mutex_.unlock();
size = alloc_.get(n, output);
count_ += n;
mutex_.unlock();

if (size == n) {
return;
}
for (int i = size; i < n; ++i) {
for (size_t i = size; i < n; ++i) {
output[i] = make_slotrecord();
}
}
Expand All @@ -989,49 +1004,48 @@ class SlotObjPool {
put(&(*input)[0], size);
input->clear();
}
void put(SlotRecord* input, size_t size) {
CHECK(ins_chan_->WriteMove(size, input) == size);
void put(SlotRecord* input, size_t num) {
for (size_t i = 0; i < num; ++i) {
input[i]->reset();
}
// pool empty add to pool
mutex_.lock();
size_t capacity = alloc_.put(num, input);
count_ -= num;
mutex_.unlock();
// disable pool
if (disable_pool_ || capacity > max_capacity_) {
cond_.notify_one();
}
}
void run(void) {
size_t n = 0;
size_t max_size = OBJPOOL_BLOCK_SIZE * 50;
std::vector<SlotRecord> input;
while (ins_chan_->ReadOnce(input, OBJPOOL_BLOCK_SIZE)) {
if (input.empty()) {
continue;
}
// over max capacity
size_t n = input.size();
count_ -= n;
if (disable_pool_ || n + capacity() > max_capacity_) {
for (auto& t : input) {
free_slotrecord(t);
input.resize(max_size);
while (inited_) {
size_t check_capacity = (disable_pool_) ? 0 : max_capacity_;
{
std::unique_lock<std::mutex> lock(mutex_);
if (alloc_.capacity() <= check_capacity) {
cond_.wait(lock);
}
} else {
for (auto& t : input) {
t->reset();
}
mutex_.lock();
for (auto& t : input) {
alloc_.release(t);
}
mutex_.unlock();
n = alloc_.get(max_size, &input[0]);
}
for (size_t i = 0; i < n; ++i) {
free_slotrecord(input[i]);
}
input.clear();
}
}
void clear(void) {
platform::Timer timeline;
timeline.Start();
mutex_.lock();
size_t total = alloc_.capacity();
alloc_.clear();
mutex_.unlock();
// wait release channel data
if (FLAGS_enable_slotpool_wait_release) {
while (!ins_chan_->Empty()) {
sleep(1);
}
}
timeline.Pause();
LOG(WARNING) << "clear slot pool data size=" << count_.load()
LOG(WARNING) << "clear slot pool data size=" << total
<< ", span=" << timeline.ElapsedSec();
}
size_t capacity(void) {
Expand All @@ -1040,26 +1054,32 @@ class SlotObjPool {
mutex_.unlock();
return total;
}
// print pool info
void print_info(const char* name = "pool") {
LOG(INFO) << "[" << name << "]slot alloc object count=" << count_
<< ", pool size=" << alloc_.capacity();
}

private:
bool inited_;
size_t max_capacity_;
Channel<SlotRecord> ins_chan_;
std::vector<std::thread> threads_;
std::mutex mutex_;
SlotObjAllocator<SlotRecordObject> alloc_;
bool disable_pool_;
std::atomic<long> count_; // NOLINT
size_t count_; // NOLINT
std::condition_variable cond_;
};

inline SlotObjPool& SlotRecordPool() {
static SlotObjPool pool;
return pool;
}
// down disk pool
inline SlotObjPool& SlotRecordDownPool() {
static SlotObjPool pool;
return pool;
}
//// down disk pool
// inline SlotObjPool& SlotRecordDownPool() {
// static SlotObjPool pool;
// return pool;
//}

using FeasignValues = SlotValues<uint64_t>;

Expand Down
Loading

0 comments on commit 9ed7577

Please sign in to comment.