Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

Making Pub/Sub reactive pull non-blocking #2227

Merged
merged 28 commits into from
Mar 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
37d8084
Added a future returning pull method to PubSubSubscriberOperations, a…
mzeijen Mar 2, 2020
39d6bc6
Reimplemented the PubSubReactiveFactory to utilize the asynchronous p…
mzeijen Mar 2, 2020
bd42475
Added unit test for the pullFuture method of PubSubSubscriberTemplate
mzeijen Mar 2, 2020
8e7e9d4
Bumped copyright year to 2020, for those files this change touches
mzeijen Mar 2, 2020
7c2a162
Fixed imports according to code style
mzeijen Mar 2, 2020
f5138fd
Merge remote-tracking branch 'upstream/master' into pubsub-nonblockin…
mzeijen Mar 3, 2020
12554df
Renamed `PubSubSubscriberOperations.pullFuture` to `PubSubSubscriberO…
mzeijen Mar 3, 2020
e43a90a
Made the executor for the async pull configurable, and added javadoc …
mzeijen Mar 4, 2020
418b5fc
Deduplicated code that creates a list of PulledAcknowledgeablePubsubM…
mzeijen Mar 4, 2020
a4b9b07
Added and implemented PubSubSubscriberOperations.pullAndAckAsync
mzeijen Mar 4, 2020
c07031a
Corrected test name after pullFuture changed into pullAsync
mzeijen Mar 4, 2020
20386a8
Added and implemented PubSubSubscriberOperations.pullAndConvertAsync
mzeijen Mar 4, 2020
92cf469
Added and implemented PubSubSubscriberOperations.pullNextAsync. Also …
mzeijen Mar 4, 2020
403fd31
Fixed some checkstyle issues
mzeijen Mar 4, 2020
cd9cc2b
Made the `PubSubSubscriberTemplate.asyncPullExecutor` configurable vi…
mzeijen Mar 4, 2020
00acef2
Added information on the scheduler to the PubSubReactiveFactory class…
mzeijen Mar 4, 2020
17179ed
Refactored the infinite/poll pull method
mzeijen Mar 4, 2020
af10c6c
Added my name to the tests that I also significantly extended
mzeijen Mar 4, 2020
dd6d814
Update spring-cloud-gcp-pubsub/src/main/java/org/springframework/clou…
mzeijen Mar 10, 2020
d250e0b
Update spring-cloud-gcp-pubsub/src/main/java/org/springframework/clou…
mzeijen Mar 10, 2020
0466c78
Update spring-cloud-gcp-pubsub/src/main/java/org/springframework/clou…
mzeijen Mar 10, 2020
6996cfc
Removed passing around projectId to `toAcknowledgeablePubsubMessageLi…
mzeijen Mar 10, 2020
809086c
Removed the `pubSubAsynchronousPullExecutor` bean in favour of using …
mzeijen Mar 10, 2020
4f99d11
Merge remote-tracking branch 'upstream/master' into pubsub-nonblockin…
mzeijen Mar 10, 2020
eea5e53
Removed the intermediate build calls for protobuf objects
mzeijen Mar 10, 2020
3ee9f53
Reduced amount of duplicate code in `pullAndAck` and `pullAndAckAsync…
mzeijen Mar 11, 2020
ec3d15f
Removed the javadoc statement that maxMessages can be null
mzeijen Mar 12, 2020
46a5284
Correct since javadoc annotation, updated copyright year and added my…
mzeijen Mar 12, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -72,6 +72,7 @@
* @author Mike Eltsufin
* @author Chengyuan Zhao
* @author Daniel Zou
* @author Maurice Zeijen
*/
@Configuration
@AutoConfigureAfter(GcpContextAutoConfiguration.class)
Expand Down Expand Up @@ -167,10 +168,12 @@ public Executor pubSubAcknowledgementExecutor() {
@ConditionalOnMissingBean
public PubSubSubscriberTemplate pubSubSubscriberTemplate(SubscriberFactory subscriberFactory,
ObjectProvider<PubSubMessageConverter> pubSubMessageConverter,
@Qualifier("pubSubAcknowledgementExecutor") Executor executor) {
@Qualifier("pubSubAsynchronousPullExecutor") ObjectProvider<Executor> asyncPullExecutor,
@Qualifier("pubSubAcknowledgementExecutor") Executor ackExecutor) {
elefeint marked this conversation as resolved.
Show resolved Hide resolved
PubSubSubscriberTemplate pubSubSubscriberTemplate = new PubSubSubscriberTemplate(subscriberFactory);
pubSubMessageConverter.ifUnique(pubSubSubscriberTemplate::setMessageConverter);
pubSubSubscriberTemplate.setAckExecutor(executor);
pubSubSubscriberTemplate.setAckExecutor(ackExecutor);
asyncPullExecutor.ifAvailable(pubSubSubscriberTemplate::setAsyncPullExecutor);
return pubSubSubscriberTemplate;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,8 +18,6 @@

import java.util.Optional;

import javax.annotation.PreDestroy;

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
Expand All @@ -38,6 +36,7 @@
* Reactive Pub/Sub support autoconfiguration.
*
* @author Elena Felder
* @author Maurice Zeijen
*
* @since 1.2
*/
Expand All @@ -49,30 +48,14 @@
matchIfMissing = true)
public class GcpPubSubReactiveAutoConfiguration {

private Scheduler defaultPubSubReactiveScheduler;

@Bean
@ConditionalOnMissingBean
public PubSubReactiveFactory pubSubReactiveFactory(
PubSubSubscriberTemplate subscriberTemplate,
@Qualifier("pubSubReactiveScheduler") Optional<Scheduler> userProvidedScheduler) {

Scheduler scheduler = null;
if (userProvidedScheduler.isPresent()) {
scheduler = userProvidedScheduler.get();
}
else {
this.defaultPubSubReactiveScheduler = Schedulers.newElastic("pubSubReactiveScheduler");
scheduler = this.defaultPubSubReactiveScheduler;
}
Scheduler scheduler = userProvidedScheduler.orElseGet(() -> Schedulers.parallel());
elefeint marked this conversation as resolved.
Show resolved Hide resolved
return new PubSubReactiveFactory(subscriberTemplate, scheduler);
}

@PreDestroy
public void closeScheduler() {
if (this.defaultPubSubReactiveScheduler != null) {
this.defaultPubSubReactiveScheduler.dispose();
mzeijen marked this conversation as resolved.
Show resolved Hide resolved
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,13 +42,15 @@
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncResult;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
*
* @author Elena Felder
* @author Maurice Zeijen
*/
@RunWith(MockitoJUnitRunner.class)
public class GcpPubSubReactiveAutoConfigurationTest {
Expand Down Expand Up @@ -105,17 +107,13 @@ public void reactiveConfigDisabledWhenReactivePubSubDisabled() {

@Test
public void defaultSchedulerUsedWhenNoneProvided() {

setUpThreadPrefixVerification("pubSubReactiveScheduler");
setUpThreadPrefixVerification("parallel");

ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withBean(PubSubSubscriberOperations.class, () -> mockSubscriberTemplate)
.withConfiguration(AutoConfigurations.of(GcpPubSubReactiveAutoConfiguration.class));

contextRunner.run(ctx -> {
pollAndVerify(ctx);
});

contextRunner.run(this::pollAndVerify);
}

@Test
Expand All @@ -128,28 +126,26 @@ public void customSchedulerUsedWhenAvailable() {
.withConfiguration(AutoConfigurations.of(GcpPubSubReactiveAutoConfiguration.class))
.withUserConfiguration(TestConfigWithOverriddenScheduler.class);

contextRunner.run(ctx -> {
pollAndVerify(ctx);
});
contextRunner.run(this::pollAndVerify);
}

private void setUpThreadPrefixVerification(String threadPrefix) {
when(mockSubscriberTemplate.pull("testSubscription", 3, false))
when(mockSubscriberTemplate.pullAsync("testSubscription", Integer.MAX_VALUE, true))
.then(arg -> {
assertThat(Thread.currentThread().getName()).startsWith(threadPrefix);

return Arrays.asList(mockMessage, mockMessage, mockMessage);
return AsyncResult.forValue(Arrays.asList(mockMessage, mockMessage, mockMessage));
});
}

private void pollAndVerify(ApplicationContext ctx) {
PubSubReactiveFactory reactiveFactory = ctx.getBean(PubSubReactiveFactory.class);

StepVerifier.create(
reactiveFactory.poll("testSubscription", 2)
.limitRequest(3))
reactiveFactory.poll("testSubscription", 2))
.expectNext(mockMessage, mockMessage, mockMessage)
.verifyComplete();
.thenCancel()
.verify();
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2018 the original author or authors.
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,6 +51,7 @@
* @author Mike Eltsufin
* @author Chengyuan Zhao
* @author Doug Hoard
* @author Maurice Zeijen
*/
public class PubSubTemplate implements PubSubOperations, InitializingBean {

Expand Down Expand Up @@ -158,23 +159,45 @@ public List<AcknowledgeablePubsubMessage> pull(String subscription, Integer maxM
return this.pubSubSubscriberTemplate.pull(subscription, maxMessages, returnImmediately);
}

@Override
public ListenableFuture<List<AcknowledgeablePubsubMessage>> pullAsync(String subscription, Integer maxMessages,
Boolean returnImmediately) {
return this.pubSubSubscriberTemplate.pullAsync(subscription, maxMessages, returnImmediately);
}

@Override
public <T> List<ConvertedAcknowledgeablePubsubMessage<T>> pullAndConvert(String subscription, Integer maxMessages,
Boolean returnImmediately, Class<T> payloadType) {
return this.pubSubSubscriberTemplate.pullAndConvert(subscription, maxMessages, returnImmediately, payloadType);
}

@Override
public <T> ListenableFuture<List<ConvertedAcknowledgeablePubsubMessage<T>>> pullAndConvertAsync(String subscription,
Integer maxMessages, Boolean returnImmediately, Class<T> payloadType) {
return this.pubSubSubscriberTemplate.pullAndConvertAsync(subscription, maxMessages, returnImmediately, payloadType);
}

@Override
public List<PubsubMessage> pullAndAck(String subscription, Integer maxMessages,
Boolean returnImmediately) {
return this.pubSubSubscriberTemplate.pullAndAck(subscription, maxMessages, returnImmediately);
}

@Override
public ListenableFuture<List<PubsubMessage>> pullAndAckAsync(String subscription, Integer maxMessages, Boolean returnImmediately) {
return this.pubSubSubscriberTemplate.pullAndAckAsync(subscription, maxMessages, returnImmediately);
}

@Override
public PubsubMessage pullNext(String subscription) {
return this.pubSubSubscriberTemplate.pullNext(subscription);
}

@Override
public ListenableFuture<PubsubMessage> pullNextAsync(String subscription) {
return this.pubSubSubscriberTemplate.pullNextAsync(subscription);
}

@Override
public void afterPropertiesSet() throws Exception {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2018 the original author or authors.
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,6 +38,7 @@
* @author Mike Eltsufin
* @author Chengyuan Zhao
* @author Doug Hoard
* @author Maurice Zeijen
*
* @since 1.1
*/
Expand Down Expand Up @@ -92,6 +93,19 @@ <T> Subscriber subscribeAndConvert(String subscription,
*/
List<PubsubMessage> pullAndAck(String subscription, Integer maxMessages, Boolean returnImmediately);

/**
* Asynchronously pull and auto-acknowledge a number of messages from a Google Cloud Pub/Sub subscription.
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param maxMessages the maximum number of pulled messages
* @param returnImmediately returns immediately even if subscription doesn't contain enough
* messages to satisfy {@code maxMessages}
* @return the ListenableFuture for the asynchronous execution, returning the list of
* received acknowledgeable messages
* @since 1.2.3
*/
ListenableFuture<List<PubsubMessage>> pullAndAckAsync(String subscription, Integer maxMessages, Boolean returnImmediately);

/**
* Pull a number of messages from a Google Cloud Pub/Sub subscription.
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
Expand All @@ -103,6 +117,19 @@ <T> Subscriber subscribeAndConvert(String subscription,
*/
List<AcknowledgeablePubsubMessage> pull(String subscription, Integer maxMessages, Boolean returnImmediately);

/**
* Asynchronously pull a number of messages from a Google Cloud Pub/Sub subscription.
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param maxMessages the maximum number of pulled messages
* @param returnImmediately returns immediately even if subscription doesn't contain enough
* messages to satisfy {@code maxMessages}
* @return the ListenableFuture for the asynchronous execution, returning the list of
* received acknowledgeable messages
* @since 1.2.3
*/
ListenableFuture<List<AcknowledgeablePubsubMessage>> pullAsync(String subscription, Integer maxMessages, Boolean returnImmediately);

/**
* Pull a number of messages from a Google Cloud Pub/Sub subscription and convert them to Spring messages with
* the desired payload type.
Expand All @@ -119,6 +146,22 @@ <T> Subscriber subscribeAndConvert(String subscription,
<T> List<ConvertedAcknowledgeablePubsubMessage<T>> pullAndConvert(String subscription, Integer maxMessages,
Boolean returnImmediately, Class<T> payloadType);

/**
* Asynchronously pull a number of messages from a Google Cloud Pub/Sub subscription and convert them to Spring messages with
* the desired payload type.
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param maxMessages the maximum number of pulled messages
* @param returnImmediately returns immediately even if subscription doesn't contain enough
* messages to satisfy {@code maxMessages}
* @param payloadType the type to which the payload of the Pub/Sub messages should be converted
* @param <T> the type of the payload
* @return the ListenableFuture for the asynchronous execution, returning the list of
* received acknowledgeable messages
* @since 1.2.3
*/
<T> ListenableFuture<List<ConvertedAcknowledgeablePubsubMessage<T>>> pullAndConvertAsync(String subscription,
Integer maxMessages, Boolean returnImmediately, Class<T> payloadType);

/**
* Pull and auto-acknowledge a message from a Google Cloud Pub/Sub subscription.
Expand All @@ -128,6 +171,16 @@ <T> List<ConvertedAcknowledgeablePubsubMessage<T>> pullAndConvert(String subscri
*/
PubsubMessage pullNext(String subscription);

/**
* Asynchronously pull and auto-acknowledge a message from a Google Cloud Pub/Sub subscription.
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @return the ListenableFuture for the asynchronous execution, returning a received message,
* or {@code null} if none exists in the subscription
* @since 1.2.3
*/
ListenableFuture<PubsubMessage> pullNextAsync(String subscription);

/**
* Acknowledge a batch of messages. The messages must have the same project id.
* @param acknowledgeablePubsubMessages messages to be acknowledged
Expand Down
Loading