Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
tangyanzhao committed Oct 22, 2020
2 parents 21d3ab1 + e17d5c7 commit 6cf069c
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 8 deletions.
40 changes: 33 additions & 7 deletions src/server/hotkey_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <boost/functional/hash.hpp>
#include "base/pegasus_key_schema.h"
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/flags.h>

namespace pegasus {
namespace server {
Expand Down Expand Up @@ -96,13 +97,19 @@ static int get_bucket_id(dsn::string_view data)
size_t hash_value = boost::hash_range(data.begin(), data.end());
return static_cast<int>(hash_value % FLAGS_data_capture_hash_bucket_num);
}
DSN_DEFINE_int32(
"pegasus.server",
max_seconds_to_detect_hotkey,
150,
"the max time (in seconds) allowed to capture hotkey, will stop if hotkey's not found");

hotkey_collector::hotkey_collector(dsn::replication::hotkey_type::type hotkey_type,
dsn::replication::replica_base *r_base)
: replica_base(r_base),
_state(hotkey_collector_state::STOPPED),
_hotkey_type(hotkey_type),
_internal_collector(std::make_shared<hotkey_empty_data_collector>(this))
_internal_collector(std::make_shared<hotkey_empty_data_collector>(this)),
_collector_start_time_second(0)
{
}

Expand Down Expand Up @@ -141,10 +148,12 @@ void hotkey_collector::analyse_data()
{
switch (_state.load()) {
case hotkey_collector_state::COARSE_DETECTING:
_internal_collector->analyse_data(_result);
if (_result.coarse_bucket_index != -1) {
// TODO: (Tangyanzhao) reset _internal_collector to hotkey_fine_data_collector
_state.store(hotkey_collector_state::FINE_DETECTING);
if (!terminate_if_timeout()) {
_internal_collector->analyse_data(_result);
if (_result.coarse_bucket_index != -1) {
// TODO: (Tangyanzhao) reset _internal_collector to hotkey_fine_data_collector
_state.store(hotkey_collector_state::FINE_DETECTING);
}
}
return;
default:
Expand Down Expand Up @@ -173,6 +182,7 @@ void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response
dwarn_replica(hint);
return;
case hotkey_collector_state::STOPPED:
_collector_start_time_second = dsn_now_s();
_internal_collector.reset(new hotkey_coarse_data_collector(this));
_state.store(hotkey_collector_state::COARSE_DETECTING);
resp.err = dsn::ERR_OK;
Expand All @@ -190,14 +200,30 @@ void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response

void hotkey_collector::on_stop_detect(dsn::replication::detect_hotkey_response &resp)
{
_state.store(hotkey_collector_state::STOPPED);
_internal_collector.reset();
terminate();
resp.err = dsn::ERR_OK;
std::string hint =
fmt::format("{} hotkey stopped, cache cleared", dsn::enum_to_string(_hotkey_type));
ddebug_replica(hint);
}

void hotkey_collector::terminate()
{
_state.store(hotkey_collector_state::STOPPED);
_internal_collector.reset();
_collector_start_time_second = 0;
}

bool hotkey_collector::terminate_if_timeout()
{
if (dsn_now_s() >= _collector_start_time_second + FLAGS_max_seconds_to_detect_hotkey) {
ddebug_replica("hotkey collector work time is exhausted but no hotkey has been found");
terminate();
return true;
}
return false;
}

hotkey_coarse_data_collector::hotkey_coarse_data_collector(replica_base *base)
: internal_collector_base(base), _hash_buckets(FLAGS_data_capture_hash_bucket_num)
{
Expand Down
3 changes: 3 additions & 0 deletions src/server/hotkey_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,14 @@ class hotkey_collector : public dsn::replication::replica_base
private:
void on_start_detect(dsn::replication::detect_hotkey_response &resp);
void on_stop_detect(dsn::replication::detect_hotkey_response &resp);
void terminate();
bool terminate_if_timeout();

detect_hotkey_result _result;
std::atomic<hotkey_collector_state> _state;
const dsn::replication::hotkey_type::type _hotkey_type;
std::shared_ptr<internal_collector_base> _internal_collector;
uint64_t _collector_start_time_second;
};

class internal_collector_base : public dsn::replication::replica_base
Expand Down

0 comments on commit 6cf069c

Please sign in to comment.