Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rocksdb): Adapt rate limiter to prevent bust writes and huge compaction #543

Merged
merged 9 commits into from
Jun 12, 2020
Merged
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
@@ -42,12 +42,16 @@ static bool chkpt_init_from_dir(const char *name, int64_t &decree)
std::string(name) == chkpt_get_dir_name(decree);
}

std::shared_ptr<rocksdb::RateLimiter> pegasus_server_impl::_s_rate_limiter;
int64_t pegasus_server_impl::_rocksdb_limiter_last_total_through;
std::shared_ptr<rocksdb::Cache> pegasus_server_impl::_s_block_cache;
::dsn::task_ptr pegasus_server_impl::_update_server_rdb_stat;
::dsn::perf_counter_wrapper pegasus_server_impl::_pfc_rdb_block_cache_mem_usage;
::dsn::perf_counter_wrapper pegasus_server_impl::_pfc_rdb_write_limiter_rate_bytes;
const std::string pegasus_server_impl::COMPRESSION_HEADER = "per_level:";
const std::string pegasus_server_impl::DATA_COLUMN_FAMILY_NAME = "default";
const std::string pegasus_server_impl::META_COLUMN_FAMILY_NAME = "pegasus_meta_cf";
const std::chrono::seconds pegasus_server_impl::kServerStatUpdateTimeSec = std::chrono::seconds(10);

void pegasus_server_impl::parse_checkpoints()
{
@@ -1424,16 +1428,18 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv)
[this]() { this->update_replica_rocksdb_statistics(); },
_update_rdb_stat_interval);

// Block cache is a singleton on this server shared by all replicas, its metrics update task
// should be scheduled once an interval on the server view.
// These counters are singletons on this server shared by all replicas, their metrics update
// task should be scheduled once an interval on the server view.
static std::once_flag flag;
std::call_once(flag, [&]() {
// The timer task will always running even though there is no replicas
dassert(kServerStatUpdateTimeSec.count() != 0,
"kServerStatUpdateTimeSec shouldn't be zero");
_update_server_rdb_stat = ::dsn::tasking::enqueue_timer(
LPC_REPLICATION_LONG_COMMON,
nullptr, // TODO: the tracker is nullptr, we will fix it later
[this]() { update_server_rocksdb_statistics(); },
_update_rdb_stat_interval);
kServerStatUpdateTimeSec);
});

// initialize cu calculator and write service after server being initialized.
@@ -2107,9 +2113,16 @@ void pegasus_server_impl::update_server_rocksdb_statistics()
if (_s_block_cache) {
uint64_t val = _s_block_cache->GetUsage();
_pfc_rdb_block_cache_mem_usage->set(val);
dinfo_f("_pfc_rdb_block_cache_mem_usage: {} bytes", val);
} else {
dinfo("_pfc_rdb_block_cache_mem_usage: 0 bytes because block cache is disabled");
}

// Update _pfc_rdb_write_limiter_rate_bytes
if (_s_rate_limiter) {
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
uint64_t current_total_through = _s_rate_limiter->GetTotalBytesThrough();
uint64_t through_bytes_per_sec =
(current_total_through - _rocksdb_limiter_last_total_through) /
kServerStatUpdateTimeSec.count();
_pfc_rdb_write_limiter_rate_bytes->set(through_bytes_per_sec);
_rocksdb_limiter_last_total_through = current_total_through;
}
}

5 changes: 5 additions & 0 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@
#include <rrdb/rrdb_types.h>
#include <rrdb/rrdb.server.h>
#include <gtest/gtest_prod.h>
#include <rocksdb/rate_limiter.h>

#include "key_ttl_compaction_filter.h"
#include "pegasus_scan_context.h"
@@ -317,6 +318,7 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service
::dsn::error_code flush_all_family_columns(bool wait);

