diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java index b87cf3adda372..883d53540cbc6 100644 --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java @@ -37,6 +37,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -196,7 +197,7 @@ public void testProduceTxnMessage() throws InterruptedException, PulsarClientExc } @Test - public void testConsumeTxnMessage() throws InterruptedException, PulsarClientException { + public void testConsumeTxnMessage() throws Exception { String argString = "%s -r 10 -u %s -txn -ss %s -st %s -sp %s -ntxn %d"; String subName = "sub"; String topic = testTopic + UUID.randomUUID(); @@ -224,11 +225,18 @@ public void testConsumeTxnMessage() throws InterruptedException, PulsarClientExc }); thread.start(); thread.join(); + + Awaitility.await().untilAsserted(() -> { + admin.transactions().getCoordinatorStats().forEach((integer, transactionCoordinatorStats) -> { + Assert.assertEquals(transactionCoordinatorStats.ongoingTxnSize, 0); + }); + }); + Consumer consumer = pulsarClient.newConsumer().subscriptionName(subName).topic(topic) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionType(SubscriptionType.Exclusive) .enableBatchIndexAcknowledgment(false) - .subscribe(); + .subscribe(); for (int i = 0; i < 5; i++) { Message message = consumer.receive(2, TimeUnit.SECONDS); Assert.assertNotNull(message);