Skip to content

Commit

Permalink
feat(rocksdb): Adapt rate limiter to prevent bust writes and huge com…
Browse files Browse the repository at this point in the history
…paction (#543)
foreverneverer authored Jun 12, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 764ee4f commit 4a8a779
Showing 4 changed files with 64 additions and 8 deletions.
2 changes: 2 additions & 0 deletions src/server/config.ini
Original file line number Diff line number Diff line change
@@ -311,6 +311,8 @@
rocksdb_multi_get_max_iteration_size = 31457280
rocksdb_max_iteration_count = 1000
rocksdb_iteration_threshold_time_ms = 30000
rocksdb_limiter_max_write_megabytes_per_sec = 500
rocksdb_limiter_enable_auto_tune = false

checkpoint_reserve_min_count = 2
checkpoint_reserve_time_seconds = 1800
25 changes: 19 additions & 6 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
@@ -42,12 +42,16 @@ static bool chkpt_init_from_dir(const char *name, int64_t &decree)
std::string(name) == chkpt_get_dir_name(decree);
}

std::shared_ptr<rocksdb::RateLimiter> pegasus_server_impl::_s_rate_limiter;
int64_t pegasus_server_impl::_rocksdb_limiter_last_total_through;
std::shared_ptr<rocksdb::Cache> pegasus_server_impl::_s_block_cache;
::dsn::task_ptr pegasus_server_impl::_update_server_rdb_stat;
::dsn::perf_counter_wrapper pegasus_server_impl::_pfc_rdb_block_cache_mem_usage;
::dsn::perf_counter_wrapper pegasus_server_impl::_pfc_rdb_write_limiter_rate_bytes;
const std::string pegasus_server_impl::COMPRESSION_HEADER = "per_level:";
const std::string pegasus_server_impl::DATA_COLUMN_FAMILY_NAME = "default";
const std::string pegasus_server_impl::META_COLUMN_FAMILY_NAME = "pegasus_meta_cf";
const std::chrono::seconds pegasus_server_impl::kServerStatUpdateTimeSec = std::chrono::seconds(10);

void pegasus_server_impl::parse_checkpoints()
{
@@ -1424,16 +1428,18 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv)
[this]() { this->update_replica_rocksdb_statistics(); },
_update_rdb_stat_interval);

// Block cache is a singleton on this server shared by all replicas, its metrics update task
// should be scheduled once an interval on the server view.
// These counters are singletons on this server shared by all replicas, their metrics update
// task should be scheduled once an interval on the server view.
static std::once_flag flag;
std::call_once(flag, [&]() {
// The timer task will always running even though there is no replicas
dassert(kServerStatUpdateTimeSec.count() != 0,
"kServerStatUpdateTimeSec shouldn't be zero");
_update_server_rdb_stat = ::dsn::tasking::enqueue_timer(
LPC_REPLICATION_LONG_COMMON,
nullptr, // TODO: the tracker is nullptr, we will fix it later
[this]() { update_server_rocksdb_statistics(); },
_update_rdb_stat_interval);
kServerStatUpdateTimeSec);
});

// initialize cu calculator and write service after server being initialized.
@@ -2107,9 +2113,16 @@ void pegasus_server_impl::update_server_rocksdb_statistics()
if (_s_block_cache) {
uint64_t val = _s_block_cache->GetUsage();
_pfc_rdb_block_cache_mem_usage->set(val);
dinfo_f("_pfc_rdb_block_cache_mem_usage: {} bytes", val);
} else {
dinfo("_pfc_rdb_block_cache_mem_usage: 0 bytes because block cache is disabled");
}

// Update _pfc_rdb_write_limiter_rate_bytes
if (_s_rate_limiter) {
uint64_t current_total_through = _s_rate_limiter->GetTotalBytesThrough();
uint64_t through_bytes_per_sec =
(current_total_through - _rocksdb_limiter_last_total_through) /
kServerStatUpdateTimeSec.count();
_pfc_rdb_write_limiter_rate_bytes->set(through_bytes_per_sec);
_rocksdb_limiter_last_total_through = current_total_through;
}
}

5 changes: 5 additions & 0 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@
#include <dsn/dist/replication/replication.codes.h>
#include <rrdb/rrdb_types.h>
#include <gtest/gtest_prod.h>
#include <rocksdb/rate_limiter.h>

#include "key_ttl_compaction_filter.h"
#include "pegasus_scan_context.h"
@@ -317,6 +318,7 @@ class pegasus_server_impl : public pegasus_read_service
::dsn::error_code flush_all_family_columns(bool wait);

private:
static const std::chrono::seconds kServerStatUpdateTimeSec;
static const std::string COMPRESSION_HEADER;
// Column family names.
static const std::string DATA_COLUMN_FAMILY_NAME;
@@ -346,6 +348,8 @@ class pegasus_server_impl : public pegasus_read_service
rocksdb::ColumnFamilyHandle *_data_cf;
rocksdb::ColumnFamilyHandle *_meta_cf;
static std::shared_ptr<rocksdb::Cache> _s_block_cache;
static std::shared_ptr<rocksdb::RateLimiter> _s_rate_limiter;
static int64_t _rocksdb_limiter_last_total_through;
volatile bool _is_open;
uint32_t _pegasus_data_version;
std::atomic<int64_t> _last_durable_decree;
@@ -392,6 +396,7 @@ class pegasus_server_impl : public pegasus_read_service

// rocksdb internal statistics
// server level
static ::dsn::perf_counter_wrapper _pfc_rdb_write_limiter_rate_bytes;
static ::dsn::perf_counter_wrapper _pfc_rdb_block_cache_mem_usage;
// replica level
::dsn::perf_counter_wrapper _pfc_rdb_sst_count;
40 changes: 38 additions & 2 deletions src/server/pegasus_server_impl_init.cpp
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@

#include "pegasus_server_impl.h"

#include <dsn/utility/flags.h>
#include <rocksdb/filter_policy.h>

#include "capacity_unit_calculator.h"
@@ -14,6 +15,18 @@

namespace pegasus {
namespace server {

DSN_DEFINE_int64(
"pegasus.server",
rocksdb_limiter_max_write_megabytes_per_sec,
500,
"max rate of rocksdb flush and compaction(MB/s), if less than or equal to 0 means close limit");

DSN_DEFINE_bool("pegasus.server",
rocksdb_limiter_enable_auto_tune,
false,
"whether to enable write rate auto tune when open rocksdb write limit");

pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
: pegasus_read_service(r),
_db(nullptr),
@@ -251,6 +264,22 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
tbl_opts.block_cache = _s_block_cache;
}

// FLAGS_rocksdb_limiter_max_write_megabytes_per_sec <= 0 means close the rate limit.
// For more detail arguments see
// https://github.com/facebook/rocksdb/blob/v6.6.4/include/rocksdb/rate_limiter.h#L111-L137
if (FLAGS_rocksdb_limiter_max_write_megabytes_per_sec > 0) {
static std::once_flag flag;
std::call_once(flag, [&]() {
_s_rate_limiter = std::shared_ptr<rocksdb::RateLimiter>(rocksdb::NewGenericRateLimiter(
FLAGS_rocksdb_limiter_max_write_megabytes_per_sec << 20,
100 * 1000, // refill_period_us
10, // fairness
rocksdb::RateLimiter::Mode::kWritesOnly,
FLAGS_rocksdb_limiter_enable_auto_tune));
});
_db_opts.rate_limiter = _s_rate_limiter;
}

// Bloom filter configurations.
bool disable_bloom_filter = dsn_config_get_value_bool(
"pegasus.server", "rocksdb_disable_bloom_filter", false, "Whether to disable bloom filter");
@@ -400,8 +429,8 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
COUNTER_TYPE_NUMBER,
"statistic the total count of rocksdb block cache");

// Block cache is a singleton on this server shared by all replicas, so we initialize
// `_pfc_rdb_block_cache_mem_usage` only once.
// These counters are singletons on this server shared by all replicas, so we initialize
// them only once.
static std::once_flag flag;
std::call_once(flag, [&]() {
_pfc_rdb_block_cache_mem_usage.init_global_counter(
@@ -410,6 +439,13 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
"rdb.block_cache.memory_usage",
COUNTER_TYPE_NUMBER,
"statistic the memory usage of rocksdb block cache");

_pfc_rdb_write_limiter_rate_bytes.init_global_counter(
"replica",
"app.pegasus",
"rdb.write_limiter_rate_bytes",
COUNTER_TYPE_NUMBER,
"statistic the through bytes of rocksdb write rate limiter");
});

snprintf(name, 255, "rdb.index_and_filter_blocks.memory_usage@%s", str_gpid.c_str());

0 comments on commit 4a8a779

Please sign in to comment.