Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rocksdb): Adapt rate limiter to prevent bust writes and huge compaction #543

Merged
merged 9 commits into from
Jun 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/server/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@
rocksdb_multi_get_max_iteration_size = 31457280
rocksdb_max_iteration_count = 1000
rocksdb_iteration_threshold_time_ms = 30000
rocksdb_limiter_max_write_megabytes_per_sec = 500
rocksdb_limiter_enable_auto_tune = false

checkpoint_reserve_min_count = 2
checkpoint_reserve_time_seconds = 1800
Expand Down
25 changes: 19 additions & 6 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@ static bool chkpt_init_from_dir(const char *name, int64_t &decree)
std::string(name) == chkpt_get_dir_name(decree);
}

std::shared_ptr<rocksdb::RateLimiter> pegasus_server_impl::_s_rate_limiter;
int64_t pegasus_server_impl::_rocksdb_limiter_last_total_through;
std::shared_ptr<rocksdb::Cache> pegasus_server_impl::_s_block_cache;
::dsn::task_ptr pegasus_server_impl::_update_server_rdb_stat;
::dsn::perf_counter_wrapper pegasus_server_impl::_pfc_rdb_block_cache_mem_usage;
::dsn::perf_counter_wrapper pegasus_server_impl::_pfc_rdb_write_limiter_rate_bytes;
const std::string pegasus_server_impl::COMPRESSION_HEADER = "per_level:";
const std::string pegasus_server_impl::DATA_COLUMN_FAMILY_NAME = "default";
const std::string pegasus_server_impl::META_COLUMN_FAMILY_NAME = "pegasus_meta_cf";
const std::chrono::seconds pegasus_server_impl::kServerStatUpdateTimeSec = std::chrono::seconds(10);

void pegasus_server_impl::parse_checkpoints()
{
Expand Down Expand Up @@ -1424,16 +1428,18 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv)
[this]() { this->update_replica_rocksdb_statistics(); },
_update_rdb_stat_interval);

// Block cache is a singleton on this server shared by all replicas, its metrics update task
// should be scheduled once an interval on the server view.
// These counters are singletons on this server shared by all replicas, their metrics update
// task should be scheduled once an interval on the server view.
static std::once_flag flag;
std::call_once(flag, [&]() {
// The timer task will always running even though there is no replicas
dassert(kServerStatUpdateTimeSec.count() != 0,
"kServerStatUpdateTimeSec shouldn't be zero");
_update_server_rdb_stat = ::dsn::tasking::enqueue_timer(
LPC_REPLICATION_LONG_COMMON,
nullptr, // TODO: the tracker is nullptr, we will fix it later
[this]() { update_server_rocksdb_statistics(); },
_update_rdb_stat_interval);
kServerStatUpdateTimeSec);
});

// initialize cu calculator and write service after server being initialized.
Expand Down Expand Up @@ -2107,9 +2113,16 @@ void pegasus_server_impl::update_server_rocksdb_statistics()
if (_s_block_cache) {
uint64_t val = _s_block_cache->GetUsage();
_pfc_rdb_block_cache_mem_usage->set(val);
dinfo_f("_pfc_rdb_block_cache_mem_usage: {} bytes", val);
} else {
dinfo("_pfc_rdb_block_cache_mem_usage: 0 bytes because block cache is disabled");
}

