Skip to content

Commit

Permalink
[fix][txn] Always send correct transaction id in end txn response (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi authored Jan 10, 2023
1 parent e829672 commit 90764e7
Show file tree
Hide file tree
Showing 7 changed files with 382 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ Future<Void> sendMessagesToConsumer(long consumerId, String topicName, Subscript

void sendNewTxnResponse(long requestId, TxnID txnID, long tcID);

void sendNewTxnErrorResponse(long requestId, long txnID, ServerError error, String message);
void sendNewTxnErrorResponse(long requestId, long tcID, ServerError error, String message);

void sendEndTxnResponse(long requestId, TxnID txnID, int txnAction);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,8 @@ public void sendNewTxnResponse(long requestId, TxnID txnID, long tcID) {
}

@Override
public void sendNewTxnErrorResponse(long requestId, long txnID, ServerError error, String message) {
BaseCommand command = Commands.newTxnResponse(requestId, txnID, error, message);
public void sendNewTxnErrorResponse(long requestId, long tcID, ServerError error, String message) {
BaseCommand command = Commands.newTxnResponse(requestId, tcID, error, message);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
Expand All @@ -352,7 +352,8 @@ public void sendEndTxnResponse(long requestId, TxnID txnID, int txnAction) {

@Override
public void sendEndTxnErrorResponse(long requestId, TxnID txnID, ServerError error, String message) {
BaseCommand command = Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(), error, message);
BaseCommand command = Commands.newEndTxnResponse(requestId, txnID.getLeastSigBits(),
txnID.getMostSigBits(), error, message);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2406,7 +2406,9 @@ protected void handleAddPartitionToTxn(CommandAddPartitionToTxn command) {
} else {
ex = handleTxnException(ex, BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), requestId);

ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getMostSigBits(),
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
txnID.getLeastSigBits(),
txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex),
ex.getMessage()));
transactionMetadataStoreService.handleOpFail(ex, tcId);
Expand Down Expand Up @@ -2451,7 +2453,7 @@ protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) {
final long requestId = command.getRequestId();
final String topic = command.getTopic();
final int txnAction = command.getTxnAction().getValue();
TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
final TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
final long lowWaterMark = command.getTxnidLeastBitsOfLowWatermark();

if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -2487,7 +2489,7 @@ protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) {
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
ServerError.ServiceNotReady,
"The topic " + topic + " does not exist in broker.",
txnID.getMostSigBits(), txnID.getLeastSigBits()));
txnID.getLeastSigBits(), txnID.getMostSigBits()));
} else {
log.warn("handleEndTxnOnPartition fail ! The topic {} has not been created, "
+ "txnId: [{}], txnAction: [{}]",
Expand All @@ -2496,13 +2498,13 @@ protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) {
txnID.getLeastSigBits(), txnID.getMostSigBits()));
}
}).exceptionally(e -> {
log.error("handleEndTxnOnPartition fail ! topic {}, "
+ "txnId: [{}], txnAction: [{}]", topic, txnID,
TxnAction.valueOf(txnAction), e.getCause());
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
requestId, ServerError.ServiceNotReady,
e.getMessage(), txnID.getLeastSigBits(), txnID.getMostSigBits()));
return null;
log.error("handleEndTxnOnPartition fail ! topic {}, "
+ "txnId: [{}], txnAction: [{}]", topic, txnID,
TxnAction.valueOf(txnAction), e.getCause());
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
requestId, ServerError.ServiceNotReady,
e.getMessage(), txnID.getLeastSigBits(), txnID.getMostSigBits()));
return null;
});
}
}).exceptionally(e -> {
Expand Down Expand Up @@ -2556,7 +2558,7 @@ protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) {
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
BrokerServiceException.getClientErrorCode(e),
"Handle end txn on subscription failed."));
"Handle end txn on subscription failed: " + e.getMessage()));
return;
}
ctx.writeAndFlush(
Expand All @@ -2569,7 +2571,7 @@ protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) {
if (b) {
log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, "
+ "subscription: {}, txnId: [{}], txnAction: [{}]", topic, subName,
new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction));
txnID, TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
ServerError.ServiceNotReady,
Expand All @@ -2582,13 +2584,13 @@ protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) {
txnID.getLeastSigBits(), txnID.getMostSigBits()));
}
}).exceptionally(e -> {
log.error("handleEndTxnOnSubscription fail ! topic {}, subscription: {}"
+ "txnId: [{}], txnAction: [{}]", topic, subName,
txnID, TxnAction.valueOf(txnAction), e.getCause());
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
ServerError.ServiceNotReady, e.getMessage()));
return null;
log.error("handleEndTxnOnSubscription fail ! topic {}, subscription: {}"
+ "txnId: [{}], txnAction: [{}]", topic, subName,
txnID, TxnAction.valueOf(txnAction), e.getCause());
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
ServerError.ServiceNotReady, e.getMessage()));
return null;
});
}
}).exceptionally(e -> {
Expand All @@ -2598,7 +2600,7 @@ protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) {
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
ServerError.ServiceNotReady,
"Handle end txn on subscription failed."));
"Handle end txn on subscription failed: " + e.getMessage()));
return null;
});
}
Expand Down Expand Up @@ -2655,7 +2657,7 @@ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
} else {
ex = handleTxnException(ex, BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(), requestId);

ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId, txnID.getLeastSigBits(),
txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex),
ex.getMessage()));
transactionMetadataStoreService.handleOpFail(ex, tcId);
Expand Down
Loading

0 comments on commit 90764e7

Please sign in to comment.