Skip to content

Commit

Permalink
Global deletion policy for sqs listeners.
Browse files Browse the repository at this point in the history
  • Loading branch information
MatejNedic authored and maciejwalkowiak committed Jun 30, 2020
1 parent 29784df commit d13b5e0
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.cloud.aws.messaging.listener.QueueMessageHandler;
import org.springframework.cloud.aws.messaging.listener.SendToHandlerMethodReturnValueHandler;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.core.DestinationResolvingMessageSendingOperations;
Expand All @@ -38,6 +39,7 @@
/**
* @author Alain Sahli
* @author Maciej Walkowiak
* @author Matej Nedic
* @since 1.0
*/
public class QueueMessageHandlerFactory {
Expand All @@ -52,6 +54,8 @@ public class QueueMessageHandlerFactory {

private ResourceIdResolver resourceIdResolver;

private SqsMessageDeletionPolicy sqsMessageDeletionPolicy = SqsMessageDeletionPolicy.NO_REDRIVE;

private BeanFactory beanFactory;

private List<MessageConverter> messageConverters;
Expand Down Expand Up @@ -104,6 +108,17 @@ public void setAmazonSqs(AmazonSQSAsync amazonSqs) {
this.amazonSqs = amazonSqs;
}

/**
* Configures global deletion Policy.
* @param sqsMessageDeletionPolicy if set it will use SqsMessageDeletionPolicy param
* as global default value only if SqsMessageDeletionPolicy is omitted
* from @SqsListener annotation. Should not be null.
*/
public void setSqsMessageDeletionPolicy(
final SqsMessageDeletionPolicy sqsMessageDeletionPolicy) {
this.sqsMessageDeletionPolicy = sqsMessageDeletionPolicy;
}

/**
* This value is only used if no {@code sendToMessagingTemplate} has been set.
* @param resourceIdResolver the resourceIdResolver to use for resolving logical to
Expand Down Expand Up @@ -140,7 +155,8 @@ public QueueMessageHandler createQueueMessageHandler() {
QueueMessageHandler queueMessageHandler = new QueueMessageHandler(
CollectionUtils.isEmpty(this.messageConverters) ? Arrays.asList(
getDefaultMappingJackson2MessageConverter(this.objectMapper))
: this.messageConverters);
: this.messageConverters,
this.sqsMessageDeletionPolicy);

if (!CollectionUtils.isEmpty(this.argumentResolvers)) {
queueMessageHandler.getCustomArgumentResolvers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
* @author Alain Sahli
* @author Maciej Walkowiak
* @author Wojciech Mąka
* @author Matej Nedic
* @since 1.0
*/
public class QueueMessageHandler
Expand All @@ -74,14 +75,19 @@ public class QueueMessageHandler
static final String ACKNOWLEDGMENT = "Acknowledgment";
static final String VISIBILITY = "Visibility";

private final SqsMessageDeletionPolicy sqsMessageDeletionPolicy;

private final List<MessageConverter> messageConverters;

public QueueMessageHandler(List<MessageConverter> messageConverters) {
public QueueMessageHandler(List<MessageConverter> messageConverters,
SqsMessageDeletionPolicy sqsMessageDeletionPolicy) {
this.messageConverters = messageConverters;
this.sqsMessageDeletionPolicy = sqsMessageDeletionPolicy;
}

public QueueMessageHandler() {
this.messageConverters = Collections.emptyList();
this.sqsMessageDeletionPolicy = SqsMessageDeletionPolicy.NO_REDRIVE;
}

private static String[] wrapInStringArray(Object valueToWrap) {
Expand Down Expand Up @@ -127,15 +133,19 @@ protected MappingInformation getMappingForMethod(Method method,
SqsListener sqsListenerAnnotation = AnnotationUtils.findAnnotation(method,
SqsListener.class);
if (sqsListenerAnnotation != null && sqsListenerAnnotation.value().length > 0) {
if (sqsListenerAnnotation.deletionPolicy() == SqsMessageDeletionPolicy.NEVER
SqsMessageDeletionPolicy tempDeletionPolicy = sqsListenerAnnotation
.deletionPolicy() == SqsMessageDeletionPolicy.DEFAULT
? sqsMessageDeletionPolicy
: sqsListenerAnnotation.deletionPolicy();
if (tempDeletionPolicy == SqsMessageDeletionPolicy.NEVER
&& hasNoAcknowledgmentParameter(method.getParameterTypes())) {
this.logger.warn("Listener method '" + method.getName() + "' in type '"
+ method.getDeclaringClass().getName()
+ "' has deletion policy 'NEVER' but does not have a parameter of type Acknowledgment.");
}
return new MappingInformation(
resolveDestinationNames(sqsListenerAnnotation.value()),
sqsListenerAnnotation.deletionPolicy());
tempDeletionPolicy);
}

MessageMapping messageMappingAnnotation = AnnotationUtils.findAnnotation(method,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
* </ul>
*
* @author Alain Sahli
* @author Matej Nedic
* @since 1.1
* @see org.springframework.cloud.aws.messaging.listener.annotation.SqsListener
*/
Expand Down Expand Up @@ -72,6 +73,12 @@ public enum SqsMessageDeletionPolicy {
* Deletes message when successfully executed by the listener method. If an exception
* is thrown by the listener method, the message will not be deleted.
*/
ON_SUCCESS
ON_SUCCESS,

/**
* If this is set in SqsListener, it will use default value set for specific
* QueueMessageHandlerFactory. Default, if not changed is set to NO_REDRIVE.
*/
DEFAULT

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
* annotation.
*
* @author Alain Sahli
* @author Matej Nedic
* @since 1.1
*/
@Target(ElementType.METHOD)
Expand All @@ -85,6 +86,6 @@
* called.
* @return deletion policy
*/
SqsMessageDeletionPolicy deletionPolicy() default SqsMessageDeletionPolicy.NO_REDRIVE;
SqsMessageDeletionPolicy deletionPolicy() default SqsMessageDeletionPolicy.DEFAULT;

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.cloud.aws.messaging.listener.QueueMessageHandler;
import org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.support.destination.DynamicQueueUrlDestinationResolver;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
Expand All @@ -57,6 +58,7 @@
/**
* @author Alain Sahli
* @author Maciej Walkowiak
* @author Matej Nedic
* @author Mete Alpaslan Katırcıoğlu
*/
class SqsConfigurationTest {
Expand Down Expand Up @@ -120,6 +122,11 @@ void messageHandler_withFactoryConfiguration_shouldUseCustomValues()
assertThat(messageHandler.getCustomReturnValueHandlers().get(0)).isEqualTo(
ConfigurationWithCustomizedMessageHandler.CUSTOM_RETURN_VALUE_HANDLER);

Object sqsMessageDeletionPolicy = ReflectionTestUtils.getField(messageHandler,
"sqsMessageDeletionPolicy");
assertThat(sqsMessageDeletionPolicy)
.isEqualTo(SqsMessageDeletionPolicy.NO_REDRIVE);

Object sendToMessageTemplate = ReflectionTestUtils.getField(
messageHandler.getReturnValueHandlers().get(1), "messageTemplate");
assertThat(ReflectionTestUtils.getField(sendToMessageTemplate, "amazonSqs"))
Expand All @@ -134,6 +141,44 @@ void messageHandler_withFactoryConfiguration_shouldUseCustomValues()
ConfigurationWithCustomizedMessageHandler.CUSTOM_RESOURCE_ID_RESOLVER);
}

@Test
void messageHandler_withFactoryConfiguration_shouldUseGlobalDeletionPolicy()
throws Exception {
// Arrange & Act
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(
ConfigurationWithCustomizedMessageHandlerGlobalDeletionPolicy.class);
QueueMessageHandler messageHandler = applicationContext
.getBean(QueueMessageHandler.class);

// Assert
assertThat(messageHandler.getCustomArgumentResolvers().size()).isEqualTo(1);
assertThat(messageHandler.getCustomArgumentResolvers().get(0)).isEqualTo(
ConfigurationWithCustomizedMessageHandlerGlobalDeletionPolicy.CUSTOM_ARGUMENT_RESOLVER);

assertThat(messageHandler.getCustomReturnValueHandlers().size()).isEqualTo(2);
assertThat(messageHandler.getCustomReturnValueHandlers().get(0)).isEqualTo(
ConfigurationWithCustomizedMessageHandlerGlobalDeletionPolicy.CUSTOM_RETURN_VALUE_HANDLER);

Object sqsMessageDeletionPolicy = ReflectionTestUtils.getField(messageHandler,
"sqsMessageDeletionPolicy");
assertThat(sqsMessageDeletionPolicy)
.isEqualTo(SqsMessageDeletionPolicy.ON_SUCCESS);

Object sendToMessageTemplate = ReflectionTestUtils.getField(
messageHandler.getReturnValueHandlers().get(1), "messageTemplate");
assertThat(ReflectionTestUtils.getField(sendToMessageTemplate, "amazonSqs"))
.isEqualTo(
ConfigurationWithCustomizedMessageHandlerGlobalDeletionPolicy.CUSTOM_AMAZON_SQS);

Object destinationResolver = ReflectionTestUtils.getField(sendToMessageTemplate,
"destinationResolver");
Object targetDestinationResolver = ReflectionTestUtils
.getField(destinationResolver, "targetDestinationResolver");
assertThat(ReflectionTestUtils.getField(targetDestinationResolver,
"resourceIdResolver")).isEqualTo(
ConfigurationWithCustomizedMessageHandlerGlobalDeletionPolicy.CUSTOM_RESOURCE_ID_RESOLVER);
}

@Test
void configuration_withCustomConfigurationFactory_shouldBeUsedToCreateTheContainer()
throws Exception {
Expand Down Expand Up @@ -315,6 +360,39 @@ QueueMessageHandlerFactory queueMessageHandlerFactory() {

}

@EnableSqs
@Configuration(proxyBeanMethods = false)
static class ConfigurationWithCustomizedMessageHandlerGlobalDeletionPolicy
extends MinimalConfiguration {

static final HandlerMethodReturnValueHandler CUSTOM_RETURN_VALUE_HANDLER = mock(
HandlerMethodReturnValueHandler.class);

static final HandlerMethodArgumentResolver CUSTOM_ARGUMENT_RESOLVER = mock(
HandlerMethodArgumentResolver.class);

static final AmazonSQSAsync CUSTOM_AMAZON_SQS = mock(AmazonSQSAsync.class,
withSettings().stubOnly());

static final ResourceIdResolver CUSTOM_RESOURCE_ID_RESOLVER = mock(
ResourceIdResolver.class);

@Bean
QueueMessageHandlerFactory queueMessageHandlerFactory() {
QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
factory.setArgumentResolvers(
Collections.singletonList(CUSTOM_ARGUMENT_RESOLVER));
factory.setReturnValueHandlers(
Collections.singletonList(CUSTOM_RETURN_VALUE_HANDLER));
factory.setSqsMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS);
factory.setAmazonSqs(CUSTOM_AMAZON_SQS);
factory.setResourceIdResolver(CUSTOM_RESOURCE_ID_RESOLVER);

return factory;
}

}

@EnableSqs
@Configuration(proxyBeanMethods = false)
static class ConfigurationWithCustomContainerFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1030,7 +1030,8 @@ static class QueueMessageHandlerWithJacksonMappingConfiguration {
@Bean
QueueMessageHandler queueMessageHandler() {
return new QueueMessageHandler(
Arrays.asList(mappingJackson2MessageConverter()));
Arrays.asList(mappingJackson2MessageConverter()),
SqsMessageDeletionPolicy.NO_REDRIVE);
}

@Bean
Expand Down

0 comments on commit d13b5e0

Please sign in to comment.