Skip to content

Commit

Permalink
Merge branch '2.2.x' into 2.3.x
Browse files Browse the repository at this point in the history
  • Loading branch information
maciejwalkowiak committed Jun 19, 2020
2 parents e2143c7 + 96444a2 commit b0a6e42
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.aws.messaging.listener;

import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.DeleteMessageResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Callback executed on SQS message deletion.
*
* @author Mete Alpaslan Katircioglu
*/
class DeleteMessageHandler
implements AsyncHandler<DeleteMessageRequest, DeleteMessageResult> {

private static final Logger logger = LoggerFactory
.getLogger(DeleteMessageHandler.class);

private final String receiptHandle;

DeleteMessageHandler(String receiptHandle) {
this.receiptHandle = receiptHandle;
}

@Override
public void onError(Exception exception) {
logger.warn("An exception occurred while deleting '{}' receiptHandle",
receiptHandle, exception);
}

@Override
public void onSuccess(DeleteMessageRequest request,
DeleteMessageResult deleteMessageResult) {
logger.trace("'{}' receiptHandle is deleted successfully",
request.getReceiptHandle());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

/**
* @author Alain Sahli
* @author Mete Alpaslan Katircioglu
* @since 1.1
*/
public class QueueMessageAcknowledgment implements Acknowledgment {
Expand All @@ -43,7 +44,8 @@ public QueueMessageAcknowledgment(AmazonSQSAsync amazonSqsAsync, String queueUrl
@Override
public Future<?> acknowledge() {
return this.amazonSqsAsync.deleteMessageAsync(
new DeleteMessageRequest(this.queueUrl, this.receiptHandle));
new DeleteMessageRequest(this.queueUrl, this.receiptHandle),
new DeleteMessageHandler(this.receiptHandle));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
/**
* @author Agim Emruli
* @author Alain Sahli
* @author Mete Alpaslan Katircioglu
* @since 1.0
*/
public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer {
Expand Down Expand Up @@ -440,7 +441,8 @@ private void applyDeletionPolicyOnError(String receiptHandle) {

private void deleteMessage(String receiptHandle) {
getAmazonSqs().deleteMessageAsync(
new DeleteMessageRequest(this.queueUrl, receiptHandle));
new DeleteMessageRequest(this.queueUrl, receiptHandle),
new DeleteMessageHandler(receiptHandle));
}

private org.springframework.messaging.Message<String> getMessageForExecution() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import com.amazonaws.AmazonClientException;
import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.buffered.AmazonSQSBufferedAsyncClient;
import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest;
Expand Down Expand Up @@ -603,7 +604,7 @@ protected void executeMessage(
container.stop();
verify(sqs, times(1)).deleteMessageAsync(eq(new DeleteMessageRequest(
"https://executeMessage_successfulExecution_shouldRemoveMessageFromQueue.amazonaws.com",
"ReceiptHandle")));
"ReceiptHandle")), any(AsyncHandler.class));
}

@Test
Expand Down Expand Up @@ -666,7 +667,7 @@ protected void executeMessage(
verify(sqs, times(1)).deleteMessageAsync(eq(new DeleteMessageRequest(
"https://executeMessage_executionThrowsExceptionAnd"
+ "QueueHasAllDeletionPolicy_shouldRemoveMessageFromQueue.amazonaws.com",
"ReceiptHandle")));
"ReceiptHandle")), any(AsyncHandler.class));
}

@Test
Expand Down Expand Up @@ -729,7 +730,7 @@ protected void executeMessage(
verify(sqs, never()).deleteMessageAsync(eq(new DeleteMessageRequest(
"https://executeMessage_executionThrowsExceptionAnd"
+ "QueueHasRedrivePolicy_shouldNotRemoveMessageFromQueue.amazonaws.com",
"ReceiptHandle")));
"ReceiptHandle")), any(AsyncHandler.class));
}

