Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pipeline](API) Add a new API to find pipeline tasks by a specific query ID #35563

Merged
merged 1 commit into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions be/src/http/action/pipeline_task_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,34 @@ void LongPipelineTaskAction::handle(HttpRequest* req) {
ExecEnv::GetInstance()->fragment_mgr()->dump_pipeline_tasks(duration));
}

void QueryPipelineTaskAction::handle(HttpRequest* req) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain; version=0.0.4");
int64_t high = 0;
int64_t low = 0;
try {
auto& query_id_str = req->param("query_id");
if (query_id_str.length() != 16 * 2 + 1) {
HttpChannel::send_reply(
req, HttpStatus::INTERNAL_SERVER_ERROR,
"Invalid query id! Query id should be {hi}-{lo} which is a hexadecimal. \n");
return;
}
from_hex(&high, query_id_str.substr(0, 16));
from_hex(&low, query_id_str.substr(17));
} catch (const std::exception& e) {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "invalid argument.query_id: {}, meet error: {}. \n",
req->param("query_id"), e.what());
LOG(WARNING) << fmt::to_string(debug_string_buffer);
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
fmt::to_string(debug_string_buffer));
return;
}
TUniqueId query_id;
query_id.hi = high;
query_id.lo = low;
HttpChannel::send_reply(req, HttpStatus::OK,
ExecEnv::GetInstance()->fragment_mgr()->dump_pipeline_tasks(query_id));
}

} // end namespace doris
9 changes: 9 additions & 0 deletions be/src/http/action/pipeline_task_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,13 @@ class LongPipelineTaskAction : public HttpHandler {
void handle(HttpRequest* req) override;
};

class QueryPipelineTaskAction : public HttpHandler {
public:
QueryPipelineTaskAction() = default;

~QueryPipelineTaskAction() override = default;

void handle(HttpRequest* req) override;
};

} // end namespace doris
8 changes: 8 additions & 0 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,14 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) {
return fmt::to_string(debug_string_buffer);
}

std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& query_id) {
if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
return q_ctx->print_all_pipeline_context();
} else {
return fmt::format("Query context (query id = {}) not found. \n", print_id(query_id));
}
}

Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
const FinishCallback& cb) {
VLOG_ROW << "query: " << print_id(params.query_id) << " exec_plan_fragment params is "
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ class FragmentMgr : public RestMonitorIface {
}

std::string dump_pipeline_tasks(int64_t duration = 0);
std::string dump_pipeline_tasks(TUniqueId& query_id);

void get_runtime_query_info(std::vector<WorkloadQueryInfo>* _query_info_list);

Expand Down
27 changes: 27 additions & 0 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,33 @@ void QueryContext::cancel_all_pipeline_context(const Status& reason) {
}
}

std::string QueryContext::print_all_pipeline_context() {
std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> ctx_to_print;
fmt::memory_buffer debug_string_buffer;
size_t i = 0;
{
fmt::format_to(debug_string_buffer, "{} pipeline fragment contexts in query {}. \n",
_fragment_id_to_pipeline_ctx.size(), print_id(_query_id));

{
std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
ctx_to_print.push_back(f_context);
}
}
for (auto& f_context : ctx_to_print) {
if (auto pipeline_ctx = f_context.lock()) {
auto elapsed = pipeline_ctx->elapsed_time() / 1000000000.0;
fmt::format_to(debug_string_buffer,
"No.{} (elapse_second={}s, fragment_id={}) : {}\n", i, elapsed,
pipeline_ctx->get_fragment_id(), pipeline_ctx->debug_string());
i++;
}
}
}
return fmt::to_string(debug_string_buffer);
}

Status QueryContext::cancel_pipeline_context(const int fragment_id, const Status& reason) {
std::weak_ptr<pipeline::PipelineFragmentContext> ctx_to_cancel;
{
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class QueryContext {
[[nodiscard]] bool is_cancelled() const { return !_exec_status.ok(); }

void cancel_all_pipeline_context(const Status& reason);
std::string print_all_pipeline_context();
Status cancel_pipeline_context(const int fragment_id, const Status& reason);
void set_pipeline_context(const int fragment_id,
std::shared_ptr<pipeline::PipelineFragmentContext> pip_ctx);
Expand Down
5 changes: 5 additions & 0 deletions be/src/service/http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ Status HttpService::start() {
_ev_http_server->register_handler(HttpMethod::GET, "/api/running_pipeline_tasks/{duration}",
long_pipeline_task_action);

// Dump all running pipeline tasks which has been running for more than {duration} seconds
QueryPipelineTaskAction* query_pipeline_task_action = _pool.add(new QueryPipelineTaskAction());
_ev_http_server->register_handler(HttpMethod::GET, "/api/query_pipeline_tasks/{query_id}",
query_pipeline_task_action);

// Register Tablets Info action
TabletsInfoAction* tablets_info_action =
_pool.add(new TabletsInfoAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
Expand Down
Loading