From c4d0309197bff00a0ec3c288606f5251608a7c27 Mon Sep 17 00:00:00 2001 From: liaoxin Date: Thu, 18 Jul 2024 08:33:36 +0000 Subject: [PATCH] [fix](cloud) fix inconsistent reponse between cloud mode and local mode for streamload 2PC --- cloud/src/meta-service/meta_service_txn.cpp | 19 ++++++++++++++----- .../CloudGlobalTransactionMgr.java | 7 ++++--- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 4d48d7c9df4f0e..0a3439e94f77db 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -826,7 +826,12 @@ void commit_txn_immediately( if (err != TxnErrorCode::TXN_OK) { code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND : cast_as(err); - ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id << " err=" << err; + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + ss << "transaction [" << txn_id << "] not found, db_id=" << db_id; + } else { + ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id + << " err=" << err; + } msg = ss.str(); LOG(WARNING) << msg; return; @@ -845,7 +850,7 @@ void commit_txn_immediately( DCHECK(txn_info.txn_id() == txn_id); if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) { code = MetaServiceCode::TXN_ALREADY_ABORTED; - ss << "transaction is already aborted: db_id=" << db_id << " txn_id=" << txn_id; + ss << "transaction [" << txn_id << "] is already aborted, db_id=" << db_id; msg = ss.str(); LOG(WARNING) << msg; return; @@ -1868,7 +1873,11 @@ void MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller, if (err != TxnErrorCode::TXN_OK) { code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND : cast_as(err); - ss << "failed to get db id, txn_id=" << txn_id << " err=" << err; + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + ss << "transaction [" << txn_id << "] not found"; + } else { + ss << "failed to get txn info, txn_id=" << txn_id << " err=" << err; + } msg = ss.str(); return; } @@ -1911,13 +1920,13 @@ void MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller, //check state is valid. if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) { code = MetaServiceCode::TXN_ALREADY_ABORTED; - ss << "transaction is already abort db_id=" << db_id << "txn_id=" << txn_id; + ss << "transaction [" << txn_id << "] is already aborted, db_id=" << db_id; msg = ss.str(); return; } if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) { code = MetaServiceCode::TXN_ALREADY_VISIBLE; - ss << "transaction is already visible db_id=" << db_id << "txn_id=" << txn_id; + ss << "transaction [" << txn_id << "] is already VISIBLE, db_id=" << db_id; msg = ss.str(); return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 727192b4e572aa..a4d0e58247141e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -529,6 +529,10 @@ private void commitTxn(CommitTxnRequest commitTxnRequest, long transactionId, bo throw new UserException("commitTxn() failed, errMsg:" + e.getMessage()); } + if (is2PC && (commitTxnResponse.getStatus().getCode() == MetaServiceCode.TXN_ALREADY_VISIBLE + || commitTxnResponse.getStatus().getCode() == MetaServiceCode.TXN_ALREADY_ABORTED)) { + throw new UserException(commitTxnResponse.getStatus().getMsg()); + } if (commitTxnResponse.getStatus().getCode() != MetaServiceCode.OK && commitTxnResponse.getStatus().getCode() != MetaServiceCode.TXN_ALREADY_VISIBLE) { LOG.warn("commitTxn failed, transactionId:{}, retryTime:{}, commitTxnResponse:{}", @@ -545,9 +549,6 @@ private void commitTxn(CommitTxnRequest commitTxnRequest, long transactionId, bo internalMsgBuilder.append(commitTxnResponse.getStatus().getCode()); throw new UserException("internal error, " + internalMsgBuilder.toString()); } - if (is2PC && commitTxnResponse.getStatus().getCode() == MetaServiceCode.TXN_ALREADY_VISIBLE) { - throw new UserException(commitTxnResponse.getStatus().getMsg()); - } TransactionState txnState = TxnUtil.transactionStateFromPb(commitTxnResponse.getTxnInfo()); TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId());