diff --git a/rdsn b/rdsn index fafbd599d1..69102a786f 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit fafbd599d1df48b36bade4d08940ab79837aaf3b +Subproject commit 69102a786f3b888155bc18b8b6c58031c7d2fd98 diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h index 4f7c280d28..f4969b930f 100644 --- a/src/shell/command_helper.h +++ b/src/shell/command_helper.h @@ -35,6 +35,7 @@ #include "command_executor.h" #include "command_utils.h" +#include "validate_utils.h" using namespace dsn::replication; @@ -1077,4 +1078,4 @@ inline bool get_storage_size_stat(shell_context *sc, app_storage_size_stat &st_s dsn::utils::time_ms_to_date_time(dsn_now_ms(), buf, sizeof(buf)); st_stat.timestamp = buf; return true; -} +} \ No newline at end of file diff --git a/src/shell/commands.h b/src/shell/commands.h index 27618f026b..92ede4c06a 100644 --- a/src/shell/commands.h +++ b/src/shell/commands.h @@ -262,3 +262,7 @@ bool pause_bulk_load(command_executor *e, shell_context *sc, arguments args); bool restart_bulk_load(command_executor *e, shell_context *sc, arguments args); bool cancel_bulk_load(command_executor *e, shell_context *sc, arguments args); + +// == detect hotkey (see 'commands/detect_hotkey.cpp') == // + +bool detect_hotkey(command_executor *e, shell_context *sc, arguments args); \ No newline at end of file diff --git a/src/shell/commands/detect_hotkey.cpp b/src/shell/commands/detect_hotkey.cpp new file mode 100644 index 0000000000..57e6daa44e --- /dev/null +++ b/src/shell/commands/detect_hotkey.cpp @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "shell/commands.h" +#include "shell/validate_utils.h" +#include + +bool generate_hotkey_request(dsn::replication::detect_hotkey_request &req, + const std::string &hotkey_action, + const std::string &hotkey_type, + int app_id, + int partition_index, + std::string &err_info) +{ + std::string hotkey_action_check = hotkey_action; + std::string hotkey_type_check = hotkey_type; + std::transform( + hotkey_type_check.begin(), hotkey_type_check.end(), hotkey_type_check.begin(), tolower); + std::transform(hotkey_action_check.begin(), + hotkey_action_check.end(), + hotkey_action_check.begin(), + ::tolower); + if (hotkey_type_check == "read") { + req.type = dsn::replication::hotkey_type::type::READ; + } else if (hotkey_type_check == "write") { + req.type = dsn::replication::hotkey_type::type::WRITE; + } else { + err_info = fmt::format("\"{}\" is an invalid hotkey type (should be 'read' or 'write')\n", + hotkey_type); + return false; + } + if (hotkey_action_check == "start") { + req.action = dsn::replication::detect_action::START; + } else if (hotkey_action_check == "stop") { + req.action = dsn::replication::detect_action::STOP; + } else { + err_info = + fmt::format("\"{}\" is an invalid hotkey detect action (should be 'start' or 'stop')\n", + hotkey_action); + return false; + } + req.pid = dsn::gpid(app_id, partition_index); + return true; +} + +// TODO: (Tangyanzhao) merge hotspot_partition_calculator::send_detect_hotkey_request +bool detect_hotkey(command_executor *e, shell_context *sc, arguments args) +{ + // detect_hotkey [-a|--app_id] [-p|--partition_index][-c|--hotkey_action][-t|--hotkey_type] + const std::set ¶ms = {"a", + "app_id", + "p", + "partition_index", + "c", + "hotkey_action", + "t", + "hotkey_type", + "d", + "address"}; + const std::set flags = {}; + argh::parser cmd(args.argc, args.argv, argh::parser::PREFER_PARAM_FOR_UNREG_OPTION); + if (!validate_cmd(cmd, params, flags)) { + return false; + } + int app_id; + if (!dsn::buf2int32(cmd({"-a", "--app_id"}).str(), app_id)) { + fmt::print( + stderr, + "\"{}\" is a invalid app_id, you should use `app` to check related information\n", + cmd({"-a", "--app_id"}).str()); + return false; + } + int partition_index; + if (!dsn::buf2int32(cmd({"-p", "--partition_index"}).str(), partition_index)) { + fmt::print(stderr, + "\"{}\" is a invalid partition index, you should use `app` to check related " + "information\n", + cmd({"-p", "--partition_index"}).str()); + return false; + } + std::string hotkey_action = cmd({"-c", "--hotkey_action"}).str(); + std::string hotkey_type = cmd({"-t", "--hotkey_type"}).str(); + dsn::rpc_address target_address; + std::string err_info; + + if (!validate_ip(sc, cmd({"-d", "--address"}).str(), target_address, err_info)) { + fmt::print(stderr, err_info); + return false; + } + + dsn::replication::detect_hotkey_request req; + if (!generate_hotkey_request( + req, hotkey_action, hotkey_type, app_id, partition_index, err_info)) { + fmt::print(stderr, err_info); + return false; + } + + detect_hotkey_response resp; + auto err = sc->ddl_client->detect_hotkey(dsn::rpc_address(target_address), req, resp); + if (err != dsn::ERR_OK) { + fmt::print(stderr, + "Hotkey detect rpc sending failed, in {}.{}, error_hint:{}\n", + app_id, + partition_index, + err.to_string()); + return false; + } + + if (resp.err != dsn::ERR_OK) { + fmt::print(stderr, + "Hotkey detect rpc performed failed, in {}.{}, error_hint:{} {}\n", + app_id, + partition_index, + resp.err, + resp.err_hint); + return false; + } + + fmt::print(stderr, "YES\n"); + + return true; +} \ No newline at end of file diff --git a/src/shell/commands/disk_rebalance.cpp b/src/shell/commands/disk_rebalance.cpp index 97462c9f8e..3bcc81a2d7 100644 --- a/src/shell/commands/disk_rebalance.cpp +++ b/src/shell/commands/disk_rebalance.cpp @@ -5,45 +5,14 @@ #include "shell/commands.h" #include "shell/argh.h" #include "shell/command_output.h" +#include "shell/validate_utils.h" #include #include #include #include -#include #include -bool validate_cmd(const argh::parser &cmd, - const std::set ¶ms, - const std::set &flags) -{ - if (cmd.size() > 1) { - fmt::print(stderr, "too many params!\n"); - return false; - } - - for (const auto ¶m : cmd.params()) { - if (params.find(param.first) == params.end()) { - fmt::print(stderr, "unknown param {} = {}\n", param.first, param.second); - return false; - } - } - - for (const auto &flag : cmd.flags()) { - if (params.find(flag) != params.end()) { - fmt::print(stderr, "missing value of {}\n", flag); - return false; - } - - if (flags.find(flag) == flags.end()) { - fmt::print(stderr, "unknown flag {}\n", flag); - return false; - } - } - - return true; -} - bool query_disk_info( shell_context *sc, const argh::parser &cmd, diff --git a/src/shell/main.cpp b/src/shell/main.cpp index ea2688b2ba..094b0b3837 100644 --- a/src/shell/main.cpp +++ b/src/shell/main.cpp @@ -474,6 +474,16 @@ static command_executor commands[] = { "<-a --app_name str> [-f --forced]", cancel_bulk_load, }, + { + "detect_hotkey", + "start or stop hotkey detection on the replica", + "<-a|--app_id str> " + "<-p|--partition_index num> " + "<-t|--hotkey_type read|write> " + "<-c|--detect_action start|stop> " + "<-d|--address str>", + detect_hotkey, + }, { "exit", "exit shell", "", exit_shell, }, diff --git a/src/shell/validate_utils.cpp b/src/shell/validate_utils.cpp new file mode 100644 index 0000000000..5b00bdbfe7 --- /dev/null +++ b/src/shell/validate_utils.cpp @@ -0,0 +1,62 @@ +// +// Created by smilencer on 2020/9/28. +// + +#include "shell/validate_utils.h" + +bool validate_cmd(const argh::parser &cmd, + const std::set ¶ms, + const std::set &flags) +{ + if (cmd.size() > 1) { + fmt::print(stderr, "too many params!\n"); + return false; + } + + for (const auto ¶m : cmd.params()) { + if (params.find(param.first) == params.end()) { + fmt::print(stderr, "unknown param {} = {}\n", param.first, param.second); + return false; + } + } + + for (const auto &flag : cmd.flags()) { + if (params.find(flag) != params.end()) { + fmt::print(stderr, "missing value of {}\n", flag); + return false; + } + + if (flags.find(flag) == flags.end()) { + fmt::print(stderr, "unknown flag {}\n", flag); + return false; + } + } + + return true; +} + +bool validate_ip(shell_context *sc, + const std::string &ip_str, + /*out*/ dsn::rpc_address &target_address, + /*out*/ std::string &err_info) +{ + std::map nodes; + auto error = sc->ddl_client->list_nodes(::dsn::replication::node_status::NS_INVALID, nodes); + if (error != dsn::ERR_OK) { + err_info = fmt::format("list nodes failed, error={} \n", error.to_string()); + return false; + } + + bool not_find_ip = true; + for (const auto &node : nodes) { + if (ip_str == node.first.to_std_string()) { + target_address = node.first; + not_find_ip = false; + } + } + if (not_find_ip) { + err_info = fmt::format("invalid ip, error={} \n", ip_str); + return false; + } + return true; +} \ No newline at end of file diff --git a/src/shell/validate_utils.h b/src/shell/validate_utils.h new file mode 100644 index 0000000000..26909a2a79 --- /dev/null +++ b/src/shell/validate_utils.h @@ -0,0 +1,12 @@ +#include "shell/argh.h" +#include +#include "command_executor.h" + +bool validate_cmd(const argh::parser &cmd, + const std::set ¶ms, + const std::set &flags); + +bool validate_ip(shell_context *sc, + const std::string &ip_str, + /*out*/ dsn::rpc_address &target_address, + /*out*/ std::string &err_info); \ No newline at end of file