Skip to content

Commit

Permalink
refactor(remote_commands): init
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Feb 19, 2024
1 parent 039bc62 commit 582e9b1
Show file tree
Hide file tree
Showing 19 changed files with 297 additions and 270 deletions.
27 changes: 14 additions & 13 deletions src/failure_detector/failure_detector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#include <type_traits>
#include <utility>

#include <nlohmann/json.hpp>

#include "absl/strings/string_view.h"
#include "failure_detector/fd.code.definition.h"
#include "fd_types.h"
Expand Down Expand Up @@ -68,10 +70,10 @@ void failure_detector::register_ctrl_commands()
{
static std::once_flag flag;
std::call_once(flag, [&]() {
_get_allow_list = dsn::command_manager::instance().register_command(
{"fd.allow_list"},
_get_allow_list = dsn::command_manager::instance().register_single_command(
"fd.allow_list",
"show allow list of failure detector",
"Show the allow list of failure detector",
"",
[this](const std::vector<std::string> &args) { return get_allow_list(args); });
});
}
Expand Down Expand Up @@ -335,18 +337,17 @@ void failure_detector::set_allow_list(const std::vector<std::string> &replica_ad

std::string failure_detector::get_allow_list(const std::vector<std::string> &args) const
{
if (!_is_started)
return "error: FD is not started";
if (!_is_started) {
nlohmann::json err_msg;
err_msg["error"] = fmt::format("FD is not started");
return err_msg.dump(2);
}

std::stringstream oss;
nlohmann::json info;
dsn::zauto_lock l(_lock);
oss << "get ok: allow list " << (_use_allow_list ? "enabled. list: " : "disabled.");
for (auto iter = _allow_list.begin(); iter != _allow_list.end(); ++iter) {
if (iter != _allow_list.begin())
oss << ",";
oss << *iter;
}
return oss.str();
info["enabled"] = _use_allow_list;
info["allow_list"] = fmt::format("{}", fmt::join(_allow_list, ","));
return info.dump(2);
}

void failure_detector::on_ping_internal(const beacon_msg &beacon, /*out*/ beacon_ack &ack)
Expand Down
52 changes: 25 additions & 27 deletions src/meta/greedy_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
#include <unordered_map>
#include <utility>

#include <nlohmann/json.hpp>

#include "app_balance_policy.h"
#include "cluster_balance_policy.h"
#include "greedy_load_balancer.h"
Expand Down Expand Up @@ -71,39 +73,35 @@ greedy_load_balancer::~greedy_load_balancer() {}

void greedy_load_balancer::register_ctrl_commands()
{
_get_balance_operation_count = dsn::command_manager::instance().register_command(
{"meta.lb.get_balance_operation_count"},
"meta.lb.get_balance_operation_count [total | move_pri | copy_pri | copy_sec | detail]",
"get balance operation count",
_get_balance_operation_count = dsn::command_manager::instance().register_single_command(
"meta.lb.get_balance_operation_count",
"Get balance operation count",
"[total | move_pri | copy_pri | copy_sec | detail]",
[this](const std::vector<std::string> &args) { return get_balance_operation_count(args); });
}

std::string greedy_load_balancer::get_balance_operation_count(const std::vector<std::string> &args)
{
if (args.empty()) {
return std::string("total=" + std::to_string(t_operation_counters[ALL_COUNT]));
}

if (args[0] == "total") {
return std::string("total=" + std::to_string(t_operation_counters[ALL_COUNT]));
nlohmann::json info;
if (args.size() > 1) {
info["error"] = fmt::format("invalid arguments");
} else if (args.empty() || args[0] == "total") {
info["total"] = t_operation_counters[ALL_COUNT];
} else if (args[0] == "move_pri") {
info["move_pri"] = t_operation_counters[MOVE_PRI_COUNT];
} else if (args[0] == "copy_pri") {
info["copy_pri"] = t_operation_counters[COPY_PRI_COUNT];
} else if (args[0] == "copy_sec") {
info["copy_sec"] = t_operation_counters[COPY_SEC_COUNT];
} else if (args[0] == "detail") {
info["move_pri"] = t_operation_counters[MOVE_PRI_COUNT];
info["copy_pri"] = t_operation_counters[COPY_PRI_COUNT];
info["copy_sec"] = t_operation_counters[COPY_SEC_COUNT];
info["total"] = t_operation_counters[ALL_COUNT];
} else {
info["error"] = fmt::format("invalid arguments");
}

std::string result("unknown");
if (args[0] == "move_pri")
result = std::string("move_pri=" + std::to_string(t_operation_counters[MOVE_PRI_COUNT]));
else if (args[0] == "copy_pri")
result = std::string("copy_pri=" + std::to_string(t_operation_counters[COPY_PRI_COUNT]));
else if (args[0] == "copy_sec")
result = std::string("copy_sec=" + std::to_string(t_operation_counters[COPY_SEC_COUNT]));
else if (args[0] == "detail")
result = std::string("move_pri=" + std::to_string(t_operation_counters[MOVE_PRI_COUNT]) +
",copy_pri=" + std::to_string(t_operation_counters[COPY_PRI_COUNT]) +
",copy_sec=" + std::to_string(t_operation_counters[COPY_SEC_COUNT]) +
",total=" + std::to_string(t_operation_counters[ALL_COUNT]));
else
result = std::string("ERR: invalid arguments");

return result;
return info.dump(2);
}

void greedy_load_balancer::score(meta_view view, double &primary_stddev, double &total_stddev)
Expand Down
73 changes: 38 additions & 35 deletions src/meta/load_balance_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <mutex>
#include <ostream>

#include <nlohmann/json.hpp>

#include "dsn.layer2_types.h"
#include "meta/greedy_load_balancer.h"
#include "meta/meta_data.h"
Expand Down Expand Up @@ -183,10 +185,10 @@ load_balance_policy::load_balance_policy(meta_service *svc)
{
static std::once_flag flag;
std::call_once(flag, [&]() {
_ctrl_balancer_ignored_apps = dsn::command_manager::instance().register_command(
{"meta.lb.ignored_app_list"},
"meta.lb.ignored_app_list <get|set|clear> [app_id1,app_id2..]",
"get, set and clear balancer ignored_app_list",
_ctrl_balancer_ignored_apps = dsn::command_manager::instance().register_single_command(
"meta.lb.ignored_app_list",
"get, set or clear balancer ignored_app_list",
"<get|set|clear> [app_id1,app_id2..]",
[this](const std::vector<std::string> &args) {
return remote_command_balancer_ignored_app_ids(args);
});
Expand Down Expand Up @@ -395,70 +397,71 @@ bool load_balance_policy::execute_balance(
std::string
load_balance_policy::remote_command_balancer_ignored_app_ids(const std::vector<std::string> &args)
{
static const std::string invalid_arguments("invalid arguments");
nlohmann::json info;
if (args.empty()) {
return invalid_arguments;
}
if (args[0] == "set") {
info["error"] = "invalid argument";
} else if (args[0] == "set") {
return set_balancer_ignored_app_ids(args);
}
if (args[0] == "get") {
} else if (args[0] == "get") {
return get_balancer_ignored_app_ids();
}
if (args[0] == "clear") {
} else if (args[0] == "clear") {
return clear_balancer_ignored_app_ids();
} else {
info["error"] = "invalid argument";
}
return invalid_arguments;
return info.dump(2);
}

std::string load_balance_policy::set_balancer_ignored_app_ids(const std::vector<std::string> &args)
{
static const std::string invalid_arguments("invalid arguments");
nlohmann::json info;
info["error"] = "invalid argument";
if (args.size() != 2) {
return invalid_arguments;
return info.dump(2);
}

std::vector<std::string> app_ids;
dsn::utils::split_args(args[1].c_str(), app_ids, ',');
if (app_ids.empty()) {
return invalid_arguments;
return info.dump(2);
}

std::set<app_id> app_list;
for (const std::string &app_id_str : app_ids) {
for (const auto &app_id_str : app_ids) {
app_id app;
if (!dsn::buf2int32(app_id_str, app)) {
return invalid_arguments;
return info.dump(2);
}
app_list.insert(app);
}

dsn::zauto_write_lock l(_balancer_ignored_apps_lock);
_balancer_ignored_apps = std::move(app_list);
return "set ok";
{
dsn::zauto_write_lock l(_balancer_ignored_apps_lock);
_balancer_ignored_apps = std::move(app_list);
}
info["error"] = "ok";
return info.dump(2);
}

std::string load_balance_policy::get_balancer_ignored_app_ids()
{
std::stringstream oss;
dsn::zauto_read_lock l(_balancer_ignored_apps_lock);
if (_balancer_ignored_apps.empty()) {
return "no ignored apps";
nlohmann::json data;
{
dsn::zauto_read_lock l(_balancer_ignored_apps_lock);
data["ignored_app_id_list"] = fmt::format("{}", fmt::join(_balancer_ignored_apps, ","));
}
oss << "ignored_app_id_list: ";
std::copy(_balancer_ignored_apps.begin(),
_balancer_ignored_apps.end(),
std::ostream_iterator<app_id>(oss, ","));
std::string app_ids = oss.str();
app_ids[app_ids.size() - 1] = '\0';
return app_ids;
return data.dump(2);
}

std::string load_balance_policy::clear_balancer_ignored_app_ids()
{
dsn::zauto_write_lock l(_balancer_ignored_apps_lock);
_balancer_ignored_apps.clear();
return "clear ok";
{
dsn::zauto_write_lock l(_balancer_ignored_apps_lock);
_balancer_ignored_apps.clear();
}
nlohmann::json info;
info["error"] = "ok";
return info.dump(2);
}

bool load_balance_policy::is_ignored_app(app_id app_id)
Expand Down
61 changes: 36 additions & 25 deletions src/meta/partition_guardian.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <ostream>
#include <unordered_map>

#include <nlohmann/json.hpp>

#include "common/replication_common.h"
#include "common/replication_other_types.h"
#include "meta/meta_data.h"
Expand Down Expand Up @@ -694,10 +696,10 @@ void partition_guardian::register_ctrl_commands()
"meta.lb.assign_delay_ms",
"control the replica_assign_delay_ms_for_dropouts config"));

_cmds.emplace_back(dsn::command_manager::instance().register_command(
{"meta.lb.assign_secondary_black_list"},
"lb.assign_secondary_black_list [<ip:port,ip:port,ip:port>|clear]",
_cmds.emplace_back(dsn::command_manager::instance().register_single_command(
"meta.lb.assign_secondary_black_list",
"control the assign secondary black list",
"[<ip:port,ip:port,ip:port>|clear]",
[this](const std::vector<std::string> &args) {
return ctrl_assign_secondary_black_list(args);
}));
Expand All @@ -706,47 +708,56 @@ void partition_guardian::register_ctrl_commands()
std::string
partition_guardian::ctrl_assign_secondary_black_list(const std::vector<std::string> &args)
{
std::string invalid_arguments("invalid arguments");
std::stringstream oss;
nlohmann::json msg;
msg["error"] = "ok";
// Query.
if (args.empty()) {
dsn::zauto_read_lock l(_black_list_lock);
oss << "get ok: ";
for (auto iter = _assign_secondary_black_list.begin();
iter != _assign_secondary_black_list.end();
++iter) {
if (iter != _assign_secondary_black_list.begin())
oss << ",";
oss << *iter;
{
dsn::zauto_read_lock l(_black_list_lock);
msg["assign_secondary_black_list"] =
fmt::format("{}", fmt::join(_assign_secondary_black_list, ","));
}
return oss.str();
return msg.dump(2);
}

// Invalid argument.
if (args.size() != 1) {
return invalid_arguments;
msg["error"] = "invalid argument, 0 or 1 argument is acceptable";
return msg.dump(2);
}

dsn::zauto_write_lock l(_black_list_lock);
// Clear.
if (args[0] == "clear") {
_assign_secondary_black_list.clear();
return "clear ok";
{
dsn::zauto_write_lock l(_black_list_lock);
_assign_secondary_black_list.clear();
}
return msg.dump(2);
}

// Set to new value.
std::vector<std::string> ip_ports;
dsn::utils::split_args(args[0].c_str(), ip_ports, ',');
if (args.size() == 0) {
return invalid_arguments;
if (ip_ports.empty()) {
msg["error"] =
"invalid argument, the argument should be in form of '<ip:port,ip:port,ip:port>'";
return msg.dump(2);
}

std::set<dsn::rpc_address> addr_list;
for (const std::string &s : ip_ports) {
for (const auto &ip_port : ip_ports) {
dsn::rpc_address addr;
if (!addr.from_string_ipv4(s.c_str())) {
return invalid_arguments;
if (!addr.from_string_ipv4(ip_port.c_str())) {
msg["error"] = fmt::format("invalid argument, bad ip:port '{}'", ip_port);
return msg.dump(2);
}
addr_list.insert(addr);
}
_assign_secondary_black_list = std::move(addr_list);
return "set ok";
{
dsn::zauto_write_lock l(_black_list_lock);
_assign_secondary_black_list = std::move(addr_list);
}
return msg.dump(2);
}

void partition_guardian::get_ddd_partitions(const gpid &pid,
Expand Down
8 changes: 4 additions & 4 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,10 @@ server_state::~server_state() { _tracker.cancel_outstanding_tasks(); }

void server_state::register_cli_commands()
{
_cmds.emplace_back(dsn::command_manager::instance().register_command(
{"meta.dump"},
"meta.dump - dump app_states of meta server to local file",
"meta.dump -t|--target target_file",
_cmds.emplace_back(dsn::command_manager::instance().register_single_command(
"meta.dump",
"Dump app_states of meta server to a local file",
"-t|--target target_file",
[this](const std::vector<std::string> &args) {
dsn::error_code err;
if (args.size() != 2) {
Expand Down
6 changes: 3 additions & 3 deletions src/perf_counter/perf_counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ perf_counters::perf_counters()
});
}));

_cmds.emplace_back(command_manager::instance().register_command(
{"server-stat"},
"server-stat - query selected perf counters",
_cmds.emplace_back(command_manager::instance().register_single_command(
"server-stat",
"Query selected perf counters",
"",
[](const std::vector<std::string> &args) { return get_brief_stat(); }));
}

Expand Down
Loading

0 comments on commit 582e9b1

Please sign in to comment.