Skip to content

Commit

Permalink
[fix](cloud) fix inconsistent reponse between cloud mode and local mo…
Browse files Browse the repository at this point in the history
…de for streamload 2PC (#38076)

fix  the regression test case of flink_connector_response for cloud p0.
  • Loading branch information
liaoxin01 authored and dataroaring committed Jul 19, 2024
1 parent 448611f commit d966c80
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 8 deletions.
19 changes: 14 additions & 5 deletions cloud/src/meta-service/meta_service_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,12 @@ void commit_txn_immediately(
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id << " err=" << err;
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
ss << "transaction [" << txn_id << "] not found, db_id=" << db_id;
} else {
ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id
<< " err=" << err;
}
msg = ss.str();
LOG(WARNING) << msg;
return;
Expand All @@ -845,7 +850,7 @@ void commit_txn_immediately(
DCHECK(txn_info.txn_id() == txn_id);
if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
code = MetaServiceCode::TXN_ALREADY_ABORTED;
ss << "transaction is already aborted: db_id=" << db_id << " txn_id=" << txn_id;
ss << "transaction [" << txn_id << "] is already aborted, db_id=" << db_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
Expand Down Expand Up @@ -1868,7 +1873,11 @@ void MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller,
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
ss << "failed to get db id, txn_id=" << txn_id << " err=" << err;
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
ss << "transaction [" << txn_id << "] not found";
} else {
ss << "failed to get txn info, txn_id=" << txn_id << " err=" << err;
}
msg = ss.str();
return;
}
Expand Down Expand Up @@ -1911,13 +1920,13 @@ void MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller,
//check state is valid.
if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
code = MetaServiceCode::TXN_ALREADY_ABORTED;
ss << "transaction is already abort db_id=" << db_id << "txn_id=" << txn_id;
ss << "transaction [" << txn_id << "] is already aborted, db_id=" << db_id;
msg = ss.str();
return;
}
if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
code = MetaServiceCode::TXN_ALREADY_VISIBLE;
ss << "transaction is already visible db_id=" << db_id << "txn_id=" << txn_id;
ss << "transaction [" << txn_id << "] is already VISIBLE, db_id=" << db_id;
msg = ss.str();
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,10 @@ private void commitTxn(CommitTxnRequest commitTxnRequest, long transactionId, bo
throw new UserException("commitTxn() failed, errMsg:" + e.getMessage());
}

if (is2PC && (commitTxnResponse.getStatus().getCode() == MetaServiceCode.TXN_ALREADY_VISIBLE
|| commitTxnResponse.getStatus().getCode() == MetaServiceCode.TXN_ALREADY_ABORTED)) {
throw new UserException(commitTxnResponse.getStatus().getMsg());
}
if (commitTxnResponse.getStatus().getCode() != MetaServiceCode.OK
&& commitTxnResponse.getStatus().getCode() != MetaServiceCode.TXN_ALREADY_VISIBLE) {
LOG.warn("commitTxn failed, transactionId:{}, retryTime:{}, commitTxnResponse:{}",
Expand All @@ -545,9 +549,6 @@ private void commitTxn(CommitTxnRequest commitTxnRequest, long transactionId, bo
internalMsgBuilder.append(commitTxnResponse.getStatus().getCode());
throw new UserException("internal error, " + internalMsgBuilder.toString());
}
if (is2PC && commitTxnResponse.getStatus().getCode() == MetaServiceCode.TXN_ALREADY_VISIBLE) {
throw new UserException(commitTxnResponse.getStatus().getMsg());
}

TransactionState txnState = TxnUtil.transactionStateFromPb(commitTxnResponse.getTxnInfo());
TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId());
Expand Down

0 comments on commit d966c80

Please sign in to comment.