Skip to content

Commit

Permalink
fix: pull-endpoint setting for async subscribers (#1883)
Browse files Browse the repository at this point in the history
* fix: pull-endpoint setting for async subscribers

* add integration test and fix fqn subscription usage

Fixes: #1882.
  • Loading branch information
meltsufin authored May 23, 2023
1 parent 219adb3 commit f0c5d4c
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 27 deletions.
4 changes: 2 additions & 2 deletions docs/src/main/asciidoc/pubsub.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ However, if a per-subscription configuration is not set then the global or defau
| `spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period` | The maximum period a message ack deadline will be extended, in seconds | No | 0
| `spring.cloud.gcp.pubsub.subscriber.min-duration-per-ack-extension` | The lower bound for a single mod ack extension period, in seconds | No | 0
| `spring.cloud.gcp.pubsub.subscriber.max-duration-per-ack-extension` | The upper bound for a single mod ack extension period, in seconds | No | 0
| `spring.cloud.gcp.pubsub.subscriber.pull-endpoint` | The endpoint for synchronous pulling messages | No | pubsub.googleapis.com:443
| `spring.cloud.gcp.pubsub.subscriber.pull-endpoint` | The endpoint for pulling messages | No | pubsub.googleapis.com:443
| `spring.cloud.gcp.pubsub.[subscriber,publisher].executor-threads` | Number of threads used by `Subscriber` instances created by `SubscriberFactory` | No | 4
| `spring.cloud.gcp.pubsub.[subscriber,publisher.batching].flow-control.max-outstanding-element-count`|
Maximum number of outstanding elements to keep in memory before enforcing flow control. | No | unlimited
Expand Down Expand Up @@ -103,7 +103,7 @@ This is useful in conjunction with enabling message ordering because sending mes
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].max-ack-extension-period` | The maximum period a message ack deadline will be extended, in seconds. | No | 0
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].min-duration-per-ack-extension` | The lower bound for a single mod ack extension period, in seconds | No | 0
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].max-duration-per-ack-extension` | The upper bound for a single mod ack extension period, in seconds | No | 0
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].pull-endpoint` | The endpoint for synchronous pulling messages. | No | pubsub.googleapis.com:443
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].pull-endpoint` | The endpoint for pulling messages. | No | pubsub.googleapis.com:443
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].executor-threads` | Number of threads used by `Subscriber` instances created by `SubscriberFactory`. Note that configuring per-subscription `executor-threads` will result in the creation of thread pools for both global/default **and** per-subscription configurations. | No | 4
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].flow-control.max-outstanding-element-count`|
Maximum number of outstanding elements to keep in memory before enforcing flow control. | No | unlimited
Expand Down
4 changes: 2 additions & 2 deletions docs/src/main/md/pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ is not set then the global or default configuration will be used.
| `spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period` | The maximum period a message ack deadline will be extended, in seconds. | No | 0 |
| `spring.cloud.gcp.pubsub.subscriber.min-duration-per-ack-extension` | The lower bound for a single mod ack extension period, in seconds. | No | 0 |
| `spring.cloud.gcp.pubsub.subscriber.max-duration-per-ack-extension` | The upper bound for a single mod ack extension period, in seconds. | No | 0 |
| `spring.cloud.gcp.pubsub.subscriber.pull-endpoint` | The endpoint for synchronous pulling messages. | No | pubsub.googleapis.com:443 |
| `spring.cloud.gcp.pubsub.subscriber.pull-endpoint` | The endpoint for pulling messages. | No | pubsub.googleapis.com:443 |
| `spring.cloud.gcp.pubsub.[subscriber,publisher].executor-threads` | Number of threads used by `Subscriber` instances created by `SubscriberFactory`. | No | 4 |
| `spring.cloud.gcp.pubsub.[subscriber,publisher.batching].flow-control.max-outstanding-element-count` | Maximum number of outstanding elements to keep in memory before enforcing flow control. | No | unlimited |
| `spring.cloud.gcp.pubsub.[subscriber,publisher.batching].flow-control.max-outstanding-request-bytes` | Maximum number of outstanding bytes to keep in memory before enforcing flow control. | No | unlimited |
Expand All @@ -95,7 +95,7 @@ is not set then the global or default configuration will be used.
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].max-ack-extension-period` | The maximum period a message ack deadline will be extended, in seconds. | No | 0 |
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].min-duration-per-ack-extension` | The lower bound for a single mod ack extension period, in seconds. | No | 0 |
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].max-duration-per-ack-extension` | The upper bound for a single mod ack extension period, in seconds. | No | 0 |
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].pull-endpoint` | The endpoint for synchronous pulling messages. | No | pubsub.googleapis.com:443 |
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].pull-endpoint` | The endpoint for pulling messages. | No | pubsub.googleapis.com:443 |
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].executor-threads` | Number of threads used by `Subscriber` instances created by `SubscriberFactory`. Note that configuring per-subscription `executor-threads` will result in the creation of thread pools for both global/default **and** per-subscription configurations. | No | 4 |
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].flow-control.max-outstanding-element-count` | Maximum number of outstanding elements to keep in memory before enforcing flow control. | No | unlimited |
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].flow-control.max-outstanding-request-bytes` | Maximum number of outstanding bytes to keep in memory before enforcing flow control. | No | unlimited |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,16 @@ class PubSubAutoConfigurationIntegrationTests {
"spring.cloud.gcp.pubsub.subscription.fully-qualified-test-sub-1-with-project-abc.retry.initial-rpc-timeout-seconds=600",
"spring.cloud.gcp.pubsub.subscription.fully-qualified-test-sub-1-with-project-abc.retry.rpc-timeout-multiplier=1",
"spring.cloud.gcp.pubsub.subscription.fully-qualified-test-sub-1-with-project-abc.retry.max-rpc-timeout-seconds=600",
"spring.cloud.gcp.pubsub.subscription.fully-qualified-test-sub-1-with-project-abc.pull-endpoint=northamerica-northeast2-pubsub.googleapis.com:443",
"spring.cloud.gcp.pubsub.subscription.test-sub-2.executor-threads=1",
"spring.cloud.gcp.pubsub.subscription.test-sub-2.max-ack-extension-period=0",
"spring.cloud.gcp.pubsub.subscription.test-sub-2.min-duration-per-ack-extension=1",
"spring.cloud.gcp.pubsub.subscription.test-sub-2.max-duration-per-ack-extension=2",
"spring.cloud.gcp.pubsub.subscription.test-sub-2.parallel-pull-count=1",
"spring.cloud.gcp.pubsub.subscription.test-sub-2.flow-control.max-outstanding-element-Count=1",
"spring.cloud.gcp.pubsub.subscription.test-sub-2.flow-control.max-outstanding-request-Bytes=1",
"spring.cloud.gcp.pubsub.subscription.test-sub-2.flow-control.limit-exceeded-behavior=Ignore")
"spring.cloud.gcp.pubsub.subscription.test-sub-2.flow-control.limit-exceeded-behavior=Ignore",
"spring.cloud.gcp.pubsub.subscription.test-sub-2.pull-endpoint=bad.endpoint")
.withConfiguration(
AutoConfigurations.of(
GcpContextAutoConfiguration.class, GcpPubSubAutoConfiguration.class));
Expand Down Expand Up @@ -147,10 +149,12 @@ void testPull() {
.isNotNull();
assertThat((ExecutorProvider) context.getBean("globalSubscriberExecutorProvider"))
.isNotNull();
assertThat(
gcpPubSubProperties.computeRetryableCodes(
"test-sub-1", projectId))
assertThat(gcpPubSubProperties.computeRetryableCodes(subscriptionName, projectId))
.isEqualTo(new Code[] {Code.INTERNAL});
assertThat(gcpPubSubProperties.computePullEndpoint(fullSubscriptionNameSub1, projectId))
.isEqualTo("northamerica-northeast2-pubsub.googleapis.com:443");
assertThat(gcpPubSubProperties.computePullEndpoint("test-sub-2", projectId))
.isEqualTo("bad.endpoint");
assertThat((RetrySettings) context.getBean("subscriberRetrySettings-" + fullSubscriptionNameSub1))
.isEqualTo(expectedRetrySettings);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public FlowControl computeSubscriberFlowControlSettings(ProjectSubscriptionName
*/
public Integer computeParallelPullCount(String subscriptionName, String projectId) {
Integer parallelPullCount =
getSubscriptionProperties(ProjectSubscriptionName.of(projectId, subscriptionName))
getSubscriptionProperties(PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, projectId))
.getParallelPullCount();

return parallelPullCount != null
Expand All @@ -203,7 +203,7 @@ public Integer computeParallelPullCount(String subscriptionName, String projectI
*/
public Code[] computeRetryableCodes(String subscriptionName, String projectId) {
Code[] retryableCodes =
getSubscriptionProperties(ProjectSubscriptionName.of(projectId, subscriptionName))
getSubscriptionProperties(PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, projectId))
.getRetryableCodes();
return retryableCodes != null ? retryableCodes : this.globalSubscriber.getRetryableCodes();
}
Expand All @@ -219,7 +219,7 @@ public Code[] computeRetryableCodes(String subscriptionName, String projectId) {
*/
public Long computeMaxAckExtensionPeriod(String subscriptionName, String projectId) {
Long maxAckExtensionPeriod =
getSubscriptionProperties(ProjectSubscriptionName.of(projectId, subscriptionName))
getSubscriptionProperties(PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, projectId))
.getMaxAckExtensionPeriod();

if (maxAckExtensionPeriod != null) {
Expand All @@ -243,7 +243,7 @@ public Long computeMaxAckExtensionPeriod(String subscriptionName, String project
@Nullable
public Long computeMinDurationPerAckExtension(String subscriptionName, String projectId) {
Long minDurationPerAckExtension =
getSubscriptionProperties(ProjectSubscriptionName.of(projectId, subscriptionName))
getSubscriptionProperties(PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, projectId))
.getMinDurationPerAckExtension();

if (minDurationPerAckExtension != null) {
Expand All @@ -265,7 +265,7 @@ public Long computeMinDurationPerAckExtension(String subscriptionName, String pr
@Nullable
public Long computeMaxDurationPerAckExtension(String subscriptionName, String projectId) {
Long maxDurationPerAckExtension =
getSubscriptionProperties(ProjectSubscriptionName.of(projectId, subscriptionName))
getSubscriptionProperties(PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, projectId))
.getMaxDurationPerAckExtension();

if (maxDurationPerAckExtension != null) {
Expand All @@ -286,7 +286,7 @@ public Long computeMaxDurationPerAckExtension(String subscriptionName, String pr
*/
public String computePullEndpoint(String subscriptionName, String projectId) {
String pullEndpoint =
getSubscriptionProperties(ProjectSubscriptionName.of(projectId, subscriptionName))
getSubscriptionProperties(PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, projectId))
.getPullEndpoint();
return pullEndpoint != null ? pullEndpoint : this.globalSubscriber.getPullEndpoint();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public void setParallelPullCount(Integer parallelPullCount) {
}

/**
* Set the endpoint for synchronous pulling messages.
* Set the endpoint for pulling messages.
*
* @param pullEndpoint the pull endpoint to set
*/
Expand Down Expand Up @@ -269,6 +269,11 @@ public Subscriber createSubscriber(String subscriptionName, MessageReceiver rece
subscriberBuilder.setSystemExecutorProvider(this.systemExecutorProvider);
}

String endpoint = getPullEndpoint(subscriptionName);
if (endpoint != null) {
subscriberBuilder.setEndpoint(endpoint);
}

FlowControlSettings flowControl = getFlowControlSettings(subscriptionName);
if (flowControl != null) {
subscriberBuilder.setFlowControlSettings(flowControl);
Expand Down Expand Up @@ -341,10 +346,12 @@ SubscriberStubSettings buildGlobalSubscriberStubSettings() throws IOException {
SubscriberStubSettings.Builder subscriberStubSettings =
buildStubSettingsWithoutConfigurations();

if (this.pullEndpoint != null) {
subscriberStubSettings.setEndpoint(this.pullEndpoint);
} else {
applyGlobalPullEndpoint(subscriberStubSettings);
String endpoint =
this.pullEndpoint != null
? this.pullEndpoint
: this.pubSubConfiguration.getSubscriber().getPullEndpoint();
if (endpoint != null) {
subscriberStubSettings.setEndpoint(endpoint);
}

ExecutorProvider executor =
Expand Down Expand Up @@ -372,13 +379,6 @@ SubscriberStubSettings buildGlobalSubscriberStubSettings() throws IOException {
return subscriberStubSettings.build();
}

private void applyGlobalPullEndpoint(SubscriberStubSettings.Builder subscriberStubSettings) {
String endpoint = this.pubSubConfiguration.getSubscriber().getPullEndpoint();
if (endpoint != null) {
subscriberStubSettings.setEndpoint(endpoint);
}
}

SubscriberStubSettings buildSubscriberStubSettings(String subscriptionName) throws IOException {
SubscriberStubSettings.Builder subscriberStubSettings =
buildStubSettingsWithoutConfigurations();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,9 @@ void testCreateSubscriber_validateSetProperties() {
when(mockPubSubConfiguration.computeParallelPullCount(
"defaultSubscription", projectIdProvider.getProjectId()))
.thenReturn(2);
when(mockPubSubConfiguration.computePullEndpoint(
"defaultSubscription", projectIdProvider.getProjectId()))
.thenReturn("test.endpoint");

Subscriber expectedSubscriber =
factory.createSubscriber("defaultSubscription", (message, consumer) -> {});
Expand All @@ -381,7 +384,8 @@ void testCreateSubscriber_validateSetProperties() {
.hasFieldOrPropertyWithValue("maxAckExtensionPeriod", Duration.ofSeconds(2L))
.hasFieldOrPropertyWithValue("minDurationPerAckExtension", Duration.ofSeconds(3L))
.hasFieldOrPropertyWithValue("maxDurationPerAckExtension", Duration.ofSeconds(4L))
.hasFieldOrPropertyWithValue("numPullers", 2);
.hasFieldOrPropertyWithValue("numPullers", 2)
.hasFieldOrPropertyWithValue("subStubSettings.endpoint", "test.endpoint");
}

@Test
Expand Down

0 comments on commit f0c5d4c

Please sign in to comment.