From 42a7f7b4717064624f1d14a2ee233cdac347b3f3 Mon Sep 17 00:00:00 2001 From: Yingchun Lai Date: Tue, 20 Feb 2024 23:30:59 +0800 Subject: [PATCH] src/runtime/task/task_engine.cpp --- src/runtime/task/task_engine.cpp | 43 ++++++++++++++++---------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/src/runtime/task/task_engine.cpp b/src/runtime/task/task_engine.cpp index 0688952084..7728df11bd 100644 --- a/src/runtime/task/task_engine.cpp +++ b/src/runtime/task/task_engine.cpp @@ -189,18 +189,6 @@ nlohmann::json task_worker_pool::get_runtime_info(const std::vector nlohmann::json info; // Queues. - nlohmann::json workers; - for (const auto &worker : _workers) { - if (worker) { - nlohmann::json w; - w["TID"] = worker->native_tid(); - w["queue_name"] = worker->queue()->get_name(); - workers.emplace_back(w); - } - } - info["threads"] = workers; - - // Threads. nlohmann::json queues; for (const auto &queue : _queues) { if (queue) { @@ -212,13 +200,26 @@ nlohmann::json task_worker_pool::get_runtime_info(const std::vector } info["queues"] = queues; + // Threads. + nlohmann::json workers; + for (const auto &worker : _workers) { + if (worker) { + nlohmann::json w; + w["index"] = worker->index(); + w["TID"] = worker->native_tid(); + w["queue_name"] = worker->queue()->get_name(); + workers.emplace_back(w); + } + } + info["threads"] = workers; + return info; } nlohmann::json task_worker_pool::get_queue_info() const { nlohmann::json queues; - for (auto &queue : _queues) { + for (const auto &queue : _queues) { if (queue) { nlohmann::json q; q["name"] = queue->get_name(); @@ -288,14 +289,13 @@ volatile int *task_engine::get_task_queue_virtual_length_ptr(dsn::task_code code nlohmann::json task_engine::get_runtime_info(const std::vector &args) const { - nlohmann::json info; - // Thread pools. + nlohmann::json pools; for (const auto &pool : _pools) { if (pool) { - info[pool->spec().pool_code.to_string()] = pool->get_runtime_info(args); + pools[pool->spec().pool_code.to_string()] = pool->get_runtime_info(args); } } - return info; + return pools; } nlohmann::json task_engine::get_queue_info() const @@ -321,7 +321,7 @@ void task_engine::register_cli_commands() "Get the current or set a new max task queue length of a specific thread_pool. It can " "be set it to INT_MAX which means a big enough value, but it can't be cancelled the " "delay/reject policy dynamically", - " [set_to_new_max_queue_length]", + " [new_max_queue_length]", [this](const std::vector &args) { if (args.empty()) { return std::string("ERR: invalid arguments, task.queue_max_length " @@ -333,19 +333,20 @@ void task_engine::register_cli_commands() continue; } if (pool->_spec.pool_code.to_string() == args[0]) { - // when args length is 1, return current value + // Query. if (args.size() == 1) { return fmt::format("The current task queue length of {} is {}", args[0], pool->_spec.queue_length_throttling_threshold); } + + // Update. if (args.size() == 2) { int new_queue_length = INT_MAX; if ((args[1] != "INT_MAX" && !dsn::buf2int32(args[1], new_queue_length)) || new_queue_length < 0) { - return fmt::format( - "queue_max_length must be >= 0, or set 'INT_MAX'"); + return fmt::format("queue_max_length must be >= 0 or 'INT_MAX'"); } pool->_spec.queue_length_throttling_threshold = new_queue_length; return fmt::format("Task queue {} is updated to new max length {}",