diff --git a/src/failure_detector/failure_detector.cpp b/src/failure_detector/failure_detector.cpp index af264d21f15..4923739f567 100644 --- a/src/failure_detector/failure_detector.cpp +++ b/src/failure_detector/failure_detector.cpp @@ -33,6 +33,8 @@ #include #include +#include + #include "absl/strings/string_view.h" #include "failure_detector/fd.code.definition.h" #include "fd_types.h" @@ -68,10 +70,10 @@ void failure_detector::register_ctrl_commands() { static std::once_flag flag; std::call_once(flag, [&]() { - _get_allow_list = dsn::command_manager::instance().register_command( - {"fd.allow_list"}, + _get_allow_list = dsn::command_manager::instance().register_single_command( "fd.allow_list", - "show allow list of failure detector", + "Show the allow list of failure detector", + "", [this](const std::vector &args) { return get_allow_list(args); }); }); } @@ -335,18 +337,17 @@ void failure_detector::set_allow_list(const std::vector &replica_ad std::string failure_detector::get_allow_list(const std::vector &args) const { - if (!_is_started) - return "error: FD is not started"; + if (!_is_started) { + nlohmann::json err_msg; + err_msg["error"] = fmt::format("FD is not started"); + return err_msg.dump(2); + } - std::stringstream oss; + nlohmann::json info; dsn::zauto_lock l(_lock); - oss << "get ok: allow list " << (_use_allow_list ? "enabled. list: " : "disabled."); - for (auto iter = _allow_list.begin(); iter != _allow_list.end(); ++iter) { - if (iter != _allow_list.begin()) - oss << ","; - oss << *iter; - } - return oss.str(); + info["enabled"] = _use_allow_list; + info["allow_list"] = fmt::format("{}", fmt::join(_allow_list, ",")); + return info.dump(2); } void failure_detector::on_ping_internal(const beacon_msg &beacon, /*out*/ beacon_ack &ack) diff --git a/src/meta/greedy_load_balancer.cpp b/src/meta/greedy_load_balancer.cpp index 48f79258aac..aa2fceebd70 100644 --- a/src/meta/greedy_load_balancer.cpp +++ b/src/meta/greedy_load_balancer.cpp @@ -32,6 +32,8 @@ #include #include +#include + #include "app_balance_policy.h" #include "cluster_balance_policy.h" #include "greedy_load_balancer.h" @@ -71,39 +73,35 @@ greedy_load_balancer::~greedy_load_balancer() {} void greedy_load_balancer::register_ctrl_commands() { - _get_balance_operation_count = dsn::command_manager::instance().register_command( - {"meta.lb.get_balance_operation_count"}, - "meta.lb.get_balance_operation_count [total | move_pri | copy_pri | copy_sec | detail]", - "get balance operation count", + _get_balance_operation_count = dsn::command_manager::instance().register_single_command( + "meta.lb.get_balance_operation_count", + "Get balance operation count", + "[total | move_pri | copy_pri | copy_sec | detail]", [this](const std::vector &args) { return get_balance_operation_count(args); }); } std::string greedy_load_balancer::get_balance_operation_count(const std::vector &args) { - if (args.empty()) { - return std::string("total=" + std::to_string(t_operation_counters[ALL_COUNT])); - } - - if (args[0] == "total") { - return std::string("total=" + std::to_string(t_operation_counters[ALL_COUNT])); + nlohmann::json info; + if (args.size() > 1) { + info["error"] = fmt::format("invalid arguments"); + } else if (args.empty() || args[0] == "total") { + info["total"] = t_operation_counters[ALL_COUNT]; + } else if (args[0] == "move_pri") { + info["move_pri"] = t_operation_counters[MOVE_PRI_COUNT]; + } else if (args[0] == "copy_pri") { + info["copy_pri"] = t_operation_counters[COPY_PRI_COUNT]; + } else if (args[0] == "copy_sec") { + info["copy_sec"] = t_operation_counters[COPY_SEC_COUNT]; + } else if (args[0] == "detail") { + info["move_pri"] = t_operation_counters[MOVE_PRI_COUNT]; + info["copy_pri"] = t_operation_counters[COPY_PRI_COUNT]; + info["copy_sec"] = t_operation_counters[COPY_SEC_COUNT]; + info["total"] = t_operation_counters[ALL_COUNT]; + } else { + info["error"] = fmt::format("invalid arguments"); } - - std::string result("unknown"); - if (args[0] == "move_pri") - result = std::string("move_pri=" + std::to_string(t_operation_counters[MOVE_PRI_COUNT])); - else if (args[0] == "copy_pri") - result = std::string("copy_pri=" + std::to_string(t_operation_counters[COPY_PRI_COUNT])); - else if (args[0] == "copy_sec") - result = std::string("copy_sec=" + std::to_string(t_operation_counters[COPY_SEC_COUNT])); - else if (args[0] == "detail") - result = std::string("move_pri=" + std::to_string(t_operation_counters[MOVE_PRI_COUNT]) + - ",copy_pri=" + std::to_string(t_operation_counters[COPY_PRI_COUNT]) + - ",copy_sec=" + std::to_string(t_operation_counters[COPY_SEC_COUNT]) + - ",total=" + std::to_string(t_operation_counters[ALL_COUNT])); - else - result = std::string("ERR: invalid arguments"); - - return result; + return info.dump(2); } void greedy_load_balancer::score(meta_view view, double &primary_stddev, double &total_stddev) diff --git a/src/meta/load_balance_policy.cpp b/src/meta/load_balance_policy.cpp index 8cff520c946..6476636fbbc 100644 --- a/src/meta/load_balance_policy.cpp +++ b/src/meta/load_balance_policy.cpp @@ -25,6 +25,8 @@ #include #include +#include + #include "dsn.layer2_types.h" #include "meta/greedy_load_balancer.h" #include "meta/meta_data.h" @@ -183,10 +185,10 @@ load_balance_policy::load_balance_policy(meta_service *svc) { static std::once_flag flag; std::call_once(flag, [&]() { - _ctrl_balancer_ignored_apps = dsn::command_manager::instance().register_command( - {"meta.lb.ignored_app_list"}, - "meta.lb.ignored_app_list [app_id1,app_id2..]", - "get, set and clear balancer ignored_app_list", + _ctrl_balancer_ignored_apps = dsn::command_manager::instance().register_single_command( + "meta.lb.ignored_app_list", + "get, set or clear balancer ignored_app_list", + " [app_id1,app_id2..]", [this](const std::vector &args) { return remote_command_balancer_ignored_app_ids(args); }); @@ -395,70 +397,71 @@ bool load_balance_policy::execute_balance( std::string load_balance_policy::remote_command_balancer_ignored_app_ids(const std::vector &args) { - static const std::string invalid_arguments("invalid arguments"); + nlohmann::json info; if (args.empty()) { - return invalid_arguments; - } - if (args[0] == "set") { + info["error"] = "invalid argument"; + } else if (args[0] == "set") { return set_balancer_ignored_app_ids(args); - } - if (args[0] == "get") { + } else if (args[0] == "get") { return get_balancer_ignored_app_ids(); - } - if (args[0] == "clear") { + } else if (args[0] == "clear") { return clear_balancer_ignored_app_ids(); + } else { + info["error"] = "invalid argument"; } - return invalid_arguments; + return info.dump(2); } std::string load_balance_policy::set_balancer_ignored_app_ids(const std::vector &args) { - static const std::string invalid_arguments("invalid arguments"); + nlohmann::json info; + info["error"] = "invalid argument"; if (args.size() != 2) { - return invalid_arguments; + return info.dump(2); } std::vector app_ids; dsn::utils::split_args(args[1].c_str(), app_ids, ','); if (app_ids.empty()) { - return invalid_arguments; + return info.dump(2); } std::set app_list; - for (const std::string &app_id_str : app_ids) { + for (const auto &app_id_str : app_ids) { app_id app; if (!dsn::buf2int32(app_id_str, app)) { - return invalid_arguments; + return info.dump(2); } app_list.insert(app); } - dsn::zauto_write_lock l(_balancer_ignored_apps_lock); - _balancer_ignored_apps = std::move(app_list); - return "set ok"; + { + dsn::zauto_write_lock l(_balancer_ignored_apps_lock); + _balancer_ignored_apps = std::move(app_list); + } + info["error"] = "ok"; + return info.dump(2); } std::string load_balance_policy::get_balancer_ignored_app_ids() { - std::stringstream oss; - dsn::zauto_read_lock l(_balancer_ignored_apps_lock); - if (_balancer_ignored_apps.empty()) { - return "no ignored apps"; + nlohmann::json data; + { + dsn::zauto_read_lock l(_balancer_ignored_apps_lock); + data["ignored_app_id_list"] = fmt::format("{}", fmt::join(_balancer_ignored_apps, ",")); } - oss << "ignored_app_id_list: "; - std::copy(_balancer_ignored_apps.begin(), - _balancer_ignored_apps.end(), - std::ostream_iterator(oss, ",")); - std::string app_ids = oss.str(); - app_ids[app_ids.size() - 1] = '\0'; - return app_ids; + return data.dump(2); } std::string load_balance_policy::clear_balancer_ignored_app_ids() { - dsn::zauto_write_lock l(_balancer_ignored_apps_lock); - _balancer_ignored_apps.clear(); - return "clear ok"; + { + dsn::zauto_write_lock l(_balancer_ignored_apps_lock); + _balancer_ignored_apps.clear(); + } + nlohmann::json info; + info["error"] = "ok"; + return info.dump(2); } bool load_balance_policy::is_ignored_app(app_id app_id) diff --git a/src/meta/partition_guardian.cpp b/src/meta/partition_guardian.cpp index fdc531fccff..1618bd0c5d9 100644 --- a/src/meta/partition_guardian.cpp +++ b/src/meta/partition_guardian.cpp @@ -25,6 +25,8 @@ #include #include +#include + #include "common/replication_common.h" #include "common/replication_other_types.h" #include "meta/meta_data.h" @@ -694,10 +696,10 @@ void partition_guardian::register_ctrl_commands() "meta.lb.assign_delay_ms", "control the replica_assign_delay_ms_for_dropouts config")); - _cmds.emplace_back(dsn::command_manager::instance().register_command( - {"meta.lb.assign_secondary_black_list"}, - "lb.assign_secondary_black_list [|clear]", + _cmds.emplace_back(dsn::command_manager::instance().register_single_command( + "meta.lb.assign_secondary_black_list", "control the assign secondary black list", + "[|clear]", [this](const std::vector &args) { return ctrl_assign_secondary_black_list(args); })); @@ -706,47 +708,56 @@ void partition_guardian::register_ctrl_commands() std::string partition_guardian::ctrl_assign_secondary_black_list(const std::vector &args) { - std::string invalid_arguments("invalid arguments"); - std::stringstream oss; + nlohmann::json msg; + msg["error"] = "ok"; + // Query. if (args.empty()) { - dsn::zauto_read_lock l(_black_list_lock); - oss << "get ok: "; - for (auto iter = _assign_secondary_black_list.begin(); - iter != _assign_secondary_black_list.end(); - ++iter) { - if (iter != _assign_secondary_black_list.begin()) - oss << ","; - oss << *iter; + { + dsn::zauto_read_lock l(_black_list_lock); + msg["assign_secondary_black_list"] = + fmt::format("{}", fmt::join(_assign_secondary_black_list, ",")); } - return oss.str(); + return msg.dump(2); } + // Invalid argument. if (args.size() != 1) { - return invalid_arguments; + msg["error"] = "invalid argument, 0 or 1 argument is acceptable"; + return msg.dump(2); } - dsn::zauto_write_lock l(_black_list_lock); + // Clear. if (args[0] == "clear") { - _assign_secondary_black_list.clear(); - return "clear ok"; + { + dsn::zauto_write_lock l(_black_list_lock); + _assign_secondary_black_list.clear(); + } + return msg.dump(2); } + // Set to new value. std::vector ip_ports; dsn::utils::split_args(args[0].c_str(), ip_ports, ','); - if (args.size() == 0) { - return invalid_arguments; + if (ip_ports.empty()) { + msg["error"] = + "invalid argument, the argument should be in form of ''"; + return msg.dump(2); } std::set addr_list; - for (const std::string &s : ip_ports) { + for (const auto &ip_port : ip_ports) { dsn::rpc_address addr; - if (!addr.from_string_ipv4(s.c_str())) { - return invalid_arguments; + if (!addr.from_string_ipv4(ip_port.c_str())) { + msg["error"] = fmt::format("invalid argument, bad ip:port '{}'", ip_port); + return msg.dump(2); } addr_list.insert(addr); } - _assign_secondary_black_list = std::move(addr_list); - return "set ok"; + { + dsn::zauto_write_lock l(_black_list_lock); + _assign_secondary_black_list = std::move(addr_list); + } + return msg.dump(2); } void partition_guardian::get_ddd_partitions(const gpid &pid, diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index d5cb2294b19..32737a96630 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -142,10 +142,10 @@ server_state::~server_state() { _tracker.cancel_outstanding_tasks(); } void server_state::register_cli_commands() { - _cmds.emplace_back(dsn::command_manager::instance().register_command( - {"meta.dump"}, - "meta.dump - dump app_states of meta server to local file", - "meta.dump -t|--target target_file", + _cmds.emplace_back(dsn::command_manager::instance().register_single_command( + "meta.dump", + "Dump app_states of meta server to a local file", + "-t|--target target_file", [this](const std::vector &args) { dsn::error_code err; if (args.size() != 2) { diff --git a/src/perf_counter/perf_counters.cpp b/src/perf_counter/perf_counters.cpp index 8d805db5be3..b1d0f930a30 100644 --- a/src/perf_counter/perf_counters.cpp +++ b/src/perf_counter/perf_counters.cpp @@ -159,10 +159,10 @@ perf_counters::perf_counters() }); })); - _cmds.emplace_back(command_manager::instance().register_command( - {"server-stat"}, - "server-stat - query selected perf counters", + _cmds.emplace_back(command_manager::instance().register_single_command( "server-stat", + "Query selected perf counters", + "", [](const std::vector &args) { return get_brief_stat(); })); } diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index 06a6bc48a5e..cfa2559990f 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -2129,10 +2129,10 @@ void replica_stub::open_service() #if !defined(DSN_ENABLE_GPERF) && defined(DSN_USE_JEMALLOC) void replica_stub::register_jemalloc_ctrl_command() { - _cmds.emplace_back(::dsn::command_manager::instance().register_command( - {"replica.dump-jemalloc-stats"}, - fmt::format("replica.dump-jemalloc-stats <{}> [buffer size]", kAllJeStatsTypesStr), - "dump stats of jemalloc", + _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( + "replica.dump-jemalloc-stats", + "Dump stats of jemalloc", + fmt::format("<{}> [buffer size]", kAllJeStatsTypesStr), [](const std::vector &args) { if (args.empty()) { return std::string("invalid arguments"); @@ -2170,10 +2170,10 @@ void replica_stub::register_ctrl_command() /// failure_detector::register_ctrl_commands and nfs_client_impl::register_cli_commands static std::once_flag flag; std::call_once(flag, [&]() { - _cmds.emplace_back(::dsn::command_manager::instance().register_command( - {"replica.kill_partition"}, - "replica.kill_partition [app_id [partition_index]]", - "replica.kill_partition: kill partitions by (all, one app, one partition)", + _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( + "replica.kill_partition", + "Kill partitions by (all, one app, one partition)", + "[app_id [partition_index]]", [this](const std::vector &args) { dsn::gpid pid; if (args.size() == 0) { @@ -2205,11 +2205,10 @@ void replica_stub::register_ctrl_command() "replica.verbose-commit-log", "control if print verbose log when commit mutation")); - _cmds.emplace_back(::dsn::command_manager::instance().register_command( - {"replica.trigger-checkpoint"}, - "replica.trigger-checkpoint [id1,id2,...] (where id is 'app_id' or " - "'app_id.partition_id')", - "replica.trigger-checkpoint - trigger replicas to do checkpoint", + _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( + "replica.trigger-checkpoint", + "Trigger replicas to do checkpoint", + "[id1,id2,...] (where id is 'app_id' or 'app_id.partition_id')", [this](const std::vector &args) { return exec_command_on_replica(args, true, [this](const replica_ptr &rep) { tasking::enqueue(LPC_PER_REPLICA_CHECKPOINT_TIMER, @@ -2220,10 +2219,10 @@ void replica_stub::register_ctrl_command() }); })); - _cmds.emplace_back(::dsn::command_manager::instance().register_command( - {"replica.query-compact"}, - "replica.query-compact [id1,id2,...] (where id is 'app_id' or 'app_id.partition_id')", - "replica.query-compact - query full compact status on the underlying storage engine", + _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( + "replica.query-compact", + "Query full compact status on the underlying storage engine", + "[id1,id2,...] (where id is 'app_id' or 'app_id.partition_id')", [this](const std::vector &args) { return exec_command_on_replica(args, true, [](const replica_ptr &rep) { return rep->query_manual_compact_state(); diff --git a/src/runtime/service_engine.cpp b/src/runtime/service_engine.cpp index ee0c742d812..577136e9c21 100644 --- a/src/runtime/service_engine.cpp +++ b/src/runtime/service_engine.cpp @@ -44,6 +44,7 @@ #include "utils/filesystem.h" #include "utils/fmt_logging.h" #include "utils/join_point.h" +#include "utils/string_conv.h" #include "utils/strings.h" using namespace dsn::utils; @@ -144,22 +145,19 @@ error_code service_node::start() return err; } -void service_node::get_runtime_info(const std::string &indent, - const std::vector &args, - /*out*/ std::stringstream &ss) +std::string service_node::get_runtime_info(const std::vector &args) const { - ss << indent << full_name() << ":" << std::endl; - - std::string indent2 = indent + "\t"; - _computation->get_runtime_info(indent2, args, ss); + nlohmann::json info; + info[full_name()] = _computation->get_runtime_info(args); + return info.dump(2); } -void service_node::get_queue_info( - /*out*/ std::stringstream &ss) +nlohmann::json service_node::get_queue_info() const { - ss << "{\"app_name\":\"" << full_name() << "\",\n\"thread_pool\":[\n"; - _computation->get_queue_info(ss); - ss << "]}"; + nlohmann::json info; + info["app_name"] = full_name(); + info["thread_pool"] = _computation->get_queue_info(); + return info; } rpc_request_task *service_node::generate_intercepted_request_task(message_ex *req) @@ -189,16 +187,18 @@ service_engine::service_engine() { _env = nullptr; - _cmds.emplace_back(dsn::command_manager::instance().register_command( - {"engine"}, - "engine - get engine internal information", - "engine [app-id]", + _cmds.emplace_back(dsn::command_manager::instance().register_single_command( + "engine", + "Get engine internal information, including threadpools and threads and queues in each " + "threadpool", + "[app-id]", &service_engine::get_runtime_info)); - _cmds.emplace_back(dsn::command_manager::instance().register_command( - {"system.queue"}, - "system.queue - get queue internal information", + _cmds.emplace_back(dsn::command_manager::instance().register_single_command( "system.queue", + "Get queue internal information, including the threadpool each queue belongs to, and the " + "queue name and size", + "", &service_engine::get_queue_info)); } @@ -243,39 +243,47 @@ void service_engine::start_node(service_app_spec &app_spec) std::string service_engine::get_runtime_info(const std::vector &args) { - std::stringstream ss; - if (args.size() == 0) { - ss << "" << service_engine::instance()._nodes_by_app_id.size() - << " nodes available:" << std::endl; - for (auto &kv : service_engine::instance()._nodes_by_app_id) { - ss << "\t" << kv.second->id() << "." << kv.second->full_name() << std::endl; - } - } else { - std::string indent = ""; - int id = atoi(args[0].c_str()); - auto it = service_engine::instance()._nodes_by_app_id.find(id); - if (it != service_engine::instance()._nodes_by_app_id.end()) { - auto args2 = args; - args2.erase(args2.begin()); - it->second->get_runtime_info(indent, args2, ss); - } else { - ss << "cannot find node with given app id"; + // Overview. + if (args.empty()) { + nlohmann::json overview; + nlohmann::json nodes; + for (const auto &nodes_by_app_id : service_engine::instance()._nodes_by_app_id) { + nodes.emplace_back(fmt::format( + "{}.{}", nodes_by_app_id.second->id(), nodes_by_app_id.second->full_name())); } + overview["available_nodes"] = nodes; + return overview.dump(2); + } + + // Invalid argument. + int id; + if (!dsn::buf2int32(args[0], id)) { + nlohmann::json err_msg; + err_msg["error"] = "invalid argument, only one integer argument is acceptable"; + return err_msg.dump(2); } - return ss.str(); + + // The query id is not found. + const auto &it = service_engine::instance()._nodes_by_app_id.find(id); + if (it == service_engine::instance()._nodes_by_app_id.end()) { + nlohmann::json err_msg; + err_msg["error"] = fmt::format("cannot find node with given app id({})", id); + return err_msg.dump(2); + } + + // Query a special id. + auto tmp_args = args; + tmp_args.erase(tmp_args.begin()); + return it->second->get_runtime_info(tmp_args); } std::string service_engine::get_queue_info(const std::vector &args) { - std::stringstream ss; - ss << "["; - for (auto &it : service_engine::instance()._nodes_by_app_id) { - if (it.first != service_engine::instance()._nodes_by_app_id.begin()->first) - ss << ","; - it.second->get_queue_info(ss); + nlohmann::json info; + for (const auto &nodes_by_app_id : service_engine::instance()._nodes_by_app_id) { + info.emplace_back(nodes_by_app_id.second->get_queue_info()); } - ss << "]"; - return ss.str(); + return info.dump(2); } bool service_engine::is_simulator() const { return _simulator; } diff --git a/src/runtime/service_engine.h b/src/runtime/service_engine.h index 1e6528f0fae..ccf3c5b7596 100644 --- a/src/runtime/service_engine.h +++ b/src/runtime/service_engine.h @@ -32,6 +32,8 @@ #include #include +#include + #include "runtime/api_task.h" #include "runtime/global_config.h" #include "runtime/service_app.h" @@ -60,10 +62,8 @@ class service_node rpc_engine *rpc() const { return _rpc.get(); } task_engine *computation() const { return _computation.get(); } - void get_runtime_info(const std::string &indent, - const std::vector &args, - /*out*/ std::stringstream &ss); - void get_queue_info(/*out*/ std::stringstream &ss); + std::string get_runtime_info(const std::vector &args) const; + nlohmann::json get_queue_info() const; dsn::error_code start(); dsn::error_code start_app(); diff --git a/src/runtime/task/task_engine.cpp b/src/runtime/task/task_engine.cpp index 4fcc3f6e8e7..71b808eff60 100644 --- a/src/runtime/task/task_engine.cpp +++ b/src/runtime/task/task_engine.cpp @@ -184,42 +184,49 @@ bool task_worker_pool::shared_same_worker_with_current_task(task *tsk) const } } -void task_worker_pool::get_runtime_info(const std::string &indent, - const std::vector &args, - /*out*/ std::stringstream &ss) +nlohmann::json task_worker_pool::get_runtime_info(const std::vector &args) const { - std::string indent2 = indent + "\t"; - ss << indent << "contains " << _workers.size() << " threads with " << _queues.size() - << " queues" << std::endl; - - for (auto &q : _queues) { - if (q) { - ss << indent2 << q->get_name() << " now has " << q->count() << " pending tasks" - << std::endl; + nlohmann::json info; + + // Queues. + nlohmann::json workers; + for (const auto &worker : _workers) { + if (worker) { + nlohmann::json w; + w["TID"] = worker->native_tid(); + w["queue_name"] = worker->queue()->get_name(); + workers.emplace_back(w); } } - - for (auto &wk : _workers) { - if (wk) { - ss << indent2 << wk->index() << " (TID = " << wk->native_tid() - << ") attached with queue " << wk->queue()->get_name() << std::endl; + info["threads"] = workers; + + // Threads. + nlohmann::json queues; + for (const auto &queue : _queues) { + if (queue) { + nlohmann::json q; + q["name"] = queue->get_name(); + q["pending_task_count"] = queue->count(); + queues.emplace_back(q); } } + info["queues"] = queues; + + return info; } -void task_worker_pool::get_queue_info(/*out*/ std::stringstream &ss) + +nlohmann::json task_worker_pool::get_queue_info() const { - ss << "["; - bool first_flag = 0; - for (auto &q : _queues) { - if (q) { - if (!first_flag) - first_flag = 1; - else - ss << ","; - ss << "\t\t{\"name\":\"" << q->get_name() << "\",\n\t\t\"num\":" << q->count() << "}\n"; + nlohmann::json queues; + for (auto &queue : _queues) { + if (queue) { + nlohmann::json q; + q["name"] = queue->get_name(); + q["pending_task_count"] = queue->count(); + queues.emplace_back(q); } } - ss << "]\n"; + return queues; } task_engine::task_engine(service_node *node) @@ -279,33 +286,30 @@ volatile int *task_engine::get_task_queue_virtual_length_ptr(dsn::task_code code return pl->queues()[idx]->get_virtual_length_ptr(); } -void task_engine::get_runtime_info(const std::string &indent, - const std::vector &args, - /*out*/ std::stringstream &ss) +nlohmann::json task_engine::get_runtime_info(const std::vector &args) const { - std::string indent2 = indent + "\t"; - for (auto &p : _pools) { - if (p) { - ss << indent << p->spec().pool_code.to_string() << std::endl; - p->get_runtime_info(indent2, args, ss); + nlohmann::json info; + // Thread pools. + for (const auto &pool : _pools) { + if (pool) { + info[pool->spec().pool_code.to_string()] = pool->get_runtime_info(args); } } + return info; } -void task_engine::get_queue_info(/*out*/ std::stringstream &ss) +nlohmann::json task_engine::get_queue_info() const { - bool first_flag = 0; - for (auto &p : _pools) { - if (p) { - if (!first_flag) - first_flag = 1; - else - ss << ","; - ss << "\t{\"pool_name\":\"" << p->spec().pool_code << "\",\n\t\"pool_queue\":\n"; - p->get_queue_info(ss); - ss << "}\n"; + nlohmann::json pools; + for (const auto &pool : _pools) { + if (pool) { + nlohmann::json p; + p["name"] = pool->spec().pool_code.to_string(); + p["queue"] = pool->get_queue_info(); + pools.emplace_back(p); } } + return pools; } void task_engine::register_cli_commands() diff --git a/src/runtime/task/task_engine.h b/src/runtime/task/task_engine.h index cbff4e3514f..045379a3349 100644 --- a/src/runtime/task/task_engine.h +++ b/src/runtime/task/task_engine.h @@ -32,6 +32,8 @@ #include #include +#include + #include "runtime/task/task_code.h" #include "utils/command_manager.h" #include "utils/threadpool_spec.h" @@ -72,10 +74,8 @@ class task_worker_pool bool shared_same_worker_with_current_task(task *task) const; task_engine *engine() const { return _owner; } service_node *node() const { return _node; } - void get_runtime_info(const std::string &indent, - const std::vector &args, - /*out*/ std::stringstream &ss); - void get_queue_info(/*out*/ std::stringstream &ss); + nlohmann::json get_runtime_info(const std::vector &args) const; + nlohmann::json get_queue_info() const; std::vector &queues() { return _queues; } std::vector &workers() { return _workers; } @@ -118,10 +118,8 @@ class task_engine volatile int *get_task_queue_virtual_length_ptr(dsn::task_code code, int hash); service_node *node() const { return _node; } - void get_runtime_info(const std::string &indent, - const std::vector &args, - /*out*/ std::stringstream &ss); - void get_queue_info(/*out*/ std::stringstream &ss); + nlohmann::json get_runtime_info(const std::vector &args) const; + nlohmann::json get_queue_info() const; private: void register_cli_commands(); diff --git a/src/runtime/test/task_engine.cpp b/src/runtime/test/task_engine.cpp index 38374f45584..5fa39262503 100644 --- a/src/runtime/test/task_engine.cpp +++ b/src/runtime/test/task_engine.cpp @@ -58,9 +58,7 @@ TEST(core, task_engine) ASSERT_TRUE(engine->is_started()); std::vector args; - std::stringstream oss; - engine->get_runtime_info(" ", args, oss); - printf("%s\n", oss.str().c_str()); + fmt::print(stdout, "{}\n", engine->get_runtime_info(args).dump()); std::vector &pools = engine->pools(); for (size_t i = 0; i < pools.size(); ++i) { diff --git a/src/server/main.cpp b/src/server/main.cpp index 7eb9253fab8..23f27c61e7f 100644 --- a/src/server/main.cpp +++ b/src/server/main.cpp @@ -26,6 +26,8 @@ #include #include +#include + #include "backup_types.h" #include "compaction_operation.h" #include "info_collector_app.h" @@ -91,17 +93,18 @@ int main(int argc, char **argv) dsn_app_registration_pegasus(); std::unique_ptr server_info_cmd = - dsn::command_manager::instance().register_command( - {"server-info"}, - "server-info - query server information", + dsn::command_manager::instance().register_single_command( "server-info", + "Query server information", + "", [](const std::vector &args) { - char str[100]; - ::dsn::utils::time_ms_to_date_time(dsn::utils::process_start_millis(), str, 100); - std::ostringstream oss; - oss << "Pegasus Server " << PEGASUS_VERSION << " (" << PEGASUS_GIT_COMMIT << ") " - << PEGASUS_BUILD_TYPE << ", Started at " << str; - return oss.str(); + nlohmann::json info; + info["version"] = PEGASUS_VERSION; + info["build_type"] = PEGASUS_BUILD_TYPE; + info["git_SHA"] = PEGASUS_GIT_COMMIT; + info["start_time"] = + ::dsn::utils::time_s_to_date_time(dsn::utils::process_start_millis() / 1000); + return info.dump(2); }); dsn_run(argc, argv, true); diff --git a/src/server/pegasus_manual_compact_service.cpp b/src/server/pegasus_manual_compact_service.cpp index a2cdf4812e7..8b72cd97872 100644 --- a/src/server/pegasus_manual_compact_service.cpp +++ b/src/server/pegasus_manual_compact_service.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -339,31 +340,16 @@ std::string pegasus_manual_compact_service::query_compact_state() const uint64_t start_time_ms = _manual_compact_start_running_time_ms.load(); uint64_t last_finish_time_ms = _manual_compact_last_finish_time_ms.load(); uint64_t last_time_used_ms = _manual_compact_last_time_used_ms.load(); - std::stringstream state; - if (last_finish_time_ms > 0) { - char str[24] = {0}; - dsn::utils::time_ms_to_string(last_finish_time_ms, str); - state << "last finish at [" << str << "]"; - } else { - state << "last finish at [-]"; - } - - if (last_time_used_ms > 0) { - state << ", last used " << last_time_used_ms << " ms"; - } - if (enqueue_time_ms > 0) { - char str[24] = {0}; - dsn::utils::time_ms_to_string(enqueue_time_ms, str); - state << ", recent enqueue at [" << str << "]"; - } - - if (start_time_ms > 0) { - char str[24] = {0}; - dsn::utils::time_ms_to_string(start_time_ms, str); - state << ", recent start at [" << str << "]"; - } - return state.str(); + nlohmann::json info; + info["recent_enqueue_at"] = + enqueue_time_ms > 0 ? dsn::utils::time_s_to_date_time(enqueue_time_ms / 1000) : "-"; + info["recent_start_at"] = + start_time_ms > 0 ? dsn::utils::time_s_to_date_time(start_time_ms / 1000) : "-"; + info["last_finish"] = + last_finish_time_ms > 0 ? dsn::utils::time_s_to_date_time(last_finish_time_ms / 1000) : "-"; + info["last_used"] = last_time_used_ms > 0 ? std::to_string(last_time_used_ms) : "-"; + return info.dump(2); } dsn::replication::manual_compaction_status::type diff --git a/src/shell/commands/node_management.cpp b/src/shell/commands/node_management.cpp index abb06ca2959..7f8bf47dbe7 100644 --- a/src/shell/commands/node_management.cpp +++ b/src/shell/commands/node_management.cpp @@ -657,10 +657,10 @@ bool remote_command(command_executor *e, shell_context *sc, arguments args) } fprintf(stderr, "CALL [%s] [%s] ", n.desc.c_str(), hostname.c_str()); if (results[i].first) { - fprintf(stderr, "succeed: %s\n", results[i].second.c_str()); + fprintf(stderr, "succeed: \n%s\n", results[i].second.c_str()); succeed++; } else { - fprintf(stderr, "failed: %s\n", results[i].second.c_str()); + fprintf(stderr, "failed: \n%s\n", results[i].second.c_str()); failed++; } } diff --git a/src/shell/main.cpp b/src/shell/main.cpp index 186fb92bab4..59eb2bc7d81 100644 --- a/src/shell/main.cpp +++ b/src/shell/main.cpp @@ -333,7 +333,7 @@ static command_executor commands[] = { "remote_command", "send remote command to servers", "[-t all|meta-server|replica-server] [-r|--resolve_ip] [-l ip:port,ip:port...]" - " [arguments...]", + " [arguments...]", remote_command, }, { diff --git a/src/utils/command_manager.cpp b/src/utils/command_manager.cpp index 0c5dd9b1fc0..70a5df2ae79 100644 --- a/src/utils/command_manager.cpp +++ b/src/utils/command_manager.cpp @@ -68,6 +68,18 @@ std::unique_ptr command_manager::register_bool_command( }); } +std::unique_ptr +command_manager::register_single_command(const std::string &command, + const std::string &help, + const std::string &args_help, + command_handler handler) +{ + return register_command({command}, + fmt::format("{} - {}", command, help), + fmt::format("{} {}", command, args_help), + handler); +} + void command_manager::deregister_command(uintptr_t handle) { auto c = reinterpret_cast(handle); diff --git a/src/utils/command_manager.h b/src/utils/command_manager.h index 26a6cb34280..73869f95a3c 100644 --- a/src/utils/command_manager.h +++ b/src/utils/command_manager.h @@ -87,6 +87,12 @@ class command_manager : public ::dsn::utils::singleton }); } + std::unique_ptr + register_single_command(const std::string &command, + const std::string &help, + const std::string &args_help, + command_handler handler) WARN_UNUSED_RESULT; + bool run_command(const std::string &cmd, const std::vector &args, /*out*/ std::string &output); diff --git a/src/utils/test/command_manager_test.cpp b/src/utils/test/command_manager_test.cpp index e6e2598cac5..5a46597027b 100644 --- a/src/utils/test/command_manager_test.cpp +++ b/src/utils/test/command_manager_test.cpp @@ -32,10 +32,10 @@ class command_manager_test : public ::testing::Test public: command_manager_test() { - _cmd = command_manager::instance().register_command( - {"test-cmd"}, - "test-cmd - just for command_manager unit-test", - "test-cmd arg1 arg2 ...", + _cmd = command_manager::instance().register_single_command( + "test-cmd", + "just for command_manager unit-test", + "arg1 arg2 ...", [](const vector &args) { return fmt::format("test-cmd response: [{}]", boost::join(args, " ")); });