diff --git a/sdk/spring/CHANGELOG.md b/sdk/spring/CHANGELOG.md index 8d92e8006605a..3d57794e81280 100644 --- a/sdk/spring/CHANGELOG.md +++ b/sdk/spring/CHANGELOG.md @@ -54,6 +54,11 @@ This section includes changes in `spring-cloud-azure-starter-active-directory-b2 #### Dependency Updates - Upgrade spring-security to 5.6.4 to address [CVE-2022-22978](https://spring.io/blog/2022/05/15/cve-2022-22978-authorization-bypass-in-regexrequestmatcher) [#29304](https://github.com/Azure/azure-sdk-for-java/pull/29304). +### Spring Integration Azure Storage Queue +This section includes changes in `spring-integration-azure-storage-queue` module. + +#### Features Added +- Add configurable visibility timeout to `StorageQueueMessageSource` to allow configuring visibility timeout of message source at startup [#29567](https://github.com/Azure/azure-sdk-for-java/pull/29567). ## 4.2.0 (2022-05-26) diff --git a/sdk/spring/spring-integration-azure-storage-queue/src/main/java/com/azure/spring/integration/storage/queue/inbound/StorageQueueMessageSource.java b/sdk/spring/spring-integration-azure-storage-queue/src/main/java/com/azure/spring/integration/storage/queue/inbound/StorageQueueMessageSource.java index 1dd9001e36875..aaa8ac72a90f7 100644 --- a/sdk/spring/spring-integration-azure-storage-queue/src/main/java/com/azure/spring/integration/storage/queue/inbound/StorageQueueMessageSource.java +++ b/sdk/spring/spring-integration-azure-storage-queue/src/main/java/com/azure/spring/integration/storage/queue/inbound/StorageQueueMessageSource.java @@ -8,6 +8,8 @@ import org.springframework.messaging.Message; import org.springframework.util.Assert; +import java.time.Duration; + /** * Inbound Message Source to receive messages from Azure Storage Queue. * @@ -16,22 +18,40 @@ public class StorageQueueMessageSource extends AbstractMessageSource> private final StorageQueueTemplate storageQueueTemplate; private final String destination; + private final Duration visibilityTimeout; /** * Construct a {@link StorageQueueMessageSource} with the specified destination and {@link StorageQueueTemplate}. + * Default visibility timeout of 30 seconds would apply. * * @param destination the destination * @param storageQueueTemplate the storage queue operation */ public StorageQueueMessageSource(String destination, StorageQueueTemplate storageQueueTemplate) { + this(destination, storageQueueTemplate, null); + } + + /** + * Construct a {@link StorageQueueMessageSource} with the specified destination, {@link StorageQueueTemplate} + * and visibility timeout. + * + * @param destination the destination + * @param storageQueueTemplate the storage queue operation + * @param visibilityTimeout The timeout period for how long the message is invisible in the queue. + * If left empty the dequeued messages will be invisible for 30 seconds. + * The timeout must be between 1 second and 7 days + */ + public StorageQueueMessageSource(String destination, StorageQueueTemplate storageQueueTemplate, + Duration visibilityTimeout) { Assert.hasText(destination, "destination can't be null or empty"); this.storageQueueTemplate = storageQueueTemplate; this.destination = destination; + this.visibilityTimeout = visibilityTimeout; } @Override public Object doReceive() { - return storageQueueTemplate.receiveAsync(destination, null).block(); + return storageQueueTemplate.receiveAsync(destination, visibilityTimeout).block(); } @Override diff --git a/sdk/spring/spring-integration-azure-storage-queue/src/test/java/com/azure/spring/integration/storage/queue/inbound/StorageQueueMessageSourceTests.java b/sdk/spring/spring-integration-azure-storage-queue/src/test/java/com/azure/spring/integration/storage/queue/inbound/StorageQueueMessageSourceTests.java index 854a5b0c71736..2dc68d54bf30b 100644 --- a/sdk/spring/spring-integration-azure-storage-queue/src/test/java/com/azure/spring/integration/storage/queue/inbound/StorageQueueMessageSourceTests.java +++ b/sdk/spring/spring-integration-azure-storage-queue/src/test/java/com/azure/spring/integration/storage/queue/inbound/StorageQueueMessageSourceTests.java @@ -15,13 +15,14 @@ import org.springframework.messaging.support.GenericMessage; import reactor.core.publisher.Mono; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.when; @TestInstance(TestInstance.Lifecycle.PER_CLASS) @@ -32,6 +33,7 @@ public class StorageQueueMessageSourceTests { private Message message; private String destination = "test-destination"; + private Duration visibilityTimeout = Duration.ofMinutes(1); private StorageQueueMessageSource messageSource; private AutoCloseable closeable; @@ -49,7 +51,7 @@ public void init() { @BeforeEach public void setup() { - messageSource = new StorageQueueMessageSource(destination, mockTemplate); + messageSource = new StorageQueueMessageSource(destination, mockTemplate, visibilityTimeout); } @AfterAll @@ -59,14 +61,23 @@ public void close() throws Exception { @Test public void testDoReceiveWhenHaveNoMessage() { - when(this.mockTemplate.receiveAsync(eq(destination), any())).thenReturn(Mono.empty()); + when(this.mockTemplate.receiveAsync(eq(destination), eq(visibilityTimeout))).thenReturn(Mono.empty()); assertNull(messageSource.doReceive()); } @Test public void testDoReceiveSuccess() { - when(this.mockTemplate.receiveAsync(eq(destination), any())).thenReturn(Mono.just(message)); + when(this.mockTemplate.receiveAsync(eq(destination), eq(visibilityTimeout))).thenReturn(Mono.just(message)); Message receivedMessage = (Message) messageSource.doReceive(); assertEquals(message, receivedMessage); } + + @Test + public void testDoReceiveSuccessWithDefaultVisibilityTimeout() { + final StorageQueueMessageSource messageSourceWithDefaultTimeout = + new StorageQueueMessageSource(destination, mockTemplate); + when(this.mockTemplate.receiveAsync(eq(destination), isNull())).thenReturn(Mono.just(message)); + Message receivedMessage = (Message) messageSourceWithDefaultTimeout.doReceive(); + assertEquals(message, receivedMessage); + } }