From 433e3aff9b2cbdf690ee9de07dad12e5e738ac50 Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Tue, 9 Aug 2022 10:45:48 +0800 Subject: [PATCH] [Fix][Flaky-test] Fix testConsumeTxnMessage (#16981) * [Fix][Flaky-test] Fix testConsumeTxnMessage Master https://github.com/apache/pulsar/issues/14109 ## Motivation The transaction commit is async, so the consumer can still receive message when the consumer rebuilds. ## Modification Add Awaitility.await() for check-ing whether the ongoingTxns = 0. --- .../testclient/PerformanceTransactionTest.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java index b87cf3adda372..883d53540cbc6 100644 --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java @@ -37,6 +37,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -196,7 +197,7 @@ public void testProduceTxnMessage() throws InterruptedException, PulsarClientExc } @Test - public void testConsumeTxnMessage() throws InterruptedException, PulsarClientException { + public void testConsumeTxnMessage() throws Exception { String argString = "%s -r 10 -u %s -txn -ss %s -st %s -sp %s -ntxn %d"; String subName = "sub"; String topic = testTopic + UUID.randomUUID(); @@ -224,11 +225,18 @@ public void testConsumeTxnMessage() throws InterruptedException, PulsarClientExc }); thread.start(); thread.join(); + + Awaitility.await().untilAsserted(() -> { + admin.transactions().getCoordinatorStats().forEach((integer, transactionCoordinatorStats) -> { + Assert.assertEquals(transactionCoordinatorStats.ongoingTxnSize, 0); + }); + }); + Consumer consumer = pulsarClient.newConsumer().subscriptionName(subName).topic(topic) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionType(SubscriptionType.Exclusive) .enableBatchIndexAcknowledgment(false) - .subscribe(); + .subscribe(); for (int i = 0; i < 5; i++) { Message message = consumer.receive(2, TimeUnit.SECONDS); Assert.assertNotNull(message);