Skip to content

Commit

Permalink
Add subscription-specific settings for retryableCodes (GoogleCloudPla…
Browse files Browse the repository at this point in the history
…tform#706)

Support subscription-specific settings for retryable codes.

We recently introduced support for overriding retryableCodes in pull settings (GoogleCloudPlatform#670). However, this property can only be set globally. This PR enables subscription-specific configuration.The retryableCodes configuration can be set in the following manner:
Global
`spring.cloud.gcp.pubsub.subscriber.retryableCodes=INTERNAL, ABORTED`
Subscription-specific
`spring.cloud.gcp.pubsub.subscription.mySubscription.retryableCodes=INTERNAL, ABORTED`
  • Loading branch information
kateryna216 authored and GitHub committed Nov 18, 2021
1 parent 79d3ea1 commit d8723ef
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,6 @@ public SubscriberFactory defaultSubscriberFactory(
}
factory.setRetrySettingsMap(this.subscriberRetrySettingsMap);
factory.setGlobalRetrySettings(this.globalRetrySettings);
if (this.gcpPubSubProperties.getSubscriber().getRetryableCodes() != null) {
factory.setRetryableCodes(gcpPubSubProperties.getSubscriber().getRetryableCodes());
}

healthTrackerRegistry.ifAvailable(factory::setHealthTrackerRegistry);

return factory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public void retryableCodes_default() {
DefaultSubscriberFactory defaultSubscriberFactory = ctx.getBean("defaultSubscriberFactory", DefaultSubscriberFactory.class);
assertThat(FieldUtils.readField(defaultSubscriberFactory, "retryableCodes", true))
.isNull();
assertThat(defaultSubscriberFactory.getRetryableCodes("subscription-name")).isNull();
});
}

Expand All @@ -134,9 +135,15 @@ public void retryableCodes_empty() {
.withPropertyValues("spring.cloud.gcp.pubsub.subscriber.retryableCodes=");

contextRunner.run(ctx -> {
GcpPubSubProperties properties = ctx.getBean(GcpPubSubProperties.class);
GcpProjectIdProvider projectIdProvider = ctx.getBean(GcpProjectIdProvider.class);
DefaultSubscriberFactory defaultSubscriberFactory = ctx.getBean("defaultSubscriberFactory",
DefaultSubscriberFactory.class);

DefaultSubscriberFactory defaultSubscriberFactory = ctx.getBean("defaultSubscriberFactory", DefaultSubscriberFactory.class);
assertThat(FieldUtils.readField(defaultSubscriberFactory, "retryableCodes", true))
assertThat(properties.getSubscriber().getRetryableCodes()).isEqualTo(new Code[] {});
assertThat(properties.computeRetryableCodes("subscription-name", projectIdProvider.getProjectId()))
.isEqualTo(new Code[] {});
assertThat(defaultSubscriberFactory.getRetryableCodes("subscription-name"))
.isEqualTo(new Code[] {});
});
}
Expand All @@ -149,9 +156,16 @@ public void retryableCodes_INTERNAL() {
.withPropertyValues("spring.cloud.gcp.pubsub.subscriber.retryableCodes=INTERNAL");

contextRunner.run(ctx -> {
GcpPubSubProperties properties = ctx.getBean(GcpPubSubProperties.class);
GcpProjectIdProvider projectIdProvider = ctx.getBean(GcpProjectIdProvider.class);
DefaultSubscriberFactory defaultSubscriberFactory = ctx.getBean("defaultSubscriberFactory",
DefaultSubscriberFactory.class);

DefaultSubscriberFactory defaultSubscriberFactory = ctx.getBean("defaultSubscriberFactory", DefaultSubscriberFactory.class);
assertThat(FieldUtils.readField(defaultSubscriberFactory, "retryableCodes", true))
assertThat(properties.getSubscriber().getRetryableCodes())
.isEqualTo(new Code[] { Code.INTERNAL });
assertThat(properties.computeRetryableCodes("subscription-name", projectIdProvider.getProjectId()))
.isEqualTo(new Code[] { Code.INTERNAL });
assertThat(defaultSubscriberFactory.getRetryableCodes("subscription-name"))
.isEqualTo(new Code[] { Code.INTERNAL });
});
}
Expand All @@ -161,13 +175,77 @@ public void retryableCodes_many() {
ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(GcpPubSubAutoConfiguration.class))
.withUserConfiguration(TestConfig.class)
.withPropertyValues("spring.cloud.gcp.pubsub.subscriber.retryableCodes=UNKNOWN,ABORTED,UNAVAILABLE,INTERNAL");
.withPropertyValues(
"spring.cloud.gcp.pubsub.subscriber.retryableCodes=UNKNOWN,ABORTED,UNAVAILABLE,INTERNAL");

contextRunner.run(ctx -> {
GcpPubSubProperties properties = ctx.getBean(GcpPubSubProperties.class);
GcpProjectIdProvider projectIdProvider = ctx.getBean(GcpProjectIdProvider.class);
DefaultSubscriberFactory defaultSubscriberFactory = ctx.getBean("defaultSubscriberFactory",
DefaultSubscriberFactory.class);
Code[] expectedRetryableCodes = new Code[] { Code.UNKNOWN, Code.ABORTED, Code.UNAVAILABLE, Code.INTERNAL };

assertThat(properties.getSubscriber().getRetryableCodes())
.isEqualTo(expectedRetryableCodes);
assertThat(properties.computeRetryableCodes("subscription-name", projectIdProvider.getProjectId()))
.isEqualTo(expectedRetryableCodes);
assertThat(defaultSubscriberFactory.getRetryableCodes("subscription-name"))
.isEqualTo(expectedRetryableCodes);
});
}

DefaultSubscriberFactory defaultSubscriberFactory = ctx.getBean("defaultSubscriberFactory", DefaultSubscriberFactory.class);
assertThat(FieldUtils.readField(defaultSubscriberFactory, "retryableCodes", true))
@Test
public void retryableCodes_selectiveConfigurationSet() {
ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(GcpPubSubAutoConfiguration.class))
.withUserConfiguration(TestConfig.class)
.withPropertyValues(
"spring.cloud.gcp.pubsub.subscription.subscription-name.retryableCodes=UNKNOWN,ABORTED,UNAVAILABLE,INTERNAL");

contextRunner.run(ctx -> {

DefaultSubscriberFactory defaultSubscriberFactory = ctx.getBean("defaultSubscriberFactory",
DefaultSubscriberFactory.class);
assertThat(defaultSubscriberFactory.getRetryableCodes("subscription-name"))
.isEqualTo(new Code[] { Code.UNKNOWN, Code.ABORTED, Code.UNAVAILABLE, Code.INTERNAL });
assertThat(defaultSubscriberFactory.getRetryableCodes("other")).isNull();
});
}

@Test
public void retryableCodes_globalAndSelectiveConfigurationSet_selectiveTakesPrecedence() {
ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(GcpPubSubAutoConfiguration.class))
.withUserConfiguration(TestConfig.class)
.withPropertyValues(
"spring.cloud.gcp.pubsub.subscriber.retryableCodes=INTERNAL",
"spring.cloud.gcp.pubsub.subscription.subscription-name.retryableCodes=UNKNOWN,ABORTED,UNAVAILABLE");

contextRunner.run(ctx -> {

DefaultSubscriberFactory defaultSubscriberFactory = ctx.getBean("defaultSubscriberFactory",
DefaultSubscriberFactory.class);
assertThat(defaultSubscriberFactory.getRetryableCodes("subscription-name"))
.isEqualTo(new Code[] { Code.UNKNOWN, Code.ABORTED, Code.UNAVAILABLE });
assertThat(defaultSubscriberFactory.getRetryableCodes("other")).isEqualTo(new Code[] { Code.INTERNAL });
});
}

@Test
public void retryableCodes_globalAndDifferentSelectiveConfigurationSet_pickGlobal() {
ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(GcpPubSubAutoConfiguration.class))
.withUserConfiguration(TestConfig.class)
.withPropertyValues(
"spring.cloud.gcp.pubsub.subscriber.retryableCodes=INTERNAL",
"spring.cloud.gcp.pubsub.subscription.subscription-name.pull-counts=2");

contextRunner.run(ctx -> {

DefaultSubscriberFactory defaultSubscriberFactory = ctx.getBean("defaultSubscriberFactory",
DefaultSubscriberFactory.class);
assertThat(defaultSubscriberFactory.getRetryableCodes("subscription-name"))
.isEqualTo(new Code[] { Code.INTERNAL });
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void testPull() {
.isNotNull();
assertThat((ExecutorProvider) context.getBean("globalSubscriberExecutorProvider"))
.isNotNull();
assertThat(gcpPubSubProperties.getSubscriber().getRetryableCodes())
assertThat(gcpPubSubProperties.computeRetryableCodes("test-sub-1", projectIdProvider.getProjectId()))
.isEqualTo(new Code[] { Code.INTERNAL });
assertThat((RetrySettings) context.getBean("subscriberRetrySettings-test-sub-1"))
.isEqualTo(expectedRetrySettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,21 @@ public Integer computeParallelPullCount(String subscriptionName, String projectI
return parallelPullCount != null ? parallelPullCount : this.globalSubscriber.getParallelPullCount();
}


/**
* Retrieves collection of retryable codes from configuration. The subscription-specific
* property takes precedence if both global and subscription-specific properties are set.
* If subscription-specific configuration is not set then the global configuration is
* picked.
* @param subscriptionName subscription name
* @param projectId project id
* @return retryable codes
*/
public Code[] computeRetryableCodes(String subscriptionName, String projectId) {
Code[] retryableCodes = getSubscriber(subscriptionName, projectId).getRetryableCodes();
return retryableCodes != null ? retryableCodes : this.globalSubscriber.getRetryableCodes();
}

/**
* Computes the max extension period. The subscription-specific property takes precedence
* if both global and subscription-specific properties are set. If none are set then the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,10 @@ SubscriberStubSettings buildGlobalSubscriberStubSettings() throws IOException {
subscriberStubSettings.pullSettings().setRetrySettings(retrySettings);
}

if (this.retryableCodes != null) {
subscriberStubSettings.pullSettings().setRetryableCodes(
this.retryableCodes);
Code[] codes = this.retryableCodes != null ? this.retryableCodes
: this.pubSubConfiguration.getSubscriber().getRetryableCodes();
if (codes != null) {
subscriberStubSettings.pullSettings().setRetryableCodes(codes);
}

return subscriberStubSettings.build();
Expand Down Expand Up @@ -392,9 +393,9 @@ SubscriberStubSettings buildSubscriberStubSettings(String subscriptionName) thro
subscriberStubSettings.pullSettings().setRetrySettings(retrySettings);
}

if (this.retryableCodes != null) {
subscriberStubSettings.pullSettings().setRetryableCodes(
this.retryableCodes);
Code[] codes = getRetryableCodes(subscriptionName);
if (codes != null) {
subscriberStubSettings.pullSettings().setRetryableCodes(codes);
}

return subscriberStubSettings.build();
Expand All @@ -419,12 +420,6 @@ SubscriberStubSettings.Builder buildStubSettingsWithoutConfigurations() {
if (this.apiClock != null) {
subscriberStubSettings.setClock(this.apiClock);
}

if (this.retryableCodes != null) {
subscriberStubSettings.pullSettings().setRetryableCodes(
this.retryableCodes);
}

return subscriberStubSettings;
}

Expand Down Expand Up @@ -511,6 +506,13 @@ String getPullEndpoint(String subscriptionName) {
return this.pubSubConfiguration.computePullEndpoint(subscriptionName, projectId);
}

public Code[] getRetryableCodes(String subscriptionName) {
if (this.retryableCodes != null) {
return this.retryableCodes;
}
return this.pubSubConfiguration.computeRetryableCodes(subscriptionName, projectId);
}

public void setExecutorProviderMap(
ConcurrentMap<String, ExecutorProvider> executorProviderMap) {
this.executorProviderMap = executorProviderMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,29 @@ public void testComputeSubscriberRetrySettings_returnGlobal() {
assertThat(result.getMaxRpcTimeoutSeconds()).isEqualTo(9L);
}

@Test
public void testComputeRetryableCodes_returnsGlobal() {
PubSubConfiguration pubSubConfiguration = new PubSubConfiguration();
PubSubConfiguration.Subscriber globalSubscriber = pubSubConfiguration.getSubscriber();

globalSubscriber.setRetryableCodes(new Code[] { Code.INTERNAL });

assertThat(pubSubConfiguration.computeRetryableCodes("subscription-name", "projectId"))
.containsExactly(Code.INTERNAL);
}

@Test
public void testComputeRetryableCodes_returnCustom() {
PubSubConfiguration pubSubConfiguration = new PubSubConfiguration();
PubSubConfiguration.Subscriber subscriber = new PubSubConfiguration.Subscriber();
subscriber.setRetryableCodes(new Code[] { Code.INTERNAL });
pubSubConfiguration.getSubscription().put("projects/projectId/subscriptions/subscription-name",
subscriber);

assertThat(pubSubConfiguration.computeRetryableCodes("subscription-name", "projectId"))
.containsExactly(Code.INTERNAL);
}

@Test
public void testSubscriberMapProperties_defaultOrGlobal_addToMap() {
PubSubConfiguration pubSubConfiguration = new PubSubConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ public void testBuildGlobalSubscriberStubSettings_pullEndpoint_pickGlobalConfigu
}

@Test
public void testBuildSubscriberStubSettings_retryableCodes() throws IllegalAccessException, IOException {
public void testBuildSubscriberStubSettings_retryableCodes_pickUserProvidedValue() throws IllegalAccessException, IOException {
GcpProjectIdProvider projectIdProvider = () -> "project";
DefaultSubscriberFactory factory = new DefaultSubscriberFactory(projectIdProvider, new PubSubConfiguration());
factory.setRetryableCodes(new Code[] { Code.INTERNAL });
Expand All @@ -536,7 +536,22 @@ public void testBuildSubscriberStubSettings_retryableCodes() throws IllegalAcces
}

@Test
public void testBuildGlobalSubscriberStubSettings_retryableCodes() throws IOException, IllegalAccessException {
public void testBuildSubscriberStubSettings_retryableCodes_pickConfiguration() throws IllegalAccessException, IOException {
GcpProjectIdProvider projectIdProvider = () -> "project";
DefaultSubscriberFactory factory = new DefaultSubscriberFactory(projectIdProvider, mockPubSubConfiguration);
when(mockPubSubConfiguration.computeRetryableCodes("someSubscription", projectIdProvider.getProjectId()))
.thenReturn(new Code[] { Code.INTERNAL });

assertThat(FieldUtils.readField(factory, "retryableCodes", true)).isNull();

SubscriberStubSettings settings = factory.buildSubscriberStubSettings("someSubscription");
assertThat(settings.pullSettings().getRetryableCodes())
.containsExactly(Code.INTERNAL);
}

@Test
public void testBuildGlobalSubscriberStubSettings_retryableCodes_userProvidedValue()
throws IOException, IllegalAccessException {
GcpProjectIdProvider projectIdProvider = () -> "project";
DefaultSubscriberFactory factory = new DefaultSubscriberFactory(projectIdProvider, new PubSubConfiguration());
factory.setRetryableCodes(new Code[] { Code.INTERNAL });
Expand All @@ -549,6 +564,18 @@ public void testBuildGlobalSubscriberStubSettings_retryableCodes() throws IOExce
.containsExactly(Code.INTERNAL);
}

@Test
public void testBuildGlobalSubscriberStubSettings_retryableCodes_pickConfiguration() throws IOException {
GcpProjectIdProvider projectIdProvider = () -> "project";
DefaultSubscriberFactory factory = new DefaultSubscriberFactory(projectIdProvider, mockPubSubConfiguration);
when(mockPubSubConfiguration.getSubscriber()).thenReturn(mockSubscriber);
when(mockSubscriber.getRetryableCodes()).thenReturn(new Code[] { Code.INTERNAL });

SubscriberStubSettings settings = factory.buildGlobalSubscriberStubSettings();
assertThat(settings.pullSettings().getRetryableCodes())
.containsExactly(Code.INTERNAL);
}

@Test
public void createSubscriberStubSucceeds_noSubscriptionNameAndNewConfiguration() {
GcpProjectIdProvider projectIdProvider = () -> "project";
Expand Down

0 comments on commit d8723ef

Please sign in to comment.