From d41dd4796f74ddbe7ec77e7701443b110de1e18f Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Wed, 20 Sep 2017 14:41:04 -0700 Subject: [PATCH] Fix Intermittent test for dispatcher --- .../api/DispatcherBlockConsumerTest.java | 181 ++++++++---------- 1 file changed, 75 insertions(+), 106 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java index d53cab63bd104..a15aabda46c04 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java @@ -29,6 +29,7 @@ import java.lang.reflect.Field; import java.util.Arrays; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -61,6 +62,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import org.testng.collections.Lists; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Maps; @@ -95,7 +97,7 @@ public Object[][] bundleUnloading() { * * @throws Exception */ - @Test + @Test(timeOut=10000) public void testConsumerBlockingWithUnAckedMessagesAtDispatcher() throws Exception { log.info("-- Starting {} test --", methodName); @@ -137,7 +139,6 @@ public void testConsumerBlockingWithUnAckedMessagesAtDispatcher() throws Excepti msg = consumers[i].receive(500, TimeUnit.MILLISECONDS); if (msg != null) { messages.put(msg, consumers[i]); - log.info("Received message: " + new String(msg.getData())); } else { break; } @@ -156,33 +157,33 @@ public void testConsumerBlockingWithUnAckedMessagesAtDispatcher() throws Excepti fail("ack failed", e); } }); - // wait to start dispatching-async - Thread.sleep(1000); - // try to consume remaining messages - int remainingMessages = totalProducedMsgs - messages.size(); + // try to consume remaining messages: broker may take time to deliver so, retry multiple time to consume // all messages - for (int retry = 0; retry < 5; retry++) { - for (int i = 0; i < consumers.length; i++) { - for (int j = 0; j < remainingMessages; j++) { - msg = consumers[i].receive(100, TimeUnit.MILLISECONDS); - if (msg != null) { - messages.put(msg, consumers[i]); - log.info("Received message: " + new String(msg.getData())); - } else { - break; + List result = Lists.newArrayList(); + // expecting messages which are not received + int expectedRemainingMessages = totalProducedMsgs - messages.size(); + CountDownLatch latch = new CountDownLatch(expectedRemainingMessages); + for (int i = 0; i < consumers.length; i++) { + final int consumerCount = i; + for (int j = 0; j < totalProducedMsgs; j++) { + consumers[i].receiveAsync().thenAccept(m -> { + result.add(m.getMessageId()); + try { + consumers[consumerCount].acknowledge(m); + } catch (PulsarClientException e) { + fail("failed to ack msg", e); } - } - } - if (messages.size() >= totalProducedMsgs) { - break; - } else { - Thread.sleep(100); + latch.countDown(); + }); } } - // total received-messages should match to produced messages - assertEquals(totalProducedMsgs, messages.size()); + latch.await(); + + // total received-messages should match to produced messages (it may have duplicate messages) + assertTrue(result.size() >= expectedRemainingMessages); + producer.close(); Arrays.asList(consumers).forEach(c -> { try { @@ -205,7 +206,7 @@ public void testConsumerBlockingWithUnAckedMessagesAtDispatcher() throws Excepti * * @throws Exception */ - @Test + @Test(timeOut=10000) public void testConsumerBlockingWithUnAckedMessagesAndRedelivery() throws Exception { log.info("-- Starting {} test --", methodName); @@ -252,75 +253,38 @@ public void testConsumerBlockingWithUnAckedMessagesAndRedelivery() throws Except } } - int totalConsumedMsgs = messages.size(); // client must receive number of messages = unAckedMessagesBufferSize rather all produced messages assertNotEquals(messages.size(), totalProducedMsgs); - // trigger redelivery + // (3) trigger redelivery messages.asMap().forEach((c, msgs) -> { c.redeliverUnacknowledgedMessages( msgs.stream().map(m -> (MessageIdImpl) m).collect(Collectors.toSet())); }); - // wait for redelivery to be completed - Thread.sleep(1000); - - // now, broker must have redelivered all unacked messages - messages.clear(); - for (int i = 0; i < 3; i++) { - for (int j = 0; j < totalProducedMsgs; j++) { - msg = consumers[i].receive(500, TimeUnit.MILLISECONDS); - if (msg != null) { - messages.put(consumers[i], msg.getMessageId()); - log.info("Received message: " + new String(msg.getData())); - } else { - break; - } - } - } - - // check all unacked messages have been redelivered: with delta 30: 3 consumers with receiverQueueSize=10 - Set result = Sets.newHashSet(messages.values()); - assertEquals(totalConsumedMsgs, result.size(), 3 * receiverQueueSize); - - // start acknowledging messages - messages.asMap().forEach((c, msgs) -> { - msgs.forEach(m -> { - try { - c.acknowledge(m); - } catch (PulsarClientException e) { - fail("ack failed", e); - } - }); - }); - - // now: dispatcher must be unblocked: wait to start dispatching-async - Thread.sleep(1000); - result = Sets.newHashSet(messages.values()); - // try to consume remaining messages: broker may take time to deliver so, retry multiple time to consume + // (4) try to consume remaining messages: broker may take time to deliver so, retry multiple time to consume // all messages - for (int retry = 0; retry < 5; retry++) { - for (int i = 0; i < consumers.length; i++) { - for (int j = 0; j < totalProducedMsgs; j++) { - msg = consumers[i].receive(100, TimeUnit.MILLISECONDS); - if (msg != null) { - result.add(msg.getMessageId()); - consumers[i].acknowledge(msg); - log.info("Received message: " + new String(msg.getData())); - } else { - break; + List result = Lists.newArrayList(); + CountDownLatch latch = new CountDownLatch(totalProducedMsgs); + for (int i = 0; i < consumers.length; i++) { + final int consumerCount = i; + for (int j = 0; j < totalProducedMsgs; j++) { + consumers[i].receiveAsync().thenAccept(m -> { + result.add(m.getMessageId()); + try { + consumers[consumerCount].acknowledge(m); + } catch (PulsarClientException e) { + fail("failed to ack msg", e); } - } - } - if (result.size() >= totalProducedMsgs) { - break; - } else { - Thread.sleep(100); + latch.countDown(); + }); } } - // total received-messages should match to produced messages - assertEquals(totalProducedMsgs, result.size()); + latch.await(); + + // total received-messages should match to produced messages (it may have duplicate messages) + assertTrue(result.size() >= totalProducedMsgs); producer.close(); Arrays.asList(consumers).forEach(c -> { try { @@ -389,10 +353,10 @@ public void testCloseConsumerBlockedDispatcher() throws Exception { // client must receive number of messages = unAckedMessagesBufferSize rather all produced messages assertEquals(messages.size(), unackMsgAllowed, receiverQueueSize * 2); - // create consumer2 - Consumer consumer2 = pulsarClient.subscribe(topicName, subscriberName, conf); // close consumer1: all messages of consumer1 must be replayed and received by consumer2 consumer1.close(); + // create consumer2 + Consumer consumer2 = pulsarClient.subscribe(topicName, subscriberName, conf); Map messages2 = Maps.newHashMap(); // try to consume remaining messages: broker may take time to deliver so, retry multiple time to consume // all messages @@ -431,7 +395,7 @@ public void testCloseConsumerBlockedDispatcher() throws Exception { * * @throws Exception */ - @Test + @Test(timeOut=10000) public void testRedeliveryOnBlockedDistpatcher() throws Exception { log.info("-- Starting {} test --", methodName); @@ -482,7 +446,7 @@ public void testRedeliveryOnBlockedDistpatcher() throws Exception { int totalConsumedMsgs = messages.size(); // client must receive number of messages = unAckedMessagesBufferSize rather all produced messages - assertEquals(totalConsumedMsgs, unackMsgAllowed, 2 * receiverQueueSize); + assertEquals(totalConsumedMsgs, unackMsgAllowed, 3 * receiverQueueSize); // trigger redelivery Arrays.asList(consumers).forEach(c -> { @@ -525,36 +489,31 @@ public void testRedeliveryOnBlockedDistpatcher() throws Exception { }); }); - // now: dispatcher must be unblocked: wait to start dispatching-async - Thread.sleep(1000); - result.clear(); messages1.values().forEach(s -> result.addAll(s)); // try to consume remaining messages - int remainingMessages = totalProducedMsgs - messages1.size(); + int remainingMessages = totalProducedMsgs - result.size(); // try to consume remaining messages: broker may take time to deliver so, retry multiple time to consume // all messages - for (int retry = 0; retry < 5; retry++) { - for (int i = 0; i < consumers.length; i++) { - for (int j = 0; j < remainingMessages; j++) { - msg = consumers[i].receive(100, TimeUnit.MILLISECONDS); - if (msg != null) { - result.add(msg.getMessageId()); - consumers[i].acknowledge(msg); - log.info("Received message: " + new String(msg.getData())); - } else { - break; + CountDownLatch latch = new CountDownLatch(remainingMessages); + List consumedMessages = Lists.newArrayList(); + for (int i = 0; i < consumers.length; i++) { + final int counsumerIndex = i; + for (int j = 0; j < remainingMessages; j++) { + consumers[i].receiveAsync().thenAccept(m -> { + consumedMessages.add(m.getMessageId()); + try { + consumers[counsumerIndex].acknowledge(m); + } catch (PulsarClientException e) { + fail("failed to ack", e); } - } - } - if (result.size() >= totalProducedMsgs) { - break; - } else { - Thread.sleep(100); + latch.countDown(); + }); } } - // total received-messages should match to produced messages - assertEquals(totalProducedMsgs, result.size()); + latch.await(); + // total received-messages should match remaining messages excluding duplicate + assertTrue(consumedMessages.size() >= remainingMessages); producer.close(); Arrays.asList(consumers).forEach(c -> { try { @@ -797,6 +756,11 @@ public void testBlockBrokerDispatching() throws Exception { } else { break; } + // once consumer receives maxUnAckPerBroker-msgs then sleep to give a chance to scheduler to block the + // subscription + if (j == maxUnAckPerBroker) { + Thread.sleep(200); + } } // client must receive number of messages = maxUnAckPerbroker rather all produced messages assertNotEquals(messages1.size(), totalProducedMsgs); @@ -976,6 +940,11 @@ public void testBrokerDispatchBlockAndSubAckBackRequiredMsgs() { } else { break; } + // once consumer receives maxUnAckPerBroker-msgs then sleep to give a chance to scheduler to block the + // subscription + if (j == maxUnAckPerBroker) { + Thread.sleep(200); + } } // client must receive number of messages = maxUnAckPerbroker rather all produced messages assertNotEquals(messages1.size(), totalProducedMsgs);