Skip to content

Commit

Permalink
Fix alloc_bytes of ReadLimiter (#5852)
Browse files Browse the repository at this point in the history
close #5801
  • Loading branch information
JinheLin authored Sep 16, 2022
1 parent befe835 commit b232bcd
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 39 deletions.
65 changes: 38 additions & 27 deletions dbms/src/Encryption/RateLimiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <boost/algorithm/string.hpp>
#include <cassert>
#include <fstream>
#include <magic_enum.hpp>

namespace CurrentMetrics
{
Expand Down Expand Up @@ -107,6 +108,7 @@ WriteLimiter::WriteLimiter(Int64 rate_limit_per_sec_, LimiterType type_, UInt64
, requests_to_wait{0}
, type(type_)
, alloc_bytes{0}
, log(Logger::get(std::string(magic_enum::enum_name(type))))
{}

WriteLimiter::~WriteLimiter()
Expand All @@ -131,7 +133,8 @@ void WriteLimiter::request(Int64 bytes)
consumeBytes(bytes);
return;
}

Stopwatch sw_pending;
Int64 wait_times = 0;
auto pending_request = pendingRequestMetrics(type);

// request cannot be satisfied at this moment, enqueue
Expand All @@ -140,7 +143,7 @@ void WriteLimiter::request(Int64 bytes)
while (!r.granted)
{
assert(!req_queue.empty());

wait_times++;
bool timed_out = false;
// if this request is in the front of req_queue,
// then it is responsible to trigger the refill process.
Expand Down Expand Up @@ -191,6 +194,7 @@ void WriteLimiter::request(Int64 bytes)
}
}
}
LOG_FMT_TRACE(log, "pending_us {} wait_times {} pending_count {} rate_limit_per_sec {}", sw_pending.elapsed() / 1000, wait_times, req_queue.size(), refill_balance_per_period * 1000 / refill_period_ms);
}

size_t WriteLimiter::setStop()
Expand Down Expand Up @@ -296,7 +300,7 @@ ReadLimiter::ReadLimiter(
: WriteLimiter(rate_limit_per_sec_, type_, refill_period_ms_)
, get_read_bytes(std::move(get_read_bytes_))
, last_stat_bytes(get_read_bytes())
, log(Logger::get("ReadLimiter"))
, last_refill_time(std::chrono::system_clock::now())
{}

Int64 ReadLimiter::getAvailableBalance()
Expand All @@ -317,17 +321,21 @@ Int64 ReadLimiter::getAvailableBalance()
else
{
Int64 real_alloc_bytes = bytes - last_stat_bytes;
metricAllocBytes(type, real_alloc_bytes);
// `alloc_bytes` is the number of byte that ReadLimiter has allocated.
if (available_balance > 0)
{
auto can_alloc_bytes = std::min(real_alloc_bytes, available_balance);
alloc_bytes += can_alloc_bytes;
metricAllocBytes(type, can_alloc_bytes);
}
available_balance -= real_alloc_bytes;
alloc_bytes += real_alloc_bytes;
}
last_stat_bytes = bytes;
return available_balance;
}

void ReadLimiter::consumeBytes(Int64 bytes)
void ReadLimiter::consumeBytes([[maybe_unused]] Int64 bytes)
{
metricRequestBytes(type, bytes);
// Do nothing for read.
}

Expand All @@ -338,10 +346,26 @@ bool ReadLimiter::canGrant([[maybe_unused]] Int64 bytes)

void ReadLimiter::refillAndAlloc()
{
if (available_balance < refill_balance_per_period)
// `available_balance` of `ReadLimiter` may be overdrawn.
if (available_balance < 0)
{
// Limiter may not be called for a long time.
// During this time, limiter can be refilled at most `max_refill_times` times and covers some overdraft.
auto elapsed_duration = std::chrono::system_clock::now() - last_refill_time;
UInt64 elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed_duration).count();
// At least refill one time.
Int64 max_refill_times = std::max(elapsed_ms, refill_period_ms) / refill_period_ms;
Int64 max_refill_bytes = max_refill_times * refill_balance_per_period;
Int64 can_alloc_bytes = std::min(-available_balance, max_refill_bytes);
alloc_bytes += can_alloc_bytes;
metricAllocBytes(type, can_alloc_bytes);
available_balance = std::min(available_balance + max_refill_bytes, refill_balance_per_period);
}
else
{
available_balance += refill_balance_per_period;
available_balance = refill_balance_per_period;
}
last_refill_time = std::chrono::system_clock::now();

assert(!req_queue.empty());
auto * head_req = req_queue.front();
Expand Down Expand Up @@ -710,26 +734,13 @@ IOLimitTuner::TuneResult IOLimitTuner::tune() const
}

auto [max_read_bytes_per_sec, max_write_bytes_per_sec, rw_tuned] = tuneReadWrite();
LOG_FMT_INFO(
log,
"tuneReadWrite: max_read {} max_write {} rw_tuned {}",
max_read_bytes_per_sec,
max_write_bytes_per_sec,
rw_tuned);
auto [max_bg_read_bytes_per_sec, max_fg_read_bytes_per_sec, read_tuned] = tuneRead(max_read_bytes_per_sec);
LOG_FMT_INFO(
log,
"tuneRead: bg_read {} fg_read {} read_tuned {}",
max_bg_read_bytes_per_sec,
max_fg_read_bytes_per_sec,
read_tuned);
auto [max_bg_write_bytes_per_sec, max_fg_write_bytes_per_sec, write_tuned] = tuneWrite(max_write_bytes_per_sec);
LOG_FMT_INFO(
log,
"tuneWrite: bg_write {} fg_write {} write_tuned {}",
max_bg_write_bytes_per_sec,
max_fg_write_bytes_per_sec,
write_tuned);
if (rw_tuned || read_tuned || write_tuned)
{
LOG_FMT_INFO(log, "tune_msg: bg_write {} => {} fg_write {} => {} bg_read {} => {} fg_read {} => {}", bg_write_stat != nullptr ? bg_write_stat->maxBytesPerSec() : 0, max_bg_write_bytes_per_sec, fg_write_stat != nullptr ? fg_write_stat->maxBytesPerSec() : 0, max_fg_write_bytes_per_sec, bg_read_stat != nullptr ? bg_read_stat->maxBytesPerSec() : 0, max_bg_read_bytes_per_sec, fg_read_stat != nullptr ? fg_read_stat->maxBytesPerSec() : 0, max_fg_read_bytes_per_sec);
}

