diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index ffc351e941353..c309f69fd566d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -1469,4 +1469,52 @@ public void testTBRecoverChangeStateError() throws InterruptedException, Timeout Assert.assertTrue(t instanceof BrokerServiceException.ServiceUnitNotReadyException); } } + + @Test + public void testGetTxnState() throws Exception { + Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS) + .build().get(); + + // test OPEN and TIMEOUT + assertEquals(transaction.getState(), Transaction.State.OPEN); + Transaction timeoutTxn = transaction; + Awaitility.await().until(() -> timeoutTxn.getState() == Transaction.State.TIME_OUT); + + // test abort + transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS) + .build().get(); + transaction.abort().get(); + assertEquals(transaction.getState(), Transaction.State.ABORTED); + + // test commit + transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS) + .build().get(); + transaction.commit().get(); + assertEquals(transaction.getState(), Transaction.State.COMMITTED); + + // test error + transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS) + .build().get(); + pulsarServiceList.get(0).getTransactionMetadataStoreService() + .endTransaction(transaction.getTxnID(), 0, false); + transaction.commit(); + Transaction errorTxn = transaction; + Awaitility.await().until(() -> errorTxn.getState() == Transaction.State.ERROR); + + // test committing + transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS) + .build().get(); + ((TransactionImpl) transaction).registerSendOp(new CompletableFuture<>()); + transaction.commit(); + Transaction committingTxn = transaction; + Awaitility.await().until(() -> committingTxn.getState() == Transaction.State.COMMITTING); + + // test aborting + transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS) + .build().get(); + ((TransactionImpl) transaction).registerSendOp(new CompletableFuture<>()); + transaction.abort(); + Transaction abortingTxn = transaction; + Awaitility.await().until(() -> abortingTxn.getState() == Transaction.State.ABORTING); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index ddb9062454b37..aea77bec13678 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -1052,7 +1052,7 @@ public void testTxnTimeOutInClient() throws Exception{ .build().get(); producer.newMessage().send(); Awaitility.await().untilAsserted(() -> { - Assert.assertEquals(((TransactionImpl)transaction).getState(), TransactionImpl.State.TIMEOUT); + Assert.assertEquals(((TransactionImpl)transaction).getState(), TransactionImpl.State.TIME_OUT); }); try { diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java index fd4cf0bc1665c..33e96d5c2764d 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java @@ -29,6 +29,55 @@ @InterfaceStability.Evolving public interface Transaction { + enum State { + + /** + * When a transaction is in the `OPEN` state, messages can be produced and acked with this transaction. + * + * When a transaction is in the `OPEN` state, it can commit or abort. + */ + OPEN, + + /** + * When a client invokes a commit, the transaction state is changed from `OPEN` to `COMMITTING`. + */ + COMMITTING, + + /** + * When a client invokes an abort, the transaction state is changed from `OPEN` to `ABORTING`. + */ + ABORTING, + + /** + * When a client receives a response to a commit, the transaction state is changed from + * `COMMITTING` to `COMMITTED`. + */ + COMMITTED, + + /** + * When a client receives a response to an abort, the transaction state is changed from `ABORTING` to `ABORTED`. + */ + ABORTED, + + /** + * When a client invokes a commit or an abort, but a transaction does not exist in a coordinator, + * then the state is changed to `ERROR`. + * + * When a client invokes a commit, but the transaction state in a coordinator is `ABORTED` or `ABORTING`, + * then the state is changed to `ERROR`. + * + * When a client invokes an abort, but the transaction state in a coordinator is `COMMITTED` or `COMMITTING`, + * then the state is changed to `ERROR`. + */ + ERROR, + + /** + * When a transaction is timed out and the transaction state is `OPEN`, + * then the transaction state is changed from `OPEN` to `TIME_OUT`. + */ + TIME_OUT + } + /** * Commit the transaction. * @@ -48,4 +97,12 @@ public interface Transaction { * @return {@link TxnID} the txnID. */ TxnID getTxnID(); + + /** + * Get transaction state. + * + * @return {@link State} the state of the transaction. + */ + State getState(); + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java index 55b20438693e3..833b0957d1c8a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java @@ -70,17 +70,7 @@ public class TransactionImpl implements Transaction , TimerTask { @Override public void run(Timeout timeout) throws Exception { - STATE_UPDATE.compareAndSet(this, State.OPEN, State.TIMEOUT); - } - - public enum State { - OPEN, - COMMITTING, - ABORTING, - COMMITTED, - ABORTED, - ERROR, - TIMEOUT + STATE_UPDATE.compareAndSet(this, State.OPEN, State.TIME_OUT); } TransactionImpl(PulsarClientImpl client, @@ -215,6 +205,11 @@ public TxnID getTxnID() { return new TxnID(txnIdMostBits, txnIdLeastBits); } + @Override + public State getState() { + return state; + } + public boolean checkIfOpen(CompletableFuture completableFuture) { if (state == State.OPEN) { return true;