Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Fix concurrent clear() calls when fused chains are canceled #6677

Merged
merged 1 commit into from
Oct 17, 2019

Conversation

akarnokd
Copy link
Member

Backport of #6676

When a fuseable source backed by an SpscLinkedArrayQueue is cancelled and cleared concurrently (i.e., one thread clears while the other cancels the chain), the clear() method could run concurrently and either crash with NPE or end up in an infinite loop due to corrupted queue state.

This PR fixes two kinds of mistakes leading to this scenario:

  • Calling clear() from cancel/dispose when the output is fused.
  • Calling clear() from a fused drain loop when cancellation is detected.

When fused, similar to poll(), calling clear() is the responsibility of the consumer and the producer side is not allowed to call them.

The bug affected the following operators:

  • FlowableOnBackpressureBuffer
  • FlowableGroupBy
  • UnicastProcessor
  • UnicastSubject

Fixes #6673

@codecov
Copy link

codecov bot commented Oct 17, 2019

Codecov Report

Merging #6677 into 2.x will decrease coverage by 0.01%.
The diff coverage is 87.5%.

Impacted file tree graph

@@             Coverage Diff              @@
##                2.x    #6677      +/-   ##
============================================
- Coverage     98.19%   98.18%   -0.02%     
- Complexity     6347     6349       +2     
============================================
  Files           677      677              
  Lines         45551    45550       -1     
  Branches       6332     6333       +1     
============================================
- Hits          44731    44725       -6     
- Misses          256      260       +4     
- Partials        564      565       +1
Impacted Files Coverage Δ Complexity Δ
...x/internal/operators/flowable/FlowableGroupBy.java 96.06% <0%> (+0.54%) 3 <0> (ø) ⬇️
...ain/java/io/reactivex/subjects/UnicastSubject.java 100% <100%> (ø) 65 <0> (ø) ⬇️
...java/io/reactivex/processors/UnicastProcessor.java 100% <100%> (+1.19%) 68 <0> (+1) ⬆️
...erators/flowable/FlowableOnBackpressureBuffer.java 96.63% <100%> (ø) 2 <0> (ø) ⬇️
...ernal/operators/flowable/FlowableFlatMapMaybe.java 90.33% <0%> (-4.35%) 2% <0%> (ø)
...l/operators/observable/ObservableFlatMapMaybe.java 84.96% <0%> (-3.27%) 2% <0%> (ø)
...ava/io/reactivex/processors/BehaviorProcessor.java 96.86% <0%> (-2.25%) 60% <0%> (ø)
...internal/operators/flowable/FlowableSwitchMap.java 93.51% <0%> (-1.39%) 3% <0%> (ø)
...ex/internal/operators/flowable/FlowableCreate.java 96.77% <0%> (-0.97%) 6% <0%> (ø)
...ternal/operators/observable/ObservablePublish.java 99.12% <0%> (-0.88%) 11% <0%> (-1%)
... and 18 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update d4eae73...9011bd5. Read the comment docs.

@akarnokd akarnokd merged commit 70fe91c into ReactiveX:2.x Oct 17, 2019
@akarnokd akarnokd deleted the SpscClearQueueFix2x branch October 17, 2019 14:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants