Skip to content

Commit

Permalink
ARTEMIS-4312 dupes w/redistribution and multicast
Browse files Browse the repository at this point in the history
Multiple multicast queues on the same address can lead to duplicate
messages during redistribution in a cluster.
  • Loading branch information
jbertram authored and clebertsuconic committed Jun 14, 2023
1 parent 582a689 commit 3ff8419
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,9 @@ public Message redistribute(final Message message,
if (logger.isDebugEnabled()) {
logger.debug("Message {} being copied as {}", message.getMessageID(), copyRedistribute.getMessageID());
}

copyRedistribute.setAddress(message.getAddress());
copyRedistribute.clearInternalProperties();

if (context.getTransaction() == null) {
context.setTransaction(new TransactionImpl(storageManager));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,17 @@ protected void addConsumer(final int consumerID,
boolean autoCommitAcks,
final String user,
final String password) throws Exception {
addConsumer(consumerID, node, queueName, filterVal, autoCommitAcks, user, password, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE);
}

protected void addConsumer(final int consumerID,
final int node,
final String queueName,
final String filterVal,
boolean autoCommitAcks,
final String user,
final String password,
final int ackBatchSize) throws Exception {
try {
if (consumers[consumerID] != null) {
throw new IllegalArgumentException("Already a consumer at " + node);
Expand All @@ -641,7 +652,7 @@ protected void addConsumer(final int consumerID,
throw new IllegalArgumentException("No sf at " + node);
}

ClientSession session = addClientSession(sf.createSession(user, password, false, false, autoCommitAcks, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE));
ClientSession session = addClientSession(sf.createSession(user, password, false, false, autoCommitAcks, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ackBatchSize));

String filterString = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,85 @@ public void testRedistributionWithMessageGroups() throws Exception {
logger.debug("Test done");
}

@Test
public void testRedistributionWithMultipleQueuesOnTheSameAddress() throws Exception {
final int MESSAGE_COUNT = 10;
final String ADDRESS = "myAddress";
final String QUEUE0 = "queue0";
final String QUEUE1 = "queue1";

getServer(0).getConfiguration().addAddressSetting(ADDRESS, new AddressSettings().setRedistributionDelay(0));
getServer(1).getConfiguration().addAddressSetting(ADDRESS, new AddressSettings().setRedistributionDelay(0));

setupClusterConnection("cluster0", ADDRESS, MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
setupClusterConnection("cluster1", ADDRESS, MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);

startServers(0, 1);

waitForTopology(servers[0], 2);
waitForTopology(servers[1], 2);

setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());

createQueue(0, ADDRESS, QUEUE0, null, false);
createQueue(0, ADDRESS, QUEUE1, null, false);

createQueue(1, ADDRESS, QUEUE0, null, false);
createQueue(1, ADDRESS, QUEUE1, null, false);

addConsumer(0, 0, QUEUE0, null, true, null, null, 0);
addConsumer(1, 1, QUEUE0, null, true, null, null, 0);

waitForBindings(0, ADDRESS, 2, 1, true);
waitForBindings(0, ADDRESS, 2, 1, false);

waitForBindings(1, ADDRESS, 2, 1, true);
waitForBindings(1, ADDRESS, 2, 1, false);

send(0, ADDRESS, MESSAGE_COUNT, true, null, RoutingType.MULTICAST, null);

{ // make sure all the messages were delivered to the proper queues & nodes
Wait.assertEquals(5L, () -> servers[0].locateQueue(QUEUE0).getMessagesAdded(), 2000, 100);
Wait.assertEquals(5L, () -> servers[1].locateQueue(QUEUE0).getMessagesAdded(), 2000, 100);

Wait.assertEquals(10L, () -> servers[0].locateQueue(QUEUE1).getMessageCount(), 2000, 100);
Wait.assertEquals(0L, () -> servers[1].locateQueue(QUEUE1).getMessageCount(), 2000, 100);
}

for (int i = 0; i < MESSAGE_COUNT / 2; i++) {
{
ClientMessage m = consumers[0].getConsumer().receive(1000);
assertNotNull(m);
m.acknowledge();
}
{
ClientMessage m = consumers[1].getConsumer().receive(1000);
assertNotNull(m);
m.acknowledge();
}
}

{ // make sure all the messages were consumed propertly
Wait.assertEquals(5L, () -> servers[0].locateQueue(QUEUE0).getMessagesAcknowledged(), 2000, 100);
Wait.assertEquals(0L, () -> servers[0].locateQueue(QUEUE0).getMessageCount(), 2000, 100);

Wait.assertEquals(5L, () -> servers[1].locateQueue(QUEUE0).getMessagesAcknowledged(), 2000, 100);
Wait.assertEquals(0L, () -> servers[1].locateQueue(QUEUE0).getMessageCount(), 2000, 100);
}

// add consumer to force redistribution of messages to node 1
addConsumer(2, 1, QUEUE1, null);
waitForBindings(1, ADDRESS, 2, 2, true);
waitForBindings(0, ADDRESS, 2, 2, false);

Wait.assertEquals(10L, () -> servers[1].locateQueue(QUEUE1).getMessageCount(), 2000, 100);
Wait.assertEquals(0L, () -> servers[0].locateQueue(QUEUE1).getMessageCount(), 2000, 100);

// ensure no messages were inadvertently redistributed to the wrong queue (i.e. the main point of this test)
Wait.assertEquals(0L, () -> servers[1].locateQueue(QUEUE0).getMessageCount(), 2000, 100);
}

//https://issues.jboss.org/browse/HORNETQ-1057
@Test
public void testRedistributionStopsWhenConsumerAdded() throws Exception {
Expand Down

0 comments on commit 3ff8419

Please sign in to comment.