diff --git a/be/src/http/action/pipeline_task_action.cpp b/be/src/http/action/pipeline_task_action.cpp index b19b42c9468563..b6a7cabe514e3b 100644 --- a/be/src/http/action/pipeline_task_action.cpp +++ b/be/src/http/action/pipeline_task_action.cpp @@ -56,4 +56,34 @@ void LongPipelineTaskAction::handle(HttpRequest* req) { ExecEnv::GetInstance()->fragment_mgr()->dump_pipeline_tasks(duration)); } +void QueryPipelineTaskAction::handle(HttpRequest* req) { + req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain; version=0.0.4"); + int64_t high = 0; + int64_t low = 0; + try { + auto& query_id_str = req->param("query_id"); + if (query_id_str.length() != 16 * 2 + 1) { + HttpChannel::send_reply( + req, HttpStatus::INTERNAL_SERVER_ERROR, + "Invalid query id! Query id should be {hi}-{lo} which is a hexadecimal. \n"); + return; + } + from_hex(&high, query_id_str.substr(0, 16)); + from_hex(&low, query_id_str.substr(17)); + } catch (const std::exception& e) { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, "invalid argument.query_id: {}, meet error: {}. \n", + req->param("query_id"), e.what()); + LOG(WARNING) << fmt::to_string(debug_string_buffer); + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, + fmt::to_string(debug_string_buffer)); + return; + } + TUniqueId query_id; + query_id.hi = high; + query_id.lo = low; + HttpChannel::send_reply(req, HttpStatus::OK, + ExecEnv::GetInstance()->fragment_mgr()->dump_pipeline_tasks(query_id)); +} + } // end namespace doris diff --git a/be/src/http/action/pipeline_task_action.h b/be/src/http/action/pipeline_task_action.h index 553ac856e6f1c9..23c1a17464fec6 100644 --- a/be/src/http/action/pipeline_task_action.h +++ b/be/src/http/action/pipeline_task_action.h @@ -41,4 +41,13 @@ class LongPipelineTaskAction : public HttpHandler { void handle(HttpRequest* req) override; }; +class QueryPipelineTaskAction : public HttpHandler { +public: + QueryPipelineTaskAction() = default; + + ~QueryPipelineTaskAction() override = default; + + void handle(HttpRequest* req) override; +}; + } // end namespace doris diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index ce9b10be4ffac0..b6ed9d42fd05ff 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -139,6 +139,8 @@ class PipelineFragmentContext : public TaskExecutionContext { uint64_t create_time() const { return _create_time; } + uint64_t elapsed_time() const { return _fragment_watcher.elapsed_time(); } + protected: Status _create_sink(int sender_id, const TDataSink& t_data_sink, RuntimeState* state); Status _build_pipelines(ExecNode*, PipelinePtr); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 8473c885a8b1f6..cd8eabb06424c3 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -952,6 +952,14 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) { return fmt::to_string(debug_string_buffer); } +std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& query_id) { + if (auto q_ctx = get_query_context(query_id)) { + return q_ctx->print_all_pipeline_context(); + } else { + return fmt::format("Query context (query id = {}) not found. \n", print_id(query_id)); + } +} + Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, QuerySource query_source, const FinishCallback& cb) { VLOG_ROW << "query: " << print_id(params.query_id) << " exec_plan_fragment params is " diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 608ee522bad0cc..16ad368ae6108f 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -148,6 +148,7 @@ class FragmentMgr : public RestMonitorIface { } std::string dump_pipeline_tasks(int64_t duration = 0); + std::string dump_pipeline_tasks(TUniqueId& query_id); void get_runtime_query_info(std::vector* _query_info_list); diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index ec559068e1968b..fa4321ad6c4d12 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -279,6 +279,33 @@ Status QueryContext::cancel_pipeline_context(const int fragment_id, return Status::OK(); } +std::string QueryContext::print_all_pipeline_context() { + std::vector> ctx_to_print; + fmt::memory_buffer debug_string_buffer; + size_t i = 0; + { + fmt::format_to(debug_string_buffer, "{} pipeline fragment contexts in query {}. \n", + _fragment_id_to_pipeline_ctx.size(), print_id(_query_id)); + + { + std::lock_guard lock(_pipeline_map_write_lock); + for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) { + ctx_to_print.push_back(f_context); + } + } + for (auto& f_context : ctx_to_print) { + if (auto pipeline_ctx = f_context.lock()) { + auto elapsed = pipeline_ctx->elapsed_time() / 1000000000.0; + fmt::format_to(debug_string_buffer, + "No.{} (elapse_second={}s, fragment_id={}) : {}\n", i, elapsed, + pipeline_ctx->get_fragment_id(), pipeline_ctx->debug_string()); + i++; + } + } + } + return fmt::to_string(debug_string_buffer); +} + void QueryContext::set_pipeline_context( const int fragment_id, std::shared_ptr pip_ctx) { std::lock_guard lock(_pipeline_map_write_lock); diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 83b934df58a95f..862da39bfae1a9 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -122,6 +122,7 @@ class QueryContext { const std::string& msg); Status cancel_pipeline_context(const int fragment_id, const PPlanFragmentCancelReason& reason, const std::string& msg); + std::string print_all_pipeline_context(); void set_pipeline_context(const int fragment_id, std::shared_ptr pip_ctx); void cancel(std::string msg, Status new_status, int fragment_id = -1); diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index a46e9c569d5f00..dc356b77731b86 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -202,6 +202,10 @@ Status HttpService::start() { LoadStreamAction* load_stream_action = _pool.add(new LoadStreamAction()); _ev_http_server->register_handler(HttpMethod::GET, "/api/load_streams", load_stream_action); + QueryPipelineTaskAction* query_pipeline_task_action = _pool.add(new QueryPipelineTaskAction()); + _ev_http_server->register_handler(HttpMethod::GET, "/api/query_pipeline_tasks/{query_id}", + query_pipeline_task_action); + // Register Tablets Info action TabletsInfoAction* tablets_info_action = _pool.add(new TabletsInfoAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));