Skip to content

Commit

Permalink
XX
Browse files Browse the repository at this point in the history
X
  • Loading branch information
zhiqiang-hhhh committed Oct 15, 2024
1 parent f112af0 commit 18b899a
Show file tree
Hide file tree
Showing 20 changed files with 60 additions and 60 deletions.
3 changes: 2 additions & 1 deletion be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,9 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
LOG_INFO("PipelineXFragmentContext::cancel")
.tag("query_id", print_id(_query_id))
.tag("fragment_id", _fragment_id)
.tag("reason", reason)
.tag("reason", PPlanFragmentCancelReason_Name(reason))
.tag("error message", msg);

if (reason == PPlanFragmentCancelReason::TIMEOUT) {
LOG(WARNING) << "PipelineXFragmentContext is cancelled due to timeout : " << debug_string();
}
Expand Down
14 changes: 8 additions & 6 deletions be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size)
}

BufferControlBlock::~BufferControlBlock() {
cancel();
std::unique_lock<std::mutex> l(_lock);
_data_removal.notify_all();
_data_arrival.notify_all();
_waiting_rpc.clear();
}

Status BufferControlBlock::init() {
Expand Down Expand Up @@ -248,7 +251,6 @@ Status BufferControlBlock::close(Status exec_status) {
std::unique_lock<std::mutex> l(_lock);
_is_close = true;
_status = exec_status;

// notify blocked get thread
_data_arrival.notify_all();
if (!_waiting_rpc.empty()) {
Expand All @@ -266,13 +268,13 @@ Status BufferControlBlock::close(Status exec_status) {
return Status::OK();
}

void BufferControlBlock::cancel() {
void BufferControlBlock::cancel_by_timeout() {
std::unique_lock<std::mutex> l(_lock);
_is_cancelled = true;
_data_removal.notify_all();
_data_arrival.notify_all();
for (auto& ctx : _waiting_rpc) {
ctx->on_failure(Status::Cancelled("Cancelled"));
ctx->on_failure(Status::TimedOut("Query timeout"));
}
_waiting_rpc.clear();
}
Expand Down Expand Up @@ -301,8 +303,8 @@ Status PipBufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch
return Status::OK();
}

void PipBufferControlBlock::cancel() {
BufferControlBlock::cancel();
void PipBufferControlBlock::cancel_by_timeout() {
BufferControlBlock::cancel_by_timeout();
_update_dependency();
}

Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/buffer_control_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ class BufferControlBlock {
// close buffer block, set _status to exec_status and set _is_close to true;
// called because data has been read or error happened.
Status close(Status exec_status);
// this is called by RPC, called from coordinator
virtual void cancel();

virtual void cancel_by_timeout();

[[nodiscard]] const TUniqueId& fragment_id() const { return _fragment_id; }

Expand Down Expand Up @@ -152,7 +152,7 @@ class PipBufferControlBlock : public BufferControlBlock {

Status get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result) override;

void cancel() override;
void cancel_by_timeout() override;

void set_dependency(std::shared_ptr<pipeline::Dependency> result_sink_dependency);

Expand Down
12 changes: 6 additions & 6 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1249,7 +1249,7 @@ void FragmentMgr::cancel_worker() {
clock_gettime(CLOCK_MONOTONIC, &check_invalid_query_last_timestamp);

do {
std::vector<TUniqueId> to_cancel;
std::vector<TUniqueId> queries_timeout;
std::vector<TUniqueId> queries_to_cancel;
std::vector<TUniqueId> queries_pipeline_task_leak;
// Fe process uuid -> set<QueryId>
Expand All @@ -1274,7 +1274,7 @@ void FragmentMgr::cancel_worker() {
std::lock_guard<std::mutex> lock(_lock);
for (auto& fragment_instance_itr : _fragment_instance_map) {
if (fragment_instance_itr.second->is_timeout(now)) {
to_cancel.push_back(fragment_instance_itr.second->fragment_instance_id());
queries_timeout.push_back(fragment_instance_itr.second->fragment_instance_id());
}
}
for (auto& pipeline_itr : _pipeline_map) {
Expand All @@ -1283,7 +1283,7 @@ void FragmentMgr::cancel_worker() {
reinterpret_cast<pipeline::PipelineXFragmentContext*>(pipeline_itr.second.get())
->instance_ids(ins_ids);
for (auto& ins_id : ins_ids) {
to_cancel.push_back(ins_id);
queries_timeout.push_back(ins_id);
}
} else {
pipeline_itr.second->clear_finished_tasks();
Expand Down Expand Up @@ -1393,9 +1393,9 @@ void FragmentMgr::cancel_worker() {

// TODO(zhiqiang): It seems that timeout_canceled_fragment_count is
// designed to count canceled fragment of non-pipeline query.
timeout_canceled_fragment_count->increment(to_cancel.size());
for (auto& id : to_cancel) {
cancel_instance(id, PPlanFragmentCancelReason::TIMEOUT);
timeout_canceled_fragment_count->increment(queries_timeout.size());
for (auto& id : queries_timeout) {
cancel_instance(id, PPlanFragmentCancelReason::TIMEOUT, "Query timeout on backends.");
LOG(INFO) << "FragmentMgr cancel worker going to cancel timeout instance "
<< print_id(id);
}
Expand Down
12 changes: 6 additions & 6 deletions be/src/runtime/result_buffer_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,13 @@ Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id,
return Status::OK();
}

void ResultBufferMgr::cancel(const TUniqueId& query_id) {
void ResultBufferMgr::cancel_by_timeout(const TUniqueId& query_id) {
{
std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
BufferMap::iterator iter = _buffer_map.find(query_id);

if (_buffer_map.end() != iter) {
iter->second->cancel();
iter->second->cancel_by_timeout();
_buffer_map.erase(iter);
}
}
Expand Down Expand Up @@ -189,24 +189,24 @@ void ResultBufferMgr::cancel_thread() {

do {
// get query
std::vector<TUniqueId> query_to_cancel;
std::vector<TUniqueId> queries_timeout;
time_t now_time = time(nullptr);
{
std::lock_guard<std::mutex> l(_timeout_lock);
TimeoutMap::iterator end = _timeout_map.upper_bound(now_time + 1);

for (TimeoutMap::iterator iter = _timeout_map.begin(); iter != end; ++iter) {
for (int i = 0; i < iter->second.size(); ++i) {
query_to_cancel.push_back(iter->second[i]);
queries_timeout.push_back(iter->second[i]);
}
}

_timeout_map.erase(_timeout_map.begin(), end);
}

// cancel query
for (int i = 0; i < query_to_cancel.size(); ++i) {
cancel(query_to_cancel[i]);
for (int i = 0; i < queries_timeout.size(); ++i) {
cancel_by_timeout(queries_timeout[i]);
}
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(1)));

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/result_buffer_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class ResultBufferMgr {
std::shared_ptr<arrow::Schema> find_arrow_schema(const TUniqueId& query_id);

// cancel
void cancel(const TUniqueId& fragment_id);
void cancel_by_timeout(const TUniqueId& fragment_id);

// cancel one query at a future time.
void cancel_at_time(time_t cancel_time, const TUniqueId& query_id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ public Object killQuery(HttpServletRequest request, HttpServletResponse response
}

ExecuteEnv env = ExecuteEnv.getInstance();
env.getScheduler().cancelQuery(queryId);
env.getScheduler().cancelQuery(queryId, "cancel query by rest api");
return ResponseEntityBuilder.ok();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ protected void executeCancelLogic() {
}
isCanceled.getAndSet(true);
if (null != stmtExecutor) {
stmtExecutor.cancel();
stmtExecutor.cancel("insert task cancelled");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public synchronized void onSuccess() throws JobException {
protected synchronized void executeCancelLogic() {
LOG.info("mtmv task cancel, taskId: {}", super.getTaskId());
if (executor != null) {
executor.cancel();
executor.cancel("mtmv task cancelled");
}
after();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void cancel() throws JobException {
}
isCanceled.getAndSet(true);
if (stmtExecutor != null) {
stmtExecutor.cancel();
stmtExecutor.cancel("export task cancelled");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ protected void unprotectedExecuteCancel(FailMsg failMsg, boolean abortTxn) {
for (TUniqueId loadId : loadIds) {
Coordinator coordinator = QeProcessorImpl.INSTANCE.getCoordinator(loadId);
if (coordinator != null) {
coordinator.cancel();
coordinator.cancel(failMsg.getMsg());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ protected final void execImpl(StmtExecutor executor, long jobId) throws Exceptio
}
boolean notTimeout = coordinator.join(execTimeout);
if (!coordinator.isDone()) {
coordinator.cancel();
coordinator.cancel("insert timeout");
if (notTimeout) {
errMsg = coordinator.getExecStatus().getErrorMsg();
ErrorReport.reportDdlException("there exists unhealthy backend. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,7 @@ public void kill(boolean killConnection) {
closeChannel();
}
// Now, cancel running query.
cancelQuery();
cancelQuery("cancel query by user from " + getRemoteHostPortString());
}

// kill operation with no protect by timeout.
Expand All @@ -956,10 +956,10 @@ private void killByTimeout(boolean killConnection) {
}
}

public void cancelQuery() {
public void cancelQuery(String cancelMessage) {
StmtExecutor executorRef = executor;
if (executorRef != null) {
executorRef.cancel();
executorRef.cancel(cancelMessage);
}
}

Expand Down Expand Up @@ -990,7 +990,7 @@ public void checkTimeout(long now) {
long timeout = getExecTimeout() * 1000L;
if (delta > timeout) {
LOG.warn("kill {} timeout, remote: {}, query timeout: {}, query id: {}",
timeoutTag, getRemoteHostPortString(), timeout, queryId);
timeoutTag, getRemoteHostPortString(), timeout, DebugUtil.printId(queryId));
killFlag = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ public ConnectContext getContext(String flightToken) {
return null;
}

public void cancelQuery(String queryId) {
public void cancelQuery(String queryId, String cancelReason) {
for (ConnectContext ctx : connectionMap.values()) {
TUniqueId qid = ctx.queryId();
if (qid != null && DebugUtil.printId(qid).equals(queryId)) {
ctx.cancelQuery();
ctx.cancelQuery(cancelReason);
break;
}
}
Expand Down
28 changes: 12 additions & 16 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,7 @@ private void waitRpc(List<Triple<BackendExecStates, BackendServiceProxy, Future<
errMsg = operation + " failed. " + exception.getMessage();
}
queryStatus.updateStatus(TStatusCode.INTERNAL_ERROR, errMsg);
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, errMsg);
switch (code) {
case TIMEOUT:
MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(triple.getLeft().brpcAddr.hostname)
Expand Down Expand Up @@ -1259,7 +1259,7 @@ private Map<TNetworkAddress, List<Long>> waitPipelineRpc(List<Pair<Long, Triple
errMsg = operation + " failed. " + exception.getMessage();
}
queryStatus.updateStatus(TStatusCode.INTERNAL_ERROR, errMsg);
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, errMsg);
switch (code) {
case TIMEOUT:
MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(triple.getLeft().brpcAddr.hostname)
Expand Down Expand Up @@ -1385,9 +1385,9 @@ private void updateStatus(Status status) {

queryStatus.updateStatus(status.getErrorCode(), status.getErrorMsg());
if (status.getErrorCode() == TStatusCode.TIMEOUT) {
cancelInternal(Types.PPlanFragmentCancelReason.TIMEOUT);
cancelInternal(Types.PPlanFragmentCancelReason.TIMEOUT, status.getErrorMsg());
} else {
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, status.getErrorMsg());
}
} finally {
lock.unlock();
Expand Down Expand Up @@ -1426,7 +1426,7 @@ public RowBatch getNext() throws Exception {
throw new RpcException(null, copyStatus.getErrorMsg());
} else {
String errMsg = copyStatus.getErrorMsg();
LOG.warn("query failed: {}", errMsg);
LOG.warn("Query {} failed: {}", DebugUtil.printId(queryId), errMsg);
throw new UserException(errMsg);
}
}
Expand All @@ -1441,7 +1441,7 @@ public RowBatch getNext() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("no block query, return num >= limit rows, need cancel");
}
cancelInternal(Types.PPlanFragmentCancelReason.LIMIT_REACH);
cancelInternal(Types.PPlanFragmentCancelReason.LIMIT_REACH, "query reach limit");
}
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().dryRunQuery) {
numReceivedRows = 0;
Expand Down Expand Up @@ -1528,8 +1528,8 @@ public Status shouldCancel(List<Backend> currentBackends) {
// Cancel execution of query. This includes the execution of the local plan
// fragment,
// if any, as well as all plan fragments on remote nodes.
public void cancel() {
cancel(Types.PPlanFragmentCancelReason.USER_CANCEL, "user cancel");
public void cancel(String errorMsg) {
cancel(Types.PPlanFragmentCancelReason.USER_CANCEL, errorMsg);
if (queueToken != null) {
queueToken.cancel();
}
Expand All @@ -1552,8 +1552,8 @@ public void cancel(Types.PPlanFragmentCancelReason cancelReason, String errorMsg
queryStatus.updateStatus(TStatusCode.CANCELLED, errorMsg);
}
LOG.warn("Cancel execution of query {}, this is a outside invoke, cancelReason {}",
DebugUtil.printId(queryId), cancelReason.toString());
cancelInternal(cancelReason);
DebugUtil.printId(queryId), errorMsg);
cancelInternal(cancelReason, errorMsg);
} finally {
unlock();
}
Expand All @@ -1577,9 +1577,9 @@ private void cancelLatch() {
}
}

private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason) {
private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason, String cancelMessage) {
if (null != receiver) {
receiver.cancel(cancelReason);
receiver.cancel(cancelReason, cancelMessage);
}
if (null != pointExec) {
pointExec.cancel();
Expand Down Expand Up @@ -3307,10 +3307,6 @@ public void onSuccess(InternalService.PCancelPlanFragmentResult result) {
DebugUtil.printId(fragmentInstanceId()), status.toString());
}
}
LOG.warn("Failed to cancel query {} instance initiated={} done={} backend: {},"
+ "fragment instance id={}, reason: {}",
DebugUtil.printId(queryId), initiated, done, backend.getId(),
DebugUtil.printId(fragmentInstanceId()), "without status");
}

public void onFailure(Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ public RowBatch getNext(Status status) throws TException {
LOG.warn("Query {} get result timeout, get result duration {} ms",
DebugUtil.printId(this.queryId), (timeoutTs - currentTs) / 1000);
setRunStatus(Status.TIMEOUT);
status.updateStatus(TStatusCode.TIMEOUT, "");
updateCancelReason("fetch data timeout");
status.updateStatus(TStatusCode.TIMEOUT, "Query timeout");
updateCancelReason("Query timeout");
return null;
} catch (InterruptedException e) {
// continue to get result
Expand Down Expand Up @@ -205,13 +205,14 @@ private void updateCancelReason(String reason) {
}
}

public void cancel(Types.PPlanFragmentCancelReason reason) {
public void cancel(Types.PPlanFragmentCancelReason reason, String cancelMessage) {
if (reason == Types.PPlanFragmentCancelReason.TIMEOUT) {
setRunStatus(Status.TIMEOUT);
} else {
setRunStatus(Status.CANCELLED);
}
updateCancelReason(reason.toString());

updateCancelReason(cancelMessage);
synchronized (this) {
if (currentThread != null) {
// TODO(cmy): we cannot interrupt this thread, or we may throw
Expand Down
Loading

0 comments on commit 18b899a

Please sign in to comment.