@Test
Expand Down Expand Up @@ -844,7 +845,7 @@ protected void executeMessage(
verify(sqs, never()).deleteMessageAsync(eq(new DeleteMessageRequest(
"https://receiveMessage_withMessageListenerMethodAnd"
+ "NeverDeletionPolicy_waitsForAcknowledgmentBeforeDeletion.amazonaws.com",
"ReceiptHandle")));
"ReceiptHandle")), any(AsyncHandler.class));
TestMessageListenerWithManualDeletionPolicy testMessageListenerWithManualDeletionPolicy = applicationContext
.getBean(TestMessageListenerWithManualDeletionPolicy.class);
testMessageListenerWithManualDeletionPolicy.getCountDownLatch().await(1L,
Expand All @@ -853,7 +854,7 @@ protected void executeMessage(
verify(sqs, times(1)).deleteMessageAsync(eq(new DeleteMessageRequest(
"https://receiveMessage_withMessageListenerMethodAnd"
+ "NeverDeletionPolicy_waitsForAcknowledgmentBeforeDeletion.amazonaws.com",
"ReceiptHandle")));
"ReceiptHandle")), any(AsyncHandler.class));
container.stop();
}

Expand Down Expand Up @@ -985,22 +986,38 @@ void executeMessage_withDifferentDeletionPolicies_shouldActAccordingly()
.getBean(TestMessageListenerWithAllPossibleDeletionPolicies.class);
assertThat(bean.getCountdownLatch().await(1L, TimeUnit.SECONDS)).isTrue();
container.stop();
verify(sqs, times(1)).deleteMessageAsync(eq(new DeleteMessageRequest(
"https://alwaysSuccess.amazonaws.com", "alwaysSuccess")));
verify(sqs, times(1)).deleteMessageAsync(eq(new DeleteMessageRequest(
"https://alwaysError.amazonaws.com", "alwaysError")));
verify(sqs, times(1)).deleteMessageAsync(eq(new DeleteMessageRequest(
"https://onSuccessSuccess.amazonaws.com", "onSuccessSuccess")));
verify(sqs, never()).deleteMessageAsync(eq(new DeleteMessageRequest(
"https://onSuccessError.amazonaws.com", "onSuccessError")));
verify(sqs, times(1)).deleteMessageAsync(eq(new DeleteMessageRequest(
"https://noRedriveSuccess.amazonaws.com", "noRedriveSuccess")));
verify(sqs, never()).deleteMessageAsync(eq(new DeleteMessageRequest(
"https://noRedriveError.amazonaws.com", "noRedriveError")));
verify(sqs, never()).deleteMessageAsync(eq(new DeleteMessageRequest(
"https://neverSuccess.amazonaws.com", "neverSuccess")));
verify(sqs, never()).deleteMessageAsync(eq(new DeleteMessageRequest(
"https://neverError.amazonaws.com", "neverError")));
verify(sqs, times(1)).deleteMessageAsync(
eq(new DeleteMessageRequest("https://alwaysSuccess.amazonaws.com",
"alwaysSuccess")),
any(AsyncHandler.class));
verify(sqs, times(1)).deleteMessageAsync(
eq(new DeleteMessageRequest("https://alwaysError.amazonaws.com",
"alwaysError")),
any(AsyncHandler.class));
verify(sqs, times(1)).deleteMessageAsync(
eq(new DeleteMessageRequest("https://onSuccessSuccess.amazonaws.com",
"onSuccessSuccess")),
any(AsyncHandler.class));
verify(sqs, never()).deleteMessageAsync(
eq(new DeleteMessageRequest("https://onSuccessError.amazonaws.com",
"onSuccessError")),
any(AsyncHandler.class));
verify(sqs, times(1)).deleteMessageAsync(
eq(new DeleteMessageRequest("https://noRedriveSuccess.amazonaws.com",
"noRedriveSuccess")),
any(AsyncHandler.class));
verify(sqs, never()).deleteMessageAsync(
eq(new DeleteMessageRequest("https://noRedriveError.amazonaws.com",
"noRedriveError")),
any(AsyncHandler.class));
verify(sqs, never()).deleteMessageAsync(
eq(new DeleteMessageRequest("https://neverSuccess.amazonaws.com",
"neverSuccess")),
any(AsyncHandler.class));
verify(sqs, never()).deleteMessageAsync(
eq(new DeleteMessageRequest("https://neverError.amazonaws.com",
"neverError")),
any(AsyncHandler.class));

setLogLevel(previous);
}
Expand Down

0 comments on commit b0a6e42

Please sign in to comment.