Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Chore](brpc) add gc for abafreelist to avoid eagain and set brpc tim… #38177

Merged
merged 1 commit into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/util/brpc_client_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class BrpcClientCache {
options.connection_group = connection_group;
}
options.connect_timeout_ms = 2000;
options.timeout_ms = 2000;
options.max_retry = 10;

std::unique_ptr<brpc::Channel> channel(new brpc::Channel());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
From fb53ab9245e8570d44a2eeea2ab536841a7876ec Mon Sep 17 00:00:00 2001
From: BiteTheDDDDt <[email protected]>
Date: Tue, 16 Jul 2024 14:21:21 +0800
Subject: [PATCH] add gc on abalist and adjust gc size

---
src/bthread/bthread.cpp | 3 +-
src/bthread/id.cpp | 1 +
src/bthread/list_of_abafree_id.h | 161 ++++++++++++++++++++++++++++---
3 files changed, 151 insertions(+), 14 deletions(-)

diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp
index 5ac0c3b1..86fd2c90 100644
--- a/src/bthread/bthread.cpp
+++ b/src/bthread/bthread.cpp
@@ -147,7 +147,8 @@ start_from_non_worker(bthread_t* __restrict tid,

struct TidTraits {
static const size_t BLOCK_SIZE = 63;
- static const size_t MAX_ENTRIES = 65536;
+ static const size_t MAX_ENTRIES = 655360;
+ static const size_t INIT_GC_SIZE = 65536;
static const bthread_t ID_INIT;
static bool exists(bthread_t id) { return bthread::TaskGroup::exists(id); }
};
diff --git a/src/bthread/id.cpp b/src/bthread/id.cpp
index 41c49a3f..ba77580a 100644
--- a/src/bthread/id.cpp
+++ b/src/bthread/id.cpp
@@ -291,6 +291,7 @@ void id_pool_status(std::ostream &os) {
struct IdTraits {
static const size_t BLOCK_SIZE = 63;
static const size_t MAX_ENTRIES = 100000;
+ static const size_t INIT_GC_SIZE = 4096;
static const bthread_id_t ID_INIT;
static bool exists(bthread_id_t id)
{ return bthread::id_exists_with_true_negatives(id); }
diff --git a/src/bthread/list_of_abafree_id.h b/src/bthread/list_of_abafree_id.h
index ac2b2234..45043acc 100644
--- a/src/bthread/list_of_abafree_id.h
+++ b/src/bthread/list_of_abafree_id.h
@@ -22,8 +22,10 @@
#ifndef BTHREAD_LIST_OF_ABAFREE_ID_H
#define BTHREAD_LIST_OF_ABAFREE_ID_H

-#include <vector>
#include <deque>
+#include <vector>
+
+#include "butil/macros.h"

namespace bthread {

@@ -48,6 +50,9 @@ namespace bthread {
// // Max #entries. Often has close relationship with concurrency, 65536
// // is "huge" for most apps.
// static const size_t MAX_ENTRIES = 65536;
+// // Initial GC length, when the number of blocks reaches this length,
+// // start to initiate list GC operation. It will release useless blocks
+// static const size_t INIT_GC_SIZE = 4096;
//
// // Initial value of id. Id with the value is treated as invalid.
// static const Id ID_INIT = ...;
@@ -64,13 +69,14 @@ class ListOfABAFreeId {
public:
ListOfABAFreeId();
~ListOfABAFreeId();
-
+
// Add an identifier into the list.
int add(Id id);
-
+
// Apply fn(id) to all identifiers.
- template <typename Fn> void apply(const Fn& fn);
-
+ template <typename Fn>
+ void apply(const Fn& fn);
+
// Put #entries of each level into `counts'
// Returns #levels.
size_t get_sizes(size_t* counts, size_t n);
@@ -82,19 +88,31 @@ private:
IdBlock* next;
};
void forward_index();
+
+ struct TempIdBlock {
+ IdBlock* block;
+ uint32_t index;
+ uint32_t nblock;
+ };
+
+ int gc();
+ int add_to_temp_list(TempIdBlock* temp_list, Id id);
+ template <typename Fn>
+ int for_each(const Fn& fn);
+ void free_list(IdBlock* block);
+
IdBlock* _cur_block;
uint32_t _cur_index;
uint32_t _nblock;
IdBlock _head_block;
+ uint32_t _next_gc_size;
};

// [impl.]

template <typename Id, typename IdTraits>
ListOfABAFreeId<Id, IdTraits>::ListOfABAFreeId()
- : _cur_block(&_head_block)
- , _cur_index(0)
- , _nblock(1) {
+ : _cur_block(&_head_block), _cur_index(0), _nblock(1), _next_gc_size(IdTraits::INIT_GC_SIZE) {
for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) {
_head_block.ids[i] = IdTraits::ID_INIT;
}
@@ -140,6 +158,30 @@ int ListOfABAFreeId<Id, IdTraits>::add(Id id) {
}
saved_pos[i] = pos;
}
+ // If we don't expect a GC to occur in abalist, then an error is reported and EAGAIN is returned.
+ if (_nblock * IdTraits::BLOCK_SIZE > IdTraits::MAX_ENTRIES) {
+ return EAGAIN;
+ }
+ // If the number of blocks exceeds the minimum GC length, start the GC operation
+ if (_nblock * IdTraits::BLOCK_SIZE > _next_gc_size) {
+ uint32_t before_gc_blocks = _nblock;
+ int rc = gc();
+ // To avoid frequent GC operations, we only let the GC be effective enough to continue the GC.
+ // otherwise we let the next GC occur length * 2.
+ //
+ // Condition for a GC to be sufficiently efficient: the number of blocks
+ // retained after the GC is 1/4 of the previous one.
+ if ((before_gc_blocks - _nblock) * IdTraits::BLOCK_SIZE < (_next_gc_size - (_next_gc_size >> 2))) {
+ _next_gc_size <<= 1;
+ // We want to make sure that GC must occur before MAX_ENTRIES.
+ static_assert(IdTraits::MAX_ENTRIES > IdTraits::BLOCK_SIZE * 2, "MAX_ENTRIES should be greater than 2 * IdTraits::BLOCK_SIZE");
+ if (_next_gc_size >= IdTraits::MAX_ENTRIES) {
+ _next_gc_size = IdTraits::MAX_ENTRIES - IdTraits::BLOCK_SIZE * 2;
+ }
+ }
+
+ return rc;
+ }
// The list is considered to be "crowded", add a new block and scatter
// the conflict identifiers by inserting an empty entry after each of
// them, so that even if the identifiers are still valid when we walk
@@ -152,9 +194,6 @@ int ListOfABAFreeId<Id, IdTraits>::add(Id id) {
//
// [..xxxx....] -> [......yyyy] -> [..........]
// block A new block block B
- if (_nblock * IdTraits::BLOCK_SIZE > IdTraits::MAX_ENTRIES) {
- return EAGAIN;
- }
IdBlock* new_block = new (std::nothrow) IdBlock;
if (NULL == new_block) {
return ENOMEM;
@@ -188,6 +227,93 @@ int ListOfABAFreeId<Id, IdTraits>::add(Id id) {
return 0;
}

+template <typename Id, typename IdTraits>
+int ListOfABAFreeId<Id, IdTraits>::gc() {
+ IdBlock* new_block = new (std::nothrow) IdBlock;
+ if (NULL == new_block) {
+ return ENOMEM;
+ }
+ // reset head block
+ for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) {
+ new_block->ids[i] = IdTraits::ID_INIT;
+ }
+ new_block->next = NULL;
+
+ TempIdBlock tmp_id_block;
+ tmp_id_block.block = new_block;
+ tmp_id_block.nblock = 1;
+ tmp_id_block.index = 0;
+
+ // Add each element of the old list to the new list
+ int rc = for_each([&](Id id) {
+ int rc;
+ rc = add_to_temp_list(&tmp_id_block, id);
+ if (rc != 0) {
+ return rc;
+ }
+ rc = add_to_temp_list(&tmp_id_block, IdTraits::ID_INIT);
+ if (rc != 0) {
+ return rc;
+ }
+ return 0;
+ });
+
+ if (rc != 0) {
+ free_list(new_block);
+ return rc;
+ }
+
+ // reset head block
+ for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) {
+ _head_block.ids[i] = IdTraits::ID_INIT;
+ }
+
+ free_list(_head_block.next);
+ _cur_block = tmp_id_block.block;
+ _cur_index = tmp_id_block.index;
+ // nblock and head_block
+ _nblock = tmp_id_block.nblock + 1;
+ _head_block.next = new_block;
+
+ return 0;
+}
+
+template <typename Id, typename IdTraits>
+int ListOfABAFreeId<Id, IdTraits>::add_to_temp_list(TempIdBlock* block_list, Id id) {
+ block_list->block->ids[block_list->index++] = id;
+ // add new list
+ if (block_list->index == IdTraits::BLOCK_SIZE) {
+ block_list->index = 0;
+ block_list->nblock++;
+ block_list->block->next = new (std::nothrow) IdBlock;
+ if (NULL == block_list->block->next) {
+ return ENOMEM;
+ }
+ block_list->block = block_list->block->next;
+ for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) {
+ block_list->block->ids[i] = IdTraits::ID_INIT;
+ }
+ block_list->block->next = NULL;
+ }
+ return 0;
+}
+
+template <typename Id, typename IdTraits>
+template <typename Fn>
+int ListOfABAFreeId<Id, IdTraits>::for_each(const Fn& fn) {
+ for (IdBlock* p = &_head_block; p != NULL; p = p->next) {
+ for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) {
+ if (p->ids[i] != IdTraits::ID_INIT && IdTraits::exists(p->ids[i])) {
+ int rc = fn(p->ids[i]);
+ if (rc != 0) {
+ return rc;
+ }
+ }
+ }
+ }
+ return 0;
+}
+
template <typename Id, typename IdTraits>
template <typename Fn>
void ListOfABAFreeId<Id, IdTraits>::apply(const Fn& fn) {
@@ -200,6 +326,15 @@ void ListOfABAFreeId<Id, IdTraits>::apply(const Fn& fn) {
}
}

+template <typename Id, typename IdTraits>
+void ListOfABAFreeId<Id, IdTraits>::free_list(IdBlock* p) {
+ for (; p != NULL;) {
+ IdBlock* saved_next = p->next;
+ delete p;
+ p = saved_next;
+ }
+}
+
template <typename Id, typename IdTraits>
size_t ListOfABAFreeId<Id, IdTraits>::get_sizes(size_t* cnts, size_t n) {
if (n == 0) {
@@ -210,6 +345,6 @@ size_t ListOfABAFreeId<Id, IdTraits>::get_sizes(size_t* cnts, size_t n) {
return 1;
}

-} // namespace bthread
+} // namespace bthread

-#endif // BTHREAD_LIST_OF_ABAFREE_ID_H
+#endif // BTHREAD_LIST_OF_ABAFREE_ID_H
--
2.31.1

Loading