Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x: fix mapNotification's last item backpressure handling #3702

Merged
merged 1 commit into from
Feb 19, 2016

Conversation

akarnokd
Copy link
Member

The previous algorithm used a 2 element queue but it was possible the drain loop got captured and the queue wasn't enough, causing MissingBackpressureException.

The new algorithm uses the requested field's most significant bit to indicate a completed state. If the terminal events reach it with non-zero request or a post-terminal request finds a zero request amount, the last item is emitted.

In addition, the upstream's producer may arrive after the first request thus the class includes the usual arbitration logic inlined. If the upstream doesn't set a Producer but just emits values, the production/backpressure checks are bypassed.

rx.internal.operators.OperatorFlatMapTest > testFlatMapTransformsMaxConcurrentNormalLoop FAILED
    java.lang.AssertionError: Unexpected onError events: 1
        at rx.observers.TestSubscriber.assertNoErrors(TestSubscriber.java:309)
        at rx.internal.operators.OperatorFlatMapTest.testFlatMapTransformsMaxConcurrentNormal(OperatorFlatMapTest.java:419)
        at rx.internal.operators.OperatorFlatMapTest.testFlatMapTransformsMaxConcurrentNormalLoop(OperatorFlatMapTest.java:395)
        Caused by:
        rx.exceptions.MissingBackpressureException
            at rx.internal.operators.OperatorMapNotification$SingleEmitter.offerAndComplete(OperatorMapNotification.java:173)
            at rx.internal.operators.OperatorMapNotification$MapNotificationSubscriber.onCompleted(OperatorMapNotification.java:80)
            at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.slowpath(OnSubscribeFromIterable.java:101)
            at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:73)
            at rx.internal.producers.ProducerArbiter.emitLoop(ProducerArbiter.java:186)
            at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:129)
            at rx.internal.operators.OperatorMapNotification$MapNotificationSubscriber.setProducer(OperatorMapNotification.java:74)
            at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:49)
            at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:32)
            at rx.Observable$2.call(Observable.java:162)
            at rx.Observable$2.call(Observable.java:154)
            at rx.Observable$2.call(Observable.java:162)
            at rx.Observable$2.call(Observable.java:154)
            at rx.Observable.subscribe(Observable.java:8426)
            at rx.Observable.subscribe(Observable.java:8393)
            at rx.internal.operators.OperatorFlatMapTest.testFlatMapTransformsMaxConcurrentNormal(OperatorFlatMapTest.java:416)
            ... 1 more

@akarnokd akarnokd added the Bug label Feb 12, 2016
@zsxwing
Copy link
Member

zsxwing commented Feb 15, 2016

👍

akarnokd added a commit that referenced this pull request Feb 19, 2016
1.x: fix mapNotification's last item backpressure handling
@akarnokd akarnokd merged commit c68dd0a into ReactiveX:1.x Feb 19, 2016
@akarnokd akarnokd deleted the MapNotificationFix1x branch February 19, 2016 09:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants