Skip to content

Commit

Permalink
[AMQ-9554] Add RedeliveryPolicy setting to ignore browsers by default
Browse files Browse the repository at this point in the history
  • Loading branch information
mattrpav committed Aug 27, 2024
1 parent e45ee4a commit 9f82e3e
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -548,13 +548,18 @@ private void poisonAck(MessageDispatch md, String cause) throws JMSException {

private boolean redeliveryExceeded(MessageDispatch md) {
try {
return session.getTransacted()
&& redeliveryPolicy != null
&& redeliveryPolicy.isPreDispatchCheck()
&& redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
&& md.getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()
// redeliveryCounter > x expected after resend via brokerRedeliveryPlugin
&& md.getMessage().getProperty("redeliveryDelay") == null;
if(!session.getTransacted() || redeliveryPolicy == null || !redeliveryPolicy.isPreDispatchCheck()) {
return false;
}

if(info.isBrowser() && redeliveryPolicy.isQueueBrowserIgnored()) {
return false;
}

return redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
&& md.getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()
// redeliveryCounter > x expected after resend via brokerRedeliveryPlugin
&& md.getMessage().getProperty("redeliveryDelay") == null;
} catch (Exception ignored) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable,
protected double backOffMultiplier = 5.0;
protected long redeliveryDelay = initialRedeliveryDelay;
protected boolean preDispatchCheck = true;
protected boolean queueBrowserIgnored = true;

public RedeliveryPolicy() {
}
Expand Down Expand Up @@ -165,4 +166,12 @@ public void setPreDispatchCheck(boolean preDispatchCheck) {
public boolean isPreDispatchCheck() {
return preDispatchCheck;
}

public void setQueueBrowserIgnore(boolean queueBrowserIgnored) {
this.queueBrowserIgnored = queueBrowserIgnored;
}

public boolean isQueueBrowserIgnored() {
return queueBrowserIgnored;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
package org.apache.activemq.usecases;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import static org.junit.Assert.fail;

import java.io.IOException;
import java.net.URI;
import java.util.Enumeration;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
Expand All @@ -35,8 +38,14 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.DestinationView;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -68,6 +77,10 @@ public void startBroker() throws Exception {

connectUri = connector.getConnectUri();
factory = new ActiveMQConnectionFactory(connectUri);
factory.setWatchTopicAdvisories(false);
factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0l);
factory.getRedeliveryPolicy().setRedeliveryDelay(0l);
factory.getRedeliveryPolicy().setMaximumRedeliveryDelay(0l);
}

public BrokerService createBroker() throws IOException {
Expand Down Expand Up @@ -217,4 +230,195 @@ public void testMemoryLimit() throws Exception {
browser.close();
assertTrue("got at least maxPageSize", received >= maxPageSize);
}

@Test // https://issues.apache.org/jira/browse/AMQ-9554
public void testBrowseRedeliveryMaxRedelivered() throws Exception {
browseRedelivery(0, true);
}

@Test // Ignore https://issues.apache.org/jira/browse/AMQ-9554
public void testBrowseRedeliveryIgnored() throws Exception {
browseRedelivery(1, false);
}

protected void browseRedelivery(int browseExpected, boolean dlqDlqExpected) throws Exception {
IndividualDeadLetterStrategy individualDeadLetterStrategy = new IndividualDeadLetterStrategy();
individualDeadLetterStrategy.setQueuePrefix("");
individualDeadLetterStrategy.setQueueSuffix(".dlq");
individualDeadLetterStrategy.setUseQueueForQueueMessages(true);
broker.getDestinationPolicy().getDefaultEntry().setDeadLetterStrategy(individualDeadLetterStrategy);
broker.getDestinationPolicy().getDefaultEntry().setPersistJMSRedelivered(true);

if(dlqDlqExpected) {
factory.getRedeliveryPolicy().setQueueBrowserIgnore(false);
}

String messageId = null;

String queueName = "browse.redeliverd.tx";
String dlqQueueName = "browse.redeliverd.tx.dlq";
String dlqDlqQueueName = "browse.redeliverd.tx.dlq.dlq";

ActiveMQQueue queue = new ActiveMQQueue(queueName + "?consumer.prefetchSize=0");
ActiveMQQueue queueDLQ = new ActiveMQQueue(dlqQueueName + "?consumer.prefetchSize=0");
ActiveMQQueue queueDLQDLQ = new ActiveMQQueue(dlqDlqQueueName);

broker.getAdminView().addQueue(queueName);
broker.getAdminView().addQueue(dlqQueueName);

DestinationView dlqQueueView = broker.getAdminView().getBroker().getQueueView(dlqQueueName);
DestinationView queueView = broker.getAdminView().getBroker().getQueueView(queueName);

verifyQueueStats(0l, 0l, 0l, dlqQueueView);
verifyQueueStats(0l, 0l, 0l, queueView);

Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(queue);

Message sendMessage = session.createTextMessage("Hello world!");
producer.send(sendMessage);
messageId = sendMessage.getJMSMessageID();
session.commit();
producer.close();

verifyQueueStats(0l, 0l, 0l, dlqQueueView);
verifyQueueStats(1l, 0l, 1l, queueView);

// Redeliver message to DLQ
Message message = null;
MessageConsumer consumer = session.createConsumer(queue);
int rollbackCount = 0;
do {
message = consumer.receive(2000l);
if(message != null) {
session.rollback();
rollbackCount++;
}
} while (message != null);

assertEquals(Integer.valueOf(7), Integer.valueOf(rollbackCount));
verifyQueueStats(1l, 0l, 1l, dlqQueueView);
verifyQueueStats(1l, 1l, 0l, queueView);

session.commit();
consumer.close();

// Increment redelivery counter on the message in the DLQ
// Close the consumer to force broker to dispatch
Message messageDLQ = null;
MessageConsumer consumerDLQ = session.createConsumer(queueDLQ);
int dlqRollbackCount = 0;
int dlqRollbackCountLimit = 5;
do {
messageDLQ = consumerDLQ.receive(2000l);
if(messageDLQ != null) {
session.rollback();
session.close();
consumerDLQ.close();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
consumerDLQ = session.createConsumer(queueDLQ);
dlqRollbackCount++;
}
} while (messageDLQ != null && dlqRollbackCount < dlqRollbackCountLimit);
session.commit();
consumerDLQ.close();

// Browse in tx mode works when we are at the edge of maxRedeliveries
// aka browse does not increment redeliverCounter as expected
Queue brokerQueueDLQ = resolveQueue(broker, queueDLQ);

for(int i=0; i<16; i++) {
QueueBrowser browser = session.createBrowser(queueDLQ);
Enumeration<?> enumeration = browser.getEnumeration();
ActiveMQMessage activemqMessage = null;
int received = 0;
while (enumeration.hasMoreElements()) {
activemqMessage = (ActiveMQMessage)enumeration.nextElement();
received++;
}
browser.close();
assertEquals(Integer.valueOf(1), Integer.valueOf(received));
assertEquals(Integer.valueOf(6), Integer.valueOf(activemqMessage.getRedeliveryCounter()));

// Confirm broker-side redeliveryCounter
QueueMessageReference queueMessageReference = brokerQueueDLQ.getMessage(messageId);
assertEquals(Integer.valueOf(6), Integer.valueOf(queueMessageReference.getRedeliveryCounter()));
}

session.close();
connection.close();

// Change redelivery max and the browser will fail
factory.getRedeliveryPolicy().setMaximumRedeliveries(3);
final Connection browseConnection = factory.createConnection();
browseConnection.start();

final AtomicInteger browseCounter = new AtomicInteger(0);
final AtomicInteger jmsExceptionCounter = new AtomicInteger(0);

final Session browseSession = browseConnection.createSession(true, Session.SESSION_TRANSACTED);

Thread browseThread = new Thread() {
public void run() {

QueueBrowser browser = null;
try {
browser = browseSession.createBrowser(queueDLQ);
Enumeration<?> enumeration = browser.getEnumeration();
if(Thread.currentThread().isInterrupted()) {
Thread.currentThread().interrupt();
}
while (enumeration.hasMoreElements()) {
Message message = (Message)enumeration.nextElement();
if(message != null) {
browseCounter.incrementAndGet();
}
}
} catch (JMSException e) {
jmsExceptionCounter.incrementAndGet();
} finally {
if(browser != null) { try { browser.close(); } catch (JMSException e) { jmsExceptionCounter.incrementAndGet(); } }
if(browseSession != null) { try { browseSession.close(); } catch (JMSException e) { jmsExceptionCounter.incrementAndGet(); } }
if(browseConnection != null) { try { browseConnection.close(); } catch (JMSException e) { jmsExceptionCounter.incrementAndGet(); } }
}
}
};
browseThread.start();
Thread.sleep(2000l);
browseThread.interrupt();

assertEquals(Integer.valueOf(browseExpected), Integer.valueOf(browseCounter.get()));
assertEquals(Integer.valueOf(0), Integer.valueOf(jmsExceptionCounter.get()));

// ActiveMQConsumer sends a poison ack, messages gets moved to .dlq.dlq AND remains on the .dlq
DestinationView dlqDlqQueueView = broker.getAdminView().getBroker().getQueueView(dlqDlqQueueName);
verifyQueueStats(1l, 1l, 0l, queueView);
verifyQueueStats(1l, 0l, 1l, dlqQueueView);

if(dlqDlqExpected) {
verifyQueueStats(1l, 0l, 1l, dlqDlqQueueView);
} else {
assertNull(dlqDlqQueueView);
}
}
protected static void verifyQueueStats(long enqueueCount, long dequeueCount, long queueSize, DestinationView queueView) {
assertEquals(Long.valueOf(enqueueCount), Long.valueOf(queueView.getEnqueueCount()));
assertEquals(Long.valueOf(dequeueCount), Long.valueOf(queueView.getDequeueCount()));
assertEquals(Long.valueOf(queueSize), Long.valueOf(queueView.getQueueSize()));
}

protected static Queue resolveQueue(BrokerService brokerService, ActiveMQQueue activemqQueue) throws Exception {
Set<Destination> destinations = brokerService.getBroker().getDestinations(activemqQueue);
if(destinations == null || destinations.isEmpty()) {
return null;
}

if(destinations.size() > 1) {
fail("Expected one-and-only one queue for: " + activemqQueue);
}

return (Queue)destinations.iterator().next();
}
}

0 comments on commit 9f82e3e

Please sign in to comment.