From 8d2d067ab71da60dac1a2688347c84fe7fa80e3a Mon Sep 17 00:00:00 2001 From: Yingchun Lai Date: Tue, 30 Jan 2024 00:24:22 +0800 Subject: [PATCH] refactor(remote_commands): Simplify the command register process --- src/failure_detector/failure_detector.cpp | 31 ++-- src/meta/greedy_load_balancer.cpp | 54 +++---- src/meta/load_balance_policy.cpp | 91 ++++++----- src/meta/partition_guardian.cpp | 62 +++++--- src/meta/server_state.cpp | 27 +--- src/perf_counter/perf_counters.cpp | 38 ++--- src/replica/replica_stub.cpp | 58 +++---- src/runtime/service_api_c.cpp | 37 +++-- src/runtime/service_engine.cpp | 102 +++++++------ src/runtime/service_engine.h | 8 +- src/runtime/task/task_code.cpp | 24 +-- src/runtime/task/task_engine.cpp | 143 +++++++++--------- src/runtime/task/task_engine.h | 14 +- src/runtime/test/task_engine.cpp | 8 +- src/runtime/tracer.cpp | 12 +- src/server/main.cpp | 23 +-- src/server/pegasus_manual_compact_service.cpp | 38 ++--- src/shell/commands/node_management.cpp | 4 +- src/shell/main.cpp | 2 +- src/utils/command_manager.cpp | 133 +++++++++------- src/utils/command_manager.h | 38 +++-- src/utils/simple_logger.cpp | 14 +- src/utils/test/command_manager_test.cpp | 8 +- 23 files changed, 512 insertions(+), 457 deletions(-) diff --git a/src/failure_detector/failure_detector.cpp b/src/failure_detector/failure_detector.cpp index f76e3adc00..612f3fa122 100644 --- a/src/failure_detector/failure_detector.cpp +++ b/src/failure_detector/failure_detector.cpp @@ -26,16 +26,20 @@ #include "failure_detector/failure_detector.h" +#include #include #include +#include #include -#include #include #include #include "absl/strings/string_view.h" #include "failure_detector/fd.code.definition.h" #include "fd_types.h" +#include "fmt/core.h" +#include "fmt/format.h" +#include "nlohmann/json_fwd.hpp" #include "runtime/api_layer1.h" #include "runtime/serverlet.h" #include "runtime/task/async_calls.h" @@ -68,10 +72,10 @@ void failure_detector::register_ctrl_commands() { static std::once_flag flag; std::call_once(flag, [&]() { - _get_allow_list = dsn::command_manager::instance().register_command( - {"fd.allow_list"}, + _get_allow_list = dsn::command_manager::instance().register_single_command( "fd.allow_list", - "show allow list of failure detector", + "Show the allow list of failure detector", + "", [this](const std::vector &args) { return get_allow_list(args); }); }); } @@ -335,18 +339,17 @@ void failure_detector::set_allow_list(const std::vector &replica_ad std::string failure_detector::get_allow_list(const std::vector &args) const { - if (!_is_started) - return "error: FD is not started"; + if (!_is_started) { + nlohmann::json err_msg; + err_msg["error"] = fmt::format("FD is not started"); + return err_msg.dump(2); + } - std::stringstream oss; + nlohmann::json info; dsn::zauto_lock l(_lock); - oss << "get ok: allow list " << (_use_allow_list ? "enabled. list: " : "disabled."); - for (auto iter = _allow_list.begin(); iter != _allow_list.end(); ++iter) { - if (iter != _allow_list.begin()) - oss << ","; - oss << *iter; - } - return oss.str(); + info["enabled"] = _use_allow_list; + info["allow_list"] = fmt::format("{}", fmt::join(_allow_list, ",")); + return info.dump(2); } void failure_detector::on_ping_internal(const beacon_msg &beacon, /*out*/ beacon_ack &ack) diff --git a/src/meta/greedy_load_balancer.cpp b/src/meta/greedy_load_balancer.cpp index 48f79258aa..4409c5aea6 100644 --- a/src/meta/greedy_load_balancer.cpp +++ b/src/meta/greedy_load_balancer.cpp @@ -24,8 +24,12 @@ * THE SOFTWARE. */ +#include +#include +#include // IWYU pragma: no_include #include +#include #include #include #include @@ -71,39 +75,35 @@ greedy_load_balancer::~greedy_load_balancer() {} void greedy_load_balancer::register_ctrl_commands() { - _get_balance_operation_count = dsn::command_manager::instance().register_command( - {"meta.lb.get_balance_operation_count"}, - "meta.lb.get_balance_operation_count [total | move_pri | copy_pri | copy_sec | detail]", - "get balance operation count", + _get_balance_operation_count = dsn::command_manager::instance().register_single_command( + "meta.lb.get_balance_operation_count", + "Get balance operation count", + "[total | move_pri | copy_pri | copy_sec | detail]", [this](const std::vector &args) { return get_balance_operation_count(args); }); } std::string greedy_load_balancer::get_balance_operation_count(const std::vector &args) { - if (args.empty()) { - return std::string("total=" + std::to_string(t_operation_counters[ALL_COUNT])); + nlohmann::json info; + if (args.size() > 1) { + info["error"] = fmt::format("invalid arguments"); + } else if (args.empty() || args[0] == "total") { + info["total"] = t_operation_counters[ALL_COUNT]; + } else if (args[0] == "move_pri") { + info["move_pri"] = t_operation_counters[MOVE_PRI_COUNT]; + } else if (args[0] == "copy_pri") { + info["copy_pri"] = t_operation_counters[COPY_PRI_COUNT]; + } else if (args[0] == "copy_sec") { + info["copy_sec"] = t_operation_counters[COPY_SEC_COUNT]; + } else if (args[0] == "detail") { + info["move_pri"] = t_operation_counters[MOVE_PRI_COUNT]; + info["copy_pri"] = t_operation_counters[COPY_PRI_COUNT]; + info["copy_sec"] = t_operation_counters[COPY_SEC_COUNT]; + info["total"] = t_operation_counters[ALL_COUNT]; + } else { + info["error"] = fmt::format("invalid arguments"); } - - if (args[0] == "total") { - return std::string("total=" + std::to_string(t_operation_counters[ALL_COUNT])); - } - - std::string result("unknown"); - if (args[0] == "move_pri") - result = std::string("move_pri=" + std::to_string(t_operation_counters[MOVE_PRI_COUNT])); - else if (args[0] == "copy_pri") - result = std::string("copy_pri=" + std::to_string(t_operation_counters[COPY_PRI_COUNT])); - else if (args[0] == "copy_sec") - result = std::string("copy_sec=" + std::to_string(t_operation_counters[COPY_SEC_COUNT])); - else if (args[0] == "detail") - result = std::string("move_pri=" + std::to_string(t_operation_counters[MOVE_PRI_COUNT]) + - ",copy_pri=" + std::to_string(t_operation_counters[COPY_PRI_COUNT]) + - ",copy_sec=" + std::to_string(t_operation_counters[COPY_SEC_COUNT]) + - ",total=" + std::to_string(t_operation_counters[ALL_COUNT])); - else - result = std::string("ERR: invalid arguments"); - - return result; + return info.dump(2); } void greedy_load_balancer::score(meta_view view, double &primary_stddev, double &total_stddev) diff --git a/src/meta/load_balance_policy.cpp b/src/meta/load_balance_policy.cpp index 8cff520c94..da4c8ee72e 100644 --- a/src/meta/load_balance_policy.cpp +++ b/src/meta/load_balance_policy.cpp @@ -17,14 +17,19 @@ #include "meta/load_balance_policy.h" +#include +#include // IWYU pragma: no_include #include +#include +#include #include #include #include #include #include +#include "absl/strings/string_view.h" #include "dsn.layer2_types.h" #include "meta/greedy_load_balancer.h" #include "meta/meta_data.h" @@ -34,7 +39,6 @@ #include "utils/flags.h" #include "utils/fmt_logging.h" #include "utils/string_conv.h" -#include "absl/strings/string_view.h" #include "utils/strings.h" DSN_DECLARE_uint64(min_live_node_count_for_unfreeze); @@ -183,10 +187,10 @@ load_balance_policy::load_balance_policy(meta_service *svc) { static std::once_flag flag; std::call_once(flag, [&]() { - _ctrl_balancer_ignored_apps = dsn::command_manager::instance().register_command( - {"meta.lb.ignored_app_list"}, - "meta.lb.ignored_app_list [app_id1,app_id2..]", - "get, set and clear balancer ignored_app_list", + _ctrl_balancer_ignored_apps = dsn::command_manager::instance().register_single_command( + "meta.lb.ignored_app_list", + "Get, set or clear balancer ignored_app_list", + " [set_app_id1,set_app_id2,...]", [this](const std::vector &args) { return remote_command_balancer_ignored_app_ids(args); }); @@ -395,70 +399,75 @@ bool load_balance_policy::execute_balance( std::string load_balance_policy::remote_command_balancer_ignored_app_ids(const std::vector &args) { - static const std::string invalid_arguments("invalid arguments"); - if (args.empty()) { - return invalid_arguments; - } - if (args[0] == "set") { - return set_balancer_ignored_app_ids(args); - } - if (args[0] == "get") { - return get_balancer_ignored_app_ids(); - } - if (args[0] == "clear") { - return clear_balancer_ignored_app_ids(); - } - return invalid_arguments; + static const std::string invalid_arguments_message("invalid arguments"); + nlohmann::json info; + do { + if (args.empty()) { + break; + } + if (args[0] == "set") { + return set_balancer_ignored_app_ids(args); + } else if (args[0] == "get") { + return get_balancer_ignored_app_ids(); + } else if (args[0] == "clear") { + return clear_balancer_ignored_app_ids(); + } + } while (false); + + info["error"] = invalid_arguments_message; + return info.dump(2); } std::string load_balance_policy::set_balancer_ignored_app_ids(const std::vector &args) { - static const std::string invalid_arguments("invalid arguments"); + nlohmann::json info; + info["error"] = "invalid argument"; if (args.size() != 2) { - return invalid_arguments; + return info.dump(2); } std::vector app_ids; dsn::utils::split_args(args[1].c_str(), app_ids, ','); if (app_ids.empty()) { - return invalid_arguments; + return info.dump(2); } std::set app_list; - for (const std::string &app_id_str : app_ids) { + for (const auto &app_id_str : app_ids) { app_id app; if (!dsn::buf2int32(app_id_str, app)) { - return invalid_arguments; + return info.dump(2); } app_list.insert(app); } - dsn::zauto_write_lock l(_balancer_ignored_apps_lock); - _balancer_ignored_apps = std::move(app_list); - return "set ok"; + { + dsn::zauto_write_lock l(_balancer_ignored_apps_lock); + _balancer_ignored_apps = std::move(app_list); + } + info["error"] = "ok"; + return info.dump(2); } std::string load_balance_policy::get_balancer_ignored_app_ids() { - std::stringstream oss; - dsn::zauto_read_lock l(_balancer_ignored_apps_lock); - if (_balancer_ignored_apps.empty()) { - return "no ignored apps"; + nlohmann::json data; + { + dsn::zauto_read_lock l(_balancer_ignored_apps_lock); + data["ignored_app_id_list"] = fmt::format("{}", fmt::join(_balancer_ignored_apps, ",")); } - oss << "ignored_app_id_list: "; - std::copy(_balancer_ignored_apps.begin(), - _balancer_ignored_apps.end(), - std::ostream_iterator(oss, ",")); - std::string app_ids = oss.str(); - app_ids[app_ids.size() - 1] = '\0'; - return app_ids; + return data.dump(2); } std::string load_balance_policy::clear_balancer_ignored_app_ids() { - dsn::zauto_write_lock l(_balancer_ignored_apps_lock); - _balancer_ignored_apps.clear(); - return "clear ok"; + { + dsn::zauto_write_lock l(_balancer_ignored_apps_lock); + _balancer_ignored_apps.clear(); + } + nlohmann::json info; + info["error"] = "ok"; + return info.dump(2); } bool load_balance_policy::is_ignored_app(app_id app_id) diff --git a/src/meta/partition_guardian.cpp b/src/meta/partition_guardian.cpp index 92a4c167da..9d2fb4447d 100644 --- a/src/meta/partition_guardian.cpp +++ b/src/meta/partition_guardian.cpp @@ -18,8 +18,11 @@ #include "meta/partition_guardian.h" #include +#include // IWYU pragma: no_include #include +#include +#include #include #include #include @@ -695,10 +698,10 @@ void partition_guardian::register_ctrl_commands() "meta.lb.assign_delay_ms", "control the replica_assign_delay_ms_for_dropouts config")); - _cmds.emplace_back(dsn::command_manager::instance().register_command( - {"meta.lb.assign_secondary_black_list"}, - "lb.assign_secondary_black_list [|clear]", - "control the assign secondary black list", + _cmds.emplace_back(dsn::command_manager::instance().register_single_command( + "meta.lb.assign_secondary_black_list", + "Control the assign secondary black list", + "[ip1:port,ip2:port,...|clear]", [this](const std::vector &args) { return ctrl_assign_secondary_black_list(args); })); @@ -707,47 +710,56 @@ void partition_guardian::register_ctrl_commands() std::string partition_guardian::ctrl_assign_secondary_black_list(const std::vector &args) { - std::string invalid_arguments("invalid arguments"); - std::stringstream oss; + nlohmann::json msg; + msg["error"] = "ok"; + // Query. if (args.empty()) { - dsn::zauto_read_lock l(_black_list_lock); - oss << "get ok: "; - for (auto iter = _assign_secondary_black_list.begin(); - iter != _assign_secondary_black_list.end(); - ++iter) { - if (iter != _assign_secondary_black_list.begin()) - oss << ","; - oss << *iter; + { + dsn::zauto_read_lock l(_black_list_lock); + msg["assign_secondary_black_list"] = + fmt::format("{}", fmt::join(_assign_secondary_black_list, ",")); } - return oss.str(); + return msg.dump(2); } + // Invalid argument. if (args.size() != 1) { - return invalid_arguments; + msg["error"] = "invalid argument, 0 or 1 argument is acceptable"; + return msg.dump(2); } - dsn::zauto_write_lock l(_black_list_lock); + // Clear. if (args[0] == "clear") { - _assign_secondary_black_list.clear(); - return "clear ok"; + { + dsn::zauto_write_lock l(_black_list_lock); + _assign_secondary_black_list.clear(); + } + return msg.dump(2); } + // Set to new value. std::vector ip_ports; dsn::utils::split_args(args[0].c_str(), ip_ports, ','); - if (args.size() == 0) { - return invalid_arguments; + if (ip_ports.empty()) { + msg["error"] = + "invalid argument, the argument should be in form of ''"; + return msg.dump(2); } std::set addr_list; - for (const std::string &s : ip_ports) { + for (const auto &ip_port : ip_ports) { const auto addr = rpc_address::from_host_port(s); if (!addr) { - return invalid_arguments; + msg["error"] = fmt::format("invalid argument, bad ip:port '{}'", ip_port); + return msg.dump(2); } addr_list.insert(addr); } - _assign_secondary_black_list = std::move(addr_list); - return "set ok"; + { + dsn::zauto_write_lock l(_black_list_lock); + _assign_secondary_black_list = std::move(addr_list); + } + return msg.dump(2); } void partition_guardian::get_ddd_partitions(const gpid &pid, diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index d5cb2294b1..ea75e5d48e 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -142,27 +142,16 @@ server_state::~server_state() { _tracker.cancel_outstanding_tasks(); } void server_state::register_cli_commands() { - _cmds.emplace_back(dsn::command_manager::instance().register_command( - {"meta.dump"}, - "meta.dump - dump app_states of meta server to local file", - "meta.dump -t|--target target_file", + _cmds.emplace_back(dsn::command_manager::instance().register_single_command( + "meta.dump", + "Dump app_states of meta server to a local file", + "", [this](const std::vector &args) { - dsn::error_code err; - if (args.size() != 2) { - err = ERR_INVALID_PARAMETERS; - } else { - const char *target_file = nullptr; - for (int i = 0; i < args.size(); i += 2) { - if (args[i] == "-t" || args[i] == "--target") - target_file = args[i + 1].c_str(); - } - if (target_file == nullptr) { - err = ERR_INVALID_PARAMETERS; - } else { - err = this->dump_from_remote_storage(target_file, false); - } + if (args.size() != 1) { + return ERR_INVALID_PARAMETERS.to_string(); } - return std::string(err.to_string()); + + return dump_from_remote_storage(args[0].c_str(), false).to_string(); })); _cmds.emplace_back(dsn::command_manager::instance().register_bool_command( diff --git a/src/perf_counter/perf_counters.cpp b/src/perf_counter/perf_counters.cpp index 8d805db5be..24e26c699e 100644 --- a/src/perf_counter/perf_counters.cpp +++ b/src/perf_counter/perf_counters.cpp @@ -117,27 +117,27 @@ perf_counters::perf_counters() // perf_counters tools::shared_io_service::instance(); - _cmds.emplace_back(command_manager::instance().register_command( - {"perf-counters"}, - "perf-counters - query perf counters, filtered by OR of POSIX basic regular expressions", - "perf-counters [regexp]...", + _cmds.emplace_back(command_manager::instance().register_single_command( + "perf-counters", + "Query perf counters, filtered by OR of POSIX basic regular expressions", + "[regexp]...", [](const std::vector &args) { return perf_counters::instance().list_snapshot_by_regexp(args); })); - _cmds.emplace_back(command_manager::instance().register_command( - {"perf-counters-by-substr"}, - "perf-counters-by-substr - query perf counters, filtered by OR of substrs", - "perf-counters-by-substr [substr]...", + _cmds.emplace_back(command_manager::instance().register_single_command( + "perf-counters-by-substr", + "Query perf counters, filtered by OR of substrs", + "[substr]...", [](const std::vector &args) { return perf_counters::instance().list_snapshot_by_literal( args, [](const std::string &arg, const counter_snapshot &cs) { return cs.name.find(arg) != std::string::npos; }); })); - _cmds.emplace_back(command_manager::instance().register_command( - {"perf-counters-by-prefix"}, - "perf-counters-by-prefix - query perf counters, filtered by OR of prefix strings", - "perf-counters-by-prefix [prefix]...", + _cmds.emplace_back(command_manager::instance().register_single_command( + "perf-counters-by-prefix", + "Query perf counters, filtered by OR of prefix strings", + "[prefix]...", [](const std::vector &args) { return perf_counters::instance().list_snapshot_by_literal( args, [](const std::string &arg, const counter_snapshot &cs) { @@ -145,10 +145,10 @@ perf_counters::perf_counters() utils::mequals(cs.name.c_str(), arg.c_str(), arg.size()); }); })); - _cmds.emplace_back(command_manager::instance().register_command( - {"perf-counters-by-postfix"}, - "perf-counters-by-postfix - query perf counters, filtered by OR of postfix strings", - "perf-counters-by-postfix [postfix]...", + _cmds.emplace_back(command_manager::instance().register_single_command( + "perf-counters-by-postfix", + "Query perf counters, filtered by OR of postfix strings", + "[postfix]...", [](const std::vector &args) { return perf_counters::instance().list_snapshot_by_literal( args, [](const std::string &arg, const counter_snapshot &cs) { @@ -159,10 +159,10 @@ perf_counters::perf_counters() }); })); - _cmds.emplace_back(command_manager::instance().register_command( - {"server-stat"}, - "server-stat - query selected perf counters", + _cmds.emplace_back(command_manager::instance().register_single_command( "server-stat", + "Query selected perf counters", + "", [](const std::vector &args) { return get_brief_stat(); })); } diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index 06a6bc48a5..1b3d250942 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -2129,10 +2129,10 @@ void replica_stub::open_service() #if !defined(DSN_ENABLE_GPERF) && defined(DSN_USE_JEMALLOC) void replica_stub::register_jemalloc_ctrl_command() { - _cmds.emplace_back(::dsn::command_manager::instance().register_command( - {"replica.dump-jemalloc-stats"}, - fmt::format("replica.dump-jemalloc-stats <{}> [buffer size]", kAllJeStatsTypesStr), - "dump stats of jemalloc", + _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( + "replica.dump-jemalloc-stats", + "Dump stats of jemalloc", + fmt::format("<{}> [buffer size]", kAllJeStatsTypesStr), [](const std::vector &args) { if (args.empty()) { return std::string("invalid arguments"); @@ -2170,10 +2170,10 @@ void replica_stub::register_ctrl_command() /// failure_detector::register_ctrl_commands and nfs_client_impl::register_cli_commands static std::once_flag flag; std::call_once(flag, [&]() { - _cmds.emplace_back(::dsn::command_manager::instance().register_command( - {"replica.kill_partition"}, - "replica.kill_partition [app_id [partition_index]]", - "replica.kill_partition: kill partitions by (all, one app, one partition)", + _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( + "replica.kill_partition", + "Kill partitions by (all, one app, one partition)", + "[app_id [partition_index]]", [this](const std::vector &args) { dsn::gpid pid; if (args.size() == 0) { @@ -2205,11 +2205,10 @@ void replica_stub::register_ctrl_command() "replica.verbose-commit-log", "control if print verbose log when commit mutation")); - _cmds.emplace_back(::dsn::command_manager::instance().register_command( - {"replica.trigger-checkpoint"}, - "replica.trigger-checkpoint [id1,id2,...] (where id is 'app_id' or " - "'app_id.partition_id')", - "replica.trigger-checkpoint - trigger replicas to do checkpoint", + _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( + "replica.trigger-checkpoint", + "Trigger replicas to do checkpoint by app_id or app_id.partition_id", + "[id1,id2,...]", [this](const std::vector &args) { return exec_command_on_replica(args, true, [this](const replica_ptr &rep) { tasking::enqueue(LPC_PER_REPLICA_CHECKPOINT_TIMER, @@ -2220,20 +2219,21 @@ void replica_stub::register_ctrl_command() }); })); - _cmds.emplace_back(::dsn::command_manager::instance().register_command( - {"replica.query-compact"}, - "replica.query-compact [id1,id2,...] (where id is 'app_id' or 'app_id.partition_id')", - "replica.query-compact - query full compact status on the underlying storage engine", + _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( + "replica.query-compact", + "Query full compact status on the underlying storage engine by app_id or " + "app_id.partition_id", + "[id1,id2,...]", [this](const std::vector &args) { return exec_command_on_replica(args, true, [](const replica_ptr &rep) { return rep->query_manual_compact_state(); }); })); - _cmds.emplace_back(::dsn::command_manager::instance().register_command( - {"replica.query-app-envs"}, - "replica.query-app-envs [id1,id2,...] (where id is 'app_id' or 'app_id.partition_id')", - "replica.query-app-envs - query app envs on the underlying storage engine", + _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( + "replica.query-app-envs", + "Query app envs on the underlying storage engine by app_id or app_id.partition_id", + "[id1,id2,...]", [this](const std::vector &args) { return exec_command_on_replica(args, true, [](const replica_ptr &rep) { std::map kv_map; @@ -2248,10 +2248,10 @@ void replica_stub::register_ctrl_command() "replica.release-tcmalloc-memory", "control if try to release tcmalloc memory")); - _cmds.emplace_back(::dsn::command_manager::instance().register_command( - {"replica.get-tcmalloc-status"}, - "replica.get-tcmalloc-status - get status of tcmalloc", - "get status of tcmalloc", + _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( + "replica.get-tcmalloc-status", + "Get the status of tcmalloc", + "", [](const std::vector &args) { char buf[4096]; MallocExtension::instance()->GetStats(buf, 4096); @@ -2265,10 +2265,10 @@ void replica_stub::register_ctrl_command() "control tcmalloc max reserved but not-used memory percentage", &check_mem_release_max_reserved_mem_percentage)); - _cmds.emplace_back(::dsn::command_manager::instance().register_command( - {"replica.release-all-reserved-memory"}, - "replica.release-all-reserved-memory - release tcmalloc all reserved-not-used memory", - "release tcmalloc all reserverd not-used memory back to operating system", + _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( + "replica.release-all-reserved-memory", + "Release tcmalloc all reserved-not-used memory back to operating system", + "", [this](const std::vector &args) { auto release_bytes = gc_tcmalloc_memory(true); return "OK, release_bytes=" + std::to_string(release_bytes); diff --git a/src/runtime/service_api_c.cpp b/src/runtime/service_api_c.cpp index 028555a509..c2c6b95263 100644 --- a/src/runtime/service_api_c.cpp +++ b/src/runtime/service_api_c.cpp @@ -538,25 +538,24 @@ bool run(const char *config_file, exit(1); } - dump_log_cmd = - dsn::command_manager::instance().register_command({"config-dump"}, - "config-dump - dump configuration", - "config-dump [to-this-config-file]", - [](const std::vector &args) { - std::ostringstream oss; - std::ofstream off; - std::ostream *os = &oss; - if (args.size() > 0) { - off.open(args[0]); - os = &off; - - oss << "config dump to file " - << args[0] << std::endl; - } - - dsn_config_dump(*os); - return oss.str(); - }); + dump_log_cmd = dsn::command_manager::instance().register_single_command( + "config-dump", + "Dump all configurations to a server local path or to stdout", + "[target_file]", + [](const std::vector &args) { + std::ostringstream oss; + std::ofstream off; + std::ostream *os = &oss; + if (args.size() > 0) { + off.open(args[0]); + os = &off; + + oss << "config dump to file " << args[0] << std::endl; + } + + dsn_config_dump(*os); + return oss.str(); + }); // invoke customized init after apps are created dsn::tools::sys_init_after_app_created.execute(); diff --git a/src/runtime/service_engine.cpp b/src/runtime/service_engine.cpp index ee0c742d81..98c7e58fd7 100644 --- a/src/runtime/service_engine.cpp +++ b/src/runtime/service_engine.cpp @@ -26,13 +26,15 @@ #include "service_engine.h" -#include +// IWYU pragma: no_include #include #include #include #include #include "common/gpid.h" +#include "fmt/core.h" +#include "nlohmann/json.hpp" #include "runtime/node_scoper.h" #include "runtime/rpc/rpc_engine.h" #include "runtime/rpc/rpc_message.h" @@ -44,6 +46,7 @@ #include "utils/filesystem.h" #include "utils/fmt_logging.h" #include "utils/join_point.h" +#include "utils/string_conv.h" #include "utils/strings.h" using namespace dsn::utils; @@ -144,22 +147,19 @@ error_code service_node::start() return err; } -void service_node::get_runtime_info(const std::string &indent, - const std::vector &args, - /*out*/ std::stringstream &ss) +std::string service_node::get_runtime_info(const std::vector &args) const { - ss << indent << full_name() << ":" << std::endl; - - std::string indent2 = indent + "\t"; - _computation->get_runtime_info(indent2, args, ss); + nlohmann::json info; + info[full_name()] = _computation->get_runtime_info(args); + return info.dump(2); } -void service_node::get_queue_info( - /*out*/ std::stringstream &ss) +nlohmann::json service_node::get_queue_info() const { - ss << "{\"app_name\":\"" << full_name() << "\",\n\"thread_pool\":[\n"; - _computation->get_queue_info(ss); - ss << "]}"; + nlohmann::json info; + info["app_name"] = full_name(); + info["thread_pool"] = _computation->get_queue_info(); + return info; } rpc_request_task *service_node::generate_intercepted_request_task(message_ex *req) @@ -189,16 +189,18 @@ service_engine::service_engine() { _env = nullptr; - _cmds.emplace_back(dsn::command_manager::instance().register_command( - {"engine"}, - "engine - get engine internal information", - "engine [app-id]", + _cmds.emplace_back(dsn::command_manager::instance().register_single_command( + "engine", + "Get engine internal information, including threadpools and threads and queues in each " + "threadpool", + "[app-id]", &service_engine::get_runtime_info)); - _cmds.emplace_back(dsn::command_manager::instance().register_command( - {"system.queue"}, - "system.queue - get queue internal information", + _cmds.emplace_back(dsn::command_manager::instance().register_single_command( "system.queue", + "Get queue internal information, including the threadpool each queue belongs to, and the " + "queue name and size", + "", &service_engine::get_queue_info)); } @@ -243,39 +245,47 @@ void service_engine::start_node(service_app_spec &app_spec) std::string service_engine::get_runtime_info(const std::vector &args) { - std::stringstream ss; - if (args.size() == 0) { - ss << "" << service_engine::instance()._nodes_by_app_id.size() - << " nodes available:" << std::endl; - for (auto &kv : service_engine::instance()._nodes_by_app_id) { - ss << "\t" << kv.second->id() << "." << kv.second->full_name() << std::endl; - } - } else { - std::string indent = ""; - int id = atoi(args[0].c_str()); - auto it = service_engine::instance()._nodes_by_app_id.find(id); - if (it != service_engine::instance()._nodes_by_app_id.end()) { - auto args2 = args; - args2.erase(args2.begin()); - it->second->get_runtime_info(indent, args2, ss); - } else { - ss << "cannot find node with given app id"; + // Overview. + if (args.empty()) { + nlohmann::json overview; + nlohmann::json nodes; + for (const auto &nodes_by_app_id : service_engine::instance()._nodes_by_app_id) { + nodes.emplace_back(fmt::format( + "{}.{}", nodes_by_app_id.second->id(), nodes_by_app_id.second->full_name())); } + overview["available_nodes"] = nodes; + return overview.dump(2); + } + + // Invalid argument. + int id; + if (!dsn::buf2int32(args[0], id)) { + nlohmann::json err_msg; + err_msg["error"] = "invalid argument, only one integer argument is acceptable"; + return err_msg.dump(2); } - return ss.str(); + + // The query id is not found. + const auto &it = service_engine::instance()._nodes_by_app_id.find(id); + if (it == service_engine::instance()._nodes_by_app_id.end()) { + nlohmann::json err_msg; + err_msg["error"] = fmt::format("cannot find node with given app id({})", id); + return err_msg.dump(2); + } + + // Query a special id. + auto tmp_args = args; + tmp_args.erase(tmp_args.begin()); + return it->second->get_runtime_info(tmp_args); } std::string service_engine::get_queue_info(const std::vector &args) { - std::stringstream ss; - ss << "["; - for (auto &it : service_engine::instance()._nodes_by_app_id) { - if (it.first != service_engine::instance()._nodes_by_app_id.begin()->first) - ss << ","; - it.second->get_queue_info(ss); + nlohmann::json info; + for (const auto &nodes_by_app_id : service_engine::instance()._nodes_by_app_id) { + info.emplace_back(nodes_by_app_id.second->get_queue_info()); } - ss << "]"; - return ss.str(); + return info.dump(2); } bool service_engine::is_simulator() const { return _simulator; } diff --git a/src/runtime/service_engine.h b/src/runtime/service_engine.h index 1e6528f0fa..f2c7aea76e 100644 --- a/src/runtime/service_engine.h +++ b/src/runtime/service_engine.h @@ -28,10 +28,10 @@ #include #include -#include #include #include +#include "nlohmann/json_fwd.hpp" #include "runtime/api_task.h" #include "runtime/global_config.h" #include "runtime/service_app.h" @@ -60,10 +60,8 @@ class service_node rpc_engine *rpc() const { return _rpc.get(); } task_engine *computation() const { return _computation.get(); } - void get_runtime_info(const std::string &indent, - const std::vector &args, - /*out*/ std::stringstream &ss); - void get_queue_info(/*out*/ std::stringstream &ss); + std::string get_runtime_info(const std::vector &args) const; + nlohmann::json get_queue_info() const; dsn::error_code start(); dsn::error_code start_app(); diff --git a/src/runtime/task/task_code.cpp b/src/runtime/task/task_code.cpp index 9533210e50..9abf2193fa 100644 --- a/src/runtime/task/task_code.cpp +++ b/src/runtime/task/task_code.cpp @@ -42,24 +42,24 @@ namespace utils { template <> void task_code_mgr::register_commands() { - _cmds.emplace_back(command_manager::instance().register_command( - {"task-code"}, - "task-code - query task code containing any given keywords", - "task-code keyword1 keyword2 ...", + _cmds.emplace_back(command_manager::instance().register_single_command( + "task-code", + "Query task code containing any given keywords", + "[keyword1] [keyword2] ...", [](const std::vector &args) { std::stringstream ss; - for (int code = 0; code <= dsn::task_code::max(); code++) { - if (code == TASK_CODE_INVALID) + if (code == TASK_CODE_INVALID) { continue; + } - std::string codes = dsn::task_code(code).to_string(); - if (args.size() == 0) { - ss << " " << codes << std::endl; + const std::string code_str = dsn::task_code(code).to_string(); + if (args.empty()) { + ss << " " << code_str << std::endl; } else { - for (auto &arg : args) { - if (codes.find(arg.c_str()) != std::string::npos) { - ss << " " << codes << std::endl; + for (const auto &arg : args) { + if (code_str.find(arg) != std::string::npos) { + ss << " " << code_str << std::endl; } } } diff --git a/src/runtime/task/task_engine.cpp b/src/runtime/task/task_engine.cpp index 4fcc3f6e8e..179b045626 100644 --- a/src/runtime/task/task_engine.cpp +++ b/src/runtime/task/task_engine.cpp @@ -29,9 +29,9 @@ // IWYU pragma: no_include #include #include -#include #include "fmt/core.h" +#include "nlohmann/json.hpp" #include "runtime/global_config.h" #include "runtime/service_engine.h" #include "runtime/task/task.h" @@ -184,42 +184,50 @@ bool task_worker_pool::shared_same_worker_with_current_task(task *tsk) const } } -void task_worker_pool::get_runtime_info(const std::string &indent, - const std::vector &args, - /*out*/ std::stringstream &ss) +nlohmann::json task_worker_pool::get_runtime_info(const std::vector &args) const { - std::string indent2 = indent + "\t"; - ss << indent << "contains " << _workers.size() << " threads with " << _queues.size() - << " queues" << std::endl; - - for (auto &q : _queues) { - if (q) { - ss << indent2 << q->get_name() << " now has " << q->count() << " pending tasks" - << std::endl; + nlohmann::json info; + + // Queues. + nlohmann::json queues; + for (const auto &queue : _queues) { + if (queue) { + nlohmann::json q; + q["name"] = queue->get_name(); + q["pending_task_count"] = queue->count(); + queues.emplace_back(q); } } - - for (auto &wk : _workers) { - if (wk) { - ss << indent2 << wk->index() << " (TID = " << wk->native_tid() - << ") attached with queue " << wk->queue()->get_name() << std::endl; + info["queues"] = queues; + + // Threads. + nlohmann::json workers; + for (const auto &worker : _workers) { + if (worker) { + nlohmann::json w; + w["index"] = worker->index(); + w["TID"] = worker->native_tid(); + w["queue_name"] = worker->queue()->get_name(); + workers.emplace_back(w); } } + info["threads"] = workers; + + return info; } -void task_worker_pool::get_queue_info(/*out*/ std::stringstream &ss) + +nlohmann::json task_worker_pool::get_queue_info() const { - ss << "["; - bool first_flag = 0; - for (auto &q : _queues) { - if (q) { - if (!first_flag) - first_flag = 1; - else - ss << ","; - ss << "\t\t{\"name\":\"" << q->get_name() << "\",\n\t\t\"num\":" << q->count() << "}\n"; + nlohmann::json queues; + for (const auto &queue : _queues) { + if (queue) { + nlohmann::json q; + q["name"] = queue->get_name(); + q["pending_task_count"] = queue->count(); + queues.emplace_back(q); } } - ss << "]\n"; + return queues; } task_engine::task_engine(service_node *node) @@ -279,74 +287,71 @@ volatile int *task_engine::get_task_queue_virtual_length_ptr(dsn::task_code code return pl->queues()[idx]->get_virtual_length_ptr(); } -void task_engine::get_runtime_info(const std::string &indent, - const std::vector &args, - /*out*/ std::stringstream &ss) +nlohmann::json task_engine::get_runtime_info(const std::vector &args) const { - std::string indent2 = indent + "\t"; - for (auto &p : _pools) { - if (p) { - ss << indent << p->spec().pool_code.to_string() << std::endl; - p->get_runtime_info(indent2, args, ss); + nlohmann::json pools; + for (const auto &pool : _pools) { + if (pool) { + pools[pool->spec().pool_code.to_string()] = pool->get_runtime_info(args); } } + return pools; } -void task_engine::get_queue_info(/*out*/ std::stringstream &ss) +nlohmann::json task_engine::get_queue_info() const { - bool first_flag = 0; - for (auto &p : _pools) { - if (p) { - if (!first_flag) - first_flag = 1; - else - ss << ","; - ss << "\t{\"pool_name\":\"" << p->spec().pool_code << "\",\n\t\"pool_queue\":\n"; - p->get_queue_info(ss); - ss << "}\n"; + nlohmann::json pools; + for (const auto &pool : _pools) { + if (pool) { + nlohmann::json p; + p["name"] = pool->spec().pool_code.to_string(); + p["queue"] = pool->get_queue_info(); + pools.emplace_back(p); } } + return pools; } void task_engine::register_cli_commands() { static std::once_flag flag; std::call_once(flag, [&]() { - _task_queue_max_length_cmd = dsn::command_manager::instance().register_command( - {"task.queue_max_length"}, - "task.queue_max_length [queue_max_length]", - "get/set the max task queue length of specific thread_pool, you can set INT_MAX, to " - "set a big enough value, but you can't cancel delay/reject dynamically", + _task_queue_max_length_cmd = dsn::command_manager::instance().register_single_command( + "task.queue_max_length", + "Get the current or set a new max task queue length of a specific thread_pool. It can " + "be set it to INT_MAX which means a big enough value, but it can't be cancelled the " + "delay/reject policy dynamically", + " [new_max_queue_length]", [this](const std::vector &args) { - if (args.size() < 1) { + if (args.empty()) { return std::string("ERR: invalid arguments, task.queue_max_length " "[queue_max_length]"); } - for (auto &it : _pools) { - if (!it) { + for (const auto &pool : _pools) { + if (!pool) { continue; } - if (it->_spec.pool_code.to_string() == args[0]) { - // when args length is 1, return current value + if (pool->_spec.pool_code.to_string() == args[0]) { + // Query. if (args.size() == 1) { - return fmt::format("task queue {}, length {}", + return fmt::format("The current task queue length of {} is {}", args[0], - it->_spec.queue_length_throttling_threshold); + pool->_spec.queue_length_throttling_threshold); } + + // Update. if (args.size() == 2) { - int queue_length = INT_MAX; - if ((args[1] != "INT_MAX") && - (!dsn::buf2int32(args[1], queue_length))) { - return fmt::format("queue_max_length must >= 0, or set `INT_MAX`"); - } - if (queue_length < 0) { - queue_length = INT_MAX; + int new_queue_length = INT_MAX; + if ((args[1] != "INT_MAX" && + !dsn::buf2int32(args[1], new_queue_length)) || + new_queue_length < 0) { + return fmt::format("queue_max_length must be >= 0 or 'INT_MAX'"); } - it->_spec.queue_length_throttling_threshold = queue_length; - return fmt::format("task queue {}, length {}", + pool->_spec.queue_length_throttling_threshold = new_queue_length; + return fmt::format("Task queue {} is updated to new max length {}", args[0], - it->_spec.queue_length_throttling_threshold); + pool->_spec.queue_length_throttling_threshold); } } } diff --git a/src/runtime/task/task_engine.h b/src/runtime/task/task_engine.h index cbff4e3514..6a8fe2a0ad 100644 --- a/src/runtime/task/task_engine.h +++ b/src/runtime/task/task_engine.h @@ -26,12 +26,12 @@ #pragma once -#include #include #include #include #include +#include "nlohmann/json_fwd.hpp" #include "runtime/task/task_code.h" #include "utils/command_manager.h" #include "utils/threadpool_spec.h" @@ -72,10 +72,8 @@ class task_worker_pool bool shared_same_worker_with_current_task(task *task) const; task_engine *engine() const { return _owner; } service_node *node() const { return _node; } - void get_runtime_info(const std::string &indent, - const std::vector &args, - /*out*/ std::stringstream &ss); - void get_queue_info(/*out*/ std::stringstream &ss); + nlohmann::json get_runtime_info(const std::vector &args) const; + nlohmann::json get_queue_info() const; std::vector &queues() { return _queues; } std::vector &workers() { return _workers; } @@ -118,10 +116,8 @@ class task_engine volatile int *get_task_queue_virtual_length_ptr(dsn::task_code code, int hash); service_node *node() const { return _node; } - void get_runtime_info(const std::string &indent, - const std::vector &args, - /*out*/ std::stringstream &ss); - void get_queue_info(/*out*/ std::stringstream &ss); + nlohmann::json get_runtime_info(const std::vector &args) const; + nlohmann::json get_queue_info() const; private: void register_cli_commands(); diff --git a/src/runtime/test/task_engine.cpp b/src/runtime/test/task_engine.cpp index 38374f4558..be02295b95 100644 --- a/src/runtime/test/task_engine.cpp +++ b/src/runtime/test/task_engine.cpp @@ -26,6 +26,9 @@ #include "runtime/task/task_engine.h" +#include +#include +#include #include #include "gtest/gtest.h" @@ -57,10 +60,7 @@ TEST(core, task_engine) ASSERT_NE(nullptr, engine); ASSERT_TRUE(engine->is_started()); - std::vector args; - std::stringstream oss; - engine->get_runtime_info(" ", args, oss); - printf("%s\n", oss.str().c_str()); + fmt::print(stdout, "{}\n", engine->get_runtime_info({}).dump()); std::vector &pools = engine->pools(); for (size_t i = 0; i < pools.size(); ++i) { diff --git a/src/runtime/tracer.cpp b/src/runtime/tracer.cpp index c3a7e9154b..30f2c89eb0 100644 --- a/src/runtime/tracer.cpp +++ b/src/runtime/tracer.cpp @@ -260,8 +260,6 @@ static std::string tracer_log_flow_error(const char *msg) static std::string tracer_log_flow(const std::vector &args) { - // forward|f|backward|b rpc|r|task|t trace_id|task_id(e.g., 002a003920302390) - // log_file_name(log.xx.txt) if (args.size() < 4) { return tracer_log_flow_error("not enough arguments"); } @@ -406,11 +404,11 @@ void tracer::install(service_spec &spec) static std::once_flag flag; std::call_once(flag, [&]() { - _tracer_find_cmd = command_manager::instance().register_command( - {"tracer.find"}, - "tracer.find - find related logs", - "tracer.find forward|f|backward|b rpc|r|task|t trace_id|task_id(e.g., " - "a023003920302390) log_file_name(log.xx.txt)", + _tracer_find_cmd = command_manager::instance().register_single_command( + "tracer.find", + "Find related logs", + "[forward|f|backward|b] [rpc|r|task|t] [trace_id|task_id(e.g., a023003920302390)] " + "", tracer_log_flow); }); } diff --git a/src/server/main.cpp b/src/server/main.cpp index 7eb9253fab..2ad1e6ab2f 100644 --- a/src/server/main.cpp +++ b/src/server/main.cpp @@ -17,12 +17,14 @@ * under the License. */ +#include +#include #include #include #include #include +#include #include -#include #include #include @@ -91,17 +93,18 @@ int main(int argc, char **argv) dsn_app_registration_pegasus(); std::unique_ptr server_info_cmd = - dsn::command_manager::instance().register_command( - {"server-info"}, - "server-info - query server information", + dsn::command_manager::instance().register_single_command( "server-info", + "Query server information", + "", [](const std::vector &args) { - char str[100]; - ::dsn::utils::time_ms_to_date_time(dsn::utils::process_start_millis(), str, 100); - std::ostringstream oss; - oss << "Pegasus Server " << PEGASUS_VERSION << " (" << PEGASUS_GIT_COMMIT << ") " - << PEGASUS_BUILD_TYPE << ", Started at " << str; - return oss.str(); + nlohmann::json info; + info["version"] = PEGASUS_VERSION; + info["build_type"] = PEGASUS_BUILD_TYPE; + info["git_SHA"] = PEGASUS_GIT_COMMIT; + info["start_time"] = + ::dsn::utils::time_s_to_date_time(dsn::utils::process_start_millis() / 1000); + return info.dump(); }); dsn_run(argc, argv, true); diff --git a/src/server/pegasus_manual_compact_service.cpp b/src/server/pegasus_manual_compact_service.cpp index a2cdf4812e..617a7b452f 100644 --- a/src/server/pegasus_manual_compact_service.cpp +++ b/src/server/pegasus_manual_compact_service.cpp @@ -21,14 +21,15 @@ #include #include +#include +#include #include #include -#include #include #include -#include "common/replication.codes.h" #include "common/replica_envs.h" +#include "common/replication.codes.h" #include "pegasus_server_impl.h" #include "runtime/api_layer1.h" #include "runtime/task/async_calls.h" @@ -339,31 +340,16 @@ std::string pegasus_manual_compact_service::query_compact_state() const uint64_t start_time_ms = _manual_compact_start_running_time_ms.load(); uint64_t last_finish_time_ms = _manual_compact_last_finish_time_ms.load(); uint64_t last_time_used_ms = _manual_compact_last_time_used_ms.load(); - std::stringstream state; - if (last_finish_time_ms > 0) { - char str[24] = {0}; - dsn::utils::time_ms_to_string(last_finish_time_ms, str); - state << "last finish at [" << str << "]"; - } else { - state << "last finish at [-]"; - } - - if (last_time_used_ms > 0) { - state << ", last used " << last_time_used_ms << " ms"; - } - - if (enqueue_time_ms > 0) { - char str[24] = {0}; - dsn::utils::time_ms_to_string(enqueue_time_ms, str); - state << ", recent enqueue at [" << str << "]"; - } - if (start_time_ms > 0) { - char str[24] = {0}; - dsn::utils::time_ms_to_string(start_time_ms, str); - state << ", recent start at [" << str << "]"; - } - return state.str(); + nlohmann::json info; + info["recent_enqueue_at"] = + enqueue_time_ms > 0 ? dsn::utils::time_s_to_date_time(enqueue_time_ms / 1000) : "-"; + info["recent_start_at"] = + start_time_ms > 0 ? dsn::utils::time_s_to_date_time(start_time_ms / 1000) : "-"; + info["last_finish"] = + last_finish_time_ms > 0 ? dsn::utils::time_s_to_date_time(last_finish_time_ms / 1000) : "-"; + info["last_used_ms"] = last_time_used_ms > 0 ? std::to_string(last_time_used_ms) : "-"; + return info.dump(); } dsn::replication::manual_compaction_status::type diff --git a/src/shell/commands/node_management.cpp b/src/shell/commands/node_management.cpp index 17a02be6ea..261fe50fd7 100644 --- a/src/shell/commands/node_management.cpp +++ b/src/shell/commands/node_management.cpp @@ -662,10 +662,10 @@ bool remote_command(command_executor *e, shell_context *sc, arguments args) } fprintf(stderr, "CALL [%s] [%s] ", n.desc.c_str(), hostname.c_str()); if (results[i].first) { - fprintf(stderr, "succeed: %s\n", results[i].second.c_str()); + fprintf(stderr, "succeed:\n%s\n", results[i].second.c_str()); succeed++; } else { - fprintf(stderr, "failed: %s\n", results[i].second.c_str()); + fprintf(stderr, "failed:\n%s\n", results[i].second.c_str()); failed++; } } diff --git a/src/shell/main.cpp b/src/shell/main.cpp index 186fb92bab..59eb2bc7d8 100644 --- a/src/shell/main.cpp +++ b/src/shell/main.cpp @@ -333,7 +333,7 @@ static command_executor commands[] = { "remote_command", "send remote command to servers", "[-t all|meta-server|replica-server] [-r|--resolve_ip] [-l ip:port,ip:port...]" - " [arguments...]", + " [arguments...]", remote_command, }, { diff --git a/src/utils/command_manager.cpp b/src/utils/command_manager.cpp index 0c5dd9b1fc..bcf37bb873 100644 --- a/src/utils/command_manager.cpp +++ b/src/utils/command_manager.cpp @@ -26,26 +26,28 @@ #include "utils/command_manager.h" +#include // IWYU pragma: no_include #include #include #include #include // IWYU pragma: keep #include +#include #include namespace dsn { std::unique_ptr command_manager::register_command(const std::vector &commands, - const std::string &help_one_line, - const std::string &help_long, + const std::string &help, + const std::string &args, command_handler handler) { auto *c = new command_instance(); c->commands = commands; - c->help_short = help_one_line; - c->help_long = help_long; + c->help = help; + c->args = args; c->handler = std::move(handler); utils::auto_write_lock l(_lock); @@ -59,13 +61,37 @@ command_manager::register_command(const std::vector &commands, std::unique_ptr command_manager::register_bool_command( bool &value, const std::string &command, const std::string &help) +{ + return register_single_command(command, + help, + fmt::format(""), + [&value, command](const std::vector &args) { + return set_bool(value, command, args); + }); +} + +std::unique_ptr +command_manager::register_single_command(const std::string &command, + const std::string &help, + const std::string &args, + command_handler handler) { return register_command({command}, - fmt::format("{} ", command), - help, - [&value, command](const std::vector &args) { - return set_bool(value, command, args); - }); + fmt::format("{} - {}", command, help), + fmt::format("{} {}", command, args), + handler); +} + +std::unique_ptr +command_manager::register_multiple_commands(const std::vector &commands, + const std::string &help, + const std::string &args, + command_handler handler) +{ + return register_command(commands, + fmt::format("{} - {}", fmt::join(commands, "|"), help), + fmt::format("{} {}", fmt::join(commands, "|"), args), + handler); } void command_manager::deregister_command(uintptr_t handle) @@ -128,41 +154,44 @@ std::string command_manager::set_bool(bool &value, command_manager::command_manager() { - _cmds.emplace_back(register_command({"help", "h", "H", "Help"}, - "help|Help|h|H [command] - display help information", - "", - [this](const std::vector &args) { - std::stringstream ss; - - if (args.size() == 0) { - utils::auto_read_lock l(_lock); - for (const auto &c : this->_handlers) { - ss << c.second->help_short << std::endl; - } - } else { - utils::auto_read_lock l(_lock); - auto it = _handlers.find(args[0]); - if (it == _handlers.end()) - ss << "cannot find command '" << args[0] << "'"; - else { - ss.width(6); - ss << std::left << it->first << ": " - << it->second->help_short << std::endl - << it->second->help_long << std::endl; - } - } - - return ss.str(); - })); - - _cmds.emplace_back(register_command( + _cmds.emplace_back( + register_multiple_commands({"help", "h", "H", "Help"}, + "Display help information", + "[command]", + [this](const std::vector &args) { + std::stringstream ss; + if (args.empty()) { + std::unordered_set cmds; + utils::auto_read_lock l(_lock); + for (const auto &c : this->_handlers) { + // Multiple commands with the same handler are print + // only once. + if (cmds.insert(c.second.get()).second) { + ss << c.second->help << std::endl; + } + } + } else { + utils::auto_read_lock l(_lock); + auto it = _handlers.find(args[0]); + if (it == _handlers.end()) { + ss << "cannot find command '" << args[0] << "'"; + } else { + ss.width(6); + ss << std::left << it->second->help << std::endl + << it->second->args << std::endl; + } + } + + return ss.str(); + })); + + _cmds.emplace_back(register_multiple_commands( {"repeat", "r", "R", "Repeat"}, - "repeat|Repeat|r|R interval_seconds max_count command - execute command periodically", - "repeat|Repeat|r|R interval_seconds max_count command - execute command every interval " - "seconds, to the max count as max_count (0 for infinite)", + "Execute a command periodically in every interval seconds for the max count time (0 for " + "infinite)", + " ", [this](const std::vector &args) { std::stringstream ss; - if (args.size() < 3) { return "insufficient arguments"; } @@ -172,25 +201,27 @@ command_manager::command_manager() return "invalid interval argument"; } - int max_count = atoi(args[1].c_str()); - if (max_count < 0) { + uint32_t max_count; + if (!dsn::buf2uint32(args[1], max_count)) { return "invalid max count"; } if (max_count == 0) { - max_count = std::numeric_limits::max(); + max_count = std::numeric_limits::max(); } - std::string cmd = args[2]; - std::vector largs; - for (int i = 3; i < (int)args.size(); i++) { - largs.push_back(args[i]); + const auto &command = args[2]; + std::vector command_args; + for (size_t i = 3; i < args.size(); i++) { + command_args.push_back(args[i]); } - for (int i = 0; i < max_count; i++) { + // TODO(yingchun): the 'repeat' command may last long time (or even infinity), it's + // easy to timeout, the remote_command timeout is a fixed value of 5 seconds (see + // call_remote_command()), and it also consumes thread resource on server side. + for (uint32_t i = 0; i < max_count; i++) { std::string output; - auto r = this->run_command(cmd, largs, output); - + auto r = this->run_command(command, command_args, output); if (!r) { break; } diff --git a/src/utils/command_manager.h b/src/utils/command_manager.h index 26a6cb3428..a73966845c 100644 --- a/src/utils/command_manager.h +++ b/src/utils/command_manager.h @@ -52,12 +52,6 @@ class command_manager : public ::dsn::utils::singleton public: using command_handler = std::function &)>; - std::unique_ptr - register_command(const std::vector &commands, - const std::string &help_one_line, - const std::string &help_long, - command_handler handler) WARN_UNUSED_RESULT; - // Register command which query or update a boolean configuration. // The 'value' will be queried or updated by the command named 'command' with the 'help' // description. @@ -78,15 +72,31 @@ class command_manager : public ::dsn::utils::singleton std::function validator = [](int64_t new_value) -> bool { return new_value >= 0; }) { - return register_command( - {command}, - fmt::format("{} [num | DEFAULT]", command), + return register_single_command( + command, help, + fmt::format("[num | DEFAULT]"), [&value, default_value, command, validator](const std::vector &args) { return set_int(value, default_value, command, args, validator); }); } + // Register a single 'command' with the 'help' description, its arguments are describe in + // 'args'. + std::unique_ptr + register_single_command(const std::string &command, + const std::string &help, + const std::string &args, + command_handler handler) WARN_UNUSED_RESULT; + + // Register multiple 'commands' with the 'help' description, their arguments are describe in + // 'args'. + std::unique_ptr + register_multiple_commands(const std::vector &commands, + const std::string &help, + const std::string &args, + command_handler handler) WARN_UNUSED_RESULT; + bool run_command(const std::string &cmd, const std::vector &args, /*out*/ std::string &output); @@ -101,11 +111,17 @@ class command_manager : public ::dsn::utils::singleton struct command_instance : public ref_counter { std::vector commands; - std::string help_short; - std::string help_long; + std::string help; + std::string args; command_handler handler; }; + std::unique_ptr + register_command(const std::vector &commands, + const std::string &help, + const std::string &args, + command_handler handler) WARN_UNUSED_RESULT; + void deregister_command(uintptr_t handle); static std::string diff --git a/src/utils/simple_logger.cpp b/src/utils/simple_logger.cpp index 79dfea3de0..db524e4f23 100644 --- a/src/utils/simple_logger.cpp +++ b/src/utils/simple_logger.cpp @@ -189,19 +189,19 @@ simple_logger::simple_logger(const char *log_dir) // "assertion expression: [_handlers.empty()] All commands must be deregistered before // command_manager is destroyed, however 'flush-log' is still registered". // We need to fix it. - _cmds.emplace_back(::dsn::command_manager::instance().register_command( - {"flush-log"}, - "flush-log - flush log to stderr or log file", + _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( "flush-log", + "Flush log to stderr or file", + "", [this](const std::vector &args) { this->flush(); return "Flush done."; })); - _cmds.emplace_back(::dsn::command_manager::instance().register_command( - {"reset-log-start-level"}, - "reset-log-start-level - reset the log start level", - "reset-log-start-level [DEBUG | INFO | WARNING | ERROR | FATAL]", + _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( + "reset-log-start-level", + "Reset the log start level", + "[DEBUG | INFO | WARNING | ERROR | FATAL]", [](const std::vector &args) { log_level_t start_level; if (args.size() == 0) { diff --git a/src/utils/test/command_manager_test.cpp b/src/utils/test/command_manager_test.cpp index e6e2598cac..fd7265d8de 100644 --- a/src/utils/test/command_manager_test.cpp +++ b/src/utils/test/command_manager_test.cpp @@ -32,10 +32,10 @@ class command_manager_test : public ::testing::Test public: command_manager_test() { - _cmd = command_manager::instance().register_command( - {"test-cmd"}, - "test-cmd - just for command_manager unit-test", - "test-cmd arg1 arg2 ...", + _cmd = command_manager::instance().register_single_command( + "test-cmd", + "Just for command_manager unit-test", + "arg1 arg2 ...", [](const vector &args) { return fmt::format("test-cmd response: [{}]", boost::join(args, " ")); });