return {.max_bg_read_bytes_per_sec = max_bg_read_bytes_per_sec,
.max_fg_read_bytes_per_sec = max_fg_read_bytes_per_sec,
.read_tuned = read_tuned || rw_tuned,
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/Encryption/RateLimiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ class WriteLimiter
void request(Int64 bytes);

// just for test purpose
inline UInt64 getTotalBytesThrough() const { return alloc_bytes; }
inline UInt64 getTotalBytesThrough() const
{
return available_balance < 0 ? alloc_bytes - available_balance : alloc_bytes;
}

LimiterStat getStat();

Expand Down Expand Up @@ -153,6 +156,7 @@ class WriteLimiter

Stopwatch stat_stop_watch;
UInt64 alloc_bytes;
LoggerPtr log;
};

using WriteLimiterPtr = std::shared_ptr<WriteLimiter>;
Expand Down Expand Up @@ -194,7 +198,7 @@ class ReadLimiter : public WriteLimiter

std::function<Int64()> get_read_bytes;
Int64 last_stat_bytes;
LoggerPtr log;
std::chrono::time_point<std::chrono::system_clock> last_refill_time;
};

using ReadLimiterPtr = std::shared_ptr<ReadLimiter>;
Expand Down
38 changes: 30 additions & 8 deletions dbms/src/Encryption/tests/gtest_rate_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
#include <random>
#include <thread>

#include "common/types.h"

#ifdef __linux__
#include <sys/syscall.h>
#endif
Expand Down Expand Up @@ -333,23 +331,47 @@ TEST(ReadLimiterTest, LimiterStat)
ASSERT_GT(stat.pct(), 100) << stat.toString();
}

static constexpr UInt64 alloc_bytes = 2047;
static constexpr UInt64 total_bytes = 2047;
for (int i = 0; i < 11; i++)
{
request(read_limiter, 1 << i);
}

std::this_thread::sleep_for(100ms);
ASSERT_EQ(read_limiter.getAvailableBalance(), -947);

stat = read_limiter.getStat();
ASSERT_EQ(stat.alloc_bytes, alloc_bytes);
ASSERT_GE(stat.elapsed_ms, alloc_bytes / 100 + 1);
ASSERT_EQ(stat.alloc_bytes, total_bytes + read_limiter.getAvailableBalance());
ASSERT_GE(stat.elapsed_ms, stat.alloc_bytes / 100 + 1);
ASSERT_EQ(stat.refill_period_ms, 100ul);
ASSERT_EQ(stat.refill_bytes_per_period, 100);
ASSERT_EQ(stat.maxBytesPerSec(), 1000);
ASSERT_EQ(stat.avgBytesPerSec(), static_cast<Int64>(alloc_bytes * 1000 / stat.elapsed_ms)) << stat.toString();
ASSERT_EQ(stat.pct(), static_cast<Int64>(alloc_bytes * 1000 / stat.elapsed_ms) * 100 / stat.maxBytesPerSec()) << stat.toString();
ASSERT_EQ(stat.avgBytesPerSec(), static_cast<Int64>(stat.alloc_bytes * 1000 / stat.elapsed_ms)) << stat.toString();
ASSERT_EQ(stat.pct(), static_cast<Int64>(stat.alloc_bytes * 1000 / stat.elapsed_ms) * 100 / stat.maxBytesPerSec()) << stat.toString();
}

TEST(ReadLimiterTest, ReadMany)
{
Int64 real_read_bytes{0};
auto get_read_bytes = [&]() {
return real_read_bytes;
};
auto request = [&](ReadLimiter & limiter, Int64 bytes) {
limiter.request(bytes);
real_read_bytes += bytes;
};

constexpr Int64 bytes_per_sec = 1000;
constexpr UInt64 refill_period_ms = 100;
ReadLimiter read_limiter(get_read_bytes, bytes_per_sec, LimiterType::UNKNOW, refill_period_ms);
ASSERT_EQ(read_limiter.getAvailableBalance(), 100);
request(read_limiter, 1000);
ASSERT_EQ(read_limiter.getAvailableBalance(), -900);
ASSERT_EQ(read_limiter.alloc_bytes, 100);

std::this_thread::sleep_for(1200ms);
Stopwatch sw;
request(read_limiter, 100);
ASSERT_LE(sw.elapsedMilliseconds(), 1); // Not blocked.
}

#ifdef __linux__
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/TestUtils/MockReadLimiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class MockReadLimiter final : public ReadLimiter
protected:
void consumeBytes(Int64 bytes) override
{
// Need soft limit here.
WriteLimiter::consumeBytes(bytes); // NOLINT(bugprone-parent-virtual-call)
alloc_bytes += std::min(available_balance, bytes);
available_balance -= bytes;
}
};

Expand Down

0 comments on commit b232bcd

Please sign in to comment.