Skip to content

Commit

Permalink
Enable properties which supported in sdk (Azure#22676)
Browse files Browse the repository at this point in the history
* Support the parameters that are allowed to be configured in the SDK
  • Loading branch information
zhichengliu12581 authored Aug 4, 2021
1 parent 190462f commit 26879b6
Show file tree
Hide file tree
Showing 22 changed files with 579 additions and 48 deletions.
8 changes: 8 additions & 0 deletions sdk/spring/azure-spring-cloud-autoconfigure/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ This release is compatible with Spring Boot 2.5.0 - 2.5.3 and Spring Cloud 2020.
### Breaking Changes
- Override paritionkey when session id is set. ([#23135](https://github.com/Azure/azure-sdk-for-java/pull/23135))
- Adjust the order of different partition key header. ([#23135](https://github.com/Azure/azure-sdk-for-java/pull/23135))
- Deprecate the `spring.cloud.stream.servicebus.queue.bindings.<channelName>.consumer.concurrency` property,
use `maxConcurrentSessions` and `maxConcurrentCalls` to set the properties.
### New Features
- Support configuration of `retryOptions` for ServiceBusClientBuilder with property of `spring.cloud.azure.servicebus.retry-options`, these parameters can be modified: `maxRetries`, `delay`, `maxDelay`, `tryTimeout`, `Mode`.
- Support configuration of `maxConcurrentCalls` for ServiceBusClientConfig with property of `spring.cloud.stream.servicebus.<queue or topic>.bindings.<channelName>.consumer.maxConcurrentCalls`.
- Support configuration of `maxConcurrentSessions` for ServiceBusClientConfig with property of `spring.cloud.stream.servicebus.<queue or topic>.bindings.<channelName>.consumer.maxConcurrentSessions`.
- Support configuration of `serviceBusReceiveMode` for ServiceBusClientConfig with property of `spring.cloud.stream.servicebus.<queue or topic>.bindings.<channelName>.consumer.serviceBusReceiveMode`, supported values are `PEEK_LOCK` and `RECEIVE_AND_DELETE`.
- Support configuration of `enableAutoComplete` for ServiceBusClientConfig with property of `spring.cloud.stream.servicebus.queue.bindings.<channelName>.consumer.enableAutoComplete`.

## 2.7.0 (2021-07-20)
### Key Bug Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

package com.azure.spring.cloud.autoconfigure.servicebus;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import org.springframework.boot.context.properties.ConfigurationProperties;

/**
Expand All @@ -16,6 +18,8 @@ public class AzureServiceBusProperties {

private String connectionString;

private AmqpRetryOptions retryOptions = new AmqpRetryOptions().setTryTimeout(ServiceBusConstants.OPERATION_TIMEOUT);

private AmqpTransportType transportType = AmqpTransportType.AMQP;

public String getNamespace() {
Expand All @@ -41,4 +45,12 @@ public AmqpTransportType getTransportType() {
public void setTransportType(AmqpTransportType transportType) {
this.transportType = transportType;
}

public AmqpRetryOptions getRetryOptions() {
return retryOptions;
}

public void setRetryOptions(AmqpRetryOptions retryOptions) {
this.retryOptions = retryOptions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public ServiceBusQueueClientFactory queueClientFactory(
Assert.notNull(connectionString, "Service Bus connection string must not be null");

DefaultServiceBusQueueClientFactory clientFactory = new DefaultServiceBusQueueClientFactory(connectionString, properties.getTransportType());
clientFactory.setRetryOptions(properties.getRetryOptions());
clientFactory.setNamespace(properties.getNamespace());
clientFactory.setServiceBusNamespaceManager(namespaceManager);
clientFactory.setServiceBusQueueManager(queueManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public ServiceBusTopicClientFactory topicClientFactory(
Assert.notNull(connectionString, "Service Bus connection string must not be null");

DefaultServiceBusTopicClientFactory clientFactory = new DefaultServiceBusTopicClientFactory(connectionString, properties.getTransportType());
clientFactory.setRetryOptions(properties.getRetryOptions());
clientFactory.setNamespace(properties.getNamespace());
clientFactory.setServiceBusNamespaceManager(namespaceManager);
clientFactory.setServiceBusTopicManager(topicManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.spring.cloud.autoconfigure.servicebus;

import com.azure.core.amqp.AmqpRetryMode;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.resourcemanager.AzureResourceManager;
Expand Down Expand Up @@ -92,9 +93,25 @@ public void testConnectionStringProvided() {
@Test
public void testTransportTypeWithAmqpWebSockets() {
this.contextRunner.withPropertyValues(SERVICE_BUS_PROPERTY_PREFIX + "transport-type=AMQP_WEB_SOCKETS")
.run(context -> {
assertThat(context.getBean(AzureServiceBusProperties.class).getTransportType()).isEqualTo(AmqpTransportType.AMQP_WEB_SOCKETS);
});
.run(context -> {
assertThat(context.getBean(AzureServiceBusProperties.class).getTransportType()).isEqualTo(AmqpTransportType.AMQP_WEB_SOCKETS);
});
}

@Test
public void testTransportTypeWithRetryOptions() {
this.contextRunner.withPropertyValues(SERVICE_BUS_PROPERTY_PREFIX + "retry-options.maxRetries=5",
SERVICE_BUS_PROPERTY_PREFIX + "retry-options.delay=100S",
SERVICE_BUS_PROPERTY_PREFIX + "retry-options.maxDelay=200S",
SERVICE_BUS_PROPERTY_PREFIX + "retry-options.tryTimeout=300S",
SERVICE_BUS_PROPERTY_PREFIX + "retry-options.Mode=FIXED")
.run(context -> {
assertThat(context.getBean(AzureServiceBusProperties.class).getRetryOptions().getMaxRetries()).isEqualTo(5);
assertThat(context.getBean(AzureServiceBusProperties.class).getRetryOptions().getDelay().getSeconds()).isEqualTo(100L);
assertThat(context.getBean(AzureServiceBusProperties.class).getRetryOptions().getMaxDelay().getSeconds()).isEqualTo(200L);
assertThat(context.getBean(AzureServiceBusProperties.class).getRetryOptions().getTryTimeout().getSeconds()).isEqualTo(300L);
assertThat(context.getBean(AzureServiceBusProperties.class).getRetryOptions().getMode()).isEqualTo(AmqpRetryMode.FIXED);
});
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package com.azure.spring.cloud.autoconfigure.servicebus;

import com.azure.core.amqp.AmqpRetryMode;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
import com.azure.resourcemanager.AzureResourceManager;
import com.azure.spring.cloud.context.core.config.AzureProperties;
Expand Down Expand Up @@ -113,6 +115,7 @@ public void testConnectionStringProvided() {
.run(context -> {
assertThat(context.getBean(ServiceBusConnectionStringProvider.class)
.getConnectionString()).isEqualTo(NAMESPACE_CONNECTION_STRING);
assertThat(context.getBean(AzureServiceBusProperties.class).getTransportType()).isEqualTo(AmqpTransportType.AMQP);
assertThat(context).doesNotHaveBean(ServiceBusNamespaceManager.class);
assertThat(context).doesNotHaveBean(ServiceBusQueueManager.class);
assertThat(context).hasSingleBean(ServiceBusQueueClientFactory.class);
Expand All @@ -121,6 +124,32 @@ public void testConnectionStringProvided() {
});
}

@Test
public void testTransportTypeWithAmqpWebSockets() {
this.contextRunner.withPropertyValues(SERVICE_BUS_PROPERTY_PREFIX + "transport-type=AMQP_WEB_SOCKETS")
.withUserConfiguration(AzureServiceBusAutoConfiguration.class)
.run(context -> {
assertThat(context.getBean(AzureServiceBusProperties.class).getTransportType()).isEqualTo(AmqpTransportType.AMQP_WEB_SOCKETS);
});
}

@Test
public void testTransportTypeWithRetryOptions() {
this.contextRunner.withPropertyValues(SERVICE_BUS_PROPERTY_PREFIX + "retry-options.maxRetries=5",
SERVICE_BUS_PROPERTY_PREFIX + "retry-options.delay=100S",
SERVICE_BUS_PROPERTY_PREFIX + "retry-options.maxDelay=200S",
SERVICE_BUS_PROPERTY_PREFIX + "retry-options.tryTimeout=300S",
SERVICE_BUS_PROPERTY_PREFIX + "retry-options.Mode=FIXED")
.withUserConfiguration(AzureServiceBusAutoConfiguration.class)
.run(context -> {
assertThat(context.getBean(AzureServiceBusProperties.class).getRetryOptions().getMaxRetries()).isEqualTo(5);
assertThat(context.getBean(AzureServiceBusProperties.class).getRetryOptions().getDelay().getSeconds()).isEqualTo(100L);
assertThat(context.getBean(AzureServiceBusProperties.class).getRetryOptions().getMaxDelay().getSeconds()).isEqualTo(200L);
assertThat(context.getBean(AzureServiceBusProperties.class).getRetryOptions().getTryTimeout().getSeconds()).isEqualTo(300L);
assertThat(context.getBean(AzureServiceBusProperties.class).getRetryOptions().getMode()).isEqualTo(AmqpRetryMode.FIXED);
});
}

@Test
public void testResourceManagerProvided() {
this.contextRunner.withUserConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package com.azure.spring.cloud.autoconfigure.servicebus;

import com.azure.core.amqp.AmqpRetryMode;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
import com.azure.resourcemanager.AzureResourceManager;
import com.azure.spring.cloud.context.core.config.AzureProperties;
Expand Down Expand Up @@ -92,6 +94,7 @@ public void testConnectionStringProvided() {
.withUserConfiguration(AzureServiceBusAutoConfiguration.class)
.run(context -> {
assertThat(context.getBean(ServiceBusConnectionStringProvider.class).getConnectionString()).isEqualTo(NAMESPACE_CONNECTION_STRING);
assertThat(context.getBean(AzureServiceBusProperties.class).getTransportType()).isEqualTo(AmqpTransportType.AMQP);
assertThat(context).doesNotHaveBean(ServiceBusNamespaceManager.class);
assertThat(context).doesNotHaveBean(ServiceBusTopicManager.class);
assertThat(context).doesNotHaveBean(ServiceBusTopicSubscriptionManager.class);
Expand All @@ -101,6 +104,32 @@ public void testConnectionStringProvided() {
});
}

@Test
public void testTransportTypeWithAmqpWebSockets() {
this.contextRunner.withPropertyValues(SERVICE_BUS_PROPERTY_PREFIX + "transport-type=AMQP_WEB_SOCKETS")
.withUserConfiguration(AzureServiceBusAutoConfiguration.class)
.run(context -> {
assertThat(context.getBean(AzureServiceBusProperties.class).getTransportType()).isEqualTo(AmqpTransportType.AMQP_WEB_SOCKETS);
});
}

@Test
public void testTransportTypeWithRetryOptions() {
this.contextRunner.withPropertyValues(SERVICE_BUS_PROPERTY_PREFIX + "retry-options.maxRetries=5",
SERVICE_BUS_PROPERTY_PREFIX + "retry-options.delay=100S",
SERVICE_BUS_PROPERTY_PREFIX + "retry-options.maxDelay=200S",
SERVICE_BUS_PROPERTY_PREFIX + "retry-options.tryTimeout=300S",
SERVICE_BUS_PROPERTY_PREFIX + "retry-options.Mode=FIXED")
.withUserConfiguration(AzureServiceBusAutoConfiguration.class)
.run(context -> {
assertThat(context.getBean(AzureServiceBusProperties.class).getRetryOptions().getMaxRetries()).isEqualTo(5);
assertThat(context.getBean(AzureServiceBusProperties.class).getRetryOptions().getDelay().getSeconds()).isEqualTo(100L);
assertThat(context.getBean(AzureServiceBusProperties.class).getRetryOptions().getMaxDelay().getSeconds()).isEqualTo(200L);
assertThat(context.getBean(AzureServiceBusProperties.class).getRetryOptions().getTryTimeout().getSeconds()).isEqualTo(300L);
assertThat(context.getBean(AzureServiceBusProperties.class).getRetryOptions().getMode()).isEqualTo(AmqpRetryMode.FIXED);
});
}

@Test
public void testResourceManagerProvided() {
this.contextRunner.withUserConfiguration(TestConfigWithAzureResourceManager.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ public class Constants {

public static final String SPRING_EVENT_HUB_APPLICATION_ID =
String.join("-", AZURE, SPRING_CLOUD, EVENT_HUB) + "/" + SPRING_CLOUD_VERSION;
public static final String SPRING_SERVICE_BUS_APPLICATION_ID =
String.join("-", AZURE, SPRING_CLOUD, SERVICE_BUS) + "/" + SPRING_CLOUD_VERSION;
}
8 changes: 8 additions & 0 deletions sdk/spring/azure-spring-cloud-starter-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ This release is compatible with Spring Boot 2.5.0 - 2.5.3 and Spring Cloud 2020.
### Breaking Changes
- Override paritionkey when session id is set. ([#23135](https://github.com/Azure/azure-sdk-for-java/pull/23135))
- Adjust the order of different partition key header. ([#23135](https://github.com/Azure/azure-sdk-for-java/pull/23135))
- Deprecate the `spring.cloud.stream.servicebus.queue.bindings.<channelName>.consumer.concurrency` property,
use `maxConcurrentSessions` and `maxConcurrentCalls` to set the properties.
### New Features
- Support configuration of `retryOptions` for ServiceBusClientBuilder with property of `spring.cloud.azure.servicebus.retry-options`, these parameters can be modified: `maxRetries`, `delay`, `maxDelay`, `tryTimeout`, `Mode`.
- Support configuration of `maxConcurrentCalls` for ServiceBusClientConfig with property of `spring.cloud.stream.servicebus.topic.bindings.<channelName>.consumer.maxConcurrentCalls`.
- Support configuration of `maxConcurrentSessions` for ServiceBusClientConfig with property of `spring.cloud.stream.servicebus.topic.bindings.<channelName>.consumer.maxConcurrentSessions`.
- Support configuration of `serviceBusReceiveMode` for ServiceBusClientConfig with property of `spring.cloud.stream.servicebus.queue.bindings.<channelName>.consumer.serviceBusReceiveMode`, supported values are `PEEK_LOCK` and `RECEIVE_AND_DELETE`.
- Support configuration of `enableAutoComplete` for ServiceBusClientConfig with property of `spring.cloud.stream.servicebus.queue.bindings.<channelName>.consumer.enableAutoComplete`.

## 2.7.0 (2021-07-20)
### Key Bug Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@
This release is compatible with Spring Boot 2.5.0 - 2.5.3 and Spring Cloud 2020.0.3.
### Dependency Upgrades
- Upgrade to [spring-boot-dependencies:2.5.3](https://repo.maven.apache.org/maven2/org/springframework/boot/spring-boot-dependencies/2.5.3/spring-boot-dependencies-2.5.3.pom).

### New Features
- Support configuration of `retryOptions` for ServiceBusClientBuilder with property of `spring.cloud.azure.servicebus.retry-options`, these parameters can be modified: `maxRetries`, `delay`, `maxDelay`, `tryTimeout`, `Mode`.
- Support configuration of `maxConcurrentCalls` for ServiceBusClientConfig with property of `spring.cloud.stream.servicebus.topic.bindings.<channelName>.consumer.maxConcurrentCalls`.
- Support configuration of `maxConcurrentSessions` for ServiceBusClientConfig with property of `spring.cloud.stream.servicebus.topic.bindings.<channelName>.consumer.maxConcurrentSessions`.
- Support configuration of `serviceBusReceiveMode` for ServiceBusClientConfig with property of `spring.cloud.stream.servicebus.queue.bindings.<channelName>.consumer.serviceBusReceiveMode`, supported values are `PEEK_LOCK` and `RECEIVE_AND_DELETE`.
- Support configuration of `enableAutoComplete` for ServiceBusClientConfig with property of `spring.cloud.stream.servicebus.queue.bindings.<channelName>.consumer.enableAutoComplete`.
### Breaking Changes
- Deprecate the `spring.cloud.stream.servicebus.queue.bindings.<channelName>.consumer.concurrency` property,
use `maxConcurrentSessions` and `maxConcurrentCalls` to set the properties.

## 2.7.0 (2021-07-20)
### Key Bug Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import java.util.Optional;

/**
* @author Warren Zhu
* @author Eduardo Sciullo
Expand Down Expand Up @@ -112,6 +114,16 @@ protected ServiceBusClientConfig buildClientConfig(
.setPrefetchCount(consumerProperties.getPrefetchCount())
.setConcurrency(consumerProperties.getConcurrency())
.setSessionsEnabled(consumerProperties.isSessionsEnabled())
// When session disabled, if user don't set maxConcurrentCalls, we should use concurrency
.setMaxConcurrentCalls(Optional.ofNullable(consumerProperties.getMaxConcurrentCalls())
.orElse(consumerProperties.isSessionsEnabled()
? 1 : consumerProperties.getConcurrency()))
// When session enabled, if user don't set maxConcurrentSessions, we should use concurrency
.setMaxConcurrentSessions(Optional.ofNullable(consumerProperties.getMaxConcurrentSessions())
.orElse(consumerProperties.isSessionsEnabled()
? consumerProperties.getConcurrency() : 1))
.setServiceBusReceiveMode(consumerProperties.getServiceBusReceiveMode())
.setEnableAutoComplete(consumerProperties.isEnableAutoComplete())
.build();
}

Expand Down
Loading

0 comments on commit 26879b6

Please sign in to comment.