Skip to content

Commit

Permalink
src/runtime/task/task_engine.cpp
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Feb 20, 2024
1 parent ddd086b commit 42a7f7b
Showing 1 changed file with 22 additions and 21 deletions.
43 changes: 22 additions & 21 deletions src/runtime/task/task_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,18 +189,6 @@ nlohmann::json task_worker_pool::get_runtime_info(const std::vector<std::string>
nlohmann::json info;

// Queues.
nlohmann::json workers;
for (const auto &worker : _workers) {
if (worker) {
nlohmann::json w;
w["TID"] = worker->native_tid();
w["queue_name"] = worker->queue()->get_name();
workers.emplace_back(w);
}
}
info["threads"] = workers;

// Threads.
nlohmann::json queues;
for (const auto &queue : _queues) {
if (queue) {
Expand All @@ -212,13 +200,26 @@ nlohmann::json task_worker_pool::get_runtime_info(const std::vector<std::string>
}
info["queues"] = queues;

// Threads.
nlohmann::json workers;
for (const auto &worker : _workers) {
if (worker) {
nlohmann::json w;
w["index"] = worker->index();
w["TID"] = worker->native_tid();
w["queue_name"] = worker->queue()->get_name();
workers.emplace_back(w);
}
}
info["threads"] = workers;

return info;
}

nlohmann::json task_worker_pool::get_queue_info() const
{
nlohmann::json queues;
for (auto &queue : _queues) {
for (const auto &queue : _queues) {
if (queue) {
nlohmann::json q;
q["name"] = queue->get_name();
Expand Down Expand Up @@ -288,14 +289,13 @@ volatile int *task_engine::get_task_queue_virtual_length_ptr(dsn::task_code code

nlohmann::json task_engine::get_runtime_info(const std::vector<std::string> &args) const
{
nlohmann::json info;
// Thread pools.
nlohmann::json pools;
for (const auto &pool : _pools) {
if (pool) {
info[pool->spec().pool_code.to_string()] = pool->get_runtime_info(args);
pools[pool->spec().pool_code.to_string()] = pool->get_runtime_info(args);
}
}
return info;
return pools;
}

nlohmann::json task_engine::get_queue_info() const
Expand All @@ -321,7 +321,7 @@ void task_engine::register_cli_commands()
"Get the current or set a new max task queue length of a specific thread_pool. It can "
"be set it to INT_MAX which means a big enough value, but it can't be cancelled the "
"delay/reject policy dynamically",
"<pool_code> [set_to_new_max_queue_length]",
"<pool_code> [new_max_queue_length]",
[this](const std::vector<std::string> &args) {
if (args.empty()) {
return std::string("ERR: invalid arguments, task.queue_max_length <pool_code> "
Expand All @@ -333,19 +333,20 @@ void task_engine::register_cli_commands()
continue;
}
if (pool->_spec.pool_code.to_string() == args[0]) {
// when args length is 1, return current value
// Query.
if (args.size() == 1) {
return fmt::format("The current task queue length of {} is {}",
args[0],
pool->_spec.queue_length_throttling_threshold);
}

// Update.
if (args.size() == 2) {
int new_queue_length = INT_MAX;
if ((args[1] != "INT_MAX" &&
!dsn::buf2int32(args[1], new_queue_length)) ||
new_queue_length < 0) {
return fmt::format(
"queue_max_length must be >= 0, or set 'INT_MAX'");
return fmt::format("queue_max_length must be >= 0 or 'INT_MAX'");
}
pool->_spec.queue_length_throttling_threshold = new_queue_length;
return fmt::format("Task queue {} is updated to new max length {}",
Expand Down

0 comments on commit 42a7f7b

Please sign in to comment.