diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index a3dff107f1bd3a5..2ffecc2ccd9768c 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -145,8 +145,9 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason, LOG_INFO("PipelineXFragmentContext::cancel") .tag("query_id", print_id(_query_id)) .tag("fragment_id", _fragment_id) - .tag("reason", reason) + .tag("reason", PPlanFragmentCancelReason_Name(reason)) .tag("error message", msg); + if (reason == PPlanFragmentCancelReason::TIMEOUT) { LOG(WARNING) << "PipelineXFragmentContext is cancelled due to timeout : " << debug_string(); } diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index 4b9fa57ce64b650..bc3f7e3a9fce3ee 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -104,7 +104,10 @@ BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size) } BufferControlBlock::~BufferControlBlock() { - cancel(); + std::unique_lock l(_lock); + _data_removal.notify_all(); + _data_arrival.notify_all(); + _waiting_rpc.clear(); } Status BufferControlBlock::init() { @@ -248,7 +251,6 @@ Status BufferControlBlock::close(Status exec_status) { std::unique_lock l(_lock); _is_close = true; _status = exec_status; - // notify blocked get thread _data_arrival.notify_all(); if (!_waiting_rpc.empty()) { @@ -266,13 +268,13 @@ Status BufferControlBlock::close(Status exec_status) { return Status::OK(); } -void BufferControlBlock::cancel() { +void BufferControlBlock::cancel_by_timeout() { std::unique_lock l(_lock); _is_cancelled = true; _data_removal.notify_all(); _data_arrival.notify_all(); for (auto& ctx : _waiting_rpc) { - ctx->on_failure(Status::Cancelled("Cancelled")); + ctx->on_failure(Status::TimedOut("Query timeout")); } _waiting_rpc.clear(); } @@ -301,8 +303,8 @@ Status PipBufferControlBlock::get_arrow_batch(std::shared_ptr* result) override; - void cancel() override; + void cancel_by_timeout() override; void set_dependency(std::shared_ptr result_sink_dependency); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index acf622b4196d2f4..2e5c7966929ac60 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1249,7 +1249,7 @@ void FragmentMgr::cancel_worker() { clock_gettime(CLOCK_MONOTONIC, &check_invalid_query_last_timestamp); do { - std::vector to_cancel; + std::vector queries_timeout; std::vector queries_to_cancel; std::vector queries_pipeline_task_leak; // Fe process uuid -> set @@ -1274,7 +1274,7 @@ void FragmentMgr::cancel_worker() { std::lock_guard lock(_lock); for (auto& fragment_instance_itr : _fragment_instance_map) { if (fragment_instance_itr.second->is_timeout(now)) { - to_cancel.push_back(fragment_instance_itr.second->fragment_instance_id()); + queries_timeout.push_back(fragment_instance_itr.second->fragment_instance_id()); } } for (auto& pipeline_itr : _pipeline_map) { @@ -1283,7 +1283,7 @@ void FragmentMgr::cancel_worker() { reinterpret_cast(pipeline_itr.second.get()) ->instance_ids(ins_ids); for (auto& ins_id : ins_ids) { - to_cancel.push_back(ins_id); + queries_timeout.push_back(ins_id); } } else { pipeline_itr.second->clear_finished_tasks(); @@ -1393,9 +1393,9 @@ void FragmentMgr::cancel_worker() { // TODO(zhiqiang): It seems that timeout_canceled_fragment_count is // designed to count canceled fragment of non-pipeline query. - timeout_canceled_fragment_count->increment(to_cancel.size()); - for (auto& id : to_cancel) { - cancel_instance(id, PPlanFragmentCancelReason::TIMEOUT); + timeout_canceled_fragment_count->increment(queries_timeout.size()); + for (auto& id : queries_timeout) { + cancel_instance(id, PPlanFragmentCancelReason::TIMEOUT, "Query timeout on backends."); LOG(INFO) << "FragmentMgr cancel worker going to cancel timeout instance " << print_id(id); } diff --git a/be/src/runtime/result_buffer_mgr.cpp b/be/src/runtime/result_buffer_mgr.cpp index 3d96c1871b9af32..558122cc4f06307 100644 --- a/be/src/runtime/result_buffer_mgr.cpp +++ b/be/src/runtime/result_buffer_mgr.cpp @@ -150,13 +150,13 @@ Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id, return Status::OK(); } -void ResultBufferMgr::cancel(const TUniqueId& query_id) { +void ResultBufferMgr::cancel_by_timeout(const TUniqueId& query_id) { { std::unique_lock wlock(_buffer_map_lock); BufferMap::iterator iter = _buffer_map.find(query_id); if (_buffer_map.end() != iter) { - iter->second->cancel(); + iter->second->cancel_by_timeout(); _buffer_map.erase(iter); } } @@ -189,7 +189,7 @@ void ResultBufferMgr::cancel_thread() { do { // get query - std::vector query_to_cancel; + std::vector queries_timeout; time_t now_time = time(nullptr); { std::lock_guard l(_timeout_lock); @@ -197,7 +197,7 @@ void ResultBufferMgr::cancel_thread() { for (TimeoutMap::iterator iter = _timeout_map.begin(); iter != end; ++iter) { for (int i = 0; i < iter->second.size(); ++i) { - query_to_cancel.push_back(iter->second[i]); + queries_timeout.push_back(iter->second[i]); } } @@ -205,8 +205,8 @@ void ResultBufferMgr::cancel_thread() { } // cancel query - for (int i = 0; i < query_to_cancel.size(); ++i) { - cancel(query_to_cancel[i]); + for (int i = 0; i < queries_timeout.size(); ++i) { + cancel_by_timeout(queries_timeout[i]); } } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(1))); diff --git a/be/src/runtime/result_buffer_mgr.h b/be/src/runtime/result_buffer_mgr.h index e6ae0cc10420eaf..bdf1e9266509a7f 100644 --- a/be/src/runtime/result_buffer_mgr.h +++ b/be/src/runtime/result_buffer_mgr.h @@ -71,7 +71,7 @@ class ResultBufferMgr { std::shared_ptr find_arrow_schema(const TUniqueId& query_id); // cancel - void cancel(const TUniqueId& fragment_id); + void cancel_by_timeout(const TUniqueId& fragment_id); // cancel one query at a future time. void cancel_at_time(time_t cancel_time, const TUniqueId& query_id); diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java index 0623932bc9dad2d..d6bb25e9533e9a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java @@ -576,7 +576,7 @@ public Object killQuery(HttpServletRequest request, HttpServletResponse response } ExecuteEnv env = ExecuteEnv.getInstance(); - env.getScheduler().cancelQuery(queryId); + env.getScheduler().cancelQuery(queryId, "cancel query by rest api"); return ResponseEntityBuilder.ok(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index ee5abed8392e5b8..b4f52808f4b04bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -221,7 +221,7 @@ protected void executeCancelLogic() { } isCanceled.getAndSet(true); if (null != stmtExecutor) { - stmtExecutor.cancel(); + stmtExecutor.cancel("insert task cancelled"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 59a421509d9c758..966291bd7aa3909 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -258,7 +258,7 @@ public synchronized void onSuccess() throws JobException { protected synchronized void executeCancelLogic() { LOG.info("mtmv task cancel, taskId: {}", super.getTaskId()); if (executor != null) { - executor.cancel(); + executor.cancel("mtmv task cancelled"); } after(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java index 1424f3bc3010cf7..0e434b0b82047c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java @@ -162,7 +162,7 @@ public void cancel() throws JobException { } isCanceled.getAndSet(true); if (stmtExecutor != null) { - stmtExecutor.cancel(); + stmtExecutor.cancel("export task cancelled"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 737eb33b584be9b..f02c0b289b88d50 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -600,7 +600,7 @@ protected void unprotectedExecuteCancel(FailMsg failMsg, boolean abortTxn) { for (TUniqueId loadId : loadIds) { Coordinator coordinator = QeProcessorImpl.INSTANCE.getCoordinator(loadId); if (coordinator != null) { - coordinator.cancel(); + coordinator.cancel(failMsg.getMsg()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java index 064b9abfcac379d..58a45031ffa8e95 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java @@ -143,7 +143,7 @@ protected final void execImpl(StmtExecutor executor, long jobId) throws Exceptio } boolean notTimeout = coordinator.join(execTimeout); if (!coordinator.isDone()) { - coordinator.cancel(); + coordinator.cancel("insert timeout"); if (notTimeout) { errMsg = coordinator.getExecStatus().getErrorMsg(); ErrorReport.reportDdlException("there exists unhealthy backend. " diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index ba53c14e1dc8ae0..c5622d54a140236 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -935,7 +935,7 @@ public void kill(boolean killConnection) { closeChannel(); } // Now, cancel running query. - cancelQuery(); + cancelQuery("cancel query by user from " + getRemoteHostPortString()); } // kill operation with no protect by timeout. @@ -956,10 +956,10 @@ private void killByTimeout(boolean killConnection) { } } - public void cancelQuery() { + public void cancelQuery(String cancelMessage) { StmtExecutor executorRef = executor; if (executorRef != null) { - executorRef.cancel(); + executorRef.cancel(cancelMessage); } } @@ -990,7 +990,7 @@ public void checkTimeout(long now) { long timeout = getExecTimeout() * 1000L; if (delta > timeout) { LOG.warn("kill {} timeout, remote: {}, query timeout: {}, query id: {}", - timeoutTag, getRemoteHostPortString(), timeout, queryId); + timeoutTag, getRemoteHostPortString(), timeout, DebugUtil.printId(queryId)); killFlag = true; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java index 01b41ec3e964fca..5da6bb1ba95fb05 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java @@ -145,11 +145,11 @@ public ConnectContext getContext(String flightToken) { return null; } - public void cancelQuery(String queryId) { + public void cancelQuery(String queryId, String cancelReason) { for (ConnectContext ctx : connectionMap.values()) { TUniqueId qid = ctx.queryId(); if (qid != null && DebugUtil.printId(qid).equals(queryId)) { - ctx.cancelQuery(); + ctx.cancelQuery(cancelReason); break; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index b0d1cbaba057e59..7d9c8243c04fb4f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1164,7 +1164,7 @@ private void waitRpc(List> waitPipelineRpc(List= limit rows, need cancel"); } - cancelInternal(Types.PPlanFragmentCancelReason.LIMIT_REACH); + cancelInternal(Types.PPlanFragmentCancelReason.LIMIT_REACH, "query reach limit"); } if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().dryRunQuery) { numReceivedRows = 0; @@ -1528,8 +1528,8 @@ public Status shouldCancel(List currentBackends) { // Cancel execution of query. This includes the execution of the local plan // fragment, // if any, as well as all plan fragments on remote nodes. - public void cancel() { - cancel(Types.PPlanFragmentCancelReason.USER_CANCEL, "user cancel"); + public void cancel(String errorMsg) { + cancel(Types.PPlanFragmentCancelReason.USER_CANCEL, errorMsg); if (queueToken != null) { queueToken.cancel(); } @@ -1552,8 +1552,8 @@ public void cancel(Types.PPlanFragmentCancelReason cancelReason, String errorMsg queryStatus.updateStatus(TStatusCode.CANCELLED, errorMsg); } LOG.warn("Cancel execution of query {}, this is a outside invoke, cancelReason {}", - DebugUtil.printId(queryId), cancelReason.toString()); - cancelInternal(cancelReason); + DebugUtil.printId(queryId), errorMsg); + cancelInternal(cancelReason, errorMsg); } finally { unlock(); } @@ -1577,9 +1577,9 @@ private void cancelLatch() { } } - private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason) { + private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason, String cancelMessage) { if (null != receiver) { - receiver.cancel(cancelReason); + receiver.cancel(cancelReason, cancelMessage); } if (null != pointExec) { pointExec.cancel(); @@ -3307,10 +3307,6 @@ public void onSuccess(InternalService.PCancelPlanFragmentResult result) { DebugUtil.printId(fragmentInstanceId()), status.toString()); } } - LOG.warn("Failed to cancel query {} instance initiated={} done={} backend: {}," - + "fragment instance id={}, reason: {}", - DebugUtil.printId(queryId), initiated, done, backend.getId(), - DebugUtil.printId(fragmentInstanceId()), "without status"); } public void onFailure(Throwable t) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java index 981d720e8b9ccf9..0747b68c12d2908 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java @@ -111,8 +111,8 @@ public RowBatch getNext(Status status) throws TException { LOG.warn("Query {} get result timeout, get result duration {} ms", DebugUtil.printId(this.queryId), (timeoutTs - currentTs) / 1000); setRunStatus(Status.TIMEOUT); - status.updateStatus(TStatusCode.TIMEOUT, ""); - updateCancelReason("fetch data timeout"); + status.updateStatus(TStatusCode.TIMEOUT, "Query timeout"); + updateCancelReason("Query timeout"); return null; } catch (InterruptedException e) { // continue to get result @@ -205,13 +205,14 @@ private void updateCancelReason(String reason) { } } - public void cancel(Types.PPlanFragmentCancelReason reason) { + public void cancel(Types.PPlanFragmentCancelReason reason, String cancelMessage) { if (reason == Types.PPlanFragmentCancelReason.TIMEOUT) { setRunStatus(Status.TIMEOUT); } else { setRunStatus(Status.CANCELLED); } - updateCancelReason(reason.toString()); + + updateCancelReason(cancelMessage); synchronized (this) { if (currentThread != null) { // TODO(cmy): we cannot interrupt this thread, or we may throw diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 3efed4b76505ddf..84aed1481060fc8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1475,7 +1475,7 @@ private void resetAnalyzerAndStmt() { } // Because this is called by other thread - public void cancel() { + public void cancel(String message) { Optional insertOverwriteTableCommand = getInsertOverwriteTableCommand(); if (insertOverwriteTableCommand.isPresent()) { // If the be scheduling has not been triggered yet, cancel the scheduling first @@ -1483,7 +1483,7 @@ public void cancel() { } Coordinator coordRef = coord; if (coordRef != null) { - coordRef.cancel(); + coordRef.cancel(message); } if (mysqlLoadId != null) { Env.getCurrentEnv().getLoadManager().getMysqlLoadManager().cancelMySqlLoad(mysqlLoadId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionCancelQuery.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionCancelQuery.java index 2dcff6075f4d743..d512bcfb4897bc2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionCancelQuery.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionCancelQuery.java @@ -32,7 +32,7 @@ public void exec(WorkloadQueryInfo queryInfo) { && queryInfo.tUniqueId != null && QeProcessorImpl.INSTANCE.getCoordinator(queryInfo.tUniqueId) != null) { LOG.info("cancel query {} triggered by query schedule policy.", queryInfo.queryId); - queryInfo.context.cancelQuery(); + queryInfo.context.cancelQuery("cancel query by workload policy"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java index 9f703dff92b1a96..406497c77db9a50 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java @@ -74,7 +74,7 @@ public void kill(boolean killConnection) { connectScheduler.unregisterConnection(this); } // Now, cancel running query. - cancelQuery(); + cancelQuery("arrow flight query killed by user"); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java index a7aaaf2c037c07f..e263c86e6ddf8a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java @@ -192,7 +192,7 @@ protected void setTaskStateToRunning() { public void cancel() { killed = true; if (stmtExecutor != null) { - stmtExecutor.cancel(); + stmtExecutor.cancel("analysis task cancelled"); } Env.getCurrentEnv().getAnalysisManager() .updateTaskStatus(info, AnalysisState.FAILED,