Skip to content

Commit

Permalink
[Fix][Flaky-test] Fix testConsumeTxnMessage (apache#16981)
Browse files Browse the repository at this point in the history
* [Fix][Flaky-test] Fix testConsumeTxnMessage
Master apache#14109
## Motivation
The transaction commit is async, so the consumer can still receive message when the consumer rebuilds.
## Modification
Add  Awaitility.await() for check-ing whether the ongoingTxns = 0.

(cherry picked from commit c29503e)
(cherry picked from commit 6db6679)
  • Loading branch information
liangyepianzhou authored and nicoloboschi committed Sep 2, 2022
1 parent 3275c82 commit dbf3b59
Showing 1 changed file with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -194,7 +195,7 @@ public void testProduceTxnMessage() throws InterruptedException, PulsarClientExc
}

@Test
public void testConsumeTxnMessage() throws InterruptedException, PulsarClientException {
public void testConsumeTxnMessage() throws Exception {
String argString = "%s -r 10 -u %s -txn -ss %s -st %s -sp %s -ntxn %d";
String subName = "sub";
String topic = testTopic + UUID.randomUUID();
Expand Down Expand Up @@ -222,11 +223,18 @@ public void testConsumeTxnMessage() throws InterruptedException, PulsarClientExc
});
thread.start();
thread.join();

Awaitility.await().untilAsserted(() -> {
admin.transactions().getCoordinatorStats().forEach((integer, transactionCoordinatorStats) -> {
Assert.assertEquals(transactionCoordinatorStats.ongoingTxnSize, 0);
});
});

Consumer<byte[]> consumer = pulsarClient.newConsumer().subscriptionName(subName).topic(topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Exclusive)
.enableBatchIndexAcknowledgment(false)
.subscribe();
.subscribe();
for (int i = 0; i < 5; i++) {
Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
Assert.assertNotNull(message);
Expand Down

0 comments on commit dbf3b59

Please sign in to comment.