diff --git a/src/server/hotspot_partition_calculator.cpp b/src/server/hotspot_partition_calculator.cpp index aea5a335d8..cc0142bdf6 100644 --- a/src/server/hotspot_partition_calculator.cpp +++ b/src/server/hotspot_partition_calculator.cpp @@ -17,16 +17,13 @@ #include "hotspot_partition_calculator.h" -#include #include #include +#include #include -#include -#include #include -#include -#include #include +#include namespace pegasus { namespace server { @@ -180,44 +177,39 @@ void hotspot_partition_calculator::detect_hotkey_in_hotpartition(int data_type) const dsn::replication::detect_action::type action) { FAIL_POINT_INJECT_F("send_detect_hotkey_request", [](dsn::string_view) {}); - auto request = dsn::make_unique(); - request->type = hotkey_type; - request->action = action; - ddebug_f("{} {} hotkey detection in {}.{}", + + int app_id; + int partition_count; + std::vector partitions; + _shell_context->ddl_client->list_app(app_name, app_id, partition_count, partitions); + + auto target_address = partitions[partition_index].primary; + dsn::replication::detect_hotkey_response resp; + dsn::replication::detect_hotkey_request req; + req.type = hotkey_type; + req.action = action; + auto error = _shell_context->ddl_client->detect_hotkey(target_address, req, resp); + + ddebug_f("{} {} hotkey detection in {}.{}, server address: {}", (action == dsn::replication::detect_action::STOP) ? "Stop" : "Start", (hotkey_type == dsn::replication::hotkey_type::WRITE) ? "write" : "read", app_name, - partition_index); - dsn::rpc_address meta_server; - meta_server.assign_group("meta-servers"); - std::vector meta_servers; - replica_helper::load_meta_servers(meta_servers); - for (const auto &address : meta_servers) { - meta_server.group_address()->add(address); + partition_index, + target_address.to_string()); + + if (error != dsn::ERR_OK) { + derror_f("Hotkey detect rpc sending failed, in {}.{}, error_hint:{}", + app_name, + partition_index, + error.to_string()); } - auto cluster_name = dsn::replication::get_current_cluster_name(); - // TODO: (Tangyanzhao) refactor partition_resolver to replication_ddl_client - auto resolver = partition_resolver::get_resolver(cluster_name, meta_servers, app_name.c_str()); - dsn::task_tracker tracker; - detect_hotkey_rpc rpc( - std::move(request), RPC_DETECT_HOTKEY, std::chrono::seconds(10), partition_index); - rpc.call(resolver, - &tracker, - [app_name, partition_index](dsn::error_code error) { - if (error != dsn::ERR_OK) { - derror_f("Hotkey detect rpc sending failed, in {}.{}, error_hint:{}", - app_name, - partition_index, - error.to_string()); - } - }) - ->wait(); - if (rpc.response().err != dsn::ERR_OK) { - derror_f("Hotkey detect rpc sending failed, in {}.{}, error_hint:{} {}", + + if (resp.err != dsn::ERR_OK) { + derror_f("Hotkey detect rpc executing failed, in {}.{}, error_hint:{} {}", app_name, partition_index, - rpc.response().err, - rpc.response().err_hint); + resp.err, + resp.err_hint); } } diff --git a/src/server/hotspot_partition_calculator.h b/src/server/hotspot_partition_calculator.h index 2cbc62bcaf..ddbbb15026 100644 --- a/src/server/hotspot_partition_calculator.h +++ b/src/server/hotspot_partition_calculator.h @@ -37,7 +37,9 @@ typedef std::vector> hot_partition_coun class hotspot_partition_calculator { public: - hotspot_partition_calculator(const std::string &app_name, int partition_count) + hotspot_partition_calculator(const std::string &app_name, + int partition_count, + std::shared_ptr context) : _app_name(app_name), _hot_points(partition_count), _hotpartition_counter(partition_count) { init_perf_counter(partition_count); @@ -46,10 +48,10 @@ class hotspot_partition_calculator void data_aggregate(const std::vector &partitions); // analyse the saved data to find hotspot partition void data_analyse(); - static void send_detect_hotkey_request(const std::string &app_name, - const uint64_t partition_index, - const dsn::replication::hotkey_type::type hotkey_type, - const dsn::replication::detect_action::type action); + void send_detect_hotkey_request(const std::string &app_name, + const uint64_t partition_index, + const dsn::replication::hotkey_type::type hotkey_type, + const dsn::replication::detect_action::type action); private: // empirical rule to calculate hot point of each partition @@ -65,6 +67,7 @@ class hotspot_partition_calculator hot_partition_counters _hot_points; // saving historical data can improve accuracy stat_histories _partitions_stat_histories; + std::shared_ptr _shell_context; // _hotpartition_counter p[index_of_partitions][type_of_read(0)/write(1)_stat] // it's a counter to find partitions that often exceed the threshold diff --git a/src/server/info_collector.cpp b/src/server/info_collector.cpp index d3cbe74578..2efabaed3a 100644 --- a/src/server/info_collector.cpp +++ b/src/server/info_collector.cpp @@ -57,9 +57,10 @@ info_collector::info_collector() _cluster_name = dsn::replication::get_current_cluster_name(); - _shell_context.current_cluster_name = _cluster_name; - _shell_context.meta_list = meta_servers; - _shell_context.ddl_client.reset(new replication_ddl_client(meta_servers)); + _shell_context = std::make_shared(); + _shell_context->current_cluster_name = _cluster_name; + _shell_context->meta_list = meta_servers; + _shell_context->ddl_client.reset(new replication_ddl_client(meta_servers)); _app_stat_interval_seconds = (uint32_t)dsn_config_get_value_uint64("pegasus.collector", "app_stat_interval_seconds", @@ -143,7 +144,7 @@ void info_collector::on_app_stat() { ddebug("start to stat apps"); std::map> all_rows; - if (!get_app_partition_stat(&_shell_context, all_rows)) { + if (!get_app_partition_stat(_shell_context.get(), all_rows)) { derror("call get_app_stat() failed"); return; } @@ -241,7 +242,7 @@ void info_collector::on_capacity_unit_stat(int remaining_retry_count) { ddebug("start to stat capacity unit, remaining_retry_count = %d", remaining_retry_count); std::vector nodes_stat; - if (!get_capacity_unit_stat(&_shell_context, nodes_stat)) { + if (!get_capacity_unit_stat(_shell_context.get(), nodes_stat)) { if (remaining_retry_count > 0) { dwarn("get capacity unit stat failed, remaining_retry_count = %d, " "wait %u seconds to retry", @@ -288,7 +289,7 @@ void info_collector::on_storage_size_stat(int remaining_retry_count) { ddebug("start to stat storage size, remaining_retry_count = %d", remaining_retry_count); app_storage_size_stat st_stat; - if (!get_storage_size_stat(&_shell_context, st_stat)) { + if (!get_storage_size_stat(_shell_context.get(), st_stat)) { if (remaining_retry_count > 0) { dwarn("get storage size stat failed, remaining_retry_count = %d, " "wait %u seconds to retry", @@ -316,7 +317,8 @@ info_collector::get_hotspot_calculator(const std::string &app_name, const int pa if (iter != _hotspot_calculator_store.end()) { return iter->second; } - auto calculator = std::make_shared(app_name, partition_count); + auto calculator = + std::make_shared(app_name, partition_count, _shell_context); _hotspot_calculator_store[app_name_pcount] = calculator; return calculator; } diff --git a/src/server/info_collector.h b/src/server/info_collector.h index 12bebdda0d..a7afff20c1 100644 --- a/src/server/info_collector.h +++ b/src/server/info_collector.h @@ -173,7 +173,7 @@ class info_collector dsn::task_tracker _tracker; ::dsn::rpc_address _meta_servers; std::string _cluster_name; - shell_context _shell_context; + std::shared_ptr _shell_context; uint32_t _app_stat_interval_seconds; ::dsn::task_ptr _app_stat_timer_task; ::dsn::utils::ex_lock_nr _app_stat_counter_lock; diff --git a/src/server/test/hotspot_partition_test.cpp b/src/server/test/hotspot_partition_test.cpp index f41923ccaa..645fa8abf1 100644 --- a/src/server/test/hotspot_partition_test.cpp +++ b/src/server/test/hotspot_partition_test.cpp @@ -29,7 +29,7 @@ DSN_DECLARE_int32(occurrence_threshold); class hotspot_partition_test : public pegasus_server_test_base { public: - hotspot_partition_test() : calculator("TEST", 8) + hotspot_partition_test() : calculator("TEST", 8, nullptr) { dsn::fail::setup(); dsn::fail::cfg("send_detect_hotkey_request", "return()");