Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Change Flowable.groupBy to signal MBE instead of possibly hanging #6740

Merged
merged 5 commits into from
Dec 6, 2019

Conversation

akarnokd
Copy link
Member

@akarnokd akarnokd commented Dec 3, 2019

This PR changes the backpressure behavior of Flowable.groupBy to signal MissingBackpressureException instead of silently hanging if the produced groups are not ready to be accepted by the downstream.

This can happen if one flatMaps a groupBy but there are more groups produced than the concurrency level of flatMap. Since replenishment is tied to item consumption from the groups, not consuming them can result in none of the groups receiving any further items and the whole operator hangs.

The following changes have been applied:

  • Removed the queue from the main operator since it will now try to emit directly and not buffer groups.
  • The main Flowable, lacking a queue, no longer supports operator fusion. Tests checking this property have been removed as well.
  • When a group is drained, consumed items are replenished in batch if possible. Detecting a cancellation will also trigger a replenishment.
  • When a group is pulled (fusion mode), now all pull, isEmpty and clear will trigger replenishment so that other groups can make progress too.
  • Unit tests have been modified to have large enough bufferSize/prefetch amounts to allow them to pass.

Fixes #6641

@akarnokd akarnokd added this to the 3.0 milestone Dec 3, 2019
@codecov
Copy link

codecov bot commented Dec 3, 2019

Codecov Report

Merging #6740 into 3.x will decrease coverage by 0.05%.
The diff coverage is 94.33%.

Impacted file tree graph

@@             Coverage Diff              @@
##                3.x    #6740      +/-   ##
============================================
- Coverage     98.17%   98.11%   -0.06%     
+ Complexity     6191     6190       -1     
============================================
  Files           677      677              
  Lines         44663    44599      -64     
  Branches       6171     6152      -19     
============================================
- Hits          43847    43760      -87     
- Misses          289      301      +12     
- Partials        527      538      +11
Impacted Files Coverage Δ Complexity Δ
.../main/java/io/reactivex/rxjava3/core/Flowable.java 100% <ø> (ø) 559 <0> (ø) ⬇️
...3/internal/operators/flowable/FlowableGroupBy.java 96.22% <94.33%> (-0.9%) 3 <0> (ø)
.../operators/flowable/FlowableBlockingSubscribe.java 93.02% <0%> (-4.66%) 10% <0%> (-1%)
...rnal/operators/flowable/FlowableFlatMapSingle.java 92.44% <0%> (-2.91%) 2% <0%> (ø)
...l/operators/observable/ObservableFlatMapMaybe.java 90.14% <0%> (-2.12%) 2% <0%> (ø)
...a3/internal/operators/flowable/FlowableCreate.java 95.46% <0%> (-1.95%) 6% <0%> (ø)
...ernal/operators/flowable/FlowableFromIterable.java 95.18% <0%> (-1.61%) 5% <0%> (ø)
...operators/observable/ObservableMergeWithMaybe.java 99.09% <0%> (-0.91%) 2% <0%> (ø)
...a3/internal/operators/flowable/FlowableReplay.java 91.97% <0%> (-0.83%) 19% <0%> (ø)
... and 8 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update d98dff6...9cfb5a9. Read the comment docs.

Copy link
Collaborator

@vanniktech vanniktech left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never used that operator and I remember @davidmoten had lots of use cases so let's wait for his approval?

@akarnokd
Copy link
Member Author

akarnokd commented Dec 3, 2019

Sure.

@akarnokd
Copy link
Member Author

akarnokd commented Dec 6, 2019

Let's merge this into the release and get feedback from the field.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3.x: Change the behavior of Flowable.groupBy to signal MBE if no main requests
2 participants