Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Fix parallel() on grouped flowable not replenishing properly #6720

Merged
merged 1 commit into from
Nov 21, 2019

Conversation

akarnokd
Copy link
Member

Backport of #6719

Fix a case when the GroupedFlowable is consumed by a parallel() in fusion mode causing the source to stop replenishing items from the upstream, hanging the whole sequence.

parallel() was slightly different from the usual queue consumers because it checks for isEmpty before trying to pull for an item. This was necessary because the rails may not be ready for more and an eager pull to check for emptyness would lose that item. The replenishing was done in GroupedFlowable.pull but a call to GroupedFlowable.isEmpty would not replenish.

The fix is to have isEmpty replenish similar to when poll detects emptyness and replenishes.

Reported in reactor/reactor-core#1959

@codecov
Copy link

codecov bot commented Nov 20, 2019

Codecov Report

Merging #6720 into 2.x will decrease coverage by 0.01%.
The diff coverage is 100%.

Impacted file tree graph

@@             Coverage Diff              @@
##                2.x    #6720      +/-   ##
============================================
- Coverage     98.26%   98.24%   -0.02%     
+ Complexity     6346     6345       -1     
============================================
  Files           677      677              
  Lines         45527    45532       +5     
  Branches       6333     6334       +1     
============================================
- Hits          44737    44735       -2     
- Misses          246      249       +3     
- Partials        544      548       +4
Impacted Files Coverage Δ Complexity Δ
...x/internal/operators/flowable/FlowableGroupBy.java 96.12% <100%> (+0.05%) 3 <0> (ø) ⬇️
.../operators/flowable/FlowableBlockingSubscribe.java 93.02% <0%> (-4.66%) 10% <0%> (-1%)
.../operators/observable/ObservableFlatMapSingle.java 91.04% <0%> (-3.74%) 2% <0%> (ø)
...l/operators/observable/ObservableFlatMapMaybe.java 87.58% <0%> (-3.27%) 2% <0%> (ø)
...rnal/operators/flowable/FlowableFlatMapSingle.java 93.47% <0%> (-3.27%) 2% <0%> (ø)
...activex/internal/observers/QueueDrainObserver.java 97.43% <0%> (-2.57%) 21% <0%> (-1%)
.../internal/disposables/ListCompositeDisposable.java 98% <0%> (-2%) 34% <0%> (-1%)
.../io/reactivex/disposables/CompositeDisposable.java 98.14% <0%> (-1.86%) 39% <0%> (-1%)
...java/io/reactivex/processors/PublishProcessor.java 98.19% <0%> (-1.81%) 42% <0%> (-1%)
...perators/mixed/ObservableConcatMapCompletable.java 98.49% <0%> (-1.51%) 3% <0%> (ø)
... and 22 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update aeb5f2c...6b72d1d. Read the comment docs.

@akarnokd akarnokd merged commit 170f952 into ReactiveX:2.x Nov 21, 2019
@akarnokd akarnokd deleted the FlowableGroupByParallelFix2x branch November 21, 2019 08:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants