Skip to content

Commit

Permalink
update by review
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong committed Apr 3, 2020
1 parent 4cc7e7c commit 5b8f852
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 37 deletions.
66 changes: 37 additions & 29 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "pegasus_event_listener.h"
#include "pegasus_server_write.h"
#include "meta_store.h"
#include "range_read_limiter.h"

using namespace dsn::literals::chrono_literals;

Expand Down Expand Up @@ -96,35 +95,37 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
1000,
"multi-get operation iterate count exceed this threshold will be logged, 0 means no check");

_multi_get_max_iteration_count = (uint32_t)dsn_config_get_value_uint64(
_rng_rd_opts.multi_get_max_iteration_count = (uint32_t)dsn_config_get_value_uint64(
"pegasus.server",
"rocksdb_multi_get_max_iteration_count",
3000,
"max iteration count for each rocksdb iterator operation for multi-get operation, if "
"max iteration count for each range read for multi-get operation, if "
"exceed this threshold,"
"iterator will be stopped");

_multi_get_max_iteration_size =
_rng_rd_opts.multi_get_max_iteration_size =
dsn_config_get_value_uint64("pegasus.server",
"rocksdb_multi_get_max_iteration_size",
30 << 20,
"multi-get operation total key-value size exceed "
"this threshold will stop iterating rocksdb, 0 means no check");

_rocksdb_max_iteration_count = (uint32_t)dsn_config_get_value_uint64(
"pegasus.server",
"rocksdb_max_iteration_count",
1000,
"max iteration count for each rocksdb iterator operation, if exceed this threshold,"
"iterator will be stopped");
_rng_rd_opts.rocksdb_max_iteration_count =
(uint32_t)dsn_config_get_value_uint64("pegasus.server",
"rocksdb_max_iteration_count",
1000,
"max iteration count for each range "
"read, if exceed this threshold, "
"iterator will be stopped");

_rocksdb_iteration_threshold_time_ms_in_config = dsn_config_get_value_uint64(
_rng_rd_opts.rocksdb_iteration_threshold_time_ms_in_config = dsn_config_get_value_uint64(
"pegasus.server",
"rocksdb_iteration_threshold_time_ms",
30000,
"max duration for handling one pegasus scan request(sortkey_count/multiget/scan) if exceed "
"this threshold, iterator will be stopped, 0 means no check");
_rocksdb_iteration_threshold_time_ms = _rocksdb_iteration_threshold_time_ms_in_config;
_rng_rd_opts.rocksdb_iteration_threshold_time_ms =
_rng_rd_opts.rocksdb_iteration_threshold_time_ms_in_config;

// init rocksdb::DBOptions
_db_opts.pegasus_data = true;
Expand Down Expand Up @@ -722,11 +723,13 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req
}

uint32_t max_kv_count = request.max_kv_count > 0 ? request.max_kv_count : INT_MAX;
uint32_t max_iteration_count = std::min(max_kv_count, _multi_get_max_iteration_count);
uint32_t max_iteration_count =
std::min(max_kv_count, _rng_rd_opts.multi_get_max_iteration_count);

int32_t max_kv_size = request.max_kv_size > 0 ? request.max_kv_size : INT_MAX;
int32_t max_iteration_size_config =
_multi_get_max_iteration_size > 0 ? _multi_get_max_iteration_size : INT_MAX;
int32_t max_iteration_size_config = _rng_rd_opts.multi_get_max_iteration_size > 0
? _rng_rd_opts.multi_get_max_iteration_size
: INT_MAX;
int32_t max_iteration_size = std::min(max_kv_size, max_iteration_size_config);

uint32_t epoch_now = ::pegasus::utils::epoch_now();
Expand Down Expand Up @@ -808,8 +811,10 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req
std::unique_ptr<rocksdb::Iterator> it;
bool complete = false;

std::unique_ptr<range_read_limiter> limiter = dsn::make_unique<range_read_limiter>(
max_iteration_count, max_iteration_size, _rocksdb_iteration_threshold_time_ms);
std::unique_ptr<range_read_limiter> limiter =
dsn::make_unique<range_read_limiter>(max_iteration_count,
max_iteration_size,
_rng_rd_opts.rocksdb_iteration_threshold_time_ms);

if (!request.reverse) {
it.reset(_db->NewIterator(_data_cf_rd_opts, _data_cf));
Expand Down Expand Up @@ -973,7 +978,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req
limiter->max_duration_time());
}
}
} else {
} else { // condition: !request.sort_keys.empty()
bool error_occurred = false;
rocksdb::Status final_status;
bool exceed_limit = false;
Expand Down Expand Up @@ -1132,8 +1137,10 @@ void pegasus_server_impl::on_sortkey_count(const ::dsn::blob &hash_key,
uint32_t epoch_now = ::pegasus::utils::epoch_now();
uint64_t expire_count = 0;

std::unique_ptr<range_read_limiter> limiter = dsn::make_unique<range_read_limiter>(
_rocksdb_max_iteration_count, 0, _rocksdb_iteration_threshold_time_ms);
std::unique_ptr<range_read_limiter> limiter =
dsn::make_unique<range_read_limiter>(_rng_rd_opts.rocksdb_max_iteration_count,
0,
_rng_rd_opts.rocksdb_iteration_threshold_time_ms);

while (it->Valid()) {
limiter->add_count();
Expand Down Expand Up @@ -1356,11 +1363,11 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request
int32_t count = 0;

uint32_t request_batch_size = request.batch_size > 0 ? request.batch_size : INT_MAX;
uint32_t batch_count = std::min(request_batch_size, _rocksdb_max_iteration_count);
uint32_t batch_count = std::min(request_batch_size, _rng_rd_opts.rocksdb_max_iteration_count);
resp.kvs.reserve(batch_count);

std::unique_ptr<range_read_limiter> limiter =
dsn::make_unique<range_read_limiter>(batch_count, 0, _rocksdb_iteration_threshold_time_ms);
std::unique_ptr<range_read_limiter> limiter = dsn::make_unique<range_read_limiter>(
batch_count, 0, _rng_rd_opts.rocksdb_iteration_threshold_time_ms);

while (limiter->valid() && it->Valid()) {
int c = it->key().compare(stop);
Expand Down Expand Up @@ -1518,10 +1525,11 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request,
int32_t count = 0;

uint32_t context_batch_size = context->batch_size > 0 ? context->batch_size : INT_MAX;
uint32_t batch_count = std::min(context_batch_size, _rocksdb_max_iteration_count);
uint32_t batch_count =
std::min(context_batch_size, _rng_rd_opts.rocksdb_max_iteration_count);

std::unique_ptr<range_read_limiter> limiter = dsn::make_unique<range_read_limiter>(
batch_count, 0, _rocksdb_iteration_threshold_time_ms);
batch_count, 0, _rng_rd_opts.rocksdb_iteration_threshold_time_ms);

while (limiter->valid() && it->Valid()) {
int c = it->key().compare(stop);
Expand Down Expand Up @@ -2648,7 +2656,7 @@ void pegasus_server_impl::update_slow_query_threshold(
void pegasus_server_impl::update_rocksdb_iteration_threshold(
const std::map<std::string, std::string> &envs)
{
uint64_t threshold_ms = _rocksdb_iteration_threshold_time_ms_in_config;
uint64_t threshold_ms = _rng_rd_opts.rocksdb_iteration_threshold_time_ms_in_config;
auto find = envs.find(ROCKSDB_ITERATION_THRESHOLD_TIME_MS);
if (find != envs.end()) {
// the unit of iteration threshold from env is ms
Expand All @@ -2658,12 +2666,12 @@ void pegasus_server_impl::update_rocksdb_iteration_threshold(
}
}

if (_rocksdb_iteration_threshold_time_ms != threshold_ms) {
if (_rng_rd_opts.rocksdb_iteration_threshold_time_ms != threshold_ms) {
ddebug_replica("update app env[{}] from \"{}\" to \"{}\" succeed",
ROCKSDB_ITERATION_THRESHOLD_TIME_MS,
_rocksdb_iteration_threshold_time_ms,
_rng_rd_opts.rocksdb_iteration_threshold_time_ms,
threshold_ms);
_rocksdb_iteration_threshold_time_ms = threshold_ms;
_rng_rd_opts.rocksdb_iteration_threshold_time_ms = threshold_ms;
}
}

Expand Down
10 changes: 3 additions & 7 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
#include "pegasus_scan_context.h"
#include "pegasus_manual_compact_service.h"
#include "pegasus_write_service.h"
#include "range_read_limiter.h"

namespace pegasus {
namespace server {

class meta_store;
class capacity_unit_calculator;
class pegasus_server_write;
class range_read_limiter_options;

class pegasus_server_impl : public ::dsn::apps::rrdb_service
{
Expand Down Expand Up @@ -322,12 +322,8 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service
// slow query time threshold. exceed this threshold will be logged.
uint64_t _slow_query_threshold_ns;
uint64_t _slow_query_threshold_ns_in_config;
// abnormal multi_get/rocksdb_iteration
uint32_t _multi_get_max_iteration_count;
uint64_t _multi_get_max_iteration_size;
uint32_t _rocksdb_max_iteration_count;
uint64_t _rocksdb_iteration_threshold_time_ms_in_config;
uint64_t _rocksdb_iteration_threshold_time_ms;

range_read_limiter_options _rng_rd_opts;

std::shared_ptr<KeyWithTTLCompactionFilterFactory> _key_ttl_compaction_filter_factory;
std::shared_ptr<rocksdb::Statistics> _statistics;
Expand Down
12 changes: 11 additions & 1 deletion src/server/range_read_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,21 @@
#pragma once

#include <dsn/dist/replication/replication.codes.h>
#include "pegasus_server_impl.h"

namespace pegasus {
namespace server {

class pegasus_server_impl;

struct range_read_limiter_options
{
uint32_t multi_get_max_iteration_count;
uint64_t multi_get_max_iteration_size;
uint32_t rocksdb_max_iteration_count;
uint64_t rocksdb_iteration_threshold_time_ms_in_config;
uint64_t rocksdb_iteration_threshold_time_ms;
};

class range_read_limiter
{
public:
Expand Down

0 comments on commit 5b8f852

Please sign in to comment.