Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forward porting changes from 2.0.4.patch branch #677

Merged
merged 6 commits into from
Oct 28, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ https://spring.io/projects/spring-cloud-gcp[Spring Cloud GCP] is a set of integr
This document provides a high-level overview of the changes introduced in Spring Cloud GCP by release.
For a detailed view of what has changed, refer to the https://github.com/GoogleCloudPlatform/spring-cloud-gcp/commits/main[commit history] on GitHub.

== 2.0.5.BUILD-SNAPSHOT
== 2.0.5 (2021-10-25)

== 2.0.4
=== Pub/Sub

* Fixed: Allow overriding Pub/Sub retryableCodes in pull settings (https://github.com/GoogleCloudPlatform/spring-cloud-gcp/pull/670[#670]).

== 2.0.4 (2021-08-11)

=== General

Expand Down
1 change: 1 addition & 0 deletions docs/src/main/asciidoc/pubsub.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ They do *not* control message redelivery; only message acknowledgement deadline
|===
| Name | Description | Required | Default value
| `spring.cloud.gcp.pubsub.keepAliveIntervalMinutes` | Determines frequency of keepalive gRPC ping | No | `5 minutes`
| `spring.cloud.gcp.pubsub.subscriber.retryableCodes` | RPC status codes that should be retried when pulling messages. | No | UNKNOWN,ABORTED,UNAVAILABLE
| `spring.cloud.gcp.pubsub.[subscriber,publisher].retry.total-timeout-seconds`|
TotalTimeout has ultimate control over how long the logic should keep trying the remote call until it gives up completely.
The higher the total timeout, the more retries can be attempted. | No | 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ public SubscriberFactory defaultSubscriberFactory(
"The subscriberRetrySettings bean is being deprecated. Please use application.properties to configure properties");
factory.setSubscriberStubRetrySettings(retrySettings.getIfAvailable());
}
if (this.gcpPubSubProperties.getSubscriber().getRetryableCodes() != null) {
factory.setRetryableCodes(gcpPubSubProperties.getSubscriber().getRetryableCodes());
}
return factory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auth.Credentials;
import com.google.cloud.spring.core.GcpProjectIdProvider;
Expand Down Expand Up @@ -109,6 +110,65 @@ public void maxInboundMessageSize_default() {
});
}

@Test
public void retryableCodes_default() {
ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(GcpPubSubAutoConfiguration.class))
.withUserConfiguration(TestConfig.class);

contextRunner.run(ctx -> {

DefaultSubscriberFactory defaultSubscriberFactory = ctx.getBean("defaultSubscriberFactory", DefaultSubscriberFactory.class);
assertThat(FieldUtils.readField(defaultSubscriberFactory, "retryableCodes", true))
.isNull();
});
}

@Test
public void retryableCodes_empty() {
ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(GcpPubSubAutoConfiguration.class))
.withUserConfiguration(TestConfig.class)
.withPropertyValues("spring.cloud.gcp.pubsub.subscriber.retryableCodes=");

contextRunner.run(ctx -> {

DefaultSubscriberFactory defaultSubscriberFactory = ctx.getBean("defaultSubscriberFactory", DefaultSubscriberFactory.class);
assertThat(FieldUtils.readField(defaultSubscriberFactory, "retryableCodes", true))
.isEqualTo(new Code[] {});
});
}

@Test
public void retryableCodes_INTERNAL() {
ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(GcpPubSubAutoConfiguration.class))
.withUserConfiguration(TestConfig.class)
.withPropertyValues("spring.cloud.gcp.pubsub.subscriber.retryableCodes=INTERNAL");

contextRunner.run(ctx -> {

DefaultSubscriberFactory defaultSubscriberFactory = ctx.getBean("defaultSubscriberFactory", DefaultSubscriberFactory.class);
assertThat(FieldUtils.readField(defaultSubscriberFactory, "retryableCodes", true))
.isEqualTo(new Code[] { Code.INTERNAL });
});
}

@Test
public void retryableCodes_many() {
ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(GcpPubSubAutoConfiguration.class))
.withUserConfiguration(TestConfig.class)
.withPropertyValues("spring.cloud.gcp.pubsub.subscriber.retryableCodes=UNKNOWN,ABORTED,UNAVAILABLE,INTERNAL");

contextRunner.run(ctx -> {

DefaultSubscriberFactory defaultSubscriberFactory = ctx.getBean("defaultSubscriberFactory", DefaultSubscriberFactory.class);
assertThat(FieldUtils.readField(defaultSubscriberFactory, "retryableCodes", true))
.isEqualTo(new Code[] { Code.UNKNOWN, Code.ABORTED, Code.UNAVAILABLE, Code.INTERNAL });
});
}

@Test
public void customExecutorProviderUsedWhenProvided() {
ExecutorProvider executorProvider = mock(ExecutorProvider.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.ConcurrentMap;

import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.spring.pubsub.support.PubSubSubscriptionUtils;
import com.google.pubsub.v1.ProjectSubscriptionName;

Expand Down Expand Up @@ -302,6 +303,10 @@ public static class Subscriber {
* Flow control settings for subscriber factory.
*/
private final FlowControl flowControl = new FlowControl();
/**
* RPC status codes that should be retried when pulling messages.
*/
private Code[] retryableCodes = null;

public boolean isGlobal() {
return global;
Expand All @@ -311,6 +316,14 @@ public Retry getRetry() {
return this.retry;
}

public Code[] getRetryableCodes() {
return retryableCodes;
}

public void setRetryableCodes(Code[] retryableCodes) {
this.retryableCodes = retryableCodes;
}

public FlowControl getFlowControl() {
return this.flowControl;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
Expand Down Expand Up @@ -88,6 +89,8 @@ public class DefaultSubscriberFactory implements SubscriberFactory {

private ExecutorProvider defaultExecutorProvider;

private Code[] retryableCodes;

/**
* Default {@link DefaultSubscriberFactory} constructor.
* @param projectIdProvider provides the default GCP project ID for selecting the
Expand Down Expand Up @@ -209,6 +212,14 @@ public void setSubscriberStubRetrySettings(RetrySettings subscriberStubRetrySett
this.subscriberStubRetrySettings = subscriberStubRetrySettings;
}

/**
* Set the retryable codes for subscriber pull settings.
* @param retryableCodes pull RPC response codes that should be retried.
*/
public void setRetryableCodes(Code[] retryableCodes) {
this.retryableCodes = retryableCodes;
}

@Override
public Subscriber createSubscriber(String subscriptionName, MessageReceiver receiver) {
Subscriber.Builder subscriberBuilder = Subscriber.newBuilder(
Expand Down Expand Up @@ -314,6 +325,11 @@ public SubscriberStub createSubscriberStub(String subscriptionName) {
subscriberStubSettings.pullSettings().setRetrySettings(retrySettings);
}

if (this.retryableCodes != null) {
subscriberStubSettings.pullSettings().setRetryableCodes(
this.retryableCodes);
}

try {
return GrpcSubscriberStub.create(subscriberStubSettings.build());
}
Expand Down