Skip to content

Commit

Permalink
feat(hotspot): add a function to start hotkey detecting in shell comm…
Browse files Browse the repository at this point in the history
…ands (#605)
  • Loading branch information
Smityz authored Sep 29, 2020
1 parent 704994b commit e147550
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 34 deletions.
2 changes: 1 addition & 1 deletion rdsn
Submodule rdsn updated 96 files
+0 −1 .gitignore
+0 −74 include/dsn/dist/block_service.h
+6 −1 include/dsn/dist/failure_detector/fd.code.definition.h
+0 −1 include/dsn/dist/replication/meta_service_app.h
+10 −5 include/dsn/dist/replication/replication_ddl_client.h
+10 −0 include/dsn/dist/replication/replication_enums.h
+0 −4 include/dsn/dist/replication/replication_service_app.h
+138 −0 include/dsn/dist/replication/replication_types.h
+26 −4 include/dsn/http/http_server.h
+18 −13 include/dsn/tool-api/network.h
+5 −1 include/dsn/utils/latency_tracer.h
+1 −18 scripts/linux/build.sh
+19 −153 src/block_service/fds/fds_service.cpp
+6 −14 src/block_service/fds/fds_service.h
+0 −62 src/block_service/local/local_service.cpp
+0 −10 src/block_service/local/local_service.h
+0 −16 src/block_service/test/block_service_mock.h
+10 −97 src/block_service/test/fds_service_test.cpp
+14 −1 src/client/replication_ddl_client.cpp
+153 −0 src/common/backup_utils.cpp
+193 −0 src/common/backup_utils.h
+0 −136 src/common/replication_common.cpp
+0 −165 src/common/replication_common.h
+275 −0 src/common/replication_types.cpp
+77 −0 src/http/builtin_http_calls.cpp
+35 −0 src/http/builtin_http_calls.h
+17 −9 src/http/http_server.cpp
+2 −3 src/http/perf_counter_http_service.cpp
+0 −28 src/http/perf_counter_http_service.h
+2 −0 src/http/pprof_http_service.cpp
+0 −43 src/http/root_http_service.h
+0 −38 src/http/server_info_http_services.cpp
+0 −55 src/http/server_info_http_services.h
+17 −9 src/http/test/http_server_test.cpp
+4 −9 src/http/test/perf_counter_http_service_test.cpp
+18 −0 src/meta/meta_backup_service.cpp
+17 −0 src/meta/meta_backup_service.h
+0 −12 src/meta/meta_service_app.cpp
+1 −1 src/meta/server_state_restore.cpp
+8 −1 src/replica/CMakeLists.txt
+1,075 −0 src/replica/backup/cold_backup_context.cpp
+373 −0 src/replica/backup/cold_backup_context.h
+2 −0 src/replica/backup/replica_backup_manager.cpp
+3 −1 src/replica/backup/replica_backup_manager.h
+86 −0 src/replica/backup/replica_backup_server.cpp
+46 −0 src/replica/backup/replica_backup_server.h
+4 −2 src/replica/mutation.cpp
+5 −1 src/replica/replica.cpp
+15 −82 src/replica/replica.h
+13 −20 src/replica/replica_backup.cpp
+6 −5 src/replica/replica_chkpt.cpp
+5 −0 src/replica/replica_config.cpp
+0 −1,060 src/replica/replica_context.cpp
+0 −345 src/replica/replica_context.h
+1 −0 src/replica/replica_restore.cpp
+35 −59 src/replica/replica_stub.cpp
+12 −5 src/replica/replica_stub.h
+1 −12 src/replica/replication_service_app.cpp
+204 −168 src/replica/split/replica_split_manager.cpp
+134 −0 src/replica/split/replica_split_manager.h
+21 −0 src/replica/split/test/CMakeLists.txt
+71 −0 src/replica/split/test/config-test.ini
+52 −0 src/replica/split/test/main.cpp
+98 −76 src/replica/split/test/replica_split_test.cpp
+11 −0 src/replica/split/test/run.sh
+1 −0 src/replica/test/backup_block_service_mock.h
+1 −0 src/replica/test/cold_backup_context_test.cpp
+0 −2 src/replica/test/mock_utils.h
+27 −0 src/replication.thrift
+0 −12 src/runtime/core_main.cpp
+0 −3 src/runtime/rpc/asio_net_provider.cpp
+2 −3 src/runtime/rpc/asio_rpc_session.cpp
+60 −66 src/runtime/rpc/network.cpp
+0 −4 src/runtime/rpc/network.sim.h
+42 −18 src/runtime/security/client_negotiation.cpp
+2 −1 src/runtime/security/client_negotiation.h
+2 −0 src/runtime/security/init.cpp
+63 −2 src/runtime/security/negotiation_service.cpp
+11 −0 src/runtime/security/negotiation_service.h
+0 −6 src/runtime/security/negotiation_utils.h
+14 −7 src/runtime/security/sasl_client_wrapper.cpp
+2 −2 src/runtime/security/sasl_client_wrapper.h
+29 −10 src/runtime/security/sasl_server_wrapper.cpp
+2 −2 src/runtime/security/sasl_server_wrapper.h
+2 −3 src/runtime/security/sasl_wrapper.h
+2 −2 src/runtime/security/security.thrift
+10 −10 src/runtime/security/security_types.cpp
+6 −6 src/runtime/security/security_types.h
+58 −3 src/runtime/security/server_negotiation.cpp
+5 −0 src/runtime/security/server_negotiation.h
+38 −1 src/runtime/test/client_negotiation_test.cpp
+73 −0 src/runtime/test/negotiation_service_test.cpp
+105 −7 src/runtime/test/server_negotiation_test.cpp
+3 −3 src/utils/latency_tracer.cpp
+2 −2 src/utils/test/latency_tracer_test.cpp
+4 −1 thirdparty/CMakeLists.txt
36 changes: 36 additions & 0 deletions src/shell/command_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,42 @@
#pragma once

#include <map>
#include <string>
#include <set>

#include "shell/argh.h"
#include <dsn/dist/fmt_logging.h>

inline bool validate_cmd(const argh::parser &cmd,
const std::set<std::string> &params,
const std::set<std::string> &flags)
{
if (cmd.size() > 1) {
fmt::print(stderr, "too many params!\n");
return false;
}

for (const auto &param : cmd.params()) {
if (params.find(param.first) == params.end()) {
fmt::print(stderr, "unknown param {} = {}\n", param.first, param.second);
return false;
}
}

for (const auto &flag : cmd.flags()) {
if (params.find(flag) != params.end()) {
fmt::print(stderr, "missing value of {}\n", flag);
return false;
}

if (flags.find(flag) == flags.end()) {
fmt::print(stderr, "unknown flag {}\n", flag);
return false;
}
}

return true;
}

#define verify_logged(exp, ...) \
do { \
Expand Down
5 changes: 4 additions & 1 deletion src/shell/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <pegasus/error.h>

#include "command_executor.h"
#include "command_utils.h"
#include "command_helper.h"
#include "args.h"

Expand Down Expand Up @@ -262,3 +261,7 @@ bool pause_bulk_load(command_executor *e, shell_context *sc, arguments args);
bool restart_bulk_load(command_executor *e, shell_context *sc, arguments args);

bool cancel_bulk_load(command_executor *e, shell_context *sc, arguments args);

// == detect hotkey (see 'commands/detect_hotkey.cpp') == //

bool detect_hotkey(command_executor *e, shell_context *sc, arguments args);
126 changes: 126 additions & 0 deletions src/shell/commands/detect_hotkey.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "shell/commands.h"
#include "shell/argh.h"
#include <dsn/dist/replication/replication_types.h>

bool generate_hotkey_request(dsn::replication::detect_hotkey_request &req,
const std::string &hotkey_action,
const std::string &hotkey_type,
int app_id,
int partition_index,
std::string &err_info)
{
if (!strcasecmp(hotkey_type.c_str(), "read")) {
req.type = dsn::replication::hotkey_type::type::READ;
} else if (!strcasecmp(hotkey_type.c_str(), "write")) {
req.type = dsn::replication::hotkey_type::type::WRITE;
} else {
err_info = fmt::format("\"{}\" is an invalid hotkey type (should be 'read' or 'write')\n",
hotkey_type);
return false;
}

if (!strcasecmp(hotkey_action.c_str(), "start")) {
req.action = dsn::replication::detect_action::START;
} else if (!strcasecmp(hotkey_action.c_str(), "stop")) {
req.action = dsn::replication::detect_action::STOP;
} else {
err_info =
fmt::format("\"{}\" is an invalid hotkey detect action (should be 'start' or 'stop')\n",
hotkey_action);
return false;
}
req.pid = dsn::gpid(app_id, partition_index);
return true;
}

// TODO: (Tangyanzhao) merge hotspot_partition_calculator::send_detect_hotkey_request
bool detect_hotkey(command_executor *e, shell_context *sc, arguments args)
{
// detect_hotkey
// <-a|--app_id str><-p|--partition_index num><-t|--hotkey_type read|write>
// <-c|--detect_action start|stop><-d|--address str>
const std::set<std::string> params = {"a",
"app_id",
"p",
"partition_index",
"c",
"hotkey_action",
"t",
"hotkey_type",
"d",
"address"};
const std::set<std::string> flags = {};
argh::parser cmd(args.argc, args.argv, argh::parser::PREFER_PARAM_FOR_UNREG_OPTION);
if (!validate_cmd(cmd, params, flags)) {
return false;
}

int app_id;
if (!dsn::buf2int32(cmd({"-a", "--app_id"}).str(), app_id)) {
fmt::print(stderr, "\"{}\" is an invalid num\n", cmd({"-a", "--app_id"}).str());
return false;
}

int partition_index;
if (!dsn::buf2int32(cmd({"-p", "--partition_index"}).str(), partition_index)) {
fmt::print(stderr, "\"{}\" is an invalid num\n", cmd({"-p", "--partition_index"}).str());
return false;
}

dsn::rpc_address target_address;
std::string ip_str = cmd({"-d", "--address"}).str();
if (!target_address.from_string_ipv4(ip_str.c_str())) {
fmt::print("invalid ip, error={}\n", ip_str);
return false;
}

std::string err_info;
std::string hotkey_action = cmd({"-c", "--hotkey_action"}).str();
std::string hotkey_type = cmd({"-t", "--hotkey_type"}).str();
dsn::replication::detect_hotkey_request req;
if (!generate_hotkey_request(
req, hotkey_action, hotkey_type, app_id, partition_index, err_info)) {
fmt::print(stderr, err_info);
return false;
}

detect_hotkey_response resp;
auto err = sc->ddl_client->detect_hotkey(dsn::rpc_address(target_address), req, resp);
if (err != dsn::ERR_OK) {
fmt::print(stderr,
"Hotkey detect rpc sending failed, in {}.{}, error_hint:{}\n",
app_id,
partition_index,
err.to_string());
return false;
}

if (resp.err != dsn::ERR_OK) {
fmt::print(stderr,
"Hotkey detect rpc performed failed, in {}.{}, error_hint:{} {}\n",
app_id,
partition_index,
resp.err,
resp.err_hint);
return false;
}

return true;
}
32 changes: 0 additions & 32 deletions src/shell/commands/disk_rebalance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,40 +10,8 @@
#include <fmt/ostream.h>
#include <dsn/utility/errors.h>
#include <dsn/utility/output_utils.h>
#include <dsn/utility/string_conv.h>
#include <dsn/dist/replication/duplication_common.h>

bool validate_cmd(const argh::parser &cmd,
const std::set<std::string> &params,
const std::set<std::string> &flags)
{
if (cmd.size() > 1) {
fmt::print(stderr, "too many params!\n");
return false;
}

for (const auto &param : cmd.params()) {
if (params.find(param.first) == params.end()) {
fmt::print(stderr, "unknown param {} = {}\n", param.first, param.second);
return false;
}
}

for (const auto &flag : cmd.flags()) {
if (params.find(flag) != params.end()) {
fmt::print(stderr, "missing value of {}\n", flag);
return false;
}

if (flags.find(flag) == flags.end()) {
fmt::print(stderr, "unknown flag {}\n", flag);
return false;
}
}

return true;
}

bool query_disk_info(
shell_context *sc,
const argh::parser &cmd,
Expand Down
10 changes: 10 additions & 0 deletions src/shell/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,16 @@ static command_executor commands[] = {
"<-a --app_name str> [-f --forced]",
cancel_bulk_load,
},
{
"detect_hotkey",
"start or stop hotkey detection on a replica of a replica server",
"<-a|--app_id num> "
"<-p|--partition_index num> "
"<-t|--hotkey_type read|write> "
"<-c|--detect_action start|stop> "
"<-d|--address str>",
detect_hotkey,
},
{
"exit", "exit shell", "", exit_shell,
},
Expand Down

0 comments on commit e147550

Please sign in to comment.