Skip to content

Commit

Permalink
[ISSUE-29566] Add configuration for visibility timeout in StorageQueu…
Browse files Browse the repository at this point in the history
…eMessageSource (#29567)

* [ISSUE-29566] Add configuration for visibility timeout in StorageQueueMessageSource

* [ISSUE-29566] Adding documentation to sdk/spring/CHANGELOG

Co-authored-by: Soumabrata Chakraborty <[email protected]>
  • Loading branch information
soumabrata-chakraborty and Soumabrata Chakraborty authored Jun 24, 2022
1 parent 2fe45b3 commit 7996578
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 5 deletions.
5 changes: 5 additions & 0 deletions sdk/spring/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ This section includes changes in `spring-cloud-azure-starter-active-directory-b2
#### Dependency Updates
- Upgrade spring-security to 5.6.4 to address [CVE-2022-22978](https://spring.io/blog/2022/05/15/cve-2022-22978-authorization-bypass-in-regexrequestmatcher) [#29304](https://github.com/Azure/azure-sdk-for-java/pull/29304).

### Spring Integration Azure Storage Queue
This section includes changes in `spring-integration-azure-storage-queue` module.

#### Features Added
- Add configurable visibility timeout to `StorageQueueMessageSource` to allow configuring visibility timeout of message source at startup [#29567](https://github.com/Azure/azure-sdk-for-java/pull/29567).

## 4.2.0 (2022-05-26)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

import java.time.Duration;

/**
* Inbound Message Source to receive messages from Azure Storage Queue.
*
Expand All @@ -16,22 +18,40 @@ public class StorageQueueMessageSource extends AbstractMessageSource<Message<?>>

private final StorageQueueTemplate storageQueueTemplate;
private final String destination;
private final Duration visibilityTimeout;

/**
* Construct a {@link StorageQueueMessageSource} with the specified destination and {@link StorageQueueTemplate}.
* Default visibility timeout of 30 seconds would apply.
*
* @param destination the destination
* @param storageQueueTemplate the storage queue operation
*/
public StorageQueueMessageSource(String destination, StorageQueueTemplate storageQueueTemplate) {
this(destination, storageQueueTemplate, null);
}

/**
* Construct a {@link StorageQueueMessageSource} with the specified destination, {@link StorageQueueTemplate}
* and visibility timeout.
*
* @param destination the destination
* @param storageQueueTemplate the storage queue operation
* @param visibilityTimeout The timeout period for how long the message is invisible in the queue.
* If left empty the dequeued messages will be invisible for 30 seconds.
* The timeout must be between 1 second and 7 days
*/
public StorageQueueMessageSource(String destination, StorageQueueTemplate storageQueueTemplate,
Duration visibilityTimeout) {
Assert.hasText(destination, "destination can't be null or empty");
this.storageQueueTemplate = storageQueueTemplate;
this.destination = destination;
this.visibilityTimeout = visibilityTimeout;
}

@Override
public Object doReceive() {
return storageQueueTemplate.receiveAsync(destination, null).block();
return storageQueueTemplate.receiveAsync(destination, visibilityTimeout).block();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
import org.springframework.messaging.support.GenericMessage;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.when;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
Expand All @@ -32,6 +33,7 @@ public class StorageQueueMessageSourceTests {
private Message<?> message;

private String destination = "test-destination";
private Duration visibilityTimeout = Duration.ofMinutes(1);
private StorageQueueMessageSource messageSource;
private AutoCloseable closeable;

Expand All @@ -49,7 +51,7 @@ public void init() {

@BeforeEach
public void setup() {
messageSource = new StorageQueueMessageSource(destination, mockTemplate);
messageSource = new StorageQueueMessageSource(destination, mockTemplate, visibilityTimeout);
}

@AfterAll
Expand All @@ -59,14 +61,23 @@ public void close() throws Exception {

@Test
public void testDoReceiveWhenHaveNoMessage() {
when(this.mockTemplate.receiveAsync(eq(destination), any())).thenReturn(Mono.empty());
when(this.mockTemplate.receiveAsync(eq(destination), eq(visibilityTimeout))).thenReturn(Mono.empty());
assertNull(messageSource.doReceive());
}

@Test
public void testDoReceiveSuccess() {
when(this.mockTemplate.receiveAsync(eq(destination), any())).thenReturn(Mono.just(message));
when(this.mockTemplate.receiveAsync(eq(destination), eq(visibilityTimeout))).thenReturn(Mono.just(message));
Message<?> receivedMessage = (Message<?>) messageSource.doReceive();
assertEquals(message, receivedMessage);
}

@Test
public void testDoReceiveSuccessWithDefaultVisibilityTimeout() {
final StorageQueueMessageSource messageSourceWithDefaultTimeout =
new StorageQueueMessageSource(destination, mockTemplate);
when(this.mockTemplate.receiveAsync(eq(destination), isNull())).thenReturn(Mono.just(message));
Message<?> receivedMessage = (Message<?>) messageSourceWithDefaultTimeout.doReceive();
assertEquals(message, receivedMessage);
}
}

0 comments on commit 7996578

Please sign in to comment.