From 4a8a779defa907ba3fdec42ad57e057f282b587c Mon Sep 17 00:00:00 2001 From: Shuo Date: Fri, 12 Jun 2020 19:02:25 +0800 Subject: [PATCH] feat(rocksdb): Adapt rate limiter to prevent bust writes and huge compaction (#543) --- src/server/config.ini | 2 ++ src/server/pegasus_server_impl.cpp | 25 ++++++++++++---- src/server/pegasus_server_impl.h | 5 ++++ src/server/pegasus_server_impl_init.cpp | 40 +++++++++++++++++++++++-- 4 files changed, 64 insertions(+), 8 deletions(-) diff --git a/src/server/config.ini b/src/server/config.ini index 04f026f726..6bf29198c7 100644 --- a/src/server/config.ini +++ b/src/server/config.ini @@ -311,6 +311,8 @@ rocksdb_multi_get_max_iteration_size = 31457280 rocksdb_max_iteration_count = 1000 rocksdb_iteration_threshold_time_ms = 30000 + rocksdb_limiter_max_write_megabytes_per_sec = 500 + rocksdb_limiter_enable_auto_tune = false checkpoint_reserve_min_count = 2 checkpoint_reserve_time_seconds = 1800 diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 8f74e4425b..7a6fd4ec59 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -42,12 +42,16 @@ static bool chkpt_init_from_dir(const char *name, int64_t &decree) std::string(name) == chkpt_get_dir_name(decree); } +std::shared_ptr pegasus_server_impl::_s_rate_limiter; +int64_t pegasus_server_impl::_rocksdb_limiter_last_total_through; std::shared_ptr pegasus_server_impl::_s_block_cache; ::dsn::task_ptr pegasus_server_impl::_update_server_rdb_stat; ::dsn::perf_counter_wrapper pegasus_server_impl::_pfc_rdb_block_cache_mem_usage; +::dsn::perf_counter_wrapper pegasus_server_impl::_pfc_rdb_write_limiter_rate_bytes; const std::string pegasus_server_impl::COMPRESSION_HEADER = "per_level:"; const std::string pegasus_server_impl::DATA_COLUMN_FAMILY_NAME = "default"; const std::string pegasus_server_impl::META_COLUMN_FAMILY_NAME = "pegasus_meta_cf"; +const std::chrono::seconds pegasus_server_impl::kServerStatUpdateTimeSec = std::chrono::seconds(10); void pegasus_server_impl::parse_checkpoints() { @@ -1424,16 +1428,18 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv) [this]() { this->update_replica_rocksdb_statistics(); }, _update_rdb_stat_interval); - // Block cache is a singleton on this server shared by all replicas, its metrics update task - // should be scheduled once an interval on the server view. + // These counters are singletons on this server shared by all replicas, their metrics update + // task should be scheduled once an interval on the server view. static std::once_flag flag; std::call_once(flag, [&]() { // The timer task will always running even though there is no replicas + dassert(kServerStatUpdateTimeSec.count() != 0, + "kServerStatUpdateTimeSec shouldn't be zero"); _update_server_rdb_stat = ::dsn::tasking::enqueue_timer( LPC_REPLICATION_LONG_COMMON, nullptr, // TODO: the tracker is nullptr, we will fix it later [this]() { update_server_rocksdb_statistics(); }, - _update_rdb_stat_interval); + kServerStatUpdateTimeSec); }); // initialize cu calculator and write service after server being initialized. @@ -2107,9 +2113,16 @@ void pegasus_server_impl::update_server_rocksdb_statistics() if (_s_block_cache) { uint64_t val = _s_block_cache->GetUsage(); _pfc_rdb_block_cache_mem_usage->set(val); - dinfo_f("_pfc_rdb_block_cache_mem_usage: {} bytes", val); - } else { - dinfo("_pfc_rdb_block_cache_mem_usage: 0 bytes because block cache is disabled"); + } + + // Update _pfc_rdb_write_limiter_rate_bytes + if (_s_rate_limiter) { + uint64_t current_total_through = _s_rate_limiter->GetTotalBytesThrough(); + uint64_t through_bytes_per_sec = + (current_total_through - _rocksdb_limiter_last_total_through) / + kServerStatUpdateTimeSec.count(); + _pfc_rdb_write_limiter_rate_bytes->set(through_bytes_per_sec); + _rocksdb_limiter_last_total_through = current_total_through; } } diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index d23a7e896d..76aa5ab546 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -13,6 +13,7 @@ #include #include #include +#include #include "key_ttl_compaction_filter.h" #include "pegasus_scan_context.h" @@ -317,6 +318,7 @@ class pegasus_server_impl : public pegasus_read_service ::dsn::error_code flush_all_family_columns(bool wait); private: + static const std::chrono::seconds kServerStatUpdateTimeSec; static const std::string COMPRESSION_HEADER; // Column family names. static const std::string DATA_COLUMN_FAMILY_NAME; @@ -346,6 +348,8 @@ class pegasus_server_impl : public pegasus_read_service rocksdb::ColumnFamilyHandle *_data_cf; rocksdb::ColumnFamilyHandle *_meta_cf; static std::shared_ptr _s_block_cache; + static std::shared_ptr _s_rate_limiter; + static int64_t _rocksdb_limiter_last_total_through; volatile bool _is_open; uint32_t _pegasus_data_version; std::atomic _last_durable_decree; @@ -392,6 +396,7 @@ class pegasus_server_impl : public pegasus_read_service // rocksdb internal statistics // server level + static ::dsn::perf_counter_wrapper _pfc_rdb_write_limiter_rate_bytes; static ::dsn::perf_counter_wrapper _pfc_rdb_block_cache_mem_usage; // replica level ::dsn::perf_counter_wrapper _pfc_rdb_sst_count; diff --git a/src/server/pegasus_server_impl_init.cpp b/src/server/pegasus_server_impl_init.cpp index 8a323949f9..a858707d32 100644 --- a/src/server/pegasus_server_impl_init.cpp +++ b/src/server/pegasus_server_impl_init.cpp @@ -4,6 +4,7 @@ #include "pegasus_server_impl.h" +#include #include #include "capacity_unit_calculator.h" @@ -14,6 +15,18 @@ namespace pegasus { namespace server { + +DSN_DEFINE_int64( + "pegasus.server", + rocksdb_limiter_max_write_megabytes_per_sec, + 500, + "max rate of rocksdb flush and compaction(MB/s), if less than or equal to 0 means close limit"); + +DSN_DEFINE_bool("pegasus.server", + rocksdb_limiter_enable_auto_tune, + false, + "whether to enable write rate auto tune when open rocksdb write limit"); + pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) : pegasus_read_service(r), _db(nullptr), @@ -251,6 +264,22 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) tbl_opts.block_cache = _s_block_cache; } + // FLAGS_rocksdb_limiter_max_write_megabytes_per_sec <= 0 means close the rate limit. + // For more detail arguments see + // https://github.com/facebook/rocksdb/blob/v6.6.4/include/rocksdb/rate_limiter.h#L111-L137 + if (FLAGS_rocksdb_limiter_max_write_megabytes_per_sec > 0) { + static std::once_flag flag; + std::call_once(flag, [&]() { + _s_rate_limiter = std::shared_ptr(rocksdb::NewGenericRateLimiter( + FLAGS_rocksdb_limiter_max_write_megabytes_per_sec << 20, + 100 * 1000, // refill_period_us + 10, // fairness + rocksdb::RateLimiter::Mode::kWritesOnly, + FLAGS_rocksdb_limiter_enable_auto_tune)); + }); + _db_opts.rate_limiter = _s_rate_limiter; + } + // Bloom filter configurations. bool disable_bloom_filter = dsn_config_get_value_bool( "pegasus.server", "rocksdb_disable_bloom_filter", false, "Whether to disable bloom filter"); @@ -400,8 +429,8 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) COUNTER_TYPE_NUMBER, "statistic the total count of rocksdb block cache"); - // Block cache is a singleton on this server shared by all replicas, so we initialize - // `_pfc_rdb_block_cache_mem_usage` only once. + // These counters are singletons on this server shared by all replicas, so we initialize + // them only once. static std::once_flag flag; std::call_once(flag, [&]() { _pfc_rdb_block_cache_mem_usage.init_global_counter( @@ -410,6 +439,13 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) "rdb.block_cache.memory_usage", COUNTER_TYPE_NUMBER, "statistic the memory usage of rocksdb block cache"); + + _pfc_rdb_write_limiter_rate_bytes.init_global_counter( + "replica", + "app.pegasus", + "rdb.write_limiter_rate_bytes", + COUNTER_TYPE_NUMBER, + "statistic the through bytes of rocksdb write rate limiter"); }); snprintf(name, 255, "rdb.index_and_filter_blocks.memory_usage@%s", str_gpid.c_str());