From 37d8084fdf76c1f12c9a12212e448902e461a4a2 Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Mon, 2 Mar 2020 08:05:45 +0100 Subject: [PATCH 01/26] Added a future returning pull method to PubSubSubscriberOperations, and its implementation, allowing for asynchronous and non-blocking pull of messages --- .../checkstyle/checkstyle-suppressions.xml | 9 ++++ .../cloud/gcp/pubsub/core/PubSubTemplate.java | 6 +++ .../PubSubSubscriberOperations.java | 13 ++++++ .../subscriber/PubSubSubscriberTemplate.java | 45 ++++++++++++++++--- 4 files changed, 67 insertions(+), 6 deletions(-) create mode 100644 spring-cloud-gcp-pubsub/src/checkstyle/checkstyle-suppressions.xml diff --git a/spring-cloud-gcp-pubsub/src/checkstyle/checkstyle-suppressions.xml b/spring-cloud-gcp-pubsub/src/checkstyle/checkstyle-suppressions.xml new file mode 100644 index 0000000000..f3f859eeb8 --- /dev/null +++ b/spring-cloud-gcp-pubsub/src/checkstyle/checkstyle-suppressions.xml @@ -0,0 +1,9 @@ + + + + + + diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java index 80b1d397f1..4a0735fc4c 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java @@ -158,6 +158,12 @@ public List pull(String subscription, Integer maxM return this.pubSubSubscriberTemplate.pull(subscription, maxMessages, returnImmediately); } + @Override + public ListenableFuture> pullFuture(String subscription, Integer maxMessages, + Boolean returnImmediately) { + return this.pubSubSubscriberTemplate.pullFuture(subscription, maxMessages, returnImmediately); + } + @Override public List> pullAndConvert(String subscription, Integer maxMessages, Boolean returnImmediately, Class payloadType) { diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java index fcb5acda36..4c47d4a628 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java @@ -103,6 +103,19 @@ Subscriber subscribeAndConvert(String subscription, */ List pull(String subscription, Integer maxMessages, Boolean returnImmediately); + /** + * Pull a number of messages from a Google Cloud Pub/Sub subscription. + * @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified + * subscription name in the {@code projects//subscriptions/} format + * @param maxMessages the maximum number of pulled messages + * @param returnImmediately returns immediately even if subscription doesn't contain enough + * messages to satisfy {@code maxMessages} + * @return the ListenableFuture for the asynchronous execution, returning the list of + * received acknowledgeable messages + * @since 1.3 + */ + ListenableFuture> pullFuture(String subscription, Integer maxMessages, Boolean returnImmediately); + /** * Pull a number of messages from a Google Cloud Pub/Sub subscription and convert them to Spring messages with * the desired payload type. diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java index dad2aedd8d..2aa500f86c 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java @@ -35,13 +35,9 @@ import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.cloud.pubsub.v1.Subscriber; import com.google.cloud.pubsub.v1.stub.SubscriberStub; +import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Empty; -import com.google.pubsub.v1.AcknowledgeRequest; -import com.google.pubsub.v1.ModifyAckDeadlineRequest; -import com.google.pubsub.v1.ProjectSubscriptionName; -import com.google.pubsub.v1.PubsubMessage; -import com.google.pubsub.v1.PullRequest; -import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.*; import org.springframework.beans.factory.DisposableBean; import org.springframework.cloud.gcp.pubsub.support.AcknowledgeablePubsubMessage; @@ -182,6 +178,38 @@ private List pull(PullRequest pullRequest) { .collect(Collectors.toList()); } + private ListenableFuture> pullFuture(PullRequest pullRequest) { + Assert.notNull(pullRequest, "The pull request can't be null."); + + ApiFuture pullFuture = this.subscriberStub.pullCallable().futureCall(pullRequest); + + final String projectId = this.subscriberFactory.getProjectId(); + + final SettableListenableFuture> settableFuture = new SettableListenableFuture<>(); + ApiFutures.addCallback(pullFuture, new ApiFutureCallback() { + + @Override + public void onFailure(Throwable throwable) { + settableFuture.setException(throwable); + } + + @Override + public void onSuccess(PullResponse pullResponse) { + List result = pullResponse.getReceivedMessagesList().stream() + .map((message) -> new PulledAcknowledgeablePubsubMessage( + PubSubSubscriptionUtils.toProjectSubscriptionName(pullRequest.getSubscription(), projectId), + message.getMessage(), + message.getAckId())) + .collect(Collectors.toList()); + + settableFuture.set(result); + } + + }, MoreExecutors.directExecutor()); + + return settableFuture; + } + @Override public List pull( String subscription, Integer maxMessages, Boolean returnImmediately) { @@ -189,6 +217,11 @@ public List pull( returnImmediately)); } + @Override + public ListenableFuture> pullFuture(String subscription, Integer maxMessages, Boolean returnImmediately) { + return pullFuture(this.subscriberFactory.createPullRequest(subscription, maxMessages, returnImmediately)); + } + @Override public List> pullAndConvert(String subscription, Integer maxMessages, Boolean returnImmediately, Class payloadType) { From 39d6bc68a500e533eecd5fc6f805a24abe475dd2 Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Mon, 2 Mar 2020 08:07:49 +0100 Subject: [PATCH 02/26] Reimplemented the PubSubReactiveFactory to utilize the asynchronous pullFuture method of the PubSubReactiveFactory, making it non-blocking --- .../GcpPubSubReactiveAutoConfiguration.java | 20 +-- ...cpPubSubReactiveAutoConfigurationTest.java | 23 ++- .../reactive/PubSubReactiveFactory.java | 132 ++++++------------ .../reactive/PubSubReactiveFactoryTests.java | 63 +++++++-- 4 files changed, 103 insertions(+), 135 deletions(-) diff --git a/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfiguration.java b/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfiguration.java index 327312e613..6408121c5b 100644 --- a/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfiguration.java +++ b/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfiguration.java @@ -18,8 +18,6 @@ import java.util.Optional; -import javax.annotation.PreDestroy; - import reactor.core.publisher.Flux; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; @@ -49,30 +47,14 @@ matchIfMissing = true) public class GcpPubSubReactiveAutoConfiguration { - private Scheduler defaultPubSubReactiveScheduler; - @Bean @ConditionalOnMissingBean public PubSubReactiveFactory pubSubReactiveFactory( PubSubSubscriberTemplate subscriberTemplate, @Qualifier("pubSubReactiveScheduler") Optional userProvidedScheduler) { - Scheduler scheduler = null; - if (userProvidedScheduler.isPresent()) { - scheduler = userProvidedScheduler.get(); - } - else { - this.defaultPubSubReactiveScheduler = Schedulers.newElastic("pubSubReactiveScheduler"); - scheduler = this.defaultPubSubReactiveScheduler; - } + Scheduler scheduler = userProvidedScheduler.orElseGet(() -> Schedulers.parallel()); return new PubSubReactiveFactory(subscriberTemplate, scheduler); } - @PreDestroy - public void closeScheduler() { - if (this.defaultPubSubReactiveScheduler != null) { - this.defaultPubSubReactiveScheduler.dispose(); - } - } - } diff --git a/spring-cloud-gcp-autoconfigure/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfigurationTest.java b/spring-cloud-gcp-autoconfigure/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfigurationTest.java index 33ec813621..cfe08bae73 100644 --- a/spring-cloud-gcp-autoconfigure/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfigurationTest.java +++ b/spring-cloud-gcp-autoconfigure/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfigurationTest.java @@ -42,6 +42,7 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.AsyncResult; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -105,17 +106,13 @@ public void reactiveConfigDisabledWhenReactivePubSubDisabled() { @Test public void defaultSchedulerUsedWhenNoneProvided() { - - setUpThreadPrefixVerification("pubSubReactiveScheduler"); + setUpThreadPrefixVerification("parallel"); ApplicationContextRunner contextRunner = new ApplicationContextRunner() .withBean(PubSubSubscriberOperations.class, () -> mockSubscriberTemplate) .withConfiguration(AutoConfigurations.of(GcpPubSubReactiveAutoConfiguration.class)); - contextRunner.run(ctx -> { - pollAndVerify(ctx); - }); - + contextRunner.run(this::pollAndVerify); } @Test @@ -128,17 +125,15 @@ public void customSchedulerUsedWhenAvailable() { .withConfiguration(AutoConfigurations.of(GcpPubSubReactiveAutoConfiguration.class)) .withUserConfiguration(TestConfigWithOverriddenScheduler.class); - contextRunner.run(ctx -> { - pollAndVerify(ctx); - }); + contextRunner.run(this::pollAndVerify); } private void setUpThreadPrefixVerification(String threadPrefix) { - when(mockSubscriberTemplate.pull("testSubscription", 3, false)) + when(mockSubscriberTemplate.pullFuture("testSubscription", Integer.MAX_VALUE, true)) .then(arg -> { assertThat(Thread.currentThread().getName()).startsWith(threadPrefix); - return Arrays.asList(mockMessage, mockMessage, mockMessage); + return AsyncResult.forValue(Arrays.asList(mockMessage, mockMessage, mockMessage)); }); } @@ -146,10 +141,10 @@ private void pollAndVerify(ApplicationContext ctx) { PubSubReactiveFactory reactiveFactory = ctx.getBean(PubSubReactiveFactory.class); StepVerifier.create( - reactiveFactory.poll("testSubscription", 2) - .limitRequest(3)) + reactiveFactory.poll("testSubscription", 2)) .expectNext(mockMessage, mockMessage, mockMessage) - .verifyComplete(); + .thenCancel() + .verify(); } diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java index a06fc1c5d1..9ceab6cc72 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java @@ -16,14 +16,17 @@ package org.springframework.cloud.gcp.pubsub.reactive; +import java.time.Duration; import java.util.List; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.CompletableFuture; import com.google.api.gax.rpc.DeadlineExceededException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberOperations; @@ -74,106 +77,61 @@ public PubSubReactiveFactory(PubSubSubscriberOperations subscriberOperations, Sc public Flux poll(String subscriptionName, long pollingPeriodMs) { return Flux.create(sink -> { - - Scheduler.Worker subscriptionWorker = this.scheduler.createWorker(); - sink.onRequest((numRequested) -> { if (numRequested == Long.MAX_VALUE) { - // unlimited demand - subscriptionWorker.schedulePeriodically( - new NonBlockingUnlimitedDemandPullTask(subscriptionName, sink), 0, pollingPeriodMs, TimeUnit.MILLISECONDS); + Disposable disposable = infinitePull(subscriptionName, pollingPeriodMs, sink); + sink.onCancel(disposable); } else { - subscriptionWorker.schedule(new BlockingLimitedDemandPullTask(subscriptionName, numRequested, sink)); + backpressurePull(subscriptionName, numRequested, sink); } }); - - sink.onCancel(subscriptionWorker); - }); } - private abstract class PubSubPullTask implements Runnable { - - protected final String subscriptionName; - - protected final FluxSink sink; - - PubSubPullTask(String subscriptionName, FluxSink sink) { - this.subscriptionName = subscriptionName; - this.sink = sink; - } - - /** - * Retrieve up to a specified number of messages, sending them to the subscription. - * @param demand maximum number of messages to retrieve - * @param block whether to wait for the first message to become available - * @return number of messages retrieved - */ - protected int pullToSink(int demand, boolean block) { - - List messages = - PubSubReactiveFactory.this.subscriberOperations.pull(this.subscriptionName, demand, !block); - - if (!this.sink.isCancelled()) { - messages.forEach(sink::next); - } - - return messages.size(); - } - + private Disposable infinitePull(String subscriptionName, long pollingPeriodMs, + FluxSink sink) { + return Flux + .interval(Duration.ZERO, Duration.ofMillis(pollingPeriodMs), scheduler) + .flatMap(ignore -> pullAll(subscriptionName)) + .subscribe(sink::next, sink::error); } - /** - * Runnable task issuing blocking Pub/Sub Pull requests until the specified number of - * messages has been retrieved. - */ - private class BlockingLimitedDemandPullTask extends PubSubPullTask { - - private final long initialDemand; - - BlockingLimitedDemandPullTask(String subscriptionName, long initialDemand, FluxSink sink) { - super(subscriptionName, sink); - this.initialDemand = initialDemand; - } - - @Override - public void run() { - long demand = this.initialDemand; - List messages; - - while (demand > 0 && !this.sink.isCancelled()) { - try { - int intDemand = demand > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) demand; - demand -= pullToSink(intDemand, true); - } - catch (DeadlineExceededException e) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Blocking pull timed out due to empty subscription " - + this.subscriptionName - + "; retrying."); - } - } - } - } + private Flux pullAll(String subscriptionName) { + CompletableFuture> pullResponseFuture = this.subscriberOperations + .pullFuture(subscriptionName, Integer.MAX_VALUE, true).completable(); + return Mono.fromFuture(pullResponseFuture).flatMapMany(Flux::fromIterable); } - /** - * Runnable task issuing a single Pub/Sub Pull request for all available messages. - * Terminates immediately if no messages are available. - */ - private class NonBlockingUnlimitedDemandPullTask extends PubSubPullTask { - - NonBlockingUnlimitedDemandPullTask(String subscriptionName, FluxSink sink) { - super(subscriptionName, sink); - } - - @Override - public void run() { - pullToSink(Integer.MAX_VALUE, false); - } + private void backpressurePull(String subscriptionName, long numRequested, + FluxSink sink) { + int intDemand = numRequested > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) numRequested; + this.subscriberOperations.pullFuture(subscriptionName, intDemand, false).addCallback( + messages -> { + if (!sink.isCancelled()) { + messages.forEach(sink::next); + } + if (!sink.isCancelled()) { + long numToPull = numRequested - messages.size(); + if (numToPull > 0) { + backpressurePull(subscriptionName, numToPull, sink); + } + } + }, + exception -> { + if (exception instanceof DeadlineExceededException) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Blocking pull timed out due to empty subscription " + + subscriptionName + + "; retrying."); + } + backpressurePull(subscriptionName, numRequested, sink); + } + else { + sink.error(exception); + } + }); } - } diff --git a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactoryTests.java b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactoryTests.java index 884f8220bf..ceb51b3a09 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactoryTests.java +++ b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactoryTests.java @@ -39,13 +39,12 @@ import org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberOperations; import org.springframework.cloud.gcp.pubsub.support.AcknowledgeablePubsubMessage; +import org.springframework.scheduling.annotation.AsyncResult; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; /** * Tests for streams generated by PubSubReactiveFactory. @@ -81,8 +80,8 @@ public void testSequentialRequests() throws InterruptedException { .verify(); InOrder methodOrder = Mockito.inOrder(this.subscriberOperations); - methodOrder.verify(this.subscriberOperations).pull("sub1", 1, false); - methodOrder.verify(this.subscriberOperations).pull("sub1", 3, false); + methodOrder.verify(this.subscriberOperations).pullFuture("sub1", 1, false); + methodOrder.verify(this.subscriberOperations).pullFuture("sub1", 3, false); methodOrder.verifyNoMoreInteractions(); } @@ -98,14 +97,14 @@ public void testSequentialRequestWithInsufficientDemandGetsSplitIntoTwoRequests( .verify(); InOrder methodOrder = Mockito.inOrder(this.subscriberOperations); - methodOrder.verify(this.subscriberOperations).pull("sub1", 4, false); - methodOrder.verify(this.subscriberOperations).pull("sub1", 3, false); + methodOrder.verify(this.subscriberOperations).pullFuture("sub1", 4, false); + methodOrder.verify(this.subscriberOperations).pullFuture("sub1", 3, false); methodOrder.verifyNoMoreInteractions(); } @Test public void testDeadlineExceededCausesRetry() throws InterruptedException { - setUpMessages("throw", "msg1", "msg2"); + setUpMessages("timeout", "msg1", "msg2"); StepVerifier.withVirtualTime(() -> factory.poll("sub1", 10).map(this::messageToString), 2) .expectSubscription() @@ -115,7 +114,23 @@ public void testDeadlineExceededCausesRetry() throws InterruptedException { .verify(); InOrder methodOrder = Mockito.inOrder(this.subscriberOperations); - methodOrder.verify(this.subscriberOperations, times(2)).pull("sub1", 2, false); + methodOrder.verify(this.subscriberOperations, times(2)).pullFuture("sub1", 2, false); + methodOrder.verifyNoMoreInteractions(); + } + + @Test + public void testExceptionThrownByPubSubClientResultingInErrorStream() throws InterruptedException { + setUpMessages("msg1", "msg2", "throw"); + + StepVerifier.withVirtualTime(() -> factory.poll("sub1", 10).map(this::messageToString), 2) + .expectSubscription() + .expectNext("msg1", "msg2") + .thenRequest(2) + .expectError(RuntimeException.class) + .verify(); + + InOrder methodOrder = Mockito.inOrder(this.subscriberOperations); + methodOrder.verify(this.subscriberOperations, times(2)).pullFuture("sub1", 2, false); methodOrder.verifyNoMoreInteractions(); } @@ -134,7 +149,23 @@ public void testUnlimitedDemand() throws InterruptedException { .verify(); InOrder methodOrder = Mockito.inOrder(this.subscriberOperations); - methodOrder.verify(this.subscriberOperations, times(3)).pull("sub1", Integer.MAX_VALUE, true); + methodOrder.verify(this.subscriberOperations, times(3)).pullFuture("sub1", Integer.MAX_VALUE, true); + methodOrder.verifyNoMoreInteractions(); + } + + @Test + public void testUnlimitedDemandWithException() throws InterruptedException { + setUpMessages("msg1", "msg2", "stop", "throw"); + + StepVerifier.withVirtualTime(() -> factory.poll("sub1", 10).map(this::messageToString)) + .expectSubscription() + .expectNext("msg1", "msg2") + .expectNoEvent(Duration.ofMillis(10)) + .expectError(RuntimeException.class) + .verify(); + + InOrder methodOrder = Mockito.inOrder(this.subscriberOperations); + methodOrder.verify(this.subscriberOperations, times(2)).pullFuture("sub1", Integer.MAX_VALUE, true); methodOrder.verifyNoMoreInteractions(); } @@ -152,7 +183,7 @@ private String messageToString(AcknowledgeablePubsubMessage message) { private void setUpMessages(String... messages) { List msgList = new ArrayList<>(Arrays.asList(messages)); - when(subscriberOperations.pull(eq("sub1"), any(Integer.class), any(Boolean.class))).then(invocationOnMock -> { + when(subscriberOperations.pullFuture(eq("sub1"), any(Integer.class), any(Boolean.class))).then(invocationOnMock -> { List result = new ArrayList<>(); for (int i = 0; i < (Integer) invocationOnMock.getArgument(1); i++) { if (msgList.isEmpty()) { @@ -162,12 +193,14 @@ private void setUpMessages(String... messages) { String nextPayload = msgList.remove(0); switch (nextPayload) { case "stop": - return result; - case "throw": + return AsyncResult.forValue(result); + case "timeout": if (!result.isEmpty()) { fail("Bad setup -- 'throw' should be the first event in batch"); } - throw new DeadlineExceededException("this is a noop", null, GrpcStatusCode.of(Status.Code.DEADLINE_EXCEEDED), true); + return AsyncResult.forExecutionException(new DeadlineExceededException("this is a noop", null, GrpcStatusCode.of(Status.Code.DEADLINE_EXCEEDED), true)); + case "throw": + return AsyncResult.forExecutionException(new RuntimeException("exception during pull of messages")); } AcknowledgeablePubsubMessage msg = mock(AcknowledgeablePubsubMessage.class); @@ -177,7 +210,7 @@ private void setUpMessages(String... messages) { when(msg.getPubsubMessage()).thenReturn(pubsubMessage); result.add(msg); } - return result; + return AsyncResult.forValue(result); }); } From bd4247507e38df57128e3974bdb46808b6604e4e Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Mon, 2 Mar 2020 10:46:58 +0100 Subject: [PATCH 03/26] Added unit test for the pullFuture method of PubSubSubscriberTemplate --- .../PubSubSubscriberTemplateTests.java | 86 +++++++++++++------ 1 file changed, 59 insertions(+), 27 deletions(-) diff --git a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java index 793ea334de..28ff7ee655 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java +++ b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java @@ -20,11 +20,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.function.Consumer; import com.google.api.core.ApiFuture; @@ -34,12 +30,7 @@ import com.google.cloud.pubsub.v1.Subscriber; import com.google.cloud.pubsub.v1.stub.SubscriberStub; import com.google.protobuf.Empty; -import com.google.pubsub.v1.AcknowledgeRequest; -import com.google.pubsub.v1.ModifyAckDeadlineRequest; -import com.google.pubsub.v1.PubsubMessage; -import com.google.pubsub.v1.PullRequest; -import com.google.pubsub.v1.PullResponse; -import com.google.pubsub.v1.ReceivedMessage; +import com.google.pubsub.v1.*; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -61,14 +52,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.same; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; /** * Unit tests for {@link PubSubSubscriberTemplate}. @@ -124,15 +108,19 @@ public class PubSubSubscriberTemplateTests { private UnaryCallable modifyAckDeadlineCallable; @Mock - private ApiFuture apiFuture; + private ApiFuture pullApiFuture; + + @Mock + private ApiFuture ackApiFuture; @Before - public void setUp() { + public void setUp() throws ExecutionException, InterruptedException { reset(this.subscriberFactory); reset(this.subscriberStub); reset(this.subscriber); reset(this.messageReceiver); - reset(this.apiFuture); + reset(this.pullApiFuture); + reset(this.ackApiFuture); when(this.subscriberFactory.getProjectId()).thenReturn("testProject"); @@ -158,21 +146,33 @@ public void setUp() { when(this.subscriberStub.acknowledgeCallable()).thenReturn(this.ackCallable); when(this.subscriberStub.modifyAckDeadlineCallable()).thenReturn(this.modifyAckDeadlineCallable); - when(this.ackCallable.futureCall(any(AcknowledgeRequest.class))).thenReturn(this.apiFuture); + when(this.ackCallable.futureCall(any(AcknowledgeRequest.class))).thenReturn(this.ackApiFuture); - when(this.modifyAckDeadlineCallable.futureCall(any(ModifyAckDeadlineRequest.class))).thenReturn(this.apiFuture); + when(this.modifyAckDeadlineCallable.futureCall(any(ModifyAckDeadlineRequest.class))).thenReturn(this.ackApiFuture); doAnswer((invocation) -> { Runnable runnable = invocation.getArgument(0); runnable.run(); return null; - }).when(this.apiFuture).addListener(any(Runnable.class), any(Executor.class)); + }).when(this.ackApiFuture).addListener(any(Runnable.class), any(Executor.class)); - when(this.apiFuture.isDone()).thenReturn(true); + when(this.ackApiFuture.isDone()).thenReturn(true); doNothing().when(this.ackReplyConsumer).ack(); doNothing().when(this.ackReplyConsumer).nack(); + // for pull future + when(this.pullCallable.futureCall(any(PullRequest.class))).thenReturn(this.pullApiFuture); + + doAnswer((invocation) -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }).when(this.pullApiFuture).addListener(any(Runnable.class), any(Executor.class)); + when(this.pullApiFuture.isDone()).thenReturn(true); + when(this.pullApiFuture.get()).thenReturn(PullResponse.newBuilder() + .addReceivedMessages(ReceivedMessage.newBuilder().setMessage(this.pubsubMessage).build()).build()); + // create objects under test when(this.subscriberFactory.createSubscriberStub()).thenReturn(this.subscriberStub); when(this.subscriberStub.pullCallable()).thenReturn(this.pullCallable); @@ -369,7 +369,7 @@ public void testPull_AndManualMultiSubscriptionAck() assertThat(listenableFuture.isDone()).isTrue(); assertThat(testListenableFutureCallback.getThrowable()).isNull(); verify(this.ackCallable, times(2)).futureCall(any(AcknowledgeRequest.class)); - verify(this.apiFuture, times(2)).addListener(any(), same(mockExecutor)); + verify(this.ackApiFuture, times(2)).addListener(any(), same(mockExecutor)); } @Test @@ -410,6 +410,38 @@ public void testPullAndConvert() { assertThat(result.get(0).getProjectSubscriptionName().getSubscription()).isEqualTo("sub2"); } + @Test + public void testPullFuture_AndManualAck() throws InterruptedException, ExecutionException, TimeoutException { + + ListenableFuture> pullListenableFuture = this.pubSubSubscriberTemplate + .pullFuture("sub", 1, true); + + List result = pullListenableFuture.get(10L, TimeUnit.SECONDS); + + assertThat(pullListenableFuture.isDone()).isTrue(); + + assertThat(result.size()).isEqualTo(1); + assertThat(result.get(0).getPubsubMessage()).isSameAs(this.pubsubMessage); + assertThat(result.get(0).getProjectSubscriptionName().getProject()).isEqualTo("testProject"); + assertThat(result.get(0).getProjectSubscriptionName().getSubscription()).isEqualTo("sub"); + + AcknowledgeablePubsubMessage acknowledgeablePubsubMessage = result.get(0); + assertThat(acknowledgeablePubsubMessage.getAckId()).isNotNull(); + + TestListenableFutureCallback ackTestListenableFutureCallback = new TestListenableFutureCallback(); + + ListenableFuture ackListenableFuture = this.pubSubSubscriberTemplate.ack(result); + + assertThat(ackListenableFuture).isNotNull(); + + ackListenableFuture.addCallback(ackTestListenableFutureCallback); + ackListenableFuture.get(10L, TimeUnit.SECONDS); + + assertThat(ackListenableFuture.isDone()).isTrue(); + + assertThat(ackTestListenableFutureCallback.getThrowable()).isNull(); + } + private class TestListenableFutureCallback implements ListenableFutureCallback { private Throwable throwable; From 8e7e9d480fc0da1ff0f5f68d13ead378247f07b7 Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Mon, 2 Mar 2020 10:54:07 +0100 Subject: [PATCH 04/26] Bumped copyright year to 2020, for those files this change touches --- .../pubsub/GcpPubSubReactiveAutoConfiguration.java | 2 +- .../pubsub/GcpPubSubReactiveAutoConfigurationTest.java | 2 +- .../springframework/cloud/gcp/pubsub/core/PubSubTemplate.java | 2 +- .../gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java | 2 +- .../gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java | 2 +- .../cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java | 2 +- .../pubsub/core/subscriber/PubSubSubscriberTemplateTests.java | 2 +- .../cloud/gcp/pubsub/reactive/PubSubReactiveFactoryTests.java | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfiguration.java b/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfiguration.java index 6408121c5b..cfec35a623 100644 --- a/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfiguration.java +++ b/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-cloud-gcp-autoconfigure/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfigurationTest.java b/spring-cloud-gcp-autoconfigure/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfigurationTest.java index cfe08bae73..bd8e17e15e 100644 --- a/spring-cloud-gcp-autoconfigure/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfigurationTest.java +++ b/spring-cloud-gcp-autoconfigure/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfigurationTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java index 4a0735fc4c..0a5811b1f9 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2018 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java index 4c47d4a628..8e780157c1 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2018 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java index 2aa500f86c..e4a8a12f22 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java index 9ceab6cc72..f50077b3bb 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java index 28ff7ee655..4fb4afb28e 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java +++ b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactoryTests.java b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactoryTests.java index ceb51b3a09..afb8fa36c6 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactoryTests.java +++ b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 7c2a162571457c3df186437dff02a3e613d9f9b8 Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Mon, 2 Mar 2020 12:02:10 +0100 Subject: [PATCH 05/26] Fixed imports according to code style --- .../subscriber/PubSubSubscriberTemplate.java | 7 +++++- .../PubSubSubscriberTemplateTests.java | 22 ++++++++++++++++--- .../reactive/PubSubReactiveFactoryTests.java | 4 +++- 3 files changed, 28 insertions(+), 5 deletions(-) diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java index e4a8a12f22..ebca638901 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java @@ -37,7 +37,12 @@ import com.google.cloud.pubsub.v1.stub.SubscriberStub; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Empty; -import com.google.pubsub.v1.*; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.ModifyAckDeadlineRequest; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; import org.springframework.beans.factory.DisposableBean; import org.springframework.cloud.gcp.pubsub.support.AcknowledgeablePubsubMessage; diff --git a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java index 4fb4afb28e..91d21bbca3 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java +++ b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java @@ -20,7 +20,11 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import com.google.api.core.ApiFuture; @@ -30,7 +34,12 @@ import com.google.cloud.pubsub.v1.Subscriber; import com.google.cloud.pubsub.v1.stub.SubscriberStub; import com.google.protobuf.Empty; -import com.google.pubsub.v1.*; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.ModifyAckDeadlineRequest; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.ReceivedMessage; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -52,7 +61,14 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.same; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Unit tests for {@link PubSubSubscriberTemplate}. diff --git a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactoryTests.java b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactoryTests.java index afb8fa36c6..dbe19a4c98 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactoryTests.java +++ b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactoryTests.java @@ -44,7 +44,9 @@ import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; /** * Tests for streams generated by PubSubReactiveFactory. From 12554df1578c3d1433a8b22ba4f620e353570454 Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Tue, 3 Mar 2020 15:36:40 +0100 Subject: [PATCH 06/26] Renamed `PubSubSubscriberOperations.pullFuture` to `PubSubSubscriberOperations.pullAsync` --- ...GcpPubSubReactiveAutoConfigurationTest.java | 2 +- .../cloud/gcp/pubsub/core/PubSubTemplate.java | 6 +++--- .../subscriber/PubSubSubscriberOperations.java | 4 ++-- .../subscriber/PubSubSubscriberTemplate.java | 6 +++--- .../pubsub/reactive/PubSubReactiveFactory.java | 4 ++-- .../PubSubSubscriberTemplateTests.java | 2 +- .../reactive/PubSubReactiveFactoryTests.java | 18 +++++++++--------- 7 files changed, 21 insertions(+), 21 deletions(-) diff --git a/spring-cloud-gcp-autoconfigure/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfigurationTest.java b/spring-cloud-gcp-autoconfigure/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfigurationTest.java index bd8e17e15e..f505024ad1 100644 --- a/spring-cloud-gcp-autoconfigure/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfigurationTest.java +++ b/spring-cloud-gcp-autoconfigure/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfigurationTest.java @@ -129,7 +129,7 @@ public void customSchedulerUsedWhenAvailable() { } private void setUpThreadPrefixVerification(String threadPrefix) { - when(mockSubscriberTemplate.pullFuture("testSubscription", Integer.MAX_VALUE, true)) + when(mockSubscriberTemplate.pullAsync("testSubscription", Integer.MAX_VALUE, true)) .then(arg -> { assertThat(Thread.currentThread().getName()).startsWith(threadPrefix); diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java index 0a5811b1f9..5f37a938e3 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java @@ -159,9 +159,9 @@ public List pull(String subscription, Integer maxM } @Override - public ListenableFuture> pullFuture(String subscription, Integer maxMessages, - Boolean returnImmediately) { - return this.pubSubSubscriberTemplate.pullFuture(subscription, maxMessages, returnImmediately); + public ListenableFuture> pullAsync(String subscription, Integer maxMessages, + Boolean returnImmediately) { + return this.pubSubSubscriberTemplate.pullAsync(subscription, maxMessages, returnImmediately); } @Override diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java index 8e780157c1..5d29ca6d4f 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java @@ -104,7 +104,7 @@ Subscriber subscribeAndConvert(String subscription, List pull(String subscription, Integer maxMessages, Boolean returnImmediately); /** - * Pull a number of messages from a Google Cloud Pub/Sub subscription. + * Asynchronously pull a number of messages from a Google Cloud Pub/Sub subscription. * @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified * subscription name in the {@code projects//subscriptions/} format * @param maxMessages the maximum number of pulled messages @@ -114,7 +114,7 @@ Subscriber subscribeAndConvert(String subscription, * received acknowledgeable messages * @since 1.3 */ - ListenableFuture> pullFuture(String subscription, Integer maxMessages, Boolean returnImmediately); + ListenableFuture> pullAsync(String subscription, Integer maxMessages, Boolean returnImmediately); /** * Pull a number of messages from a Google Cloud Pub/Sub subscription and convert them to Spring messages with diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java index ebca638901..d7c4bb1290 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java @@ -183,7 +183,7 @@ private List pull(PullRequest pullRequest) { .collect(Collectors.toList()); } - private ListenableFuture> pullFuture(PullRequest pullRequest) { + private ListenableFuture> pullAsync(PullRequest pullRequest) { Assert.notNull(pullRequest, "The pull request can't be null."); ApiFuture pullFuture = this.subscriberStub.pullCallable().futureCall(pullRequest); @@ -223,8 +223,8 @@ public List pull( } @Override - public ListenableFuture> pullFuture(String subscription, Integer maxMessages, Boolean returnImmediately) { - return pullFuture(this.subscriberFactory.createPullRequest(subscription, maxMessages, returnImmediately)); + public ListenableFuture> pullAsync(String subscription, Integer maxMessages, Boolean returnImmediately) { + return pullAsync(this.subscriberFactory.createPullRequest(subscription, maxMessages, returnImmediately)); } @Override diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java index fbdb706516..2e4149c544 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java @@ -102,7 +102,7 @@ private Disposable infinitePull(String subscriptionName, long pollingPeriodMs, private Flux pullAll(String subscriptionName) { CompletableFuture> pullResponseFuture = this.subscriberOperations - .pullFuture(subscriptionName, Integer.MAX_VALUE, true).completable(); + .pullAsync(subscriptionName, Integer.MAX_VALUE, true).completable(); return Mono.fromFuture(pullResponseFuture).flatMapMany(Flux::fromIterable); } @@ -110,7 +110,7 @@ private Flux pullAll(String subscriptionName) { private void backpressurePull(String subscriptionName, long numRequested, FluxSink sink) { int intDemand = numRequested > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) numRequested; - this.subscriberOperations.pullFuture(subscriptionName, intDemand, false).addCallback( + this.subscriberOperations.pullAsync(subscriptionName, intDemand, false).addCallback( messages -> { if (!sink.isCancelled()) { messages.forEach(sink::next); diff --git a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java index 91d21bbca3..be4ebf091b 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java +++ b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java @@ -430,7 +430,7 @@ public void testPullAndConvert() { public void testPullFuture_AndManualAck() throws InterruptedException, ExecutionException, TimeoutException { ListenableFuture> pullListenableFuture = this.pubSubSubscriberTemplate - .pullFuture("sub", 1, true); + .pullAsync("sub", 1, true); List result = pullListenableFuture.get(10L, TimeUnit.SECONDS); diff --git a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactoryTests.java b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactoryTests.java index b951b6d42d..fc779e17cd 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactoryTests.java +++ b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactoryTests.java @@ -82,8 +82,8 @@ public void testSequentialRequests() throws InterruptedException { .verify(); InOrder methodOrder = Mockito.inOrder(this.subscriberOperations); - methodOrder.verify(this.subscriberOperations).pullFuture("sub1", 1, false); - methodOrder.verify(this.subscriberOperations).pullFuture("sub1", 3, false); + methodOrder.verify(this.subscriberOperations).pullAsync("sub1", 1, false); + methodOrder.verify(this.subscriberOperations).pullAsync("sub1", 3, false); methodOrder.verifyNoMoreInteractions(); } @@ -99,8 +99,8 @@ public void testSequentialRequestWithInsufficientDemandGetsSplitIntoTwoRequests( .verify(); InOrder methodOrder = Mockito.inOrder(this.subscriberOperations); - methodOrder.verify(this.subscriberOperations).pullFuture("sub1", 4, false); - methodOrder.verify(this.subscriberOperations).pullFuture("sub1", 3, false); + methodOrder.verify(this.subscriberOperations).pullAsync("sub1", 4, false); + methodOrder.verify(this.subscriberOperations).pullAsync("sub1", 3, false); methodOrder.verifyNoMoreInteractions(); } @@ -116,7 +116,7 @@ public void testDeadlineExceededCausesRetry() throws InterruptedException { .verify(); InOrder methodOrder = Mockito.inOrder(this.subscriberOperations); - methodOrder.verify(this.subscriberOperations, times(2)).pullFuture("sub1", 2, false); + methodOrder.verify(this.subscriberOperations, times(2)).pullAsync("sub1", 2, false); methodOrder.verifyNoMoreInteractions(); } @@ -132,7 +132,7 @@ public void testExceptionThrownByPubSubClientResultingInErrorStream() throws Int .verify(); InOrder methodOrder = Mockito.inOrder(this.subscriberOperations); - methodOrder.verify(this.subscriberOperations, times(2)).pullFuture("sub1", 2, false); + methodOrder.verify(this.subscriberOperations, times(2)).pullAsync("sub1", 2, false); methodOrder.verifyNoMoreInteractions(); } @@ -151,7 +151,7 @@ public void testUnlimitedDemand() throws InterruptedException { .verify(); InOrder methodOrder = Mockito.inOrder(this.subscriberOperations); - methodOrder.verify(this.subscriberOperations, times(3)).pullFuture("sub1", Integer.MAX_VALUE, true); + methodOrder.verify(this.subscriberOperations, times(3)).pullAsync("sub1", Integer.MAX_VALUE, true); methodOrder.verifyNoMoreInteractions(); } @@ -167,7 +167,7 @@ public void testUnlimitedDemandWithException() throws InterruptedException { .verify(); InOrder methodOrder = Mockito.inOrder(this.subscriberOperations); - methodOrder.verify(this.subscriberOperations, times(2)).pullFuture("sub1", Integer.MAX_VALUE, true); + methodOrder.verify(this.subscriberOperations, times(2)).pullAsync("sub1", Integer.MAX_VALUE, true); methodOrder.verifyNoMoreInteractions(); } @@ -186,7 +186,7 @@ private String messageToString(AcknowledgeablePubsubMessage message) { private void setUpMessages(String... messages) { List msgList = new ArrayList<>(Arrays.asList(messages)); - when(subscriberOperations.pullFuture(eq("sub1"), any(Integer.class), any(Boolean.class))).then(invocationOnMock -> { + when(subscriberOperations.pullAsync(eq("sub1"), any(Integer.class), any(Boolean.class))).then(invocationOnMock -> { List result = new ArrayList<>(); for (int i = 0; i < (Integer) invocationOnMock.getArgument(1); i++) { if (msgList.isEmpty()) { From e43a90a84cccb93bc086fc22f311d24fe85a4a23 Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Wed, 4 Mar 2020 08:37:30 +0100 Subject: [PATCH 07/26] Made the executor for the async pull configurable, and added javadoc to its new setter and the class. Also added some javadoc to other setter methods that had no javadoc yet. --- .../checkstyle/checkstyle-suppressions.xml | 9 ---- .../subscriber/PubSubSubscriberTemplate.java | 48 ++++++++++++++++++- 2 files changed, 47 insertions(+), 10 deletions(-) delete mode 100644 spring-cloud-gcp-pubsub/src/checkstyle/checkstyle-suppressions.xml diff --git a/spring-cloud-gcp-pubsub/src/checkstyle/checkstyle-suppressions.xml b/spring-cloud-gcp-pubsub/src/checkstyle/checkstyle-suppressions.xml deleted file mode 100644 index f3f859eeb8..0000000000 --- a/spring-cloud-gcp-pubsub/src/checkstyle/checkstyle-suppressions.xml +++ /dev/null @@ -1,9 +0,0 @@ - - - - - - diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java index d7c4bb1290..3390e47b2a 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java @@ -44,6 +44,7 @@ import com.google.pubsub.v1.PullRequest; import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.ReceivedMessage; import org.springframework.beans.factory.DisposableBean; import org.springframework.cloud.gcp.pubsub.support.AcknowledgeablePubsubMessage; import org.springframework.cloud.gcp.pubsub.support.BasicAcknowledgeablePubsubMessage; @@ -65,6 +66,12 @@ * * A custom {@link Executor} can be injected to control per-subscription batch * parallelization in acknowledgement and deadline operations. + * By default, this is a single thread executor, + * created per instance of the {@link PubSubSubscriberOperations}. + * + * A custom {@link Executor} can be injected to control the threads that process + * the responses of the asynchronous pull callback operations. + * By default, this is executed on the same thread that executes the callback. * * @author Vinicius Carvalho * @author João André Martins @@ -72,6 +79,7 @@ * @author Chengyuan Zhao * @author Doug Hoard * @author Elena Felder + * @author Maurice Zeijen * * @since 1.1 */ @@ -88,6 +96,8 @@ public class PubSubSubscriberTemplate private Executor ackExecutor = this.defaultAckExecutor; + private Executor asyncPullExecutor = Runnable::run; + /** * Default {@link PubSubSubscriberTemplate} constructor. * @@ -101,21 +111,48 @@ public PubSubSubscriberTemplate(SubscriberFactory subscriberFactory) { this.subscriberStub = this.subscriberFactory.createSubscriberStub(); } + /** + * Get the converter used to convert a message payload to the desired type. + * + * @return the converter to set + */ public PubSubMessageConverter getMessageConverter() { return this.pubSubMessageConverter; } + /** + * Set the converter used to convert a message payload to the desired type. + * + * @param pubSubMessageConverter the converter to set + */ public void setMessageConverter(PubSubMessageConverter pubSubMessageConverter) { Assert.notNull(pubSubMessageConverter, "The pubSubMessageConverter can't be null."); this.pubSubMessageConverter = pubSubMessageConverter; } + /** + * Sets the {@link Executor} to control per-subscription batch + * parallelization in acknowledgement and deadline operations. + * + * @param ackExecutor the executor to set + */ public void setAckExecutor(Executor ackExecutor) { Assert.notNull(ackExecutor, "ackExecutor can't be null."); this.ackExecutor = ackExecutor; } + /** + * Sets a custom {@link Executor} can be injected to control the threads that process + * the responses of the asynchronous pull callback operations. + * + * @param asyncPullExecutor the executor to set + */ + public void setAsyncPullExecutor(Executor asyncPullExecutor) { + Assert.notNull(asyncPullExecutor, "asyncPullExecutor can't be null."); + this.asyncPullExecutor = asyncPullExecutor; + } + @Override @Deprecated public Subscriber subscribe(String subscription, MessageReceiver messageReceiver) { @@ -183,6 +220,15 @@ private List pull(PullRequest pullRequest) { .collect(Collectors.toList()); } + + /** + * Pulls messages asynchronously, on demand, using the pull request in argument. + * + * @param pullRequest pull request containing the subscription name + * @return the ListenableFuture for the asynchronous execution, returning + * the list of {@link AcknowledgeablePubsubMessage} containing the ack ID, subscription + * and acknowledger + */ private ListenableFuture> pullAsync(PullRequest pullRequest) { Assert.notNull(pullRequest, "The pull request can't be null."); @@ -210,7 +256,7 @@ public void onSuccess(PullResponse pullResponse) { settableFuture.set(result); } - }, MoreExecutors.directExecutor()); + }, asyncPullExecutor); return settableFuture; } From 418b5fc068dd6008e13b5f4de82dda58a349330c Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Wed, 4 Mar 2020 08:38:20 +0100 Subject: [PATCH 08/26] Deduplicated code that creates a list of PulledAcknowledgeablePubsubMessage --- .../subscriber/PubSubSubscriberTemplate.java | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java index 3390e47b2a..6f935cd42f 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java @@ -211,13 +211,9 @@ private List pull(PullRequest pullRequest) { Assert.notNull(pullRequest, "The pull request can't be null."); PullResponse pullResponse = this.subscriberStub.pullCallable().call(pullRequest); - return pullResponse.getReceivedMessagesList().stream() - .map((message) -> new PulledAcknowledgeablePubsubMessage( - PubSubSubscriptionUtils.toProjectSubscriptionName(pullRequest.getSubscription(), - this.subscriberFactory.getProjectId()), - message.getMessage(), - message.getAckId())) - .collect(Collectors.toList()); + return toAcknowledgeablePubsubMessageList( + pullResponse.getReceivedMessagesList(), this.subscriberFactory.getProjectId(), + pullRequest.getSubscription()); } @@ -246,12 +242,8 @@ public void onFailure(Throwable throwable) { @Override public void onSuccess(PullResponse pullResponse) { - List result = pullResponse.getReceivedMessagesList().stream() - .map((message) -> new PulledAcknowledgeablePubsubMessage( - PubSubSubscriptionUtils.toProjectSubscriptionName(pullRequest.getSubscription(), projectId), - message.getMessage(), - message.getAckId())) - .collect(Collectors.toList()); + List result = toAcknowledgeablePubsubMessageList( + pullResponse.getReceivedMessagesList(), projectId, pullRequest.getSubscription()); settableFuture.set(result); } @@ -260,6 +252,17 @@ public void onSuccess(PullResponse pullResponse) { return settableFuture; } + + private List toAcknowledgeablePubsubMessageList(List messages, + String projectId, String subscriptionId) { + return messages.stream() + .map((message) -> new PulledAcknowledgeablePubsubMessage( + PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionId, + projectId), + message.getMessage(), + message.getAckId())) + .collect(Collectors.toList()); + } @Override public List pull( From a4b9b0707eb5f5e23154cee7d07ed84c5afe1895 Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Wed, 4 Mar 2020 09:14:59 +0100 Subject: [PATCH 09/26] Added and implemented PubSubSubscriberOperations.pullAndAckAsync --- .../cloud/gcp/pubsub/core/PubSubTemplate.java | 5 +++ .../PubSubSubscriberOperations.java | 14 +++++++ .../subscriber/PubSubSubscriberTemplate.java | 33 ++++++++++++++++- .../PubSubSubscriberTemplateTests.java | 37 +++++++++++++++++-- 4 files changed, 84 insertions(+), 5 deletions(-) diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java index 5f37a938e3..a6c4c7404e 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java @@ -176,6 +176,11 @@ public List pullAndAck(String subscription, Integer maxMessages, return this.pubSubSubscriberTemplate.pullAndAck(subscription, maxMessages, returnImmediately); } + @Override + public ListenableFuture> pullAndAckAsync(String subscription, Integer maxMessages, Boolean returnImmediately) { + return this.pubSubSubscriberTemplate.pullAndAckAsync(subscription, maxMessages, returnImmediately); + } + @Override public PubsubMessage pullNext(String subscription) { return this.pubSubSubscriberTemplate.pullNext(subscription); diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java index 5d29ca6d4f..5da6f92180 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java @@ -38,6 +38,7 @@ * @author Mike Eltsufin * @author Chengyuan Zhao * @author Doug Hoard + * @author Maurice Zeijen * * @since 1.1 */ @@ -92,6 +93,19 @@ Subscriber subscribeAndConvert(String subscription, */ List pullAndAck(String subscription, Integer maxMessages, Boolean returnImmediately); + /** + * Asynchronously pull and auto-acknowledge a number of messages from a Google Cloud Pub/Sub subscription. + * @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified + * subscription name in the {@code projects//subscriptions/} format + * @param maxMessages the maximum number of pulled messages + * @param returnImmediately returns immediately even if subscription doesn't contain enough + * messages to satisfy {@code maxMessages} + * @return the ListenableFuture for the asynchronous execution, returning the list of + * received acknowledgeable messages + * @since 1.3 + */ + ListenableFuture> pullAndAckAsync(String subscription, Integer maxMessages, Boolean returnImmediately); + /** * Pull a number of messages from a Google Cloud Pub/Sub subscription. * @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java index 6f935cd42f..54891f7f3a 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java @@ -216,7 +216,6 @@ private List pull(PullRequest pullRequest) { pullRequest.getSubscription()); } - /** * Pulls messages asynchronously, on demand, using the pull request in argument. * @@ -301,7 +300,7 @@ public List pullAndAck(String subscription, Integer maxMessages, List ackableMessages = pull(pullRequest); - if (ackableMessages.size() > 0) { + if (!ackableMessages.isEmpty()) { ack(ackableMessages); } @@ -309,6 +308,36 @@ public List pullAndAck(String subscription, Integer maxMessages, .collect(Collectors.toList()); } + @Override + public ListenableFuture> pullAndAckAsync(String subscription, Integer maxMessages, + Boolean returnImmediately) { + Assert.hasText(subscription, "The subscription can't be null or empty."); + + if (maxMessages != null) { + Assert.isTrue(maxMessages > 0, "The maxMessages must be greater than 0."); + } + + PullRequest pullRequest = this.subscriberFactory.createPullRequest( + subscription, maxMessages, returnImmediately); + + final SettableListenableFuture> settableFuture = new SettableListenableFuture<>(); + + pullAsync(pullRequest).addCallback( + ackableMessages -> { + if (!ackableMessages.isEmpty()) { + ack(ackableMessages); + } + List messages = ackableMessages.stream() + .map(AcknowledgeablePubsubMessage::getPubsubMessage) + .collect(Collectors.toList()); + + settableFuture.set(messages); + }, + settableFuture::setException); + + return settableFuture; + } + @Override public PubsubMessage pullNext(String subscription) { List receivedMessageList = pullAndAck(subscription, 1, true); diff --git a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java index be4ebf091b..989ee25d4d 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java +++ b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java @@ -413,6 +413,37 @@ public void testPullAndAck_NoMessages() { verify(this.pubSubSubscriberTemplate, never()).ack(any()); } + @Test + public void testPullAndAckAsync() throws InterruptedException, ExecutionException, TimeoutException { + ListenableFuture> asyncResult = this.pubSubSubscriberTemplate.pullAndAckAsync( + "sub2", 1, true); + + List result = asyncResult.get(10L, TimeUnit.SECONDS); + assertThat(asyncResult.isDone()).isTrue(); + + assertThat(result.size()).isEqualTo(1); + + PubsubMessage pubsubMessage = result.get(0); + assertThat(pubsubMessage).isSameAs(this.pubsubMessage); + + verify(this.pubSubSubscriberTemplate, times(1)).ack(any()); + } + + @Test + public void testPullAndAckAsync_NoMessages() throws InterruptedException, ExecutionException, TimeoutException { + when(this.pullApiFuture.get()).thenReturn(PullResponse.newBuilder().build()); + + ListenableFuture> asyncResult = this.pubSubSubscriberTemplate.pullAndAckAsync( + "sub2", 1, true); + + List result = asyncResult.get(10L, TimeUnit.SECONDS); + assertThat(asyncResult.isDone()).isTrue(); + + assertThat(result.size()).isEqualTo(0); + + verify(this.pubSubSubscriberTemplate, never()).ack(any()); + } + @Test public void testPullAndConvert() { List> result = this.pubSubSubscriberTemplate.pullAndConvert( @@ -429,12 +460,12 @@ public void testPullAndConvert() { @Test public void testPullFuture_AndManualAck() throws InterruptedException, ExecutionException, TimeoutException { - ListenableFuture> pullListenableFuture = this.pubSubSubscriberTemplate + ListenableFuture> asyncResult = this.pubSubSubscriberTemplate .pullAsync("sub", 1, true); - List result = pullListenableFuture.get(10L, TimeUnit.SECONDS); + List result = asyncResult.get(10L, TimeUnit.SECONDS); - assertThat(pullListenableFuture.isDone()).isTrue(); + assertThat(asyncResult.isDone()).isTrue(); assertThat(result.size()).isEqualTo(1); assertThat(result.get(0).getPubsubMessage()).isSameAs(this.pubsubMessage); From c07031ab63e0176a2514d41f42f941c99ce44cb8 Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Wed, 4 Mar 2020 09:29:23 +0100 Subject: [PATCH 10/26] Corrected test name after pullFuture changed into pullAsync --- .../pubsub/core/subscriber/PubSubSubscriberTemplateTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java index 989ee25d4d..d08cc24c1c 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java +++ b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java @@ -458,7 +458,7 @@ public void testPullAndConvert() { } @Test - public void testPullFuture_AndManualAck() throws InterruptedException, ExecutionException, TimeoutException { + public void testPullAsync_AndManualAck() throws InterruptedException, ExecutionException, TimeoutException { ListenableFuture> asyncResult = this.pubSubSubscriberTemplate .pullAsync("sub", 1, true); From 20386a835043cd43ad4526be785cb44ab447231d Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Wed, 4 Mar 2020 09:31:36 +0100 Subject: [PATCH 11/26] Added and implemented PubSubSubscriberOperations.pullAndConvertAsync --- .../cloud/gcp/pubsub/core/PubSubTemplate.java | 6 ++++++ .../PubSubSubscriberOperations.java | 16 +++++++++++++++ .../subscriber/PubSubSubscriberTemplate.java | 20 +++++++++++++++++-- .../PubSubSubscriberTemplateTests.java | 17 ++++++++++++++++ 4 files changed, 57 insertions(+), 2 deletions(-) diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java index a6c4c7404e..9cf5460dc0 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java @@ -170,6 +170,12 @@ public List> pullAndConvert(String return this.pubSubSubscriberTemplate.pullAndConvert(subscription, maxMessages, returnImmediately, payloadType); } + @Override + public ListenableFuture>> pullAndConvertAsync(String subscription, + Integer maxMessages, Boolean returnImmediately, Class payloadType) { + return this.pullAndConvertAsync(subscription, maxMessages, returnImmediately, payloadType); + } + @Override public List pullAndAck(String subscription, Integer maxMessages, Boolean returnImmediately) { diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java index 5da6f92180..8c165d6339 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java @@ -146,6 +146,22 @@ Subscriber subscribeAndConvert(String subscription, List> pullAndConvert(String subscription, Integer maxMessages, Boolean returnImmediately, Class payloadType); + /** + * Asynchronously pull a number of messages from a Google Cloud Pub/Sub subscription and convert them to Spring messages with + * the desired payload type. + * @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified + * subscription name in the {@code projects//subscriptions/} format + * @param maxMessages the maximum number of pulled messages + * @param returnImmediately returns immediately even if subscription doesn't contain enough + * messages to satisfy {@code maxMessages} + * @param payloadType the type to which the payload of the Pub/Sub messages should be converted + * @param the type of the payload + * @return the ListenableFuture for the asynchronous execution, returning the list of + * received acknowledgeable messages + * @since 1.3 + */ + ListenableFuture>> pullAndConvertAsync(String subscription, + Integer maxMessages, Boolean returnImmediately, Class payloadType); /** * Pull and auto-acknowledge a message from a Google Cloud Pub/Sub subscription. diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java index 54891f7f3a..1559e2864a 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java @@ -35,7 +35,6 @@ import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.cloud.pubsub.v1.Subscriber; import com.google.cloud.pubsub.v1.stub.SubscriberStub; -import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Empty; import com.google.pubsub.v1.AcknowledgeRequest; import com.google.pubsub.v1.ModifyAckDeadlineRequest; @@ -280,6 +279,23 @@ public List> pullAndConvert(String Boolean returnImmediately, Class payloadType) { List ackableMessages = this.pull(subscription, maxMessages, returnImmediately); + return this.toConvertedAcknowledgeablePubsubMessages(payloadType, ackableMessages); + } + + @Override + public ListenableFuture>> pullAndConvertAsync(String subscription, + Integer maxMessages, Boolean returnImmediately, Class payloadType) { + final SettableListenableFuture>> settableFuture = new SettableListenableFuture<>(); + + this.pullAsync(subscription, maxMessages, returnImmediately).addCallback( + ackableMessages -> settableFuture + .set(this.toConvertedAcknowledgeablePubsubMessages(payloadType, ackableMessages)), + settableFuture::setException); + + return settableFuture; + } + + private List> toConvertedAcknowledgeablePubsubMessages(Class payloadType, List ackableMessages) { return ackableMessages.stream().map( (m) -> new ConvertedPulledAcknowledgeablePubsubMessage<>(m, this.pubSubMessageConverter.fromPubSubMessage(m.getPubsubMessage(), payloadType)) @@ -322,7 +338,7 @@ public ListenableFuture> pullAndAckAsync(String subscription final SettableListenableFuture> settableFuture = new SettableListenableFuture<>(); - pullAsync(pullRequest).addCallback( + this.pullAsync(pullRequest).addCallback( ackableMessages -> { if (!ackableMessages.isEmpty()) { ack(ackableMessages); diff --git a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java index d08cc24c1c..095c723134 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java +++ b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java @@ -457,6 +457,23 @@ public void testPullAndConvert() { assertThat(result.get(0).getProjectSubscriptionName().getSubscription()).isEqualTo("sub2"); } + @Test + public void testPullAndConvertAsync() throws InterruptedException, ExecutionException, TimeoutException { + ListenableFuture>> asyncResult = this.pubSubSubscriberTemplate.pullAndConvertAsync( + "sub2", 1, true, BigInteger.class); + + List> result = asyncResult.get(10L, TimeUnit.SECONDS); + assertThat(asyncResult.isDone()).isTrue(); + + verify(this.messageConverter).fromPubSubMessage(this.pubsubMessage, BigInteger.class); + + assertThat(result.size()).isEqualTo(1); + assertThat(result.get(0).getPubsubMessage()).isSameAs(this.pubsubMessage); + assertThat(result.get(0).getProjectSubscriptionName().getProject()).isEqualTo("testProject"); + assertThat(result.get(0).getProjectSubscriptionName().getSubscription()).isEqualTo("sub2"); + } + + @Test public void testPullAsync_AndManualAck() throws InterruptedException, ExecutionException, TimeoutException { From 92cf4698045903ca6d9e18f8aad24e75c27a5285 Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Wed, 4 Mar 2020 10:14:16 +0100 Subject: [PATCH 12/26] Added and implemented PubSubSubscriberOperations.pullNextAsync. Also added tests for PubSubSubscriberOperations.pullNext, because they where missing. --- .../cloud/gcp/pubsub/core/PubSubTemplate.java | 5 ++ .../PubSubSubscriberOperations.java | 10 +++ .../subscriber/PubSubSubscriberTemplate.java | 16 +++- .../PubSubSubscriberTemplateTests.java | 90 +++++++++++++++---- 4 files changed, 102 insertions(+), 19 deletions(-) diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java index 9cf5460dc0..b71a6165d3 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java @@ -192,6 +192,11 @@ public PubsubMessage pullNext(String subscription) { return this.pubSubSubscriberTemplate.pullNext(subscription); } + @Override + public ListenableFuture pullNextAsync(String subscription) { + return this.pubSubSubscriberTemplate.pullNextAsync(subscription); + } + @Override public void afterPropertiesSet() throws Exception { } diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java index 8c165d6339..4e3c3abbbc 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java @@ -171,6 +171,16 @@ ListenableFuture>> pullAndConv */ PubsubMessage pullNext(String subscription); + /** + * Asynchronously pull and auto-acknowledge a message from a Google Cloud Pub/Sub subscription. + * @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified + * subscription name in the {@code projects//subscriptions/} format + * @return the ListenableFuture for the asynchronous execution, returning a received message, + * or {@code null} if none exists in the subscription + * @since 1.3 + */ + ListenableFuture pullNextAsync(String subscription); + /** * Acknowledge a batch of messages. The messages must have the same project id. * @param acknowledgeablePubsubMessages messages to be acknowledged diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java index 1559e2864a..f33bf6f6d6 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java @@ -358,7 +358,21 @@ public ListenableFuture> pullAndAckAsync(String subscription public PubsubMessage pullNext(String subscription) { List receivedMessageList = pullAndAck(subscription, 1, true); - return (receivedMessageList.size() > 0) ? receivedMessageList.get(0) : null; + return receivedMessageList.isEmpty() ? null: receivedMessageList.get(0); + } + + @Override + public ListenableFuture pullNextAsync(String subscription) { + final SettableListenableFuture settableFuture = new SettableListenableFuture<>(); + + this.pullAndAckAsync(subscription, 1, true).addCallback( + messages -> { + PubsubMessage message = messages.isEmpty() ? null : messages.get(0); + settableFuture.set(message); + }, + settableFuture::setException); + + return settableFuture; } public SubscriberFactory getSubscriberFactory() { diff --git a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java index 095c723134..4db0f2e212 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java +++ b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java @@ -60,6 +60,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -68,6 +69,8 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; /** @@ -388,6 +391,38 @@ public void testPull_AndManualMultiSubscriptionAck() verify(this.ackApiFuture, times(2)).addListener(any(), same(mockExecutor)); } + @Test + public void testPullAsync_AndManualAck() throws InterruptedException, ExecutionException, TimeoutException { + + ListenableFuture> asyncResult = this.pubSubSubscriberTemplate + .pullAsync("sub", 1, true); + + List result = asyncResult.get(10L, TimeUnit.SECONDS); + + assertThat(asyncResult.isDone()).isTrue(); + + assertThat(result.size()).isEqualTo(1); + assertThat(result.get(0).getPubsubMessage()).isSameAs(this.pubsubMessage); + assertThat(result.get(0).getProjectSubscriptionName().getProject()).isEqualTo("testProject"); + assertThat(result.get(0).getProjectSubscriptionName().getSubscription()).isEqualTo("sub"); + + AcknowledgeablePubsubMessage acknowledgeablePubsubMessage = result.get(0); + assertThat(acknowledgeablePubsubMessage.getAckId()).isNotNull(); + + TestListenableFutureCallback ackTestListenableFutureCallback = new TestListenableFutureCallback(); + + ListenableFuture ackListenableFuture = this.pubSubSubscriberTemplate.ack(result); + + assertThat(ackListenableFuture).isNotNull(); + + ackListenableFuture.addCallback(ackTestListenableFutureCallback); + ackListenableFuture.get(10L, TimeUnit.SECONDS); + + assertThat(ackListenableFuture.isDone()).isTrue(); + + assertThat(ackTestListenableFutureCallback.getThrowable()).isNull(); + } + @Test public void testPullAndAck() { List result = this.pubSubSubscriberTemplate.pullAndAck( @@ -473,37 +508,56 @@ public void testPullAndConvertAsync() throws InterruptedException, ExecutionExce assertThat(result.get(0).getProjectSubscriptionName().getSubscription()).isEqualTo("sub2"); } + @Test + public void testPullNext() { + + PubsubMessage message = this.pubSubSubscriberTemplate.pullNext("sub2"); + + assertThat(message).isSameAs(this.pubsubMessage); + + verify(this.subscriberFactory).createPullRequest("sub2", 1, true); + verify(this.pubSubSubscriberTemplate, times(1)).ack(any()); + } @Test - public void testPullAsync_AndManualAck() throws InterruptedException, ExecutionException, TimeoutException { + public void testPullNext_NoMessages() { + when(this.pullCallable.call(any(PullRequest.class))).thenReturn(PullResponse.newBuilder().build()); - ListenableFuture> asyncResult = this.pubSubSubscriberTemplate - .pullAsync("sub", 1, true); + PubsubMessage message = this.pubSubSubscriberTemplate.pullNext("sub2"); - List result = asyncResult.get(10L, TimeUnit.SECONDS); + assertThat(message).isNull(); - assertThat(asyncResult.isDone()).isTrue(); + verify(this.subscriberFactory).createPullRequest("sub2", 1, true); + verify(this.pubSubSubscriberTemplate, never()).ack(any()); + } - assertThat(result.size()).isEqualTo(1); - assertThat(result.get(0).getPubsubMessage()).isSameAs(this.pubsubMessage); - assertThat(result.get(0).getProjectSubscriptionName().getProject()).isEqualTo("testProject"); - assertThat(result.get(0).getProjectSubscriptionName().getSubscription()).isEqualTo("sub"); - AcknowledgeablePubsubMessage acknowledgeablePubsubMessage = result.get(0); - assertThat(acknowledgeablePubsubMessage.getAckId()).isNotNull(); + @Test + public void testPullNextAsync() throws InterruptedException, ExecutionException, TimeoutException { + ListenableFuture asyncResult = this.pubSubSubscriberTemplate.pullNextAsync("sub2"); - TestListenableFutureCallback ackTestListenableFutureCallback = new TestListenableFutureCallback(); + PubsubMessage message = asyncResult.get(10L, TimeUnit.SECONDS); + assertThat(asyncResult.isDone()).isTrue(); - ListenableFuture ackListenableFuture = this.pubSubSubscriberTemplate.ack(result); + assertThat(message).isSameAs(this.pubsubMessage); - assertThat(ackListenableFuture).isNotNull(); + verify(this.subscriberFactory).createPullRequest("sub2", 1, true); + verify(this.pubSubSubscriberTemplate, times(1)).ack(any()); + } - ackListenableFuture.addCallback(ackTestListenableFutureCallback); - ackListenableFuture.get(10L, TimeUnit.SECONDS); + @Test + public void testPullNextAsync_NoMessages() throws InterruptedException, ExecutionException, TimeoutException { + when(this.pullApiFuture.get()).thenReturn(PullResponse.newBuilder().build()); - assertThat(ackListenableFuture.isDone()).isTrue(); + ListenableFuture asyncResult = this.pubSubSubscriberTemplate.pullNextAsync("sub2"); - assertThat(ackTestListenableFutureCallback.getThrowable()).isNull(); + PubsubMessage message = asyncResult.get(10L, TimeUnit.SECONDS); + assertThat(asyncResult.isDone()).isTrue(); + + assertThat(message).isNull(); + + verify(this.subscriberFactory).createPullRequest("sub2", 1, true); + verify(this.pubSubSubscriberTemplate, never()).ack(any()); } private class TestListenableFutureCallback implements ListenableFutureCallback { From 403fd31c64666a9a365017630d7d90d0a2b96e87 Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Wed, 4 Mar 2020 10:20:22 +0100 Subject: [PATCH 13/26] Fixed some checkstyle issues --- .../cloud/gcp/pubsub/core/PubSubTemplate.java | 2 +- .../pubsub/core/subscriber/PubSubSubscriberTemplate.java | 6 +++--- .../core/subscriber/PubSubSubscriberTemplateTests.java | 3 --- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java index b71a6165d3..97a6a5288d 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java @@ -173,7 +173,7 @@ public List> pullAndConvert(String @Override public ListenableFuture>> pullAndConvertAsync(String subscription, Integer maxMessages, Boolean returnImmediately, Class payloadType) { - return this.pullAndConvertAsync(subscription, maxMessages, returnImmediately, payloadType); + return this.pubSubSubscriberTemplate.pullAndConvertAsync(subscription, maxMessages, returnImmediately, payloadType); } @Override diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java index f33bf6f6d6..71595bd638 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java @@ -42,8 +42,8 @@ import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PullRequest; import com.google.pubsub.v1.PullResponse; - import com.google.pubsub.v1.ReceivedMessage; + import org.springframework.beans.factory.DisposableBean; import org.springframework.cloud.gcp.pubsub.support.AcknowledgeablePubsubMessage; import org.springframework.cloud.gcp.pubsub.support.BasicAcknowledgeablePubsubMessage; @@ -250,7 +250,7 @@ public void onSuccess(PullResponse pullResponse) { return settableFuture; } - + private List toAcknowledgeablePubsubMessageList(List messages, String projectId, String subscriptionId) { return messages.stream() @@ -358,7 +358,7 @@ public ListenableFuture> pullAndAckAsync(String subscription public PubsubMessage pullNext(String subscription) { List receivedMessageList = pullAndAck(subscription, 1, true); - return receivedMessageList.isEmpty() ? null: receivedMessageList.get(0); + return receivedMessageList.isEmpty() ? null : receivedMessageList.get(0); } @Override diff --git a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java index 4db0f2e212..48d87f8d0c 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java +++ b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java @@ -60,7 +60,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -69,8 +68,6 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; /** From cd9cc2b9f7b09cd0c0ae3b6d8c897c5ebd6b7ea3 Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Wed, 4 Mar 2020 10:37:03 +0100 Subject: [PATCH 14/26] Made the `PubSubSubscriberTemplate.asyncPullExecutor` configurable via autoconfiguration --- .../pubsub/GcpPubSubAutoConfiguration.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java b/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java index 7d27aba74c..41f226043c 100644 --- a/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java +++ b/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java @@ -153,6 +153,12 @@ public PubSubPublisherTemplate pubSubPublisherTemplate(PublisherFactory publishe return pubSubPublisherTemplate; } + @Bean + @ConditionalOnMissingBean(name = "pubSubAsynchronousPullExecutor") + public Executor pubSubAsynchronousPullExecutor() { + return Runnable::run; + } + @Bean @ConditionalOnMissingBean(name = "pubSubAcknowledgementExecutor") public Executor pubSubAcknowledgementExecutor() { @@ -167,10 +173,12 @@ public Executor pubSubAcknowledgementExecutor() { @ConditionalOnMissingBean public PubSubSubscriberTemplate pubSubSubscriberTemplate(SubscriberFactory subscriberFactory, ObjectProvider pubSubMessageConverter, - @Qualifier("pubSubAcknowledgementExecutor") Executor executor) { + @Qualifier("pubSubAsynchronousPullExecutor") Executor asyncPullExecutor, + @Qualifier("pubSubAcknowledgementExecutor") Executor ackExecutor) { PubSubSubscriberTemplate pubSubSubscriberTemplate = new PubSubSubscriberTemplate(subscriberFactory); pubSubMessageConverter.ifUnique(pubSubSubscriberTemplate::setMessageConverter); - pubSubSubscriberTemplate.setAckExecutor(executor); + pubSubSubscriberTemplate.setAckExecutor(ackExecutor); + pubSubSubscriberTemplate.setAsyncPullExecutor(asyncPullExecutor); return pubSubSubscriberTemplate; } From 00acef213acfc84eda947df1af1c95bd191dd16f Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Wed, 4 Mar 2020 13:34:50 +0100 Subject: [PATCH 15/26] Added information on the scheduler to the PubSubReactiveFactory class javadoc. --- .../cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java index 2e4149c544..3d0fb98d1a 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java @@ -36,7 +36,13 @@ /** * A factory for procuring {@link Flux} instances backed by GCP Pub/Sub Subscriptions. * + * The {@link Scheduler}, that is given to the constructor, + * is used for regularly polling the subscription, + * when the demand is unlimited. + * The scheduler is not used when there is a specific demand (a.k.a backpressure). + * * @author Elena Felder + * @author Maurice Zeijen * * @since 1.2 */ From 17179edca1018680990ec1d396a55d4c68fa2ac7 Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Wed, 4 Mar 2020 13:35:35 +0100 Subject: [PATCH 16/26] Refactored the infinite/poll pull method --- .../cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java index 3d0fb98d1a..17dd66ac1d 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java @@ -88,8 +88,7 @@ public Flux poll(String subscriptionName, long pol return Flux.create(sink -> { sink.onRequest((numRequested) -> { if (numRequested == Long.MAX_VALUE) { - Disposable disposable = infinitePull(subscriptionName, pollingPeriodMs, sink); - sink.onCancel(disposable); + pollingPull(subscriptionName, pollingPeriodMs, sink); } else { backpressurePull(subscriptionName, numRequested, sink); @@ -98,12 +97,14 @@ public Flux poll(String subscriptionName, long pol }); } - private Disposable infinitePull(String subscriptionName, long pollingPeriodMs, + private void pollingPull(String subscriptionName, long pollingPeriodMs, FluxSink sink) { - return Flux + Disposable disposable = Flux .interval(Duration.ZERO, Duration.ofMillis(pollingPeriodMs), scheduler) .flatMap(ignore -> pullAll(subscriptionName)) .subscribe(sink::next, sink::error); + + sink.onDispose(disposable); } private Flux pullAll(String subscriptionName) { From af10c6ce571a0b65b4022d695b8a7fd49c42f3d5 Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Wed, 4 Mar 2020 13:36:02 +0100 Subject: [PATCH 17/26] Added my name to the tests that I also significantly extended --- .../pubsub/core/subscriber/PubSubSubscriberTemplateTests.java | 1 + .../cloud/gcp/pubsub/reactive/PubSubReactiveFactoryTests.java | 1 + 2 files changed, 2 insertions(+) diff --git a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java index 48d87f8d0c..35ae445b70 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java +++ b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java @@ -76,6 +76,7 @@ * @author Mike Eltsufin * @author Doug Hoard * @author Elena Felder + * @author Maurice Zeijen */ @RunWith(MockitoJUnitRunner.class) public class PubSubSubscriberTemplateTests { diff --git a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactoryTests.java b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactoryTests.java index fc779e17cd..1283c4fd5d 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactoryTests.java +++ b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactoryTests.java @@ -52,6 +52,7 @@ * Tests for streams generated by PubSubReactiveFactory. * * @author Elena Felder + * @author Maurice Zeijen * * @since 1.2 */ From dd6d81492b75e809182b916b7df72a1af148e9f3 Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Tue, 10 Mar 2020 07:05:52 +0100 Subject: [PATCH 18/26] Update spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java Co-Authored-By: Elena Felder <41136058+elefeint@users.noreply.github.com> --- .../gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java index 71595bd638..b21a87e57a 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java @@ -66,7 +66,7 @@ * A custom {@link Executor} can be injected to control per-subscription batch * parallelization in acknowledgement and deadline operations. * By default, this is a single thread executor, - * created per instance of the {@link PubSubSubscriberOperations}. + * created per instance of the {@link PubSubSubscriberTemplate}. * * A custom {@link Executor} can be injected to control the threads that process * the responses of the asynchronous pull callback operations. From d250e0b7723c3e22d323270eda481232d04286c4 Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Tue, 10 Mar 2020 07:06:05 +0100 Subject: [PATCH 19/26] Update spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java Co-Authored-By: Elena Felder <41136058+elefeint@users.noreply.github.com> --- .../gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java index b21a87e57a..ab0b44fee2 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java @@ -113,7 +113,7 @@ public PubSubSubscriberTemplate(SubscriberFactory subscriberFactory) { /** * Get the converter used to convert a message payload to the desired type. * - * @return the converter to set + * @return the currently used converter */ public PubSubMessageConverter getMessageConverter() { return this.pubSubMessageConverter; From 0466c780d3d8a16d07acdcd29102ab97382b4378 Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Tue, 10 Mar 2020 07:06:23 +0100 Subject: [PATCH 20/26] Update spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java Co-Authored-By: Elena Felder <41136058+elefeint@users.noreply.github.com> --- .../gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java index ab0b44fee2..21648cf36d 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java @@ -142,7 +142,7 @@ public void setAckExecutor(Executor ackExecutor) { } /** - * Sets a custom {@link Executor} can be injected to control the threads that process + * Set a custom {@link Executor} to control the threads that process * the responses of the asynchronous pull callback operations. * * @param asyncPullExecutor the executor to set From 6996cfc74ec341b3727faabcf858a15ad187da1a Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Tue, 10 Mar 2020 07:21:33 +0100 Subject: [PATCH 21/26] Removed passing around projectId to `toAcknowledgeablePubsubMessageList`, as itself can get the project id --- .../core/subscriber/PubSubSubscriberTemplate.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java index 21648cf36d..c0e44ddf2b 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java @@ -211,7 +211,7 @@ private List pull(PullRequest pullRequest) { PullResponse pullResponse = this.subscriberStub.pullCallable().call(pullRequest); return toAcknowledgeablePubsubMessageList( - pullResponse.getReceivedMessagesList(), this.subscriberFactory.getProjectId(), + pullResponse.getReceivedMessagesList(), pullRequest.getSubscription()); } @@ -228,8 +228,6 @@ private ListenableFuture> pullAsync(PullReque ApiFuture pullFuture = this.subscriberStub.pullCallable().futureCall(pullRequest); - final String projectId = this.subscriberFactory.getProjectId(); - final SettableListenableFuture> settableFuture = new SettableListenableFuture<>(); ApiFutures.addCallback(pullFuture, new ApiFutureCallback() { @@ -241,7 +239,7 @@ public void onFailure(Throwable throwable) { @Override public void onSuccess(PullResponse pullResponse) { List result = toAcknowledgeablePubsubMessageList( - pullResponse.getReceivedMessagesList(), projectId, pullRequest.getSubscription()); + pullResponse.getReceivedMessagesList(), pullRequest.getSubscription()); settableFuture.set(result); } @@ -252,11 +250,11 @@ public void onSuccess(PullResponse pullResponse) { } private List toAcknowledgeablePubsubMessageList(List messages, - String projectId, String subscriptionId) { + String subscriptionId) { return messages.stream() .map((message) -> new PulledAcknowledgeablePubsubMessage( PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionId, - projectId), + this.subscriberFactory.getProjectId()), message.getMessage(), message.getAckId())) .collect(Collectors.toList()); From 809086c8d3ac4e90e838eaf242533a05051c8368 Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Tue, 10 Mar 2020 07:27:59 +0100 Subject: [PATCH 22/26] Removed the `pubSubAsynchronousPullExecutor` bean in favour of using a ObjectProvider for the pubSubSubscriberTemplate bean method --- .../pubsub/GcpPubSubAutoConfiguration.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java b/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java index 41f226043c..c501ac7b3e 100644 --- a/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java +++ b/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java @@ -153,12 +153,6 @@ public PubSubPublisherTemplate pubSubPublisherTemplate(PublisherFactory publishe return pubSubPublisherTemplate; } - @Bean - @ConditionalOnMissingBean(name = "pubSubAsynchronousPullExecutor") - public Executor pubSubAsynchronousPullExecutor() { - return Runnable::run; - } - @Bean @ConditionalOnMissingBean(name = "pubSubAcknowledgementExecutor") public Executor pubSubAcknowledgementExecutor() { @@ -173,12 +167,12 @@ public Executor pubSubAcknowledgementExecutor() { @ConditionalOnMissingBean public PubSubSubscriberTemplate pubSubSubscriberTemplate(SubscriberFactory subscriberFactory, ObjectProvider pubSubMessageConverter, - @Qualifier("pubSubAsynchronousPullExecutor") Executor asyncPullExecutor, + @Qualifier("pubSubAsynchronousPullExecutor") ObjectProvider asyncPullExecutor, @Qualifier("pubSubAcknowledgementExecutor") Executor ackExecutor) { PubSubSubscriberTemplate pubSubSubscriberTemplate = new PubSubSubscriberTemplate(subscriberFactory); pubSubMessageConverter.ifUnique(pubSubSubscriberTemplate::setMessageConverter); pubSubSubscriberTemplate.setAckExecutor(ackExecutor); - pubSubSubscriberTemplate.setAsyncPullExecutor(asyncPullExecutor); + asyncPullExecutor.ifAvailable(pubSubSubscriberTemplate::setAsyncPullExecutor); return pubSubSubscriberTemplate; } From eea5e531e538bd711eeac930773ac827fa731d00 Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Tue, 10 Mar 2020 08:24:21 +0100 Subject: [PATCH 23/26] Removed the intermediate build calls for protobuf objects --- .../pubsub/core/subscriber/PubSubSubscriberTemplateTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java index 35ae445b70..178520f5f8 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java +++ b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java @@ -188,13 +188,13 @@ public void setUp() throws ExecutionException, InterruptedException { }).when(this.pullApiFuture).addListener(any(Runnable.class), any(Executor.class)); when(this.pullApiFuture.isDone()).thenReturn(true); when(this.pullApiFuture.get()).thenReturn(PullResponse.newBuilder() - .addReceivedMessages(ReceivedMessage.newBuilder().setMessage(this.pubsubMessage).build()).build()); + .addReceivedMessages(ReceivedMessage.newBuilder().setMessage(this.pubsubMessage)).build()); // create objects under test when(this.subscriberFactory.createSubscriberStub()).thenReturn(this.subscriberStub); when(this.subscriberStub.pullCallable()).thenReturn(this.pullCallable); when(this.pullCallable.call(any(PullRequest.class))).thenReturn(PullResponse.newBuilder() - .addReceivedMessages(ReceivedMessage.newBuilder().setMessage(this.pubsubMessage).build()).build()); + .addReceivedMessages(ReceivedMessage.newBuilder().setMessage(this.pubsubMessage)).build()); // create object under test this.pubSubSubscriberTemplate = spy(new PubSubSubscriberTemplate(this.subscriberFactory)); From 3ee9f531fab2dedda3869e0648eb71af81e43187 Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Wed, 11 Mar 2020 09:25:59 +0100 Subject: [PATCH 24/26] Reduced amount of duplicate code in `pullAndAck` and `pullAndAckAsync` by moving the validation into `SubscriberFactory.createPullRequest` (where it partially already was) --- .../core/subscriber/PubSubSubscriberTemplate.java | 12 ------------ .../gcp/pubsub/support/DefaultSubscriberFactory.java | 2 ++ .../cloud/gcp/pubsub/support/SubscriberFactory.java | 3 ++- 3 files changed, 4 insertions(+), 13 deletions(-) diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java index c0e44ddf2b..c4ab562fae 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java @@ -303,12 +303,6 @@ private List> toConvertedAcknowledg @Override public List pullAndAck(String subscription, Integer maxMessages, Boolean returnImmediately) { - Assert.hasText(subscription, "The subscription can't be null or empty."); - - if (maxMessages != null) { - Assert.isTrue(maxMessages > 0, "The maxMessages must be greater than 0."); - } - PullRequest pullRequest = this.subscriberFactory.createPullRequest( subscription, maxMessages, returnImmediately); @@ -325,12 +319,6 @@ public List pullAndAck(String subscription, Integer maxMessages, @Override public ListenableFuture> pullAndAckAsync(String subscription, Integer maxMessages, Boolean returnImmediately) { - Assert.hasText(subscription, "The subscription can't be null or empty."); - - if (maxMessages != null) { - Assert.isTrue(maxMessages > 0, "The maxMessages must be greater than 0."); - } - PullRequest pullRequest = this.subscriberFactory.createPullRequest( subscription, maxMessages, returnImmediately); diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/support/DefaultSubscriberFactory.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/support/DefaultSubscriberFactory.java index 8ebb11fd98..2798861b0b 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/support/DefaultSubscriberFactory.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/support/DefaultSubscriberFactory.java @@ -227,6 +227,8 @@ public PullRequest createPullRequest(String subscriptionName, Integer maxMessage PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, this.projectId).toString()); if (maxMessages != null) { + Assert.isTrue(maxMessages > 0, "The maxMessages must be greater than 0."); + pullRequestBuilder.setMaxMessages(maxMessages); } diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/support/SubscriberFactory.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/support/SubscriberFactory.java index 5aed893534..5db292033d 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/support/SubscriberFactory.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/support/SubscriberFactory.java @@ -53,7 +53,8 @@ public interface SubscriberFactory { * Create a {@link PullRequest} for synchronously pulling a number of messages from * a Google Cloud Pub/Sub subscription. * @param subscriptionName the name of the subscription - * @param maxMessages the maximum number of pulled messages + * @param maxMessages the maximum number of pulled messages, + * which can be null or must be a positive number * @param returnImmediately causes the pull request to return immediately even * if subscription doesn't contain enough messages to satisfy {@code maxMessages} * @return the pull request that can be executed using a {@link SubscriberStub} From ec3d15fb2c62848b07aa65789e4b12eacef43e12 Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Thu, 12 Mar 2020 16:13:56 +0100 Subject: [PATCH 25/26] Removed the javadoc statement that maxMessages can be null --- .../cloud/gcp/pubsub/support/SubscriberFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/support/SubscriberFactory.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/support/SubscriberFactory.java index 5db292033d..cd55e509c3 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/support/SubscriberFactory.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/support/SubscriberFactory.java @@ -54,7 +54,7 @@ public interface SubscriberFactory { * a Google Cloud Pub/Sub subscription. * @param subscriptionName the name of the subscription * @param maxMessages the maximum number of pulled messages, - * which can be null or must be a positive number + * which must be a positive number * @param returnImmediately causes the pull request to return immediately even * if subscription doesn't contain enough messages to satisfy {@code maxMessages} * @return the pull request that can be executed using a {@link SubscriberStub} From 46a528431661e161e8a1f8457f2f9e9c357c32c4 Mon Sep 17 00:00:00 2001 From: Maurice Zeijen Date: Thu, 12 Mar 2020 16:31:19 +0100 Subject: [PATCH 26/26] Correct since javadoc annotation, updated copyright year and added myself as author to all files where I hadn't added myself yet. --- .../autoconfigure/pubsub/GcpPubSubAutoConfiguration.java | 3 ++- .../pubsub/GcpPubSubReactiveAutoConfiguration.java | 1 + .../pubsub/GcpPubSubReactiveAutoConfigurationTest.java | 3 ++- .../cloud/gcp/pubsub/core/PubSubTemplate.java | 1 + .../core/subscriber/PubSubSubscriberOperations.java | 8 ++++---- .../gcp/pubsub/support/DefaultSubscriberFactory.java | 3 ++- .../cloud/gcp/pubsub/support/SubscriberFactory.java | 3 ++- 7 files changed, 14 insertions(+), 8 deletions(-) diff --git a/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java b/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java index c501ac7b3e..e639e01d3f 100644 --- a/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java +++ b/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -72,6 +72,7 @@ * @author Mike Eltsufin * @author Chengyuan Zhao * @author Daniel Zou + * @author Maurice Zeijen */ @Configuration @AutoConfigureAfter(GcpContextAutoConfiguration.class) diff --git a/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfiguration.java b/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfiguration.java index cfec35a623..510074f695 100644 --- a/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfiguration.java +++ b/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfiguration.java @@ -36,6 +36,7 @@ * Reactive Pub/Sub support autoconfiguration. * * @author Elena Felder + * @author Maurice Zeijen * * @since 1.2 */ diff --git a/spring-cloud-gcp-autoconfigure/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfigurationTest.java b/spring-cloud-gcp-autoconfigure/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfigurationTest.java index f505024ad1..94ad34e16c 100644 --- a/spring-cloud-gcp-autoconfigure/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfigurationTest.java +++ b/spring-cloud-gcp-autoconfigure/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfigurationTest.java @@ -49,7 +49,8 @@ import static org.mockito.Mockito.when; /** - * + * @author Elena Felder + * @author Maurice Zeijen */ @RunWith(MockitoJUnitRunner.class) public class GcpPubSubReactiveAutoConfigurationTest { diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java index 97a6a5288d..755012b6bf 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/PubSubTemplate.java @@ -51,6 +51,7 @@ * @author Mike Eltsufin * @author Chengyuan Zhao * @author Doug Hoard + * @author Maurice Zeijen */ public class PubSubTemplate implements PubSubOperations, InitializingBean { diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java index 4e3c3abbbc..b190573568 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberOperations.java @@ -102,7 +102,7 @@ Subscriber subscribeAndConvert(String subscription, * messages to satisfy {@code maxMessages} * @return the ListenableFuture for the asynchronous execution, returning the list of * received acknowledgeable messages - * @since 1.3 + * @since 1.2.3 */ ListenableFuture> pullAndAckAsync(String subscription, Integer maxMessages, Boolean returnImmediately); @@ -126,7 +126,7 @@ Subscriber subscribeAndConvert(String subscription, * messages to satisfy {@code maxMessages} * @return the ListenableFuture for the asynchronous execution, returning the list of * received acknowledgeable messages - * @since 1.3 + * @since 1.2.3 */ ListenableFuture> pullAsync(String subscription, Integer maxMessages, Boolean returnImmediately); @@ -158,7 +158,7 @@ List> pullAndConvert(String subscri * @param the type of the payload * @return the ListenableFuture for the asynchronous execution, returning the list of * received acknowledgeable messages - * @since 1.3 + * @since 1.2.3 */ ListenableFuture>> pullAndConvertAsync(String subscription, Integer maxMessages, Boolean returnImmediately, Class payloadType); @@ -177,7 +177,7 @@ ListenableFuture>> pullAndConv * subscription name in the {@code projects//subscriptions/} format * @return the ListenableFuture for the asynchronous execution, returning a received message, * or {@code null} if none exists in the subscription - * @since 1.3 + * @since 1.2.3 */ ListenableFuture pullNextAsync(String subscription); diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/support/DefaultSubscriberFactory.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/support/DefaultSubscriberFactory.java index 2798861b0b..d72367dcf2 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/support/DefaultSubscriberFactory.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/support/DefaultSubscriberFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2018 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,6 +43,7 @@ * @author Mike Eltsufin * @author Doug Hoard * @author Chengyuan Zhao + * @author Maurice Zeijen */ public class DefaultSubscriberFactory implements SubscriberFactory { diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/support/SubscriberFactory.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/support/SubscriberFactory.java index cd55e509c3..81a1f5e670 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/support/SubscriberFactory.java +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/support/SubscriberFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2018 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ * @author Artem Bilan * @author Doug Hoard * @author Chengyuan Zhao + * @author Maurice Zeijen */ public interface SubscriberFactory {