Skip to content

Commit

Permalink
[pipeline](API) Add a new API to find pipeline tasks by a specific qu… (
Browse files Browse the repository at this point in the history
#42233)

…ery ID (#35563)

pick #35563
  • Loading branch information
Gabriel39 authored Oct 22, 2024
1 parent 85a98df commit 8877267
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 0 deletions.
30 changes: 30 additions & 0 deletions be/src/http/action/pipeline_task_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,34 @@ void LongPipelineTaskAction::handle(HttpRequest* req) {
ExecEnv::GetInstance()->fragment_mgr()->dump_pipeline_tasks(duration));
}

void QueryPipelineTaskAction::handle(HttpRequest* req) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain; version=0.0.4");
int64_t high = 0;
int64_t low = 0;
try {
auto& query_id_str = req->param("query_id");
if (query_id_str.length() != 16 * 2 + 1) {
HttpChannel::send_reply(
req, HttpStatus::INTERNAL_SERVER_ERROR,
"Invalid query id! Query id should be {hi}-{lo} which is a hexadecimal. \n");
return;
}
from_hex(&high, query_id_str.substr(0, 16));
from_hex(&low, query_id_str.substr(17));
} catch (const std::exception& e) {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "invalid argument.query_id: {}, meet error: {}. \n",
req->param("query_id"), e.what());
LOG(WARNING) << fmt::to_string(debug_string_buffer);
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
fmt::to_string(debug_string_buffer));
return;
}
TUniqueId query_id;
query_id.hi = high;
query_id.lo = low;
HttpChannel::send_reply(req, HttpStatus::OK,
ExecEnv::GetInstance()->fragment_mgr()->dump_pipeline_tasks(query_id));
}

} // end namespace doris
9 changes: 9 additions & 0 deletions be/src/http/action/pipeline_task_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,13 @@ class LongPipelineTaskAction : public HttpHandler {
void handle(HttpRequest* req) override;
};

class QueryPipelineTaskAction : public HttpHandler {
public:
QueryPipelineTaskAction() = default;

~QueryPipelineTaskAction() override = default;

void handle(HttpRequest* req) override;
};

} // end namespace doris
2 changes: 2 additions & 0 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ class PipelineFragmentContext : public TaskExecutionContext {

uint64_t create_time() const { return _create_time; }

uint64_t elapsed_time() const { return _fragment_watcher.elapsed_time(); }

protected:
Status _create_sink(int sender_id, const TDataSink& t_data_sink, RuntimeState* state);
Status _build_pipelines(ExecNode*, PipelinePtr);
Expand Down
8 changes: 8 additions & 0 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,14 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) {
return fmt::to_string(debug_string_buffer);
}

std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& query_id) {
if (auto q_ctx = get_query_context(query_id)) {
return q_ctx->print_all_pipeline_context();
} else {
return fmt::format("Query context (query id = {}) not found. \n", print_id(query_id));
}
}

Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
QuerySource query_source, const FinishCallback& cb) {
VLOG_ROW << "query: " << print_id(params.query_id) << " exec_plan_fragment params is "
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ class FragmentMgr : public RestMonitorIface {
}

std::string dump_pipeline_tasks(int64_t duration = 0);
std::string dump_pipeline_tasks(TUniqueId& query_id);

void get_runtime_query_info(std::vector<WorkloadQueryInfo>* _query_info_list);

Expand Down
27 changes: 27 additions & 0 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,33 @@ Status QueryContext::cancel_pipeline_context(const int fragment_id,
return Status::OK();
}

std::string QueryContext::print_all_pipeline_context() {
std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> ctx_to_print;
fmt::memory_buffer debug_string_buffer;
size_t i = 0;
{
fmt::format_to(debug_string_buffer, "{} pipeline fragment contexts in query {}. \n",
_fragment_id_to_pipeline_ctx.size(), print_id(_query_id));

{
std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
ctx_to_print.push_back(f_context);
}
}
for (auto& f_context : ctx_to_print) {
if (auto pipeline_ctx = f_context.lock()) {
auto elapsed = pipeline_ctx->elapsed_time() / 1000000000.0;
fmt::format_to(debug_string_buffer,
"No.{} (elapse_second={}s, fragment_id={}) : {}\n", i, elapsed,
pipeline_ctx->get_fragment_id(), pipeline_ctx->debug_string());
i++;
}
}
}
return fmt::to_string(debug_string_buffer);
}

void QueryContext::set_pipeline_context(
const int fragment_id, std::shared_ptr<pipeline::PipelineFragmentContext> pip_ctx) {
std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class QueryContext {
const std::string& msg);
Status cancel_pipeline_context(const int fragment_id, const PPlanFragmentCancelReason& reason,
const std::string& msg);
std::string print_all_pipeline_context();
void set_pipeline_context(const int fragment_id,
std::shared_ptr<pipeline::PipelineFragmentContext> pip_ctx);
void cancel(std::string msg, Status new_status, int fragment_id = -1);
Expand Down
4 changes: 4 additions & 0 deletions be/src/service/http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ Status HttpService::start() {
LoadStreamAction* load_stream_action = _pool.add(new LoadStreamAction());
_ev_http_server->register_handler(HttpMethod::GET, "/api/load_streams", load_stream_action);

QueryPipelineTaskAction* query_pipeline_task_action = _pool.add(new QueryPipelineTaskAction());
_ev_http_server->register_handler(HttpMethod::GET, "/api/query_pipeline_tasks/{query_id}",
query_pipeline_task_action);

// Register Tablets Info action
TabletsInfoAction* tablets_info_action =
_pool.add(new TabletsInfoAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
Expand Down

0 comments on commit 8877267

Please sign in to comment.