Skip to content

Commit

Permalink
[pipelineX](api) Add api for long-running tasks (apache#32459)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Mar 19, 2024
1 parent 8c11755 commit 4ea31a3
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 9 deletions.
18 changes: 18 additions & 0 deletions be/src/http/action/pipeline_task_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,22 @@ void PipelineTaskAction::handle(HttpRequest* req) {
ExecEnv::GetInstance()->fragment_mgr()->dump_pipeline_tasks());
}

void LongPipelineTaskAction::handle(HttpRequest* req) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain; version=0.0.4");
int64_t duration = 0;
try {
duration = std::stoll(req->param("duration"));
} catch (const std::exception& e) {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "invalid argument.duration: {}, meet error: {}",
req->param("duration"), e.what());
LOG(WARNING) << fmt::to_string(debug_string_buffer);
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
fmt::to_string(debug_string_buffer));
return;
}
HttpChannel::send_reply(req, HttpStatus::OK,
ExecEnv::GetInstance()->fragment_mgr()->dump_pipeline_tasks(duration));
}

} // end namespace doris
9 changes: 9 additions & 0 deletions be/src/http/action/pipeline_task_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,13 @@ class PipelineTaskAction : public HttpHandler {
void handle(HttpRequest* req) override;
};

class LongPipelineTaskAction : public HttpHandler {
public:
LongPipelineTaskAction() = default;

~LongPipelineTaskAction() override = default;

void handle(HttpRequest* req) override;
};

} // end namespace doris
6 changes: 3 additions & 3 deletions be/src/pipeline/pipeline_x/pipeline_x_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,11 @@ std::string PipelineXTask::debug_string() {
fmt::format_to(debug_string_buffer, "InstanceId: {}\n",
print_id(_state->fragment_instance_id()));

auto elapsed = (MonotonicNanos() - _fragment_context->create_time()) / 1000000000.0;
fmt::format_to(debug_string_buffer,
"PipelineTask[this = {}, state = {}, dry run = {}, elapse time "
"= {}ns], block dependency = {}, is running = {}\noperators: ",
(void*)this, get_state_name(_cur_state), _dry_run,
MonotonicNanos() - _fragment_context->create_time(),
"= {}s], block dependency = {}, is running = {}\noperators: ",
(void*)this, get_state_name(_cur_state), _dry_run, elapsed,
_blocked_dep ? _blocked_dep->debug_string() : "NULL", is_running());
for (size_t i = 0; i < _operators.size(); i++) {
fmt::format_to(
Expand Down
12 changes: 8 additions & 4 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
return Status::OK();
}

std::string FragmentMgr::dump_pipeline_tasks() {
std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) {
fmt::memory_buffer debug_string_buffer;
auto t = MonotonicNanos();
size_t i = 0;
Expand All @@ -773,9 +773,13 @@ std::string FragmentMgr::dump_pipeline_tasks() {
fmt::format_to(debug_string_buffer, "{} pipeline fragment contexts are still running!\n",
_pipeline_map.size());
for (auto& it : _pipeline_map) {
fmt::format_to(
debug_string_buffer, "No.{} (elapse time = {}ns, InstanceId = {}) : {}\n", i,
t - it.second->create_time(), print_id(it.first), it.second->debug_string());
auto elapsed = (t - it.second->create_time()) / 1000000000.0;
if (elapsed < duration) {
// Only display tasks which has been running for more than {duration} seconds.
continue;
}
fmt::format_to(debug_string_buffer, "No.{} (elapse time = {}s, InstanceId = {}) : {}\n",
i, elapsed, print_id(it.first), it.second->debug_string());
i++;
}
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class FragmentMgr : public RestMonitorIface {
return _query_ctx_map.size();
}

std::string dump_pipeline_tasks();
std::string dump_pipeline_tasks(int64_t duration = 0);

void get_runtime_query_info(std::vector<WorkloadQueryInfo>* _query_info_list);

Expand Down
7 changes: 6 additions & 1 deletion be/src/service/http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,16 @@ Status HttpService::start() {
HealthAction* health_action = _pool.add(new HealthAction());
_ev_http_server->register_handler(HttpMethod::GET, "/api/health", health_action);

// Register BE health action
// Dump all running pipeline tasks
PipelineTaskAction* pipeline_task_action = _pool.add(new PipelineTaskAction());
_ev_http_server->register_handler(HttpMethod::GET, "/api/running_pipeline_tasks",
pipeline_task_action);

// Dump all running pipeline tasks which has been running for more than {duration} seconds
LongPipelineTaskAction* long_pipeline_task_action = _pool.add(new LongPipelineTaskAction());
_ev_http_server->register_handler(HttpMethod::GET, "/api/running_pipeline_tasks/{duration}",
long_pipeline_task_action);

// Register Tablets Info action
TabletsInfoAction* tablets_info_action =
_pool.add(new TabletsInfoAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
Expand Down

0 comments on commit 4ea31a3

Please sign in to comment.