Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forward porting changes from 2.0.4.patch branch #677

Merged
merged 6 commits into from
Oct 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ https://spring.io/projects/spring-cloud-gcp[Spring Cloud GCP] is a set of integr
This document provides a high-level overview of the changes introduced in Spring Cloud GCP by release.
For a detailed view of what has changed, refer to the https://github.com/GoogleCloudPlatform/spring-cloud-gcp/commits/main[commit history] on GitHub.

== 2.0.5.BUILD-SNAPSHOT
== 2.0.5 (2021-10-25)

== 2.0.4
=== Pub/Sub

* Fixed: Allow overriding Pub/Sub retryableCodes in pull settings (https://github.com/GoogleCloudPlatform/spring-cloud-gcp/pull/670[#670]).

== 2.0.4 (2021-08-11)

=== General

Expand Down
1 change: 1 addition & 0 deletions docs/src/main/asciidoc/pubsub.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ They do *not* control message redelivery; only message acknowledgement deadline
|===
| Name | Description | Required | Default value
| `spring.cloud.gcp.pubsub.keepAliveIntervalMinutes` | Determines frequency of keepalive gRPC ping | No | `5 minutes`
| `spring.cloud.gcp.pubsub.subscriber.retryableCodes` | RPC status codes that should be retried when pulling messages. | No | UNKNOWN,ABORTED,UNAVAILABLE
| `spring.cloud.gcp.pubsub.[subscriber,publisher].retry.total-timeout-seconds`|
TotalTimeout has ultimate control over how long the logic should keep trying the remote call until it gives up completely.
The higher the total timeout, the more retries can be attempted. | No | 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ public SubscriberFactory defaultSubscriberFactory(
"The subscriberRetrySettings bean is being deprecated. Please use application.properties to configure properties");
factory.setSubscriberStubRetrySettings(retrySettings.getIfAvailable());
}
if (this.gcpPubSubProperties.getSubscriber().getRetryableCodes() != null) {
factory.setRetryableCodes(gcpPubSubProperties.getSubscriber().getRetryableCodes());
}
return factory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auth.Credentials;
import com.google.cloud.spring.core.GcpProjectIdProvider;
Expand Down Expand Up @@ -109,6 +110,65 @@ public void maxInboundMessageSize_default() {
});
}

@Test
public void retryableCodes_default() {
ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(GcpPubSubAutoConfiguration.class))
.withUserConfiguration(TestConfig.class);

contextRunner.run(ctx -> {

DefaultSubscriberFactory defaultSubscriberFactory = ctx.getBean("defaultSubscriberFactory", DefaultSubscriberFactory.class);
assertThat(FieldUtils.readField(defaultSubscriberFactory, "retryableCodes", true))
.isNull();
});
}

@Test
public void retryableCodes_empty() {
ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(GcpPubSubAutoConfiguration.class))
.withUserConfiguration(TestConfig.class)
.withPropertyValues("spring.cloud.gcp.pubsub.subscriber.retryableCodes=");

contextRunner.run(ctx -> {

DefaultSubscriberFactory defaultSubscriberFactory = ctx.getBean("defaultSubscriberFactory", DefaultSubscriberFactory.class);
assertThat(FieldUtils.readField(defaultSubscriberFactory, "retryableCodes", true))
.isEqualTo(new Code[] {});
});
}

@Test
public void retryableCodes_INTERNAL() {
ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(GcpPubSubAutoConfiguration.class))
.withUserConfiguration(TestConfig.class)
.withPropertyValues("spring.cloud.gcp.pubsub.subscriber.retryableCodes=INTERNAL");

contextRunner.run(ctx -> {

DefaultSubscriberFactory defaultSubscriberFactory = ctx.getBean("defaultSubscriberFactory", DefaultSubscriberFactory.class);
assertThat(FieldUtils.readField(defaultSubscriberFactory, "retryableCodes", true))
.isEqualTo(new Code[] { Code.INTERNAL });
});
}

@Test
public void retryableCodes_many() {
ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(GcpPubSubAutoConfiguration.class))
.withUserConfiguration(TestConfig.class)
.withPropertyValues("spring.cloud.gcp.pubsub.subscriber.retryableCodes=UNKNOWN,ABORTED,UNAVAILABLE,INTERNAL");

contextRunner.run(ctx -> {

DefaultSubscriberFactory defaultSubscriberFactory = ctx.getBean("defaultSubscriberFactory", DefaultSubscriberFactory.class);
assertThat(FieldUtils.readField(defaultSubscriberFactory, "retryableCodes", true))
.isEqualTo(new Code[] { Code.UNKNOWN, Code.ABORTED, Code.UNAVAILABLE, Code.INTERNAL });
});
}

@Test
public void customExecutorProviderUsedWhenProvided() {
ExecutorProvider executorProvider = mock(ExecutorProvider.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spring.autoconfigure.pubsub.it;

import com.google.api.gax.batching.FlowController;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.spring.autoconfigure.core.GcpContextAutoConfiguration;
import com.google.cloud.spring.autoconfigure.pubsub.GcpPubSubAutoConfiguration;
import com.google.cloud.spring.autoconfigure.pubsub.GcpPubSubProperties;
Expand All @@ -39,12 +40,13 @@

public class PubSubAutoConfigurationIntegrationTests {

private static final Log LOGGER = LogFactory.getLog(PubSubTemplateIntegrationTests.class);
private static final Log LOGGER = LogFactory.getLog(PubSubAutoConfigurationIntegrationTests.class);

private static GcpProjectIdProvider projectIdProvider;

private ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withPropertyValues(
"spring.cloud.gcp.pubsub.subscriber.retryableCodes=INTERNAL",
"spring.cloud.gcp.pubsub.subscription.test-sub-1.executor-threads=3",
"spring.cloud.gcp.pubsub.subscription.test-sub-1.retry.total-timeout-seconds=600",
"spring.cloud.gcp.pubsub.subscription.test-sub-1.retry.initial-retry-delay-seconds=100",
Expand Down Expand Up @@ -110,6 +112,8 @@ public void testPull() {
assertThat(scheduler.isDaemon()).isTrue();
assertThat((ThreadPoolTaskScheduler) context.getBean("globalPubSubSubscriberThreadPoolScheduler"))
.isNotNull();
assertThat(gcpPubSubProperties.getSubscriber().getRetryableCodes())
.isEqualTo(new Code[] { Code.INTERNAL });

pubSubAdmin.deleteSubscription(subscriptionName);
pubSubAdmin.deleteTopic(topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.ConcurrentMap;

import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.spring.pubsub.support.PubSubSubscriptionUtils;
import com.google.pubsub.v1.ProjectSubscriptionName;

Expand Down Expand Up @@ -303,6 +304,11 @@ public static class Subscriber {
*/
private final FlowControl flowControl = new FlowControl();

/**
* RPC status codes that should be retried when pulling messages.
*/
private Code[] retryableCodes = null;

public boolean isGlobal() {
return global;
}
Expand All @@ -311,6 +317,14 @@ public Retry getRetry() {
return this.retry;
}

public Code[] getRetryableCodes() {
return retryableCodes;
}

public void setRetryableCodes(Code[] retryableCodes) {
this.retryableCodes = retryableCodes;
}

public FlowControl getFlowControl() {
return this.flowControl;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
Expand Down Expand Up @@ -88,6 +89,8 @@ public class DefaultSubscriberFactory implements SubscriberFactory {

private ExecutorProvider defaultExecutorProvider;

private Code[] retryableCodes;

/**
* Default {@link DefaultSubscriberFactory} constructor.
* @param projectIdProvider provides the default GCP project ID for selecting the
Expand Down Expand Up @@ -209,6 +212,14 @@ public void setSubscriberStubRetrySettings(RetrySettings subscriberStubRetrySett
this.subscriberStubRetrySettings = subscriberStubRetrySettings;
}

/**
* Set the retryable codes for subscriber pull settings.
* @param retryableCodes pull RPC response codes that should be retried.
*/
public void setRetryableCodes(Code[] retryableCodes) {
this.retryableCodes = retryableCodes;
}

@Override
public Subscriber createSubscriber(String subscriptionName, MessageReceiver receiver) {
Subscriber.Builder subscriberBuilder = Subscriber.newBuilder(
Expand Down Expand Up @@ -280,6 +291,16 @@ public PullRequest createPullRequest(String subscriptionName, Integer maxMessage

@Override
public SubscriberStub createSubscriberStub(String subscriptionName) {
try {
SubscriberStubSettings subscriberStubSettings = buildSubscriberStubSettings(subscriptionName);
return GrpcSubscriberStub.create(subscriberStubSettings);
}
catch (IOException ex) {
throw new RuntimeException("Error creating the SubscriberStub", ex);
}
}

SubscriberStubSettings buildSubscriberStubSettings(String subscriptionName) throws IOException {
SubscriberStubSettings.Builder subscriberStubSettings = SubscriberStubSettings.newBuilder();

if (this.credentialsProvider != null) {
Expand Down Expand Up @@ -314,12 +335,12 @@ public SubscriberStub createSubscriberStub(String subscriptionName) {
subscriberStubSettings.pullSettings().setRetrySettings(retrySettings);
}

try {
return GrpcSubscriberStub.create(subscriberStubSettings.build());
}
catch (IOException ex) {
throw new RuntimeException("Error creating the SubscriberStub", ex);
if (this.retryableCodes != null) {
subscriberStubSettings.pullSettings().setRetryableCodes(
this.retryableCodes);
}

return subscriberStubSettings.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spring.pubsub.core;

import com.google.api.gax.batching.FlowController;
import com.google.api.gax.rpc.StatusCode.Code;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -48,6 +49,7 @@ public void testDefaultSubscriberProperties() {
assertThat(retrySettings.getInitialRpcTimeoutSeconds()).isNull();
assertThat(retrySettings.getRpcTimeoutMultiplier()).isNull();
assertThat(retrySettings.getMaxRpcTimeoutSeconds()).isNull();
assertThat(subscriber.getRetryableCodes()).isNull();
}

@Test
Expand All @@ -60,12 +62,15 @@ public void testSubscriberProperties() {
subscriber.setParallelPullCount(1);
subscriber.setMaxAckExtensionPeriod(1L);
subscriber.setPullEndpoint("fake-endpoint");
subscriber.setRetryableCodes(new Code[] { Code.UNKNOWN, Code.ABORTED, Code.UNAVAILABLE, Code.INTERNAL });

assertThat(subscriber.getExecutorThreads()).isEqualTo(1);
assertThat(subscriber.getMaxAcknowledgementThreads()).isEqualTo(3);
assertThat(subscriber.getParallelPullCount()).isEqualTo(1);
assertThat(subscriber.getMaxAckExtensionPeriod()).isEqualTo(1L);
assertThat(subscriber.getPullEndpoint()).isEqualTo("fake-endpoint");
assertThat(subscriber.getRetryableCodes()).containsExactly(Code.UNKNOWN, Code.ABORTED, Code.UNAVAILABLE,
Code.INTERNAL);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,27 @@

package com.google.cloud.spring.pubsub.support;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.TransportChannel;
import com.google.cloud.NoCredentials;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.cloud.spring.core.GcpProjectIdProvider;
import com.google.cloud.spring.pubsub.core.PubSubConfiguration;
import com.google.pubsub.v1.PullRequest;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand All @@ -38,6 +48,9 @@
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
Expand Down Expand Up @@ -68,12 +81,24 @@ public class DefaultSubscriberFactoryTests {
@Mock
private ThreadPoolTaskScheduler mockGlobalScheduler;

@Mock
TransportChannel mockTransportChannel;
@Mock
ApiCallContext mockApiCallContext;

/**
* used to check exception messages and types.
*/
@Rule
public ExpectedException expectedException = ExpectedException.none();

@Before
public void setUp() {
when(this.mockTransportChannel.getEmptyCallContext()).thenReturn(this.mockApiCallContext);
when(this.mockApiCallContext.withCredentials(any())).thenReturn(this.mockApiCallContext);
when(this.mockApiCallContext.withTransportChannel(any())).thenReturn(this.mockApiCallContext);
}

@Test
public void testNewSubscriber() {
DefaultSubscriberFactory factory = new DefaultSubscriberFactory(() -> "angeldust");
Expand Down Expand Up @@ -519,4 +544,46 @@ public void testGetPullEndpoint_configurationIsNull() {

assertThat(factory.getPullEndpoint("subscription-name")).isNull();
}

@Test
public void testSetRetryableCodes() throws IllegalAccessException, IOException {
GcpProjectIdProvider projectIdProvider = () -> "project";
DefaultSubscriberFactory factory = new DefaultSubscriberFactory(projectIdProvider, null);
factory.setRetryableCodes(new Code[] { Code.INTERNAL });

assertThat(FieldUtils.readField(factory, "retryableCodes", true))
.isEqualTo(new Code[] { Code.INTERNAL });

SubscriberStubSettings settings = factory.buildSubscriberStubSettings("someSubscription");
assertThat(settings.pullSettings().getRetryableCodes())
.containsExactly(Code.INTERNAL);

}

@Test
public void createSubscriberStubSucceeds() {
GcpProjectIdProvider projectIdProvider = () -> "project";
DefaultSubscriberFactory factory = new DefaultSubscriberFactory(projectIdProvider, null);
factory.setChannelProvider(FixedTransportChannelProvider.create(this.mockTransportChannel));
factory.setCredentialsProvider(() -> NoCredentials.getInstance());

SubscriberStub stub = factory.createSubscriberStub("unusedSubscription");
assertThat(stub.isShutdown()).isFalse();
}

@Test
public void createSubscriberStubFailsOnBadCredentials() throws IOException {
GcpProjectIdProvider projectIdProvider = () -> "project";
DefaultSubscriberFactory factory = new DefaultSubscriberFactory(projectIdProvider, null);
factory.setChannelProvider(FixedTransportChannelProvider.create(this.mockTransportChannel));

CredentialsProvider mockCredentialsProvider = mock(CredentialsProvider.class);
factory.setCredentialsProvider(mockCredentialsProvider);

when(mockCredentialsProvider.getCredentials()).thenThrow(new IOException("boom"));

assertThatThrownBy(() -> factory.createSubscriberStub("unusedSubscription"))
.isInstanceOf(RuntimeException.class)
.hasMessage("Error creating the SubscriberStub");
}
}