From dbf3b5923f48d77f6f6b7b1dc7798f98f71217d3 Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Tue, 9 Aug 2022 10:45:48 +0800 Subject: [PATCH] [Fix][Flaky-test] Fix testConsumeTxnMessage (#16981) * [Fix][Flaky-test] Fix testConsumeTxnMessage Master https://github.com/apache/pulsar/issues/14109 ## Motivation The transaction commit is async, so the consumer can still receive message when the consumer rebuilds. ## Modification Add Awaitility.await() for check-ing whether the ongoingTxns = 0. (cherry picked from commit c29503e7c8704132f13ae8021a76735b065940b9) (cherry picked from commit 6db6679b419fc0951f42f43331e13cabe90c6130) --- .../testclient/PerformanceTransactionTest.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java index 936f70bedf360..1fcb925a5e273 100644 --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java @@ -36,6 +36,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -194,7 +195,7 @@ public void testProduceTxnMessage() throws InterruptedException, PulsarClientExc } @Test - public void testConsumeTxnMessage() throws InterruptedException, PulsarClientException { + public void testConsumeTxnMessage() throws Exception { String argString = "%s -r 10 -u %s -txn -ss %s -st %s -sp %s -ntxn %d"; String subName = "sub"; String topic = testTopic + UUID.randomUUID(); @@ -222,11 +223,18 @@ public void testConsumeTxnMessage() throws InterruptedException, PulsarClientExc }); thread.start(); thread.join(); + + Awaitility.await().untilAsserted(() -> { + admin.transactions().getCoordinatorStats().forEach((integer, transactionCoordinatorStats) -> { + Assert.assertEquals(transactionCoordinatorStats.ongoingTxnSize, 0); + }); + }); + Consumer consumer = pulsarClient.newConsumer().subscriptionName(subName).topic(topic) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionType(SubscriptionType.Exclusive) .enableBatchIndexAcknowledgment(false) - .subscribe(); + .subscribe(); for (int i = 0; i < 5; i++) { Message message = consumer.receive(2, TimeUnit.SECONDS); Assert.assertNotNull(message);