Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

Commit

Permalink
Fixes SQS Queues shutting down gracefully
Browse files Browse the repository at this point in the history
* Allow queueStopTimeout can be configurable via SimpleMessageListenerContainerFactory
* Increase queueStopTimeout default value in milliseconds to be equal to waitTimeOut

Fixes gh-504
Fixes gh-507
Closes gh-620
  • Loading branch information
mkatircioglu authored and maciejwalkowiak committed Jul 7, 2020
1 parent 2344e3a commit e11c184
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

/**
* @author Alain Sahli
* @author Mete Alpaslan Katırcıoğlu
* @since 1.0
*/
public class SimpleMessageListenerContainerFactory {
Expand All @@ -41,6 +42,8 @@ public class SimpleMessageListenerContainerFactory {

private Integer waitTimeOut;

private Long queueStopTimeout;

private boolean autoStartup = true;

private AmazonSQSAsync amazonSqs;
Expand Down Expand Up @@ -96,6 +99,15 @@ public void setWaitTimeOut(Integer waitTimeOut) {
this.waitTimeOut = waitTimeOut;
}

/**
* Configures the queue stop timeout that waits for a queue to stop before
* interrupting the running thread.
* @param queueStopTimeout in milliseconds
*/
public void setQueueStopTimeout(Long queueStopTimeout) {
this.queueStopTimeout = queueStopTimeout;
}

/**
* Configures if this container should be automatically started. The default value is
* true.
Expand Down Expand Up @@ -204,6 +216,9 @@ public SimpleMessageListenerContainer createSimpleMessageListenerContainer() {
if (this.waitTimeOut != null) {
simpleMessageListenerContainer.setWaitTimeOut(this.waitTimeOut);
}
if (this.queueStopTimeout != null) {
simpleMessageListenerContainer.setQueueStopTimeout(this.queueStopTimeout);
}
if (this.resourceIdResolver != null) {
simpleMessageListenerContainer.setResourceIdResolver(this.resourceIdResolver);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/**
* Callback executed on SQS message deletion.
*
* @author Mete Alpaslan Katircioglu
* @author Mete Alpaslan Katırcıoğlu
*/
class DeleteMessageHandler
implements AsyncHandler<DeleteMessageRequest, DeleteMessageResult> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

/**
* @author Alain Sahli
* @author Mete Alpaslan Katircioglu
* @author Mete Alpaslan Katırcıoğlu
* @since 1.1
*/
public class QueueMessageAcknowledgment implements Acknowledgment {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
/**
* @author Agim Emruli
* @author Alain Sahli
* @author Mete Alpaslan Katircioglu
* @author Mete Alpaslan Katırcıoğlu
* @since 1.0
*/
public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer {
Expand All @@ -54,7 +54,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta

private long backOffTime = 10000;

private long queueStopTimeout = 10000;
private long queueStopTimeout = 20000;

private AsyncTaskExecutor taskExecutor;

Expand Down Expand Up @@ -100,7 +100,7 @@ public long getQueueStopTimeout() {
/**
* The number of milliseconds the {@link SimpleMessageListenerContainer#stop(String)}
* method waits for a queue to stop before interrupting the current thread. Default
* value is 10000 milliseconds (10 seconds).
* value is 20000 milliseconds (20 seconds).
* @param queueStopTimeout in milliseconds
*/
public void setQueueStopTimeout(long queueStopTimeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
/**
* @author Alain Sahli
* @author Maciej Walkowiak
* @author Mete Alpaslan Katırcıoğlu
*/
class SqsConfigurationTest {

Expand Down Expand Up @@ -160,6 +161,8 @@ void configuration_withCustomConfigurationFactory_shouldBeUsedToCreateTheContain
.isEqualTo(ConfigurationWithCustomContainerFactory.VISIBILITY_TIMEOUT);
assertThat(ReflectionTestUtils.getField(container, "waitTimeOut"))
.isEqualTo(ConfigurationWithCustomContainerFactory.WAIT_TIME_OUT);
assertThat(ReflectionTestUtils.getField(container, "queueStopTimeout"))
.isEqualTo(ConfigurationWithCustomContainerFactory.QUEUE_STOP_TIME_OUT);
assertThat(
ConfigurationWithCustomContainerFactory.DESTINATION_RESOLVER == ReflectionTestUtils
.getField(container, "destinationResolver")).isTrue();
Expand Down Expand Up @@ -334,6 +337,8 @@ static class ConfigurationWithCustomContainerFactory {

static final int WAIT_TIME_OUT = 12;

static final long QUEUE_STOP_TIME_OUT = 12;

static final DestinationResolver<String> DESTINATION_RESOLVER = new DynamicQueueUrlDestinationResolver(
mock(AmazonSQSAsync.class, withSettings().stubOnly()));

Expand All @@ -356,6 +361,7 @@ SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory() {
factory.setTaskExecutor(TASK_EXECUTOR);
factory.setVisibilityTimeout(VISIBILITY_TIMEOUT);
factory.setWaitTimeOut(WAIT_TIME_OUT);
factory.setQueueStopTimeout(QUEUE_STOP_TIME_OUT);
factory.setDestinationResolver(DESTINATION_RESOLVER);
factory.setBackOffTime(BACK_OFF_TIME);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
/**
* @author Agim Emruli
* @author Alain Sahli
* @author Mete Alpaslan Katırcıoğlu
* @since 1.0
*/
class SimpleMessageListenerContainerTest {
Expand Down Expand Up @@ -1270,7 +1271,7 @@ void setQueueStopTimeout_withNotDefaultTimeout_mustBeUsedWhenStoppingAQueue()
.isTrue();
assertThat(stopWatch
.getTotalTimeMillis() < LongRunningListenerMethod.LISTENER_METHOD_WAIT_TIME)
.as("stop must last less than the listener method (< 10000ms)")
.as("stop must last less than the listener method (< 20000ms)")
.isTrue();
container.stop();
}
Expand Down Expand Up @@ -1621,7 +1622,7 @@ private void swallowExceptions() {

private static class LongRunningListenerMethod {

private static final int LISTENER_METHOD_WAIT_TIME = 10000;
private static final int LISTENER_METHOD_WAIT_TIME = 20000;

private final CountDownLatch countDownLatch = new CountDownLatch(1);

Expand Down

0 comments on commit e11c184

Please sign in to comment.