Skip to content

Commit

Permalink
Fix Intermittent test for dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Sep 20, 2017
1 parent b53a5dc commit d41dd47
Showing 1 changed file with 75 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -61,6 +62,7 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import org.testng.collections.Lists;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -95,7 +97,7 @@ public Object[][] bundleUnloading() {
*
* @throws Exception
*/
@Test
@Test(timeOut=10000)
public void testConsumerBlockingWithUnAckedMessagesAtDispatcher() throws Exception {
log.info("-- Starting {} test --", methodName);

Expand Down Expand Up @@ -137,7 +139,6 @@ public void testConsumerBlockingWithUnAckedMessagesAtDispatcher() throws Excepti
msg = consumers[i].receive(500, TimeUnit.MILLISECONDS);
if (msg != null) {
messages.put(msg, consumers[i]);
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
Expand All @@ -156,33 +157,33 @@ public void testConsumerBlockingWithUnAckedMessagesAtDispatcher() throws Excepti
fail("ack failed", e);
}
});
// wait to start dispatching-async
Thread.sleep(1000);
// try to consume remaining messages
int remainingMessages = totalProducedMsgs - messages.size();

// try to consume remaining messages: broker may take time to deliver so, retry multiple time to consume
// all messages
for (int retry = 0; retry < 5; retry++) {
for (int i = 0; i < consumers.length; i++) {
for (int j = 0; j < remainingMessages; j++) {
msg = consumers[i].receive(100, TimeUnit.MILLISECONDS);
if (msg != null) {
messages.put(msg, consumers[i]);
log.info("Received message: " + new String(msg.getData()));
} else {
break;
List<MessageId> result = Lists.newArrayList();
// expecting messages which are not received
int expectedRemainingMessages = totalProducedMsgs - messages.size();
CountDownLatch latch = new CountDownLatch(expectedRemainingMessages);
for (int i = 0; i < consumers.length; i++) {
final int consumerCount = i;
for (int j = 0; j < totalProducedMsgs; j++) {
consumers[i].receiveAsync().thenAccept(m -> {
result.add(m.getMessageId());
try {
consumers[consumerCount].acknowledge(m);
} catch (PulsarClientException e) {
fail("failed to ack msg", e);
}
}
}
if (messages.size() >= totalProducedMsgs) {
break;
} else {
Thread.sleep(100);
latch.countDown();
});
}
}

// total received-messages should match to produced messages
assertEquals(totalProducedMsgs, messages.size());
latch.await();

// total received-messages should match to produced messages (it may have duplicate messages)
assertTrue(result.size() >= expectedRemainingMessages);

producer.close();
Arrays.asList(consumers).forEach(c -> {
try {
Expand All @@ -205,7 +206,7 @@ public void testConsumerBlockingWithUnAckedMessagesAtDispatcher() throws Excepti
*
* @throws Exception
*/
@Test
@Test(timeOut=10000)
public void testConsumerBlockingWithUnAckedMessagesAndRedelivery() throws Exception {
log.info("-- Starting {} test --", methodName);

Expand Down Expand Up @@ -252,75 +253,38 @@ public void testConsumerBlockingWithUnAckedMessagesAndRedelivery() throws Except
}
}

int totalConsumedMsgs = messages.size();
// client must receive number of messages = unAckedMessagesBufferSize rather all produced messages
assertNotEquals(messages.size(), totalProducedMsgs);

// trigger redelivery
// (3) trigger redelivery
messages.asMap().forEach((c, msgs) -> {
c.redeliverUnacknowledgedMessages(
msgs.stream().map(m -> (MessageIdImpl) m).collect(Collectors.toSet()));
});

// wait for redelivery to be completed
Thread.sleep(1000);

// now, broker must have redelivered all unacked messages
messages.clear();
for (int i = 0; i < 3; i++) {
for (int j = 0; j < totalProducedMsgs; j++) {
msg = consumers[i].receive(500, TimeUnit.MILLISECONDS);
if (msg != null) {
messages.put(consumers[i], msg.getMessageId());
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}
}

// check all unacked messages have been redelivered: with delta 30: 3 consumers with receiverQueueSize=10
Set<MessageId> result = Sets.newHashSet(messages.values());
assertEquals(totalConsumedMsgs, result.size(), 3 * receiverQueueSize);

// start acknowledging messages
messages.asMap().forEach((c, msgs) -> {
msgs.forEach(m -> {
try {
c.acknowledge(m);
} catch (PulsarClientException e) {
fail("ack failed", e);
}
});
});

// now: dispatcher must be unblocked: wait to start dispatching-async
Thread.sleep(1000);
result = Sets.newHashSet(messages.values());
// try to consume remaining messages: broker may take time to deliver so, retry multiple time to consume
// (4) try to consume remaining messages: broker may take time to deliver so, retry multiple time to consume
// all messages
for (int retry = 0; retry < 5; retry++) {
for (int i = 0; i < consumers.length; i++) {
for (int j = 0; j < totalProducedMsgs; j++) {
msg = consumers[i].receive(100, TimeUnit.MILLISECONDS);
if (msg != null) {
result.add(msg.getMessageId());
consumers[i].acknowledge(msg);
log.info("Received message: " + new String(msg.getData()));
} else {
break;
List<MessageId> result = Lists.newArrayList();
CountDownLatch latch = new CountDownLatch(totalProducedMsgs);
for (int i = 0; i < consumers.length; i++) {
final int consumerCount = i;
for (int j = 0; j < totalProducedMsgs; j++) {
consumers[i].receiveAsync().thenAccept(m -> {
result.add(m.getMessageId());
try {
consumers[consumerCount].acknowledge(m);
} catch (PulsarClientException e) {
fail("failed to ack msg", e);
}
}
}
if (result.size() >= totalProducedMsgs) {
break;
} else {
Thread.sleep(100);
latch.countDown();
});
}
}

// total received-messages should match to produced messages
assertEquals(totalProducedMsgs, result.size());
latch.await();

// total received-messages should match to produced messages (it may have duplicate messages)
assertTrue(result.size() >= totalProducedMsgs);
producer.close();
Arrays.asList(consumers).forEach(c -> {
try {
Expand Down Expand Up @@ -389,10 +353,10 @@ public void testCloseConsumerBlockedDispatcher() throws Exception {
// client must receive number of messages = unAckedMessagesBufferSize rather all produced messages
assertEquals(messages.size(), unackMsgAllowed, receiverQueueSize * 2);

// create consumer2
Consumer consumer2 = pulsarClient.subscribe(topicName, subscriberName, conf);
// close consumer1: all messages of consumer1 must be replayed and received by consumer2
consumer1.close();
// create consumer2
Consumer consumer2 = pulsarClient.subscribe(topicName, subscriberName, conf);
Map<Message, Consumer> messages2 = Maps.newHashMap();
// try to consume remaining messages: broker may take time to deliver so, retry multiple time to consume
// all messages
Expand Down Expand Up @@ -431,7 +395,7 @@ public void testCloseConsumerBlockedDispatcher() throws Exception {
*
* @throws Exception
*/
@Test
@Test(timeOut=10000)
public void testRedeliveryOnBlockedDistpatcher() throws Exception {
log.info("-- Starting {} test --", methodName);

Expand Down Expand Up @@ -482,7 +446,7 @@ public void testRedeliveryOnBlockedDistpatcher() throws Exception {

int totalConsumedMsgs = messages.size();
// client must receive number of messages = unAckedMessagesBufferSize rather all produced messages
assertEquals(totalConsumedMsgs, unackMsgAllowed, 2 * receiverQueueSize);
assertEquals(totalConsumedMsgs, unackMsgAllowed, 3 * receiverQueueSize);

// trigger redelivery
Arrays.asList(consumers).forEach(c -> {
Expand Down Expand Up @@ -525,36 +489,31 @@ public void testRedeliveryOnBlockedDistpatcher() throws Exception {
});
});

// now: dispatcher must be unblocked: wait to start dispatching-async
Thread.sleep(1000);
result.clear();
messages1.values().forEach(s -> result.addAll(s));
// try to consume remaining messages
int remainingMessages = totalProducedMsgs - messages1.size();
int remainingMessages = totalProducedMsgs - result.size();
// try to consume remaining messages: broker may take time to deliver so, retry multiple time to consume
// all messages
for (int retry = 0; retry < 5; retry++) {
for (int i = 0; i < consumers.length; i++) {
for (int j = 0; j < remainingMessages; j++) {
msg = consumers[i].receive(100, TimeUnit.MILLISECONDS);
if (msg != null) {
result.add(msg.getMessageId());
consumers[i].acknowledge(msg);
log.info("Received message: " + new String(msg.getData()));
} else {
break;
CountDownLatch latch = new CountDownLatch(remainingMessages);
List<MessageId> consumedMessages = Lists.newArrayList();
for (int i = 0; i < consumers.length; i++) {
final int counsumerIndex = i;
for (int j = 0; j < remainingMessages; j++) {
consumers[i].receiveAsync().thenAccept(m -> {
consumedMessages.add(m.getMessageId());
try {
consumers[counsumerIndex].acknowledge(m);
} catch (PulsarClientException e) {
fail("failed to ack", e);
}
}
}
if (result.size() >= totalProducedMsgs) {
break;
} else {
Thread.sleep(100);
latch.countDown();
});
}
}

// total received-messages should match to produced messages
assertEquals(totalProducedMsgs, result.size());
latch.await();
// total received-messages should match remaining messages excluding duplicate
assertTrue(consumedMessages.size() >= remainingMessages);
producer.close();
Arrays.asList(consumers).forEach(c -> {
try {
Expand Down Expand Up @@ -797,6 +756,11 @@ public void testBlockBrokerDispatching() throws Exception {
} else {
break;
}
// once consumer receives maxUnAckPerBroker-msgs then sleep to give a chance to scheduler to block the
// subscription
if (j == maxUnAckPerBroker) {
Thread.sleep(200);
}
}
// client must receive number of messages = maxUnAckPerbroker rather all produced messages
assertNotEquals(messages1.size(), totalProducedMsgs);
Expand Down Expand Up @@ -976,6 +940,11 @@ public void testBrokerDispatchBlockAndSubAckBackRequiredMsgs() {
} else {
break;
}
// once consumer receives maxUnAckPerBroker-msgs then sleep to give a chance to scheduler to block the
// subscription
if (j == maxUnAckPerBroker) {
Thread.sleep(200);
}
}
// client must receive number of messages = maxUnAckPerbroker rather all produced messages
assertNotEquals(messages1.size(), totalProducedMsgs);
Expand Down

0 comments on commit d41dd47

Please sign in to comment.