Skip to content

Commit

Permalink
Merge branch 'pack' of github.com:neverchanje/pegasus into pack
Browse files Browse the repository at this point in the history
  • Loading branch information
neverchanje committed Jan 26, 2021
2 parents 39ccf47 + 7a744f8 commit e6fdf2d
Show file tree
Hide file tree
Showing 12 changed files with 99 additions and 257 deletions.
5 changes: 4 additions & 1 deletion src/server/hotkey_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,21 @@ DSN_DEFINE_uint32(
hot_bucket_variance_threshold,
7,
"the variance threshold to detect hot bucket during coarse analysis of hotkey detection");
DSN_TAG_VARIABLE(hot_bucket_variance_threshold, FT_MUTABLE);

DSN_DEFINE_uint32(
"pegasus.server",
hot_key_variance_threshold,
5,
"the variance threshold to detect hot key during fine analysis of hotkey detection");
DSN_TAG_VARIABLE(hot_key_variance_threshold, FT_MUTABLE);

DSN_DEFINE_uint32("pegasus.server",
hotkey_buckets_num,
37,
"the number of data capture hash buckets");

DSN_DEFINE_validator(hotkey_buckets_num, [](int32_t bucket_num) -> bool {
DSN_DEFINE_validator(hotkey_buckets_num, [](uint32_t bucket_num) -> bool {
if (bucket_num < 3) {
return false;
}
Expand All @@ -63,6 +65,7 @@ DSN_DEFINE_uint32(
max_seconds_to_detect_hotkey,
150,
"the max time (in seconds) allowed to capture hotkey, will stop if hotkey's not found");
DSN_TAG_VARIABLE(max_seconds_to_detect_hotkey, FT_MUTABLE);

// 68–95–99.7 rule, same algorithm as hotspot_partition_calculator::stat_histories_analyse
/*extern*/ bool
Expand Down
27 changes: 16 additions & 11 deletions src/server/hotspot_partition_calculator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,20 @@ DSN_DEFINE_bool("pegasus.collector",
enable_detect_hotkey,
false,
"auto detect hot key in the hot paritition");
DSN_TAG_VARIABLE(enable_detect_hotkey, FT_MUTABLE);

DSN_DEFINE_int32("pegasus.collector",
hot_partition_threshold,
3,
"threshold of hotspot partition value, if app.stat.hotspots >= "
"FLAGS_hotpartition_threshold, this partition is a hot partition");
DSN_DEFINE_uint32("pegasus.collector",
hot_partition_threshold,
3,
"threshold of hotspot partition value, if app.stat.hotspots >= "
"FLAGS_hotpartition_threshold, this partition is a hot partition");
DSN_TAG_VARIABLE(hot_partition_threshold, FT_MUTABLE);

DSN_DEFINE_int32("pegasus.collector",
occurrence_threshold,
3,
"hot paritiotion occurrence times' threshold to send rpc to detect hotkey");
DSN_DEFINE_uint32("pegasus.collector",
occurrence_threshold,
3,
"hot paritiotion occurrence times' threshold to send rpc to detect hotkey");
DSN_TAG_VARIABLE(occurrence_threshold, FT_MUTABLE);

void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> &partition_stats)
{
Expand Down Expand Up @@ -149,9 +152,11 @@ void hotspot_partition_calculator::data_analyse()

void hotspot_partition_calculator::detect_hotkey_in_hotpartition(int data_type)
{
auto now_hot_partition_threshold = FLAGS_hot_partition_threshold;
auto now_occurrence_threshold = FLAGS_occurrence_threshold;
for (int index = 0; index < _hot_points.size(); index++) {
if (_hot_points[index][data_type].get()->get_value() >= FLAGS_hot_partition_threshold) {
if (++_hotpartition_counter[index][data_type] >= FLAGS_occurrence_threshold) {
if (_hot_points[index][data_type].get()->get_value() >= now_hot_partition_threshold) {
if (++_hotpartition_counter[index][data_type] >= now_occurrence_threshold) {
derror_f("Find a {} hot partition {}.{}",
(data_type == partition_qps_type::READ_HOTSPOT_DATA ? "read" : "write"),
_app_name,
Expand Down
90 changes: 48 additions & 42 deletions src/server/pegasus_server_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ namespace server {
pegasus_server_write::pegasus_server_write(pegasus_server_impl *server, bool verbose_log)
: replica_base(server), _write_svc(new pegasus_write_service(server)), _verbose_log(verbose_log)
{
init_non_batch_write_handlers();
}

int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
Expand All @@ -50,43 +51,11 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
return _write_svc->empty_put(_decree);
}

dsn::task_code rpc_code(requests[0]->rpc_code());
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) {
dassert(count == 1, "count = %d", count);
auto rpc = multi_put_rpc::auto_reply(requests[0]);
return _write_svc->multi_put(_write_ctx, rpc.request(), rpc.response());
auto iter = _non_batch_write_handlers.find(requests[0]->rpc_code());
if (iter != _non_batch_write_handlers.end()) {
dassert_f(count == 1, "count = {}", count);
return iter->second(requests[0]);
}
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) {
dassert(count == 1, "count = %d", count);
auto rpc = multi_remove_rpc::auto_reply(requests[0]);
return _write_svc->multi_remove(_decree, rpc.request(), rpc.response());
}
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_INCR) {
dassert(count == 1, "count = %d", count);
auto rpc = incr_rpc::auto_reply(requests[0]);
return _write_svc->incr(_decree, rpc.request(), rpc.response());
}
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_DUPLICATE) {
dassert(count == 1, "count = %d", count);
auto rpc = duplicate_rpc::auto_reply(requests[0]);
return _write_svc->duplicate(_decree, rpc.request(), rpc.response());
}
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) {
dassert(count == 1, "count = %d", count);
auto rpc = check_and_set_rpc::auto_reply(requests[0]);
return _write_svc->check_and_set(_decree, rpc.request(), rpc.response());
}
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
dassert(count == 1, "count = %d", count);
auto rpc = check_and_mutate_rpc::auto_reply(requests[0]);
return _write_svc->check_and_mutate(_decree, rpc.request(), rpc.response());
}
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_BULK_LOAD) {
dassert(count == 1, "count = %d", count);
auto rpc = ingestion_rpc::auto_reply(requests[0]);
return _write_svc->ingestion_files(_decree, rpc.request(), rpc.response());
}

return on_batched_writes(requests, count);
}

Expand Down Expand Up @@ -116,13 +85,10 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun
local_err = on_single_remove_in_batch(rpc);
_remove_rpc_batch.emplace_back(std::move(rpc));
} else {
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT ||
rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE ||
rpc_code == dsn::apps::RPC_RRDB_RRDB_INCR ||
rpc_code == dsn::apps::RPC_RRDB_RRDB_DUPLICATE) {
dfatal("rpc code not allow batch: %s", rpc_code.to_string());
if (_non_batch_write_handlers.find(rpc_code) != _non_batch_write_handlers.end()) {
dfatal_f("rpc code not allow batch: {}", rpc_code.to_string());
} else {
dfatal("rpc code not handled: %s", rpc_code.to_string());
dfatal_f("rpc code not handled: {}", rpc_code.to_string());
}
}

Expand Down Expand Up @@ -170,5 +136,45 @@ void pegasus_server_write::request_key_check(int64_t decree,
}
}

void pegasus_server_write::init_non_batch_write_handlers()
{
_non_batch_write_handlers = {
{dsn::apps::RPC_RRDB_RRDB_MULTI_PUT,
[this](dsn::message_ex *request) -> int {
auto rpc = multi_put_rpc::auto_reply(request);
return _write_svc->multi_put(_write_ctx, rpc.request(), rpc.response());
}},
{dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE,
[this](dsn::message_ex *request) -> int {
auto rpc = multi_remove_rpc::auto_reply(request);
return _write_svc->multi_remove(_decree, rpc.request(), rpc.response());
}},
{dsn::apps::RPC_RRDB_RRDB_INCR,
[this](dsn::message_ex *request) -> int {
auto rpc = incr_rpc::auto_reply(request);
return _write_svc->incr(_decree, rpc.request(), rpc.response());
}},
{dsn::apps::RPC_RRDB_RRDB_DUPLICATE,
[this](dsn::message_ex *request) -> int {
auto rpc = duplicate_rpc::auto_reply(request);
return _write_svc->duplicate(_decree, rpc.request(), rpc.response());
}},
{dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET,
[this](dsn::message_ex *request) -> int {
auto rpc = check_and_set_rpc::auto_reply(request);
return _write_svc->check_and_set(_decree, rpc.request(), rpc.response());
}},
{dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE,
[this](dsn::message_ex *request) -> int {
auto rpc = check_and_mutate_rpc::auto_reply(request);
return _write_svc->check_and_mutate(_decree, rpc.request(), rpc.response());
}},
{dsn::apps::RPC_RRDB_RRDB_BULK_LOAD,
[this](dsn::message_ex *request) -> int {
auto rpc = ingestion_rpc::auto_reply(request);
return _write_svc->ingestion_files(_decree, rpc.request(), rpc.response());
}},
};
}
} // namespace server
} // namespace pegasus
5 changes: 5 additions & 0 deletions src/server/pegasus_server_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class pegasus_server_write : public dsn::replication::replica_base
void request_key_check(int64_t decree, dsn::message_ex *m, const dsn::blob &key);

private:
void init_non_batch_write_handlers();

friend class pegasus_server_write_test;
friend class pegasus_write_service_test;
friend class pegasus_write_service_impl_test;
Expand All @@ -84,6 +86,9 @@ class pegasus_server_write : public dsn::replication::replica_base
int64_t _decree;

const bool _verbose_log;

typedef std::map<dsn::task_code, std::function<int(dsn::message_ex *)>> non_batch_writes_map;
non_batch_writes_map _non_batch_write_handlers;
};

} // namespace server
Expand Down
Loading

0 comments on commit e6fdf2d

Please sign in to comment.