Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

Commit

Permalink
Making Pub/Sub reactive pull non-blocking (#2227)
Browse files Browse the repository at this point in the history
* Added a future returning pull method to PubSubSubscriberOperations, and its implementation, allowing for asynchronous and non-blocking pull of messages

* Reimplemented the PubSubReactiveFactory to utilize the asynchronous pullAsync method of the PubSubReactiveFactory, making it non-blocking

* Relied on Project Reactor parallel thread pool instead of the custom elastic one.

* Added and implemented PubSubSubscriberOperations asynchronous methods: pullAndAckAsync, pullAndConvertAsync, pullNextAsync. 

* Also added tests for PubSubSubscriberOperations.pullNext, because they where missing.
  • Loading branch information
mzeijen authored Mar 12, 2020
1 parent d0de2b9 commit 2adf432
Show file tree
Hide file tree
Showing 11 changed files with 477 additions and 177 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -72,6 +72,7 @@
* @author Mike Eltsufin
* @author Chengyuan Zhao
* @author Daniel Zou
* @author Maurice Zeijen
*/
@Configuration
@AutoConfigureAfter(GcpContextAutoConfiguration.class)
Expand Down Expand Up @@ -167,10 +168,12 @@ public Executor pubSubAcknowledgementExecutor() {
@ConditionalOnMissingBean
public PubSubSubscriberTemplate pubSubSubscriberTemplate(SubscriberFactory subscriberFactory,
ObjectProvider<PubSubMessageConverter> pubSubMessageConverter,
@Qualifier("pubSubAcknowledgementExecutor") Executor executor) {
@Qualifier("pubSubAsynchronousPullExecutor") ObjectProvider<Executor> asyncPullExecutor,
@Qualifier("pubSubAcknowledgementExecutor") Executor ackExecutor) {
PubSubSubscriberTemplate pubSubSubscriberTemplate = new PubSubSubscriberTemplate(subscriberFactory);
pubSubMessageConverter.ifUnique(pubSubSubscriberTemplate::setMessageConverter);
pubSubSubscriberTemplate.setAckExecutor(executor);
pubSubSubscriberTemplate.setAckExecutor(ackExecutor);
asyncPullExecutor.ifAvailable(pubSubSubscriberTemplate::setAsyncPullExecutor);
return pubSubSubscriberTemplate;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,8 +18,6 @@

import java.util.Optional;

import javax.annotation.PreDestroy;

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
Expand All @@ -38,6 +36,7 @@
* Reactive Pub/Sub support autoconfiguration.
*
* @author Elena Felder
* @author Maurice Zeijen
*
* @since 1.2
*/
Expand All @@ -49,30 +48,14 @@
matchIfMissing = true)
public class GcpPubSubReactiveAutoConfiguration {

private Scheduler defaultPubSubReactiveScheduler;

@Bean
@ConditionalOnMissingBean
public PubSubReactiveFactory pubSubReactiveFactory(
PubSubSubscriberTemplate subscriberTemplate,
@Qualifier("pubSubReactiveScheduler") Optional<Scheduler> userProvidedScheduler) {

Scheduler scheduler = null;
if (userProvidedScheduler.isPresent()) {
scheduler = userProvidedScheduler.get();
}
else {
this.defaultPubSubReactiveScheduler = Schedulers.newElastic("pubSubReactiveScheduler");
scheduler = this.defaultPubSubReactiveScheduler;
}
Scheduler scheduler = userProvidedScheduler.orElseGet(() -> Schedulers.parallel());
return new PubSubReactiveFactory(subscriberTemplate, scheduler);
}

@PreDestroy
public void closeScheduler() {
if (this.defaultPubSubReactiveScheduler != null) {
this.defaultPubSubReactiveScheduler.dispose();
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,13 +42,15 @@
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncResult;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
*
* @author Elena Felder
* @author Maurice Zeijen
*/
@RunWith(MockitoJUnitRunner.class)
public class GcpPubSubReactiveAutoConfigurationTest {
Expand Down Expand Up @@ -105,17 +107,13 @@ public void reactiveConfigDisabledWhenReactivePubSubDisabled() {

@Test
public void defaultSchedulerUsedWhenNoneProvided() {

setUpThreadPrefixVerification("pubSubReactiveScheduler");
setUpThreadPrefixVerification("parallel");

ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withBean(PubSubSubscriberOperations.class, () -> mockSubscriberTemplate)
.withConfiguration(AutoConfigurations.of(GcpPubSubReactiveAutoConfiguration.class));

contextRunner.run(ctx -> {
pollAndVerify(ctx);
});

contextRunner.run(this::pollAndVerify);
}

@Test
Expand All @@ -128,28 +126,26 @@ public void customSchedulerUsedWhenAvailable() {
.withConfiguration(AutoConfigurations.of(GcpPubSubReactiveAutoConfiguration.class))
.withUserConfiguration(TestConfigWithOverriddenScheduler.class);

contextRunner.run(ctx -> {
pollAndVerify(ctx);
});
contextRunner.run(this::pollAndVerify);
}

private void setUpThreadPrefixVerification(String threadPrefix) {
when(mockSubscriberTemplate.pull("testSubscription", 3, false))
when(mockSubscriberTemplate.pullAsync("testSubscription", Integer.MAX_VALUE, true))
.then(arg -> {
assertThat(Thread.currentThread().getName()).startsWith(threadPrefix);

return Arrays.asList(mockMessage, mockMessage, mockMessage);
return AsyncResult.forValue(Arrays.asList(mockMessage, mockMessage, mockMessage));
});
}

private void pollAndVerify(ApplicationContext ctx) {
PubSubReactiveFactory reactiveFactory = ctx.getBean(PubSubReactiveFactory.class);

StepVerifier.create(
reactiveFactory.poll("testSubscription", 2)
.limitRequest(3))
reactiveFactory.poll("testSubscription", 2))
.expectNext(mockMessage, mockMessage, mockMessage)
.verifyComplete();
.thenCancel()
.verify();
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2018 the original author or authors.
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,6 +51,7 @@
* @author Mike Eltsufin
* @author Chengyuan Zhao
* @author Doug Hoard
* @author Maurice Zeijen
*/
public class PubSubTemplate implements PubSubOperations, InitializingBean {

Expand Down Expand Up @@ -158,23 +159,45 @@ public List<AcknowledgeablePubsubMessage> pull(String subscription, Integer maxM
return this.pubSubSubscriberTemplate.pull(subscription, maxMessages, returnImmediately);
}

@Override
public ListenableFuture<List<AcknowledgeablePubsubMessage>> pullAsync(String subscription, Integer maxMessages,
Boolean returnImmediately) {
return this.pubSubSubscriberTemplate.pullAsync(subscription, maxMessages, returnImmediately);
}

@Override
public <T> List<ConvertedAcknowledgeablePubsubMessage<T>> pullAndConvert(String subscription, Integer maxMessages,
Boolean returnImmediately, Class<T> payloadType) {
return this.pubSubSubscriberTemplate.pullAndConvert(subscription, maxMessages, returnImmediately, payloadType);
}

@Override
public <T> ListenableFuture<List<ConvertedAcknowledgeablePubsubMessage<T>>> pullAndConvertAsync(String subscription,
Integer maxMessages, Boolean returnImmediately, Class<T> payloadType) {
return this.pubSubSubscriberTemplate.pullAndConvertAsync(subscription, maxMessages, returnImmediately, payloadType);
}

@Override
public List<PubsubMessage> pullAndAck(String subscription, Integer maxMessages,
Boolean returnImmediately) {
return this.pubSubSubscriberTemplate.pullAndAck(subscription, maxMessages, returnImmediately);
}

@Override
public ListenableFuture<List<PubsubMessage>> pullAndAckAsync(String subscription, Integer maxMessages, Boolean returnImmediately) {
return this.pubSubSubscriberTemplate.pullAndAckAsync(subscription, maxMessages, returnImmediately);
}

@Override
public PubsubMessage pullNext(String subscription) {
return this.pubSubSubscriberTemplate.pullNext(subscription);
}

@Override
public ListenableFuture<PubsubMessage> pullNextAsync(String subscription) {
return this.pubSubSubscriberTemplate.pullNextAsync(subscription);
}

@Override
public void afterPropertiesSet() throws Exception {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2018 the original author or authors.
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,6 +38,7 @@
* @author Mike Eltsufin
* @author Chengyuan Zhao
* @author Doug Hoard
* @author Maurice Zeijen
*
* @since 1.1
*/
Expand Down Expand Up @@ -92,6 +93,19 @@ <T> Subscriber subscribeAndConvert(String subscription,
*/
List<PubsubMessage> pullAndAck(String subscription, Integer maxMessages, Boolean returnImmediately);

/**
* Asynchronously pull and auto-acknowledge a number of messages from a Google Cloud Pub/Sub subscription.
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param maxMessages the maximum number of pulled messages
* @param returnImmediately returns immediately even if subscription doesn't contain enough
* messages to satisfy {@code maxMessages}
* @return the ListenableFuture for the asynchronous execution, returning the list of
* received acknowledgeable messages
* @since 1.2.3
*/
ListenableFuture<List<PubsubMessage>> pullAndAckAsync(String subscription, Integer maxMessages, Boolean returnImmediately);

/**
* Pull a number of messages from a Google Cloud Pub/Sub subscription.
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
Expand All @@ -103,6 +117,19 @@ <T> Subscriber subscribeAndConvert(String subscription,
*/
List<AcknowledgeablePubsubMessage> pull(String subscription, Integer maxMessages, Boolean returnImmediately);

/**
* Asynchronously pull a number of messages from a Google Cloud Pub/Sub subscription.
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param maxMessages the maximum number of pulled messages
* @param returnImmediately returns immediately even if subscription doesn't contain enough
* messages to satisfy {@code maxMessages}
* @return the ListenableFuture for the asynchronous execution, returning the list of
* received acknowledgeable messages
* @since 1.2.3
*/
ListenableFuture<List<AcknowledgeablePubsubMessage>> pullAsync(String subscription, Integer maxMessages, Boolean returnImmediately);

/**
* Pull a number of messages from a Google Cloud Pub/Sub subscription and convert them to Spring messages with
* the desired payload type.
Expand All @@ -119,6 +146,22 @@ <T> Subscriber subscribeAndConvert(String subscription,
<T> List<ConvertedAcknowledgeablePubsubMessage<T>> pullAndConvert(String subscription, Integer maxMessages,
Boolean returnImmediately, Class<T> payloadType);

/**
* Asynchronously pull a number of messages from a Google Cloud Pub/Sub subscription and convert them to Spring messages with
* the desired payload type.
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param maxMessages the maximum number of pulled messages
* @param returnImmediately returns immediately even if subscription doesn't contain enough
* messages to satisfy {@code maxMessages}
* @param payloadType the type to which the payload of the Pub/Sub messages should be converted
* @param <T> the type of the payload
* @return the ListenableFuture for the asynchronous execution, returning the list of
* received acknowledgeable messages
* @since 1.2.3
*/
<T> ListenableFuture<List<ConvertedAcknowledgeablePubsubMessage<T>>> pullAndConvertAsync(String subscription,
Integer maxMessages, Boolean returnImmediately, Class<T> payloadType);

/**
* Pull and auto-acknowledge a message from a Google Cloud Pub/Sub subscription.
Expand All @@ -128,6 +171,16 @@ <T> List<ConvertedAcknowledgeablePubsubMessage<T>> pullAndConvert(String subscri
*/
PubsubMessage pullNext(String subscription);

/**
* Asynchronously pull and auto-acknowledge a message from a Google Cloud Pub/Sub subscription.
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @return the ListenableFuture for the asynchronous execution, returning a received message,
* or {@code null} if none exists in the subscription
* @since 1.2.3
*/
ListenableFuture<PubsubMessage> pullNextAsync(String subscription);

/**
* Acknowledge a batch of messages. The messages must have the same project id.
* @param acknowledgeablePubsubMessages messages to be acknowledged
Expand Down
Loading

0 comments on commit 2adf432

Please sign in to comment.