Skip to content

Commit

Permalink
[Chore](brpc) add gc for abafreelist to avoid eagain and set brpc tim…
Browse files Browse the repository at this point in the history
…eout_ms to 2s (#37888)

## Proposed changes
1. add gc for abafreelist to avoid eagain
```
[CANCELLED]failed to send brpc when exchange, error=Resource temporarily unavailable, error_text=[E11]Resource temporarily unavailable
```
2. set brpc timeout_ms to 2s
  • Loading branch information
BiteTheDDDDt committed Aug 2, 2024
1 parent 5733c9d commit ae212fa
Show file tree
Hide file tree
Showing 2 changed files with 281 additions and 0 deletions.
1 change: 1 addition & 0 deletions be/src/util/brpc_client_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class BrpcClientCache {
options.connection_group = connection_group;
}
options.connect_timeout_ms = 2000;
options.timeout_ms = 2000;
options.max_retry = 10;

std::unique_ptr<brpc::Channel> channel(new brpc::Channel());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
From fb53ab9245e8570d44a2eeea2ab536841a7876ec Mon Sep 17 00:00:00 2001
From: BiteTheDDDDt <[email protected]>
Date: Tue, 16 Jul 2024 14:21:21 +0800
Subject: [PATCH] add gc on abalist and adjust gc size

---
src/bthread/bthread.cpp | 3 +-
src/bthread/id.cpp | 1 +
src/bthread/list_of_abafree_id.h | 161 ++++++++++++++++++++++++++++---
3 files changed, 151 insertions(+), 14 deletions(-)

diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp
index 5ac0c3b1..86fd2c90 100644
--- a/src/bthread/bthread.cpp
+++ b/src/bthread/bthread.cpp
@@ -147,7 +147,8 @@ start_from_non_worker(bthread_t* __restrict tid,

struct TidTraits {
static const size_t BLOCK_SIZE = 63;
- static const size_t MAX_ENTRIES = 65536;
+ static const size_t MAX_ENTRIES = 655360;
+ static const size_t INIT_GC_SIZE = 65536;
static const bthread_t ID_INIT;
static bool exists(bthread_t id) { return bthread::TaskGroup::exists(id); }
};
diff --git a/src/bthread/id.cpp b/src/bthread/id.cpp
index 41c49a3f..ba77580a 100644
--- a/src/bthread/id.cpp
+++ b/src/bthread/id.cpp
@@ -291,6 +291,7 @@ void id_pool_status(std::ostream &os) {
struct IdTraits {
static const size_t BLOCK_SIZE = 63;
static const size_t MAX_ENTRIES = 100000;
+ static const size_t INIT_GC_SIZE = 4096;
static const bthread_id_t ID_INIT;
static bool exists(bthread_id_t id)
{ return bthread::id_exists_with_true_negatives(id); }
diff --git a/src/bthread/list_of_abafree_id.h b/src/bthread/list_of_abafree_id.h
index ac2b2234..45043acc 100644
--- a/src/bthread/list_of_abafree_id.h
+++ b/src/bthread/list_of_abafree_id.h
@@ -22,8 +22,10 @@
#ifndef BTHREAD_LIST_OF_ABAFREE_ID_H
#define BTHREAD_LIST_OF_ABAFREE_ID_H

-#include <vector>
#include <deque>
+#include <vector>
+
+#include "butil/macros.h"

namespace bthread {

@@ -48,6 +50,9 @@ namespace bthread {
// // Max #entries. Often has close relationship with concurrency, 65536
// // is "huge" for most apps.
// static const size_t MAX_ENTRIES = 65536;
+// // Initial GC length, when the number of blocks reaches this length,
+// // start to initiate list GC operation. It will release useless blocks
+// static const size_t INIT_GC_SIZE = 4096;
//
// // Initial value of id. Id with the value is treated as invalid.
// static const Id ID_INIT = ...;
@@ -64,13 +69,14 @@ class ListOfABAFreeId {
public:
ListOfABAFreeId();
~ListOfABAFreeId();
-
+
// Add an identifier into the list.
int add(Id id);
-
+
// Apply fn(id) to all identifiers.
- template <typename Fn> void apply(const Fn& fn);
-
+ template <typename Fn>
+ void apply(const Fn& fn);
+
// Put #entries of each level into `counts'
// Returns #levels.
size_t get_sizes(size_t* counts, size_t n);
@@ -82,19 +88,31 @@ private:
IdBlock* next;
};
void forward_index();
+
+ struct TempIdBlock {
+ IdBlock* block;
+ uint32_t index;
+ uint32_t nblock;
+ };
+
+ int gc();
+ int add_to_temp_list(TempIdBlock* temp_list, Id id);
+ template <typename Fn>
+ int for_each(const Fn& fn);
+ void free_list(IdBlock* block);
+
IdBlock* _cur_block;
uint32_t _cur_index;
uint32_t _nblock;
IdBlock _head_block;
+ uint32_t _next_gc_size;
};

// [impl.]

template <typename Id, typename IdTraits>
ListOfABAFreeId<Id, IdTraits>::ListOfABAFreeId()
- : _cur_block(&_head_block)
- , _cur_index(0)
- , _nblock(1) {
+ : _cur_block(&_head_block), _cur_index(0), _nblock(1), _next_gc_size(IdTraits::INIT_GC_SIZE) {
for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) {
_head_block.ids[i] = IdTraits::ID_INIT;
}
@@ -140,6 +158,30 @@ int ListOfABAFreeId<Id, IdTraits>::add(Id id) {
}
saved_pos[i] = pos;
}
+ // If we don't expect a GC to occur in abalist, then an error is reported and EAGAIN is returned.
+ if (_nblock * IdTraits::BLOCK_SIZE > IdTraits::MAX_ENTRIES) {
+ return EAGAIN;
+ }
+ // If the number of blocks exceeds the minimum GC length, start the GC operation
+ if (_nblock * IdTraits::BLOCK_SIZE > _next_gc_size) {
+ uint32_t before_gc_blocks = _nblock;
+ int rc = gc();
+ // To avoid frequent GC operations, we only let the GC be effective enough to continue the GC.
+ // otherwise we let the next GC occur length * 2.
+ //
+ // Condition for a GC to be sufficiently efficient: the number of blocks
+ // retained after the GC is 1/4 of the previous one.
+ if ((before_gc_blocks - _nblock) * IdTraits::BLOCK_SIZE < (_next_gc_size - (_next_gc_size >> 2))) {
+ _next_gc_size <<= 1;
+ // We want to make sure that GC must occur before MAX_ENTRIES.
+ static_assert(IdTraits::MAX_ENTRIES > IdTraits::BLOCK_SIZE * 2, "MAX_ENTRIES should be greater than 2 * IdTraits::BLOCK_SIZE");
+ if (_next_gc_size >= IdTraits::MAX_ENTRIES) {
+ _next_gc_size = IdTraits::MAX_ENTRIES - IdTraits::BLOCK_SIZE * 2;
+ }
+ }
+
+ return rc;
+ }
// The list is considered to be "crowded", add a new block and scatter
// the conflict identifiers by inserting an empty entry after each of
// them, so that even if the identifiers are still valid when we walk
@@ -152,9 +194,6 @@ int ListOfABAFreeId<Id, IdTraits>::add(Id id) {
//
// [..xxxx....] -> [......yyyy] -> [..........]
// block A new block block B
- if (_nblock * IdTraits::BLOCK_SIZE > IdTraits::MAX_ENTRIES) {
- return EAGAIN;
- }
IdBlock* new_block = new (std::nothrow) IdBlock;
if (NULL == new_block) {
return ENOMEM;
@@ -188,6 +227,93 @@ int ListOfABAFreeId<Id, IdTraits>::add(Id id) {
return 0;
}

+template <typename Id, typename IdTraits>
+int ListOfABAFreeId<Id, IdTraits>::gc() {
+ IdBlock* new_block = new (std::nothrow) IdBlock;
+ if (NULL == new_block) {
+ return ENOMEM;
+ }
+ // reset head block
+ for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) {
+ new_block->ids[i] = IdTraits::ID_INIT;
+ }
+ new_block->next = NULL;
+
+ TempIdBlock tmp_id_block;
+ tmp_id_block.block = new_block;
+ tmp_id_block.nblock = 1;
+ tmp_id_block.index = 0;
+
+ // Add each element of the old list to the new list
+ int rc = for_each([&](Id id) {
+ int rc;
+ rc = add_to_temp_list(&tmp_id_block, id);
+ if (rc != 0) {
+ return rc;
+ }
+ rc = add_to_temp_list(&tmp_id_block, IdTraits::ID_INIT);
+ if (rc != 0) {
+ return rc;
+ }
+ return 0;
+ });
+
+ if (rc != 0) {
+ free_list(new_block);
+ return rc;
+ }
+
+ // reset head block
+ for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) {
+ _head_block.ids[i] = IdTraits::ID_INIT;
+ }
+
+ free_list(_head_block.next);
+ _cur_block = tmp_id_block.block;
+ _cur_index = tmp_id_block.index;
+ // nblock and head_block
+ _nblock = tmp_id_block.nblock + 1;
+ _head_block.next = new_block;
+
+ return 0;
+}
+
+template <typename Id, typename IdTraits>
+int ListOfABAFreeId<Id, IdTraits>::add_to_temp_list(TempIdBlock* block_list, Id id) {
+ block_list->block->ids[block_list->index++] = id;
+ // add new list
+ if (block_list->index == IdTraits::BLOCK_SIZE) {
+ block_list->index = 0;
+ block_list->nblock++;
+ block_list->block->next = new (std::nothrow) IdBlock;
+ if (NULL == block_list->block->next) {
+ return ENOMEM;
+ }
+ block_list->block = block_list->block->next;
+ for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) {
+ block_list->block->ids[i] = IdTraits::ID_INIT;
+ }
+ block_list->block->next = NULL;
+ }
+ return 0;
+}
+
+template <typename Id, typename IdTraits>
+template <typename Fn>
+int ListOfABAFreeId<Id, IdTraits>::for_each(const Fn& fn) {
+ for (IdBlock* p = &_head_block; p != NULL; p = p->next) {
+ for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) {
+ if (p->ids[i] != IdTraits::ID_INIT && IdTraits::exists(p->ids[i])) {
+ int rc = fn(p->ids[i]);
+ if (rc != 0) {
+ return rc;
+ }
+ }
+ }
+ }
+ return 0;
+}
+
template <typename Id, typename IdTraits>
template <typename Fn>
void ListOfABAFreeId<Id, IdTraits>::apply(const Fn& fn) {
@@ -200,6 +326,15 @@ void ListOfABAFreeId<Id, IdTraits>::apply(const Fn& fn) {
}
}

+template <typename Id, typename IdTraits>
+void ListOfABAFreeId<Id, IdTraits>::free_list(IdBlock* p) {
+ for (; p != NULL;) {
+ IdBlock* saved_next = p->next;
+ delete p;
+ p = saved_next;
+ }
+}
+
template <typename Id, typename IdTraits>
size_t ListOfABAFreeId<Id, IdTraits>::get_sizes(size_t* cnts, size_t n) {
if (n == 0) {
@@ -210,6 +345,6 @@ size_t ListOfABAFreeId<Id, IdTraits>::get_sizes(size_t* cnts, size_t n) {
return 1;
}

-} // namespace bthread
+} // namespace bthread

-#endif // BTHREAD_LIST_OF_ABAFREE_ID_H
+#endif // BTHREAD_LIST_OF_ABAFREE_ID_H
--
2.31.1

0 comments on commit ae212fa

Please sign in to comment.