Skip to content

Commit

Permalink
refactor(remote_commands): Simplify the command register process
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Feb 23, 2024
1 parent af25694 commit 8d2d067
Show file tree
Hide file tree
Showing 23 changed files with 512 additions and 457 deletions.
31 changes: 17 additions & 14 deletions src/failure_detector/failure_detector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@

#include "failure_detector/failure_detector.h"

#include <nlohmann/json.hpp>
#include <chrono>
#include <ctime>
#include <map>
#include <mutex>
#include <ostream>
#include <type_traits>
#include <utility>

#include "absl/strings/string_view.h"
#include "failure_detector/fd.code.definition.h"
#include "fd_types.h"
#include "fmt/core.h"
#include "fmt/format.h"
#include "nlohmann/json_fwd.hpp"
#include "runtime/api_layer1.h"
#include "runtime/serverlet.h"
#include "runtime/task/async_calls.h"
Expand Down Expand Up @@ -68,10 +72,10 @@ void failure_detector::register_ctrl_commands()
{
static std::once_flag flag;
std::call_once(flag, [&]() {
_get_allow_list = dsn::command_manager::instance().register_command(
{"fd.allow_list"},
_get_allow_list = dsn::command_manager::instance().register_single_command(
"fd.allow_list",
"show allow list of failure detector",
"Show the allow list of failure detector",
"",
[this](const std::vector<std::string> &args) { return get_allow_list(args); });
});
}
Expand Down Expand Up @@ -335,18 +339,17 @@ void failure_detector::set_allow_list(const std::vector<std::string> &replica_ad

std::string failure_detector::get_allow_list(const std::vector<std::string> &args) const
{
if (!_is_started)
return "error: FD is not started";
if (!_is_started) {
nlohmann::json err_msg;
err_msg["error"] = fmt::format("FD is not started");
return err_msg.dump(2);
}

std::stringstream oss;
nlohmann::json info;
dsn::zauto_lock l(_lock);
oss << "get ok: allow list " << (_use_allow_list ? "enabled. list: " : "disabled.");
for (auto iter = _allow_list.begin(); iter != _allow_list.end(); ++iter) {
if (iter != _allow_list.begin())
oss << ",";
oss << *iter;
}
return oss.str();
info["enabled"] = _use_allow_list;
info["allow_list"] = fmt::format("{}", fmt::join(_allow_list, ","));
return info.dump(2);
}

void failure_detector::on_ping_internal(const beacon_msg &beacon, /*out*/ beacon_ack &ack)
Expand Down
54 changes: 27 additions & 27 deletions src/meta/greedy_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@
* THE SOFTWARE.
*/

#include <fmt/core.h>
#include <nlohmann/json.hpp>
#include <nlohmann/json_fwd.hpp>
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <string.h>
#include <algorithm>
#include <cstdint>
#include <map>
#include <type_traits>
Expand Down Expand Up @@ -71,39 +75,35 @@ greedy_load_balancer::~greedy_load_balancer() {}

void greedy_load_balancer::register_ctrl_commands()
{
_get_balance_operation_count = dsn::command_manager::instance().register_command(
{"meta.lb.get_balance_operation_count"},
"meta.lb.get_balance_operation_count [total | move_pri | copy_pri | copy_sec | detail]",
"get balance operation count",
_get_balance_operation_count = dsn::command_manager::instance().register_single_command(
"meta.lb.get_balance_operation_count",
"Get balance operation count",
"[total | move_pri | copy_pri | copy_sec | detail]",
[this](const std::vector<std::string> &args) { return get_balance_operation_count(args); });
}

std::string greedy_load_balancer::get_balance_operation_count(const std::vector<std::string> &args)
{
if (args.empty()) {
return std::string("total=" + std::to_string(t_operation_counters[ALL_COUNT]));
nlohmann::json info;
if (args.size() > 1) {
info["error"] = fmt::format("invalid arguments");
} else if (args.empty() || args[0] == "total") {
info["total"] = t_operation_counters[ALL_COUNT];
} else if (args[0] == "move_pri") {
info["move_pri"] = t_operation_counters[MOVE_PRI_COUNT];
} else if (args[0] == "copy_pri") {
info["copy_pri"] = t_operation_counters[COPY_PRI_COUNT];
} else if (args[0] == "copy_sec") {
info["copy_sec"] = t_operation_counters[COPY_SEC_COUNT];
} else if (args[0] == "detail") {
info["move_pri"] = t_operation_counters[MOVE_PRI_COUNT];
info["copy_pri"] = t_operation_counters[COPY_PRI_COUNT];
info["copy_sec"] = t_operation_counters[COPY_SEC_COUNT];
info["total"] = t_operation_counters[ALL_COUNT];
} else {
info["error"] = fmt::format("invalid arguments");
}

if (args[0] == "total") {
return std::string("total=" + std::to_string(t_operation_counters[ALL_COUNT]));
}

std::string result("unknown");
if (args[0] == "move_pri")
result = std::string("move_pri=" + std::to_string(t_operation_counters[MOVE_PRI_COUNT]));
else if (args[0] == "copy_pri")
result = std::string("copy_pri=" + std::to_string(t_operation_counters[COPY_PRI_COUNT]));
else if (args[0] == "copy_sec")
result = std::string("copy_sec=" + std::to_string(t_operation_counters[COPY_SEC_COUNT]));
else if (args[0] == "detail")
result = std::string("move_pri=" + std::to_string(t_operation_counters[MOVE_PRI_COUNT]) +
",copy_pri=" + std::to_string(t_operation_counters[COPY_PRI_COUNT]) +
",copy_sec=" + std::to_string(t_operation_counters[COPY_SEC_COUNT]) +
",total=" + std::to_string(t_operation_counters[ALL_COUNT]));
else
result = std::string("ERR: invalid arguments");

return result;
return info.dump(2);
}

void greedy_load_balancer::score(meta_view view, double &primary_stddev, double &total_stddev)
Expand Down
91 changes: 50 additions & 41 deletions src/meta/load_balance_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@

#include "meta/load_balance_policy.h"

#include <fmt/core.h>
#include <fmt/format.h>
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <limits.h>
#include <nlohmann/json.hpp>
#include <nlohmann/json_fwd.hpp>
#include <algorithm>
#include <iterator>
#include <limits>
#include <mutex>
#include <ostream>

#include "absl/strings/string_view.h"
#include "dsn.layer2_types.h"
#include "meta/greedy_load_balancer.h"
#include "meta/meta_data.h"
Expand All @@ -34,7 +39,6 @@
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/string_conv.h"
#include "absl/strings/string_view.h"
#include "utils/strings.h"

DSN_DECLARE_uint64(min_live_node_count_for_unfreeze);
Expand Down Expand Up @@ -183,10 +187,10 @@ load_balance_policy::load_balance_policy(meta_service *svc)
{
static std::once_flag flag;
std::call_once(flag, [&]() {
_ctrl_balancer_ignored_apps = dsn::command_manager::instance().register_command(
{"meta.lb.ignored_app_list"},
"meta.lb.ignored_app_list <get|set|clear> [app_id1,app_id2..]",
"get, set and clear balancer ignored_app_list",
_ctrl_balancer_ignored_apps = dsn::command_manager::instance().register_single_command(
"meta.lb.ignored_app_list",
"Get, set or clear balancer ignored_app_list",
"<get|set|clear> [set_app_id1,set_app_id2,...]",
[this](const std::vector<std::string> &args) {
return remote_command_balancer_ignored_app_ids(args);
});
Expand Down Expand Up @@ -395,70 +399,75 @@ bool load_balance_policy::execute_balance(
std::string
load_balance_policy::remote_command_balancer_ignored_app_ids(const std::vector<std::string> &args)
{
static const std::string invalid_arguments("invalid arguments");
if (args.empty()) {
return invalid_arguments;
}
if (args[0] == "set") {
return set_balancer_ignored_app_ids(args);
}
if (args[0] == "get") {
return get_balancer_ignored_app_ids();
}
if (args[0] == "clear") {
return clear_balancer_ignored_app_ids();
}
return invalid_arguments;
static const std::string invalid_arguments_message("invalid arguments");
nlohmann::json info;
do {
if (args.empty()) {
break;
}
if (args[0] == "set") {
return set_balancer_ignored_app_ids(args);
} else if (args[0] == "get") {
return get_balancer_ignored_app_ids();
} else if (args[0] == "clear") {
return clear_balancer_ignored_app_ids();
}
} while (false);

info["error"] = invalid_arguments_message;
return info.dump(2);
}

std::string load_balance_policy::set_balancer_ignored_app_ids(const std::vector<std::string> &args)
{
static const std::string invalid_arguments("invalid arguments");
nlohmann::json info;
info["error"] = "invalid argument";
if (args.size() != 2) {
return invalid_arguments;
return info.dump(2);
}

std::vector<std::string> app_ids;
dsn::utils::split_args(args[1].c_str(), app_ids, ',');
if (app_ids.empty()) {
return invalid_arguments;
return info.dump(2);
}

std::set<app_id> app_list;
for (const std::string &app_id_str : app_ids) {
for (const auto &app_id_str : app_ids) {
app_id app;
if (!dsn::buf2int32(app_id_str, app)) {
return invalid_arguments;
return info.dump(2);
}
app_list.insert(app);
}

dsn::zauto_write_lock l(_balancer_ignored_apps_lock);
_balancer_ignored_apps = std::move(app_list);
return "set ok";
{
dsn::zauto_write_lock l(_balancer_ignored_apps_lock);
_balancer_ignored_apps = std::move(app_list);
}
info["error"] = "ok";
return info.dump(2);
}

std::string load_balance_policy::get_balancer_ignored_app_ids()
{
std::stringstream oss;
dsn::zauto_read_lock l(_balancer_ignored_apps_lock);
if (_balancer_ignored_apps.empty()) {
return "no ignored apps";
nlohmann::json data;
{
dsn::zauto_read_lock l(_balancer_ignored_apps_lock);
data["ignored_app_id_list"] = fmt::format("{}", fmt::join(_balancer_ignored_apps, ","));
}
oss << "ignored_app_id_list: ";
std::copy(_balancer_ignored_apps.begin(),
_balancer_ignored_apps.end(),
std::ostream_iterator<app_id>(oss, ","));
std::string app_ids = oss.str();
app_ids[app_ids.size() - 1] = '\0';
return app_ids;
return data.dump(2);
}

std::string load_balance_policy::clear_balancer_ignored_app_ids()
{
dsn::zauto_write_lock l(_balancer_ignored_apps_lock);
_balancer_ignored_apps.clear();
return "clear ok";
{
dsn::zauto_write_lock l(_balancer_ignored_apps_lock);
_balancer_ignored_apps.clear();
}
nlohmann::json info;
info["error"] = "ok";
return info.dump(2);
}

bool load_balance_policy::is_ignored_app(app_id app_id)
Expand Down
62 changes: 37 additions & 25 deletions src/meta/partition_guardian.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
#include "meta/partition_guardian.h"

#include <fmt/core.h>
#include <fmt/format.h>
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <inttypes.h>
#include <nlohmann/json.hpp>
#include <nlohmann/json_fwd.hpp>
#include <stdio.h>
#include <algorithm>
#include <cstdint>
Expand Down Expand Up @@ -695,10 +698,10 @@ void partition_guardian::register_ctrl_commands()
"meta.lb.assign_delay_ms",
"control the replica_assign_delay_ms_for_dropouts config"));

_cmds.emplace_back(dsn::command_manager::instance().register_command(
{"meta.lb.assign_secondary_black_list"},
"lb.assign_secondary_black_list [<ip:port,ip:port,ip:port>|clear]",
"control the assign secondary black list",
_cmds.emplace_back(dsn::command_manager::instance().register_single_command(
"meta.lb.assign_secondary_black_list",
"Control the assign secondary black list",
"[ip1:port,ip2:port,...|clear]",
[this](const std::vector<std::string> &args) {
return ctrl_assign_secondary_black_list(args);
}));
Expand All @@ -707,47 +710,56 @@ void partition_guardian::register_ctrl_commands()
std::string
partition_guardian::ctrl_assign_secondary_black_list(const std::vector<std::string> &args)
{
std::string invalid_arguments("invalid arguments");
std::stringstream oss;
nlohmann::json msg;
msg["error"] = "ok";
// Query.
if (args.empty()) {
dsn::zauto_read_lock l(_black_list_lock);
oss << "get ok: ";
for (auto iter = _assign_secondary_black_list.begin();
iter != _assign_secondary_black_list.end();
++iter) {
if (iter != _assign_secondary_black_list.begin())
oss << ",";
oss << *iter;
{
dsn::zauto_read_lock l(_black_list_lock);
msg["assign_secondary_black_list"] =
fmt::format("{}", fmt::join(_assign_secondary_black_list, ","));
}
return oss.str();
return msg.dump(2);
}

// Invalid argument.
if (args.size() != 1) {
return invalid_arguments;
msg["error"] = "invalid argument, 0 or 1 argument is acceptable";
return msg.dump(2);
}

dsn::zauto_write_lock l(_black_list_lock);
// Clear.
if (args[0] == "clear") {
_assign_secondary_black_list.clear();
return "clear ok";
{
dsn::zauto_write_lock l(_black_list_lock);
_assign_secondary_black_list.clear();
}
return msg.dump(2);
}

// Set to new value.
std::vector<std::string> ip_ports;
dsn::utils::split_args(args[0].c_str(), ip_ports, ',');
if (args.size() == 0) {
return invalid_arguments;
if (ip_ports.empty()) {
msg["error"] =
"invalid argument, the argument should be in form of '<ip:port,ip:port,ip:port>'";
return msg.dump(2);
}

std::set<dsn::rpc_address> addr_list;
for (const std::string &s : ip_ports) {
for (const auto &ip_port : ip_ports) {
const auto addr = rpc_address::from_host_port(s);
if (!addr) {
return invalid_arguments;
msg["error"] = fmt::format("invalid argument, bad ip:port '{}'", ip_port);
return msg.dump(2);
}
addr_list.insert(addr);
}
_assign_secondary_black_list = std::move(addr_list);
return "set ok";
{
dsn::zauto_write_lock l(_black_list_lock);
_assign_secondary_black_list = std::move(addr_list);
}
return msg.dump(2);
}

void partition_guardian::get_ddd_partitions(const gpid &pid,
Expand Down
Loading

0 comments on commit 8d2d067

Please sign in to comment.