Skip to content

Commit

Permalink
Register subscriber retry settings as beans (GoogleCloudPlatform#704)
Browse files Browse the repository at this point in the history
This PR registers global and subscription-specific retry settings as beans in GcpPubSubAutoConfiguration. However, note that if a custom bean is provided then the global and subscription-specific beans won't be created.
  • Loading branch information
kateryna216 authored and GitHub committed Nov 18, 2021
1 parent aa17412 commit 79d3ea1
Show file tree
Hide file tree
Showing 5 changed files with 289 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ public class GcpPubSubAutoConfiguration {

private final ConcurrentHashMap<String, FlowControlSettings> subscriberFlowControlSettingsMap = new ConcurrentHashMap<>();

private final ConcurrentHashMap<String, RetrySettings> subscriberRetrySettingsMap = new ConcurrentHashMap<>();

private final ConcurrentHashMap<String, ExecutorProvider> executorProviderMap = new ConcurrentHashMap<>();

private final ApplicationContext applicationContext;
Expand All @@ -117,6 +119,8 @@ public class GcpPubSubAutoConfiguration {

private FlowControlSettings globalFlowControlSettings;

private RetrySettings globalRetrySettings;

private ExecutorProvider globalExecutorProvider;

public GcpPubSubAutoConfiguration(GcpPubSubProperties gcpPubSubProperties,
Expand Down Expand Up @@ -247,6 +251,8 @@ public SubscriberFactory defaultSubscriberFactory(
"The subscriberRetrySettings bean is being deprecated. Please use application.properties to configure properties");
factory.setSubscriberStubRetrySettings(retrySettings.getIfAvailable());
}
factory.setRetrySettingsMap(this.subscriberRetrySettingsMap);
factory.setGlobalRetrySettings(this.globalRetrySettings);
if (this.gcpPubSubProperties.getSubscriber().getRetryableCodes() != null) {
factory.setRetryableCodes(gcpPubSubProperties.getSubscriber().getRetryableCodes());
}
Expand Down Expand Up @@ -401,6 +407,7 @@ public void registerSubscriberSettings() {
registerSubscriberThreadPoolSchedulerBeans(context);
registerExecutorProviderBeans(context);
registerSubscriberFlowControlSettingsBeans(context);
registerSubscriberRetrySettingsBeans(context);
}

private void registerSubscriberThreadPoolSchedulerBeans(GenericApplicationContext context) {
Expand Down Expand Up @@ -436,6 +443,21 @@ private void registerExecutorProviderBeans(GenericApplicationContext context) {
createAndRegisterSelectiveExecutorProvider(context);
}

private void registerSubscriberRetrySettingsBeans(GenericApplicationContext context) {
if (context.containsBean("subscriberRetrySettings")) {
return;
}
this.globalRetrySettings = buildRetrySettings(
this.gcpPubSubProperties.getSubscriber().getRetry());
if (this.globalRetrySettings != null) {
context.registerBeanDefinition("globalSubscriberRetrySettings",
BeanDefinitionBuilder
.genericBeanDefinition(RetrySettings.class, () -> this.globalRetrySettings)
.getBeanDefinition());
}
createAndRegisterSelectiveRetrySettings(context);
}

/**
* Creates and registers {@link ThreadPoolTaskScheduler} for subscription-specific
* configurations.
Expand Down Expand Up @@ -533,6 +555,25 @@ private ExecutorProvider createAndRegisterExecutorProvider(String beanName, Thre
return executor;
}

private void createAndRegisterSelectiveRetrySettings(GenericApplicationContext context) {
Map<String, PubSubConfiguration.Subscriber> subscriberMap = this.gcpPubSubProperties.getSubscription();
for (Map.Entry<String, PubSubConfiguration.Subscriber> subscription : subscriberMap.entrySet()) {
ProjectSubscriptionName fullyQualifiedName = getFullSubscriptionName(subscription.getKey());
String subscriptionName = fullyQualifiedName.getSubscription();
PubSubConfiguration.Retry retry = this.gcpPubSubProperties.computeSubscriberRetrySettings(
subscriptionName,
this.finalProjectIdProvider.getProjectId());
RetrySettings retrySettings = buildRetrySettings(retry);
if (retrySettings != null && !retrySettings.equals(this.globalRetrySettings)) {
this.subscriberRetrySettingsMap.putIfAbsent(fullyQualifiedName.toString(), retrySettings);
String beanName = "subscriberRetrySettings-" + subscriptionName;
context.registerBeanDefinition(beanName,
BeanDefinitionBuilder.genericBeanDefinition(RetrySettings.class, () -> retrySettings)
.getBeanDefinition());
}
}
}

private Integer getGlobalExecutorThreads() {
Integer numThreads = this.gcpPubSubProperties.getSubscriber().getExecutorThreads();
return numThreads != null ? numThreads : PubSubConfiguration.DEFAULT_EXECUTOR_THREADS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.cloud.spring.pubsub.support.DefaultSubscriberFactory;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.junit.Test;
import org.threeten.bp.Duration;

import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
Expand Down Expand Up @@ -187,7 +188,6 @@ public void customExecutorProviderUsedWhenProvided() {
});
}


@Test
public void threadPoolScheduler_noConfigurationSet_globalCreated() {
ApplicationContextRunner contextRunner = new ApplicationContextRunner()
Expand Down Expand Up @@ -481,12 +481,10 @@ public void pullConfig_globalAndDifferentSelectiveConfigurationSet_pickGlobal()
ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(GcpPubSubAutoConfiguration.class))
.withPropertyValues(
"spring.cloud.gcp.projectId=fake project",
"spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period=5",
"spring.cloud.gcp.pubsub.subscriber.parallel-pull-count=10",
"spring.cloud.gcp.pubsub.subscriber.pull-endpoint=other-endpoint",
"spring.cloud.gcp.pubsub.subscription.subscription-name.executor-threads=4"
)
"spring.cloud.gcp.pubsub.subscription.subscription-name.executor-threads=4")
.withUserConfiguration(TestConfig.class);

contextRunner.run(ctx -> {
Expand Down Expand Up @@ -553,6 +551,21 @@ public void retrySettings_globalConfigurationSet() {
assertThat(retrySettings.getInitialRpcTimeoutSeconds()).isEqualTo(6);
assertThat(retrySettings.getRpcTimeoutMultiplier()).isEqualTo(7);
assertThat(retrySettings.getMaxRpcTimeoutSeconds()).isEqualTo(8);

DefaultSubscriberFactory subscriberFactory = ctx
.getBean("defaultSubscriberFactory", DefaultSubscriberFactory.class);
RetrySettings expectedRetrySettings = RetrySettings.newBuilder().setTotalTimeout(Duration.ofSeconds(1L))
.setInitialRetryDelay(Duration.ofSeconds(2L))
.setRetryDelayMultiplier(3)
.setMaxRetryDelay(Duration.ofSeconds(4L))
.setMaxAttempts(5)
.setInitialRpcTimeout(Duration.ofSeconds(6L))
.setRpcTimeoutMultiplier(7)
.setMaxRpcTimeout(Duration.ofSeconds(8))
.build();
assertThat(subscriberFactory.getRetrySettings("name")).isEqualTo(expectedRetrySettings);
assertThat(ctx.getBean("globalSubscriberRetrySettings", RetrySettings.class))
.isEqualTo(expectedRetrySettings);
});
}

Expand All @@ -578,6 +591,7 @@ public void retrySettings_selectiveConfigurationSet() {
GcpProjectIdProvider projectIdProvider = ctx.getBean(GcpProjectIdProvider.class);
PubSubConfiguration.Retry retrySettings = gcpPubSubProperties
.computeSubscriberRetrySettings("subscription-name", projectIdProvider.getProjectId());

assertThat(retrySettings.getTotalTimeoutSeconds()).isEqualTo(1L);
assertThat(retrySettings.getInitialRetryDelaySeconds()).isEqualTo(2L);
assertThat(retrySettings.getRetryDelayMultiplier()).isEqualTo(3);
Expand All @@ -587,11 +601,25 @@ public void retrySettings_selectiveConfigurationSet() {
assertThat(retrySettings.getInitialRpcTimeoutSeconds()).isEqualTo(6);
assertThat(retrySettings.getRpcTimeoutMultiplier()).isEqualTo(7);
assertThat(retrySettings.getMaxRpcTimeoutSeconds()).isEqualTo(8);

assertThat(gcpPubSubProperties.getSubscription())
.hasSize(1);
assertThat(gcpPubSubProperties.getSubscription())
.containsKey("projects/fake project/subscriptions/subscription-name");

DefaultSubscriberFactory subscriberFactory = ctx
.getBean("defaultSubscriberFactory", DefaultSubscriberFactory.class);
RetrySettings expectedRetrySettings = RetrySettings.newBuilder().setTotalTimeout(Duration.ofSeconds(1L))
.setInitialRetryDelay(Duration.ofSeconds(2L))
.setRetryDelayMultiplier(3)
.setMaxRetryDelay(Duration.ofSeconds(4L))
.setMaxAttempts(5)
.setInitialRpcTimeout(Duration.ofSeconds(6L))
.setRpcTimeoutMultiplier(7)
.setMaxRpcTimeout(Duration.ofSeconds(8L))
.build();
assertThat(subscriberFactory.getRetrySettings("subscription-name")).isEqualTo(expectedRetrySettings);
assertThat(ctx.getBean("subscriberRetrySettings-subscription-name", RetrySettings.class))
.isEqualTo(expectedRetrySettings);
});
}

Expand Down Expand Up @@ -638,7 +666,7 @@ public void retrySettings_globalAndSelectiveConfigurationSet_selectiveTakesPrece
assertThat(retrySettings.getRpcTimeoutMultiplier()).isEqualTo(7);
assertThat(retrySettings.getMaxRpcTimeoutSeconds()).isEqualTo(8);

// Validate settings for subscribers that do not have subscription-specific retry settings
// Validate settings for subscribers that do **not** have subscription-specific retry settings
// property set
PubSubConfiguration.Retry retrySettingsForOtherSubscriber = gcpPubSubProperties
.getSubscriber("other", projectIdProvider.getProjectId())
Expand All @@ -651,13 +679,44 @@ public void retrySettings_globalAndSelectiveConfigurationSet_selectiveTakesPrece
assertThat(retrySettingsForOtherSubscriber.getInitialRpcTimeoutSeconds()).isEqualTo(10);
assertThat(retrySettingsForOtherSubscriber.getRpcTimeoutMultiplier()).isEqualTo(10);
assertThat(retrySettingsForOtherSubscriber.getMaxRpcTimeoutSeconds()).isEqualTo(10);

assertThat(gcpPubSubProperties.getSubscription())
.hasSize(2);
assertThat(gcpPubSubProperties.getSubscription())
.containsKey("projects/fake project/subscriptions/subscription-name");
assertThat(gcpPubSubProperties.getSubscription())
.containsKey("projects/fake project/subscriptions/other");

// Verify that beans for selective and global retry settings are created. Also
// verify that selective retry setting takes precedence.
DefaultSubscriberFactory subscriberFactory = ctx
.getBean("defaultSubscriberFactory", DefaultSubscriberFactory.class);
RetrySettings expectedRetrySettingsForSubscriptionName = RetrySettings.newBuilder()
.setTotalTimeout(Duration.ofSeconds(1L))
.setInitialRetryDelay(Duration.ofSeconds(2L))
.setRetryDelayMultiplier(3)
.setMaxRetryDelay(Duration.ofSeconds(4L))
.setMaxAttempts(5)
.setInitialRpcTimeout(Duration.ofSeconds(6L))
.setRpcTimeoutMultiplier(7)
.setMaxRpcTimeout(Duration.ofSeconds(8))
.build();
RetrySettings expectedRetrySettingsForOther = RetrySettings.newBuilder()
.setTotalTimeout(Duration.ofSeconds(10L))
.setInitialRetryDelay(Duration.ofSeconds(10L))
.setRetryDelayMultiplier(10)
.setMaxRetryDelay(Duration.ofSeconds(10L))
.setMaxAttempts(10)
.setInitialRpcTimeout(Duration.ofSeconds(10L))
.setRpcTimeoutMultiplier(10)
.setMaxRpcTimeout(Duration.ofSeconds(10))
.build();
assertThat(subscriberFactory.getRetrySettings("subscription-name"))
.isEqualTo(expectedRetrySettingsForSubscriptionName);
assertThat(ctx.getBean("subscriberRetrySettings-subscription-name", RetrySettings.class))
.isEqualTo(expectedRetrySettingsForSubscriptionName);
assertThat(subscriberFactory.getRetrySettings("other")).isEqualTo(expectedRetrySettingsForOther);
assertThat(ctx.getBean("globalSubscriberRetrySettings", RetrySettings.class))
.isEqualTo(expectedRetrySettingsForOther);
});
}

Expand All @@ -666,7 +725,6 @@ public void retrySettings_globalAndDifferentSelectiveConfigurationSet_pickGlobal
ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(GcpPubSubAutoConfiguration.class))
.withPropertyValues(
"spring.cloud.gcp.projectId=fake project",
"spring.cloud.gcp.pubsub.subscriber.retry.total-timeout-seconds=10",
"spring.cloud.gcp.pubsub.subscriber.retry.initial-retry-delay-seconds=10",
"spring.cloud.gcp.pubsub.subscriber.retry.retry-delay-multiplier=10",
Expand All @@ -683,16 +741,88 @@ public void retrySettings_globalAndDifferentSelectiveConfigurationSet_pickGlobal
.getBean(GcpPubSubProperties.class);
GcpProjectIdProvider projectIdProvider = ctx.getBean(GcpProjectIdProvider.class);

PubSubConfiguration.Retry retrySettingsForOtherSubscriber = gcpPubSubProperties
PubSubConfiguration.Retry retrySettings = gcpPubSubProperties
.computeSubscriberRetrySettings("subscription-name", projectIdProvider.getProjectId());
assertThat(retrySettingsForOtherSubscriber.getTotalTimeoutSeconds()).isEqualTo(10L);
assertThat(retrySettingsForOtherSubscriber.getInitialRetryDelaySeconds()).isEqualTo(10L);
assertThat(retrySettingsForOtherSubscriber.getRetryDelayMultiplier()).isEqualTo(10);
assertThat(retrySettingsForOtherSubscriber.getMaxRetryDelaySeconds()).isEqualTo(10);
assertThat(retrySettingsForOtherSubscriber.getMaxAttempts()).isEqualTo(10);
assertThat(retrySettingsForOtherSubscriber.getInitialRpcTimeoutSeconds()).isEqualTo(10);
assertThat(retrySettingsForOtherSubscriber.getRpcTimeoutMultiplier()).isEqualTo(10);
assertThat(retrySettingsForOtherSubscriber.getMaxRpcTimeoutSeconds()).isEqualTo(10);

assertThat(retrySettings.getTotalTimeoutSeconds()).isEqualTo(10L);
assertThat(retrySettings.getInitialRetryDelaySeconds()).isEqualTo(10L);
assertThat(retrySettings.getRetryDelayMultiplier()).isEqualTo(10);
assertThat(retrySettings.getMaxRetryDelaySeconds()).isEqualTo(10);
assertThat(retrySettings.getMaxAttempts()).isEqualTo(10);
assertThat(retrySettings.getInitialRpcTimeoutSeconds()).isEqualTo(10);
assertThat(retrySettings.getRpcTimeoutMultiplier()).isEqualTo(10);
assertThat(retrySettings.getMaxRpcTimeoutSeconds()).isEqualTo(10);

// Verify that bean for global retry settings is created.
RetrySettings expectedRetrySettings = RetrySettings.newBuilder().setTotalTimeout(Duration.ofSeconds(10L))
.setInitialRetryDelay(Duration.ofSeconds(10L))
.setRetryDelayMultiplier(10)
.setMaxRetryDelay(Duration.ofSeconds(10L))
.setMaxAttempts(10)
.setInitialRpcTimeout(Duration.ofSeconds(10L))
.setRpcTimeoutMultiplier(10)
.setMaxRpcTimeout(Duration.ofSeconds(10))
.build();
DefaultSubscriberFactory subscriberFactory = ctx
.getBean("defaultSubscriberFactory", DefaultSubscriberFactory.class);
assertThat(subscriberFactory.getRetrySettings("subscription-name"))
.isEqualTo(expectedRetrySettings);
assertThat(ctx.getBean("globalSubscriberRetrySettings", RetrySettings.class))
.isEqualTo(expectedRetrySettings);
});
}

@Test
public void retrySettings_subsetOfProperties_pickGlobalWhenSelectiveNotSpecified() {
ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(GcpPubSubAutoConfiguration.class))
.withPropertyValues(
"spring.cloud.gcp.pubsub.subscriber.retry.total-timeout-seconds=10",
"spring.cloud.gcp.pubsub.subscription.subscription-name.retry.initial-retry-delay-seconds=2",
"spring.cloud.gcp.pubsub.subscription.subscription-name.retry.retry-delay-multiplier=3",
"spring.cloud.gcp.pubsub.subscription.subscription-name.retry.max-retry-delay-seconds=4",
"spring.cloud.gcp.pubsub.subscription.subscription-name.retry.max-attempts=5",
"spring.cloud.gcp.pubsub.subscription.subscription-name.retry.initial-rpc-timeout-seconds=6",
"spring.cloud.gcp.pubsub.subscription.subscription-name.retry.rpc-timeout-multiplier=7",
"spring.cloud.gcp.pubsub.subscription.subscription-name.retry.max-rpc-timeout-seconds=8")
.withUserConfiguration(TestConfig.class);

contextRunner.run(ctx -> {
GcpPubSubProperties gcpPubSubProperties = ctx
.getBean(GcpPubSubProperties.class);
GcpProjectIdProvider projectIdProvider = ctx.getBean(GcpProjectIdProvider.class);
PubSubConfiguration.Retry retry = gcpPubSubProperties
.computeSubscriberRetrySettings("subscription-name", projectIdProvider.getProjectId());
assertThat(retry.getTotalTimeoutSeconds()).isEqualTo(10L);
assertThat(retry.getInitialRetryDelaySeconds()).isEqualTo(2L);
assertThat(retry.getRetryDelayMultiplier()).isEqualTo(3);
assertThat(retry.getMaxRetryDelaySeconds()).isEqualTo(4L);
assertThat(retry.getMaxAttempts()).isEqualTo(5);
assertThat(retry.getInitialRpcTimeoutSeconds()).isEqualTo(6L);
assertThat(retry.getRpcTimeoutMultiplier()).isEqualTo(7);
assertThat(retry.getMaxRpcTimeoutSeconds()).isEqualTo(8L);

// Verify that beans for selective and global retry settings are created.
DefaultSubscriberFactory subscriberFactory = ctx
.getBean("defaultSubscriberFactory", DefaultSubscriberFactory.class);
RetrySettings expectedRetrySettingsForSubscriptionName = RetrySettings.newBuilder()
.setTotalTimeout(Duration.ofSeconds(10L))
.setInitialRetryDelay(Duration.ofSeconds(2L))
.setRetryDelayMultiplier(3)
.setMaxRetryDelay(Duration.ofSeconds(4L))
.setMaxAttempts(5)
.setInitialRpcTimeout(Duration.ofSeconds(6L))
.setRpcTimeoutMultiplier(7)
.setMaxRpcTimeout(Duration.ofSeconds(8L))
.build();
RetrySettings expectedGlobalRetrySettings = RetrySettings.newBuilder()
.setTotalTimeout(Duration.ofSeconds(10L)).build();
assertThat(subscriberFactory.getRetrySettings("subscription-name"))
.isEqualTo(expectedRetrySettingsForSubscriptionName);
assertThat(ctx.getBean("subscriberRetrySettings-subscription-name", RetrySettings.class))
.isEqualTo(expectedRetrySettingsForSubscriptionName);
assertThat(ctx.getBean("globalSubscriberRetrySettings", RetrySettings.class))
.isEqualTo(expectedGlobalRetrySettings);
});
}

Expand Down Expand Up @@ -775,7 +905,8 @@ public void flowControlSettings_selectiveConfigurationSet() {
.setMaxOutstandingElementCount(11L)
.setMaxOutstandingRequestBytes(12L)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Ignore).build();
assertThat(subscriberFactory.getFlowControlSettings("subscription-name")).isEqualTo(expectedFlowControlForSubscriptionName);
assertThat(subscriberFactory.getFlowControlSettings("subscription-name"))
.isEqualTo(expectedFlowControlForSubscriptionName);
assertThat(ctx.getBean("subscriberFlowControlSettings-subscription-name", FlowControlSettings.class))
.isEqualTo(expectedFlowControlForSubscriptionName);
});
Expand Down Expand Up @@ -884,7 +1015,7 @@ public void flowControlSettings_globalAndDifferentSelectiveConfigurationSet_pick
}

@Test
public void flowControlSettings_subProperties_pickGlobalWhenSelectiveNotSpecified() {
public void flowControlSettings_subsetOfProperties_pickGlobalWhenSelectiveNotSpecified() {
ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(GcpPubSubAutoConfiguration.class))
.withPropertyValues(
Expand Down
Loading

0 comments on commit 79d3ea1

Please sign in to comment.