Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

Making Pub/Sub reactive pull non-blocking #2227

Merged
merged 28 commits into from
Mar 12, 2020

Conversation

mzeijen
Copy link
Contributor

@mzeijen mzeijen commented Mar 2, 2020

Please note that this pull request is a proposal and it is not ready yet to be merged.

The current PubSubReactiveFactory is based on blocking calls to PubSubTemplate, to be able to reactively pull messages from Pub/Sub. This isn't as efficient as it could be. Because, the Google Pub/Sub client does offer the ability to do non-blocking asynchronous pulling off messages. However, the current PubSubTemplate doesn't offer any pull methods which use this ability and thus the PubSubReactiveFactory can't make use of it.

In this draft pull request I propose changes that makes the PubSubReactiveFactory.poll(...) non-blocking, without breaking its backwards compatibility (except for the exception handling, see note below). I make this possible by adding a PubSubTemplate.pullFuture method, which returns a ListenableFuture instead of blocking while it requests the messages.

I did some testing with this code against the Pub/Sub service, processing millions of messages, and comparing it to the blocking variant. It doesn't seem to have any performance downside. It does reduces the number of threads that need to be used (as there is one less thread pool that comes in to play), and doesn't do any blocking of threads.

If you think this change makes sense, then I will be happy to finish the work so that it is ready to be merged. If you have any concerns then I will happy to discuss them.

The following things are currently missing or may need to change, before this pull request is ready:

  • I am not a big fan of the method name pullFuture. It is not in line with the other methods, but I need a different name for pull as it gives a different return type. The future in the name does clearly state what you will get...
  • There are other pull related methods in PubSubTemplate for which a version should be offered that return a ListenableFuture and that don't block.
  • In PubSubTemplate.pullFuture the ApiFutures.addCallback call now uses the MoreExecutors.directExecutor() executor. Because if it, the callback will be handled by the same thread that handles the response from the PubSub client. I am wondering if this is ok or that we should make this configurable like it is already configurable for the ack, nack and modifyAckDeadline methods. I don't see much benefit in having another thread pool that must be managed by default, though. If it needs to go into a different thread pool then it would be great if we could use something generic, for CPU bound processing, like the Reactor parallel thread pool. Something to discuss.

Note: In the current implementation of the PubSubReactiveFactory, what I can see and what I have tested, the implementation doesn't correctly deal with exceptions being thrown by the PubSubTemplate or the Google PubSub client. The exceptions aren't passed upstream to the sink. Also, when doing a backpressure based poll, an exception will result in a "dead" flux, as any exception in the BlockingLimitedDemandPullTask will kill the task and no messages will ever be put in the sink anymore. The Flux doesn't know that it is dead and will simply keep on waiting for messages that will never arrive. This is because the BlockingLimitedDemandPullTask doesn't catch and forward the exception to the sink.
In the implementation of this pull request, we forward all exceptions to the sink. A retry can then be used on the Flux, returned by poll, to make sure exceptions don't result in a dead stream.
The issue in the current implementation PubSubReactiveFactory should probably be solved independent of this pull request, if there is a chance that this pull request will take a long time or will never be merged. I can create a ticket for it and a separate pull request, if desired.

@pivotal-issuemaster
Copy link

@mzeijen Please sign the Contributor License Agreement!

Click here to manually synchronize the status of this Pull Request.

See the FAQ for frequently asked questions.

@pivotal-issuemaster
Copy link

@mzeijen Thank you for signing the Contributor License Agreement!

@codecov
Copy link

codecov bot commented Mar 2, 2020

Codecov Report

Merging #2227 into master will decrease coverage by 8.19%.
The diff coverage is 84.14%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #2227      +/-   ##
============================================
- Coverage     80.77%   72.57%   -8.20%     
+ Complexity     2256     2030     -226     
============================================
  Files           258      258              
  Lines          7370     7398      +28     
  Branches        757      753       -4     
============================================
- Hits           5953     5369     -584     
- Misses         1103     1684     +581     
- Partials        314      345      +31     
Flag Coverage Δ Complexity Δ
#integration ? ?
#unittests 72.57% <84.14%> (-0.04%) 2030.00 <21.00> (+19.00) ⬇️
Impacted Files Coverage Δ Complexity Δ
...ramework/cloud/gcp/pubsub/core/PubSubTemplate.java 27.50% <0.00%> (-3.06%) 6.00 <0.00> (ø)
...d/gcp/pubsub/support/DefaultSubscriberFactory.java 14.86% <0.00%> (-0.21%) 3.00 <0.00> (ø)
...oud/gcp/pubsub/reactive/PubSubReactiveFactory.java 87.17% <86.95%> (-2.62%) 14.00 <8.00> (+8.00) ⬇️
...bsub/core/subscriber/PubSubSubscriberTemplate.java 82.14% <90.19%> (+3.79%) 38.00 <12.00> (+14.00)
...toconfigure/pubsub/GcpPubSubAutoConfiguration.java 90.90% <100.00%> (+0.06%) 38.00 <0.00> (ø)
...ure/pubsub/GcpPubSubReactiveAutoConfiguration.java 100.00% <100.00%> (ø) 2.00 <1.00> (-3.00)
...a/spanner/repository/query/SpannerQueryMethod.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-6.00%)
...figure/config/GcpConfigBootstrapConfiguration.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-2.00%)
...restore/repository/query/FirestoreQueryMethod.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-2.00%)
...ure/secretmanager/SecretManagerPropertySource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-7.00%)
... and 61 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update d0de2b9...46a5284. Read the comment docs.

@elefeint elefeint added pubsub GCP PubSub reactive labels Mar 2, 2020
@elefeint
Copy link
Contributor

elefeint commented Mar 2, 2020

Yes, please file a separate issue for the error propagation bug, sorry about that!

The naming convention could be pullAsync; it's a common convention.

As far as Guava, Spring projects try to stay away from using it due to having had bad experiences with versioning/api evolution. Now that Google libraries come with a BOM, this should be less of an issue, though, so perhaps we can consider it.

Copy link
Contributor

@meltsufin meltsufin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mzeijen Thanks for the contribution and a detailed description!

Like Elena, said I think it would be a useful addition to the PubSubTemplate. However, the error-handling issues in the current code should probably be dealt with separately.

Regarding executors, we should probably continue the pattern of it being configurable.

As far as the impact on PubSubReactiveFactory, I hope @elefeint could take a first stab at it since she's most familiar with that code.

@mzeijen
Copy link
Contributor Author

mzeijen commented Mar 3, 2020

@elefeint @meltsufin Great that you like it. I will finish this contribution than with the suggested changes.

I'll create a separate issue for the error propagation bug then. I'll probably do a merge request for it.

Regarding Guava and the MoreExecutors usage: I understand the reluctance in using it. We do the same for our internal libraries and frameworks. However, the ApiFutures.addCallback already heavily relies on it, so I thought it shouldn't be a big deal.

Regarding the configurable executor: what should the default executor be in that case? Could it still be the MoreExecutors.directExecutor()? The thread with which the flux is then executed is the grpc-default-executor. I could also set the publisherExecutorProvider as the default.

@mzeijen
Copy link
Contributor Author

mzeijen commented Mar 3, 2020

The bug report for the exception handling is #2229.

…g-pull

# Conflicts:
#	spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java
#	spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactoryTests.java
@meltsufin
Copy link
Contributor

meltsufin commented Mar 3, 2020

@mzeijen Thanks for the error-handling fix.

Regarding executors, for the default, we can definitely use MoreExecutors.directExecutor(), or even better Executor executor = Runnable::run to avoid Guava imports. We just want to make sure it's also configurable.

@mzeijen
Copy link
Contributor Author

mzeijen commented Mar 4, 2020

That is a great tip to use the Executor executor = Runnable::run instead of MoreExecutors.directExecutor(). I hadn't really looked at the implementation of the directExecutor and didn't know how simple it actually is! I'll go for the Runnable::run then, as it saves us on a checkstyle suppression.

@mzeijen mzeijen marked this pull request as ready for review March 4, 2020 12:46
@mzeijen
Copy link
Contributor Author

mzeijen commented Mar 4, 2020

I believe the pull request is ready now for the thorough review and for merging if it is ok as it is now.

What I did is:

  • all pull related methods in the PubSubSubscriberOperations now have async versions, with the names ending with Async. All methods have corresponding unit tests.
  • the executor, used for processing the async callbacks of those pull methods, is now configurable via the setAsyncPullExecutor. The autoconfiguration has been updated so you can configure this by providing the pubSubAsynchronousPullExecutor bean. By default Runnable::run is used as implementation.
  • I refactored the PubSubReactiveFactory a bit and added more javadoc.

Copy link
Contributor

@elefeint elefeint left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the clean code and taking the time to write extra tests that we were missing.

I have two questions I'd like to hash out: which scheduler to use to kick off pollingPull, and whether callback "recursion" will cause problems in the future.

The rest of my comments are minor.

mzeijen and others added 7 commits March 10, 2020 07:05
…d/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java

Co-Authored-By: Elena Felder <[email protected]>
…d/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java

Co-Authored-By: Elena Felder <[email protected]>
…d/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java

Co-Authored-By: Elena Felder <[email protected]>
…a ObjectProvider<Executor> for the pubSubSubscriberTemplate bean method
Copy link
Contributor

@meltsufin meltsufin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again for this work and being patient with us!
I added some additional comments.

…` by moving the validation into `SubscriberFactory.createPullRequest` (where it partially already was)
Copy link
Contributor

@elefeint elefeint left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR looks good. Two minor things:

  • Replace @since 1.3 with @since 1.2.3 (this is unintuitive -- we are staying on the 1.2.x version until we move this project from spring-cloud organization to GoogleCloudPlatform later in the year. The next major spring cloud release will be after we move orgs, so our master is 1.2.3.BUILD-SNAPSHOT right now).
  • Since you'll be changing the @since, add your name in @author in all the modified classes.

@mzeijen
Copy link
Contributor Author

mzeijen commented Mar 12, 2020

I added myself as author to all classes I changed, correct the @SInCE and made sure all copyright years are correctly set now. Should be good now.

Copy link
Contributor

@elefeint elefeint left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much!

@elefeint elefeint merged commit 2adf432 into spring-attic:master Mar 12, 2020
@mzeijen
Copy link
Contributor Author

mzeijen commented Mar 13, 2020

Awesome that we got this feature merged! My colleagues and I really appreciate this.

Also kudos for the way you handle these kinds of contributions. Although it took a while, mostly probably due to the time difference, it was a very pleasant process. I really appreciate the positive attitude with the constructive feedback. This makes me want to do more contributions :).

Thanks!

@elefeint
Copy link
Contributor

Thank you -- this was really open source at its best; you've invested a lot of time but now all reactive Pub/Sub users will benefit from the lower overhead.

This change will make it into the next Spring Cloud patch release, currently scheduled for mid-May.

If you or your team would like to suggest features or discuss your use-cases for any GCP/Spring integrations (or Spanner/Hibernate or Spanner/R2DBC), we can set up a video conference. I'll ping the team e-mail over gitter.

@mzeijen
Copy link
Contributor Author

mzeijen commented Mar 13, 2020

@elefeint Great! I will contact you if we want to take you up for your offer.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Development

Successfully merging this pull request may close these issues.

5 participants