Skip to content

Commit

Permalink
[#23173] DocDB: Allow large bytes to be passed to RateLimiter
Browse files Browse the repository at this point in the history
Summary:
RateLimiter has a debug assert that you cannot `Request` more than `GetSingleBurstBytes`. In release mode we do not perform this check and any call gets stuck forever. This change allows large bytes to be requested on RateLimiter. It does so by breaking requests larger than `GetSingleBurstBytes` into multiple smaller requests.

This change is a temporary fix to allow xCluster to operate without any issues. RocksDB RateLimiter has multiple enhancements over the years that would help avoid this and more starvation issues. Ex: facebook/rocksdb@cb2476a. We should consider pulling in those changes.

Fixes #23173
Jira: DB-12112

Test Plan: RateLimiterTest.LargeRequests

Reviewers: slingam

Reviewed By: slingam

Subscribers: ybase

Differential Revision: https://phorge.dev.yugabyte.com/D36703
  • Loading branch information
hari90 committed Jul 19, 2024
1 parent 78b317c commit ac9164b
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 3 deletions.
12 changes: 12 additions & 0 deletions src/yb/rocksdb/util/rate_limiter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ GenericRateLimiter::GenericRateLimiter(int64_t rate_bytes_per_sec,
fairness_(fairness > 100 ? 100 : fairness),
rnd_((uint32_t)time(nullptr)),
leader_(nullptr) {
CHECK_GT(refill_bytes_per_period_, 0)
<< "rate_bytes_per_sec * refill_period_us should be > 1000000";

for (size_t q = 0; q < yb::kElementsInIOPriority; ++q) {
total_requests_[q] = 0;
total_bytes_through_[q] = 0;
Expand Down Expand Up @@ -108,6 +111,15 @@ void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) {
}

void GenericRateLimiter::Request(int64_t bytes, const yb::IOPriority priority) {
while (bytes > 0) {
int64_t bytes_to_request = std::min(GetSingleBurstBytes(), bytes);
assert(bytes_to_request > 0);
RequestInternal(bytes_to_request, priority);
bytes -= bytes_to_request;
}
}

void GenericRateLimiter::RequestInternal(int64_t bytes, const yb::IOPriority priority) {
assert(bytes <= refill_bytes_per_period_.load(std::memory_order_relaxed));

const auto pri = yb::to_underlying(priority);
Expand Down
5 changes: 3 additions & 2 deletions src/yb/rocksdb/util/rate_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ class GenericRateLimiter : public RateLimiter {
void SetBytesPerSecond(int64_t bytes_per_second) override;

// Request for token to write bytes. If this request can not be satisfied,
// the call is blocked. Caller is responsible to make sure
// bytes <= GetSingleBurstBytes()
// the call is blocked. If the request is bigger than GetSingleBurstBytes() then the call is
// broken up into multiple requests of the same priority.
void Request(const int64_t bytes, const yb::IOPriority pri) override;
void RequestInternal(const int64_t bytes, const yb::IOPriority pri);

int64_t GetSingleBurstBytes() const override {
return refill_bytes_per_period_.load(std::memory_order_relaxed);
Expand Down
27 changes: 26 additions & 1 deletion src/yb/rocksdb/util/rate_limiter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,32 @@ namespace rocksdb {
class RateLimiterTest : public RocksDBTest {};

TEST_F(RateLimiterTest, StartStop) {
std::unique_ptr<RateLimiter> limiter(new GenericRateLimiter(100, 100, 10));
ASSERT_DEATH(
std::unique_ptr<RateLimiter> limiter(new GenericRateLimiter(100, 100, 10)),
"Check failed: refill_bytes_per_period_ > 0");

std::unique_ptr<RateLimiter> limiter(new GenericRateLimiter(1000, 1000, 10));
}

TEST_F(RateLimiterTest, LargeRequests) {
// Allow 1000 bytes per second. This gives us 1 byte every micro second, and a request of 1000
// should take 1s.
std::unique_ptr<RateLimiter> limiter(new GenericRateLimiter(1000, 1000, 10));

auto now = yb::CoarseMonoClock::Now();
limiter->Request(1000, yb::IOPriority::kHigh);
auto duration_waited = yb::ToMilliseconds(yb::CoarseMonoClock::Now() - now);
ASSERT_GT(duration_waited, 500);

#if defined(OS_MACOSX)
// MacOS tests are much slower, so use a larger timeout.
ASSERT_LT(duration_waited, 10000);
#else
ASSERT_LT(duration_waited, 1500);
#endif

ASSERT_EQ(limiter->GetTotalBytesThrough(), 1000);
ASSERT_EQ(limiter->GetTotalRequests(), 1000);
}

#ifndef OS_MACOSX
Expand Down

0 comments on commit ac9164b

Please sign in to comment.