From 258ec3db385ae82c5b37e27f7c19e993092fb242 Mon Sep 17 00:00:00 2001 From: hui lai <1353307710@qq.com> Date: Mon, 9 Sep 2024 23:47:39 +0800 Subject: [PATCH] [fix](cloud) should do check before abort transaction (#40463) When routine load task transaction is abort, it should do check before abort transaction, otherwise, it may cause concurrent modifications to the `routineLoadTaskInfoList`, which in the Java language may result in elements in the list being null, leading to a loop throwing NullPointerException during scheduling and making it impossible to schedule routine load task to consume Kafka stream. --- .../transaction/CloudGlobalTransactionMgr.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index f51454ad269c51..f224d2929a65c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -990,6 +990,20 @@ public void abortTransaction(Long dbId, Long transactionId, String reason, TxnCommitAttachment txnCommitAttachment, List tableList) throws UserException { LOG.info("try to abort transaction, dbId:{}, transactionId:{}", dbId, transactionId); + if (txnCommitAttachment != null) { + if (txnCommitAttachment instanceof RLTaskTxnCommitAttachment) { + RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment; + TxnStateChangeCallback cb = callbackFactory.getCallback(rlTaskTxnCommitAttachment.getJobId()); + if (cb != null) { + // use a temporary transaction state to do before commit check, + // what actually works is the transactionId + TransactionState tmpTxnState = new TransactionState(); + tmpTxnState.setTransactionId(transactionId); + cb.beforeAborted(tmpTxnState); + } + } + } + AbortTxnRequest.Builder builder = AbortTxnRequest.newBuilder(); builder.setDbId(dbId); builder.setTxnId(transactionId);