// Update _pfc_rdb_write_limiter_rate_bytes
if (_s_rate_limiter) {
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
uint64_t current_total_through = _s_rate_limiter->GetTotalBytesThrough();
uint64_t through_bytes_per_sec =
(current_total_through - _rocksdb_limiter_last_total_through) /
kServerStatUpdateTimeSec.count();
_pfc_rdb_write_limiter_rate_bytes->set(through_bytes_per_sec);
_rocksdb_limiter_last_total_through = current_total_through;
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <dsn/dist/replication/replication.codes.h>
#include <rrdb/rrdb_types.h>
#include <gtest/gtest_prod.h>
#include <rocksdb/rate_limiter.h>

#include "key_ttl_compaction_filter.h"
#include "pegasus_scan_context.h"
Expand Down Expand Up @@ -317,6 +318,7 @@ class pegasus_server_impl : public pegasus_read_service
::dsn::error_code flush_all_family_columns(bool wait);

private:
static const std::chrono::seconds kServerStatUpdateTimeSec;
static const std::string COMPRESSION_HEADER;
// Column family names.
static const std::string DATA_COLUMN_FAMILY_NAME;
Expand Down Expand Up @@ -346,6 +348,8 @@ class pegasus_server_impl : public pegasus_read_service
rocksdb::ColumnFamilyHandle *_data_cf;
rocksdb::ColumnFamilyHandle *_meta_cf;
static std::shared_ptr<rocksdb::Cache> _s_block_cache;
static std::shared_ptr<rocksdb::RateLimiter> _s_rate_limiter;
static int64_t _rocksdb_limiter_last_total_through;
volatile bool _is_open;
uint32_t _pegasus_data_version;
std::atomic<int64_t> _last_durable_decree;
Expand Down Expand Up @@ -392,6 +396,7 @@ class pegasus_server_impl : public pegasus_read_service

// rocksdb internal statistics
// server level
static ::dsn::perf_counter_wrapper _pfc_rdb_write_limiter_rate_bytes;
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
static ::dsn::perf_counter_wrapper _pfc_rdb_block_cache_mem_usage;
// replica level
::dsn::perf_counter_wrapper _pfc_rdb_sst_count;
Expand Down
40 changes: 38 additions & 2 deletions src/server/pegasus_server_impl_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "pegasus_server_impl.h"

#include <dsn/utility/flags.h>
#include <rocksdb/filter_policy.h>

#include "capacity_unit_calculator.h"
Expand All @@ -14,6 +15,18 @@

namespace pegasus {
namespace server {

DSN_DEFINE_int64(
"pegasus.server",
rocksdb_limiter_max_write_megabytes_per_sec,
500,
"max rate of rocksdb flush and compaction(MB/s), if less than or equal to 0 means close limit");

DSN_DEFINE_bool("pegasus.server",
rocksdb_limiter_enable_auto_tune,
false,
"whether to enable write rate auto tune when open rocksdb write limit");
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved

pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
: pegasus_read_service(r),
_db(nullptr),
Expand Down Expand Up @@ -251,6 +264,22 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
tbl_opts.block_cache = _s_block_cache;
}

// FLAGS_rocksdb_limiter_max_write_megabytes_per_sec <= 0 means close the rate limit.
// For more detail arguments see
// https://github.com/facebook/rocksdb/blob/v6.6.4/include/rocksdb/rate_limiter.h#L111-L137
if (FLAGS_rocksdb_limiter_max_write_megabytes_per_sec > 0) {
static std::once_flag flag;
std::call_once(flag, [&]() {
_s_rate_limiter = std::shared_ptr<rocksdb::RateLimiter>(rocksdb::NewGenericRateLimiter(
FLAGS_rocksdb_limiter_max_write_megabytes_per_sec << 20,
100 * 1000, // refill_period_us
10, // fairness
rocksdb::RateLimiter::Mode::kWritesOnly,
FLAGS_rocksdb_limiter_enable_auto_tune));
});
_db_opts.rate_limiter = _s_rate_limiter;
}

// Bloom filter configurations.
bool disable_bloom_filter = dsn_config_get_value_bool(
"pegasus.server", "rocksdb_disable_bloom_filter", false, "Whether to disable bloom filter");
Expand Down Expand Up @@ -400,8 +429,8 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
COUNTER_TYPE_NUMBER,
"statistic the total count of rocksdb block cache");

// Block cache is a singleton on this server shared by all replicas, so we initialize
// `_pfc_rdb_block_cache_mem_usage` only once.
// These counters are singletons on this server shared by all replicas, so we initialize
// them only once.
static std::once_flag flag;
std::call_once(flag, [&]() {
_pfc_rdb_block_cache_mem_usage.init_global_counter(
Expand All @@ -410,6 +439,13 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
"rdb.block_cache.memory_usage",
COUNTER_TYPE_NUMBER,
"statistic the memory usage of rocksdb block cache");

_pfc_rdb_write_limiter_rate_bytes.init_global_counter(
"replica",
"app.pegasus",
"rdb.write_limiter_rate_bytes",
COUNTER_TYPE_NUMBER,
"statistic the through bytes of rocksdb write rate limiter");
});

snprintf(name, 255, "rdb.index_and_filter_blocks.memory_usage@%s", str_gpid.c_str());
Expand Down