private:
static const std::chrono::seconds kServerStatUpdateTimeSec;
static const std::string COMPRESSION_HEADER;
// Column family names.
static const std::string DATA_COLUMN_FAMILY_NAME;
@@ -346,6 +348,8 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service
rocksdb::ColumnFamilyHandle *_data_cf;
rocksdb::ColumnFamilyHandle *_meta_cf;
static std::shared_ptr<rocksdb::Cache> _s_block_cache;
static std::shared_ptr<rocksdb::RateLimiter> _s_rate_limiter;
static int64_t _rocksdb_limiter_last_total_through;
volatile bool _is_open;
uint32_t _pegasus_data_version;
std::atomic<int64_t> _last_durable_decree;
@@ -392,6 +396,7 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service

// rocksdb internal statistics
// server level
static ::dsn::perf_counter_wrapper _pfc_rdb_write_limiter_rate_bytes;
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
static ::dsn::perf_counter_wrapper _pfc_rdb_block_cache_mem_usage;
// replica level
::dsn::perf_counter_wrapper _pfc_rdb_sst_count;
40 changes: 38 additions & 2 deletions src/server/pegasus_server_impl_init.cpp
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@

#include "pegasus_server_impl.h"

#include <dsn/utility/flags.h>
#include <rocksdb/filter_policy.h>

#include "capacity_unit_calculator.h"
@@ -14,6 +15,18 @@

namespace pegasus {
namespace server {

DSN_DEFINE_int64(
"pegasus.server",
rocksdb_limiter_max_write_megabytes_per_sec,
500,
"max rate of rocksdb flush and compaction(MB/s), if less than or equal to 0 means close limit");

DSN_DEFINE_bool("pegasus.server",
rocksdb_limiter_auto_tune_enable,
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
false,
"whether to enable write rate auto tune when open rocksdb write limit");
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved

pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
: dsn::apps::rrdb_service(r),
_db(nullptr),
@@ -251,6 +264,22 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
tbl_opts.block_cache = _s_block_cache;
}

// FLAGS_rocksdb_limiter_max_write_megabytes_per_sec <= 0 means close the rate limit.
// For more detail arguments see
// https://github.com/facebook/rocksdb/blob/v6.6.4/include/rocksdb/rate_limiter.h#L111-L137
if (FLAGS_rocksdb_limiter_max_write_megabytes_per_sec > 0) {
static std::once_flag flag;
std::call_once(flag, [&]() {
_s_rate_limiter = std::shared_ptr<rocksdb::RateLimiter>(rocksdb::NewGenericRateLimiter(
FLAGS_rocksdb_limiter_max_write_megabytes_per_sec << 20,
100000,
10,
rocksdb::RateLimiter::Mode::kWritesOnly,
FLAGS_rocksdb_limiter_auto_tune_enable));
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
});
_db_opts.rate_limiter = _s_rate_limiter;
}

// Bloom filter configurations.
bool disable_bloom_filter = dsn_config_get_value_bool(
"pegasus.server", "rocksdb_disable_bloom_filter", false, "Whether to disable bloom filter");
@@ -400,8 +429,8 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
COUNTER_TYPE_NUMBER,
"statistic the total count of rocksdb block cache");

// Block cache is a singleton on this server shared by all replicas, so we initialize
// `_pfc_rdb_block_cache_mem_usage` only once.
// These counters are singletons on this server shared by all replicas, so we initialize
// them only once.
static std::once_flag flag;
std::call_once(flag, [&]() {
_pfc_rdb_block_cache_mem_usage.init_global_counter(
@@ -410,6 +439,13 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
"rdb.block_cache.memory_usage",
COUNTER_TYPE_NUMBER,
"statistic the memory usage of rocksdb block cache");

_pfc_rdb_write_limiter_rate_bytes.init_global_counter(
"replica",
"app.pegasus",
"rdb.write_limiter_rate_bytes",
COUNTER_TYPE_NUMBER,
"statistic the through bytes of rocksdb write rate limiter");
});

snprintf(name, 255, "rdb.index_and_filter_blocks.memory_usage@%s", str_gpid.c_str());