Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RxRingBuffer Concurrent Unsubscribe Non-ThreadSafe #1845

Closed
laktech opened this issue Nov 10, 2014 · 13 comments
Closed

RxRingBuffer Concurrent Unsubscribe Non-ThreadSafe #1845

laktech opened this issue Nov 10, 2014 · 13 comments
Labels
Milestone

Comments

@laktech
Copy link

laktech commented Nov 10, 2014

rx-java version 0.20.6

java.lang.NullPointerException
        at rx.internal.util.RxRingBuffer.poll(RxRingBuffer.java:282)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.drainAll(OperatorMerge.java:723)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.drainQueue(OperatorMerge.java:744)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.access$300(OperatorMerge.java:505)
        at rx.internal.operators.OperatorMerge$MergeSubscriber$1.call(OperatorMerge.java:376)
        at rx.internal.operators.OperatorMerge$MergeSubscriber$1.call(OperatorMerge.java:369)
        at rx.internal.util.IndexedRingBuffer.forEach(IndexedRingBuffer.java:281)
        at rx.internal.util.IndexedRingBuffer.forEach(IndexedRingBuffer.java:247)
        at rx.internal.util.SubscriptionIndexedRingBuffer.forEach(SubscriptionIndexedRingBuffer.java:131)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.drainChildrenQueues(OperatorMerge.java:331)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.drainQueuesIfNeeded(OperatorMerge.java:308)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.access$400(OperatorMerge.java:96)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:674)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:528)
        at rx.internal.operators.OperatorSubscribeOn$1$1$1.onNext(OperatorSubscribeOn.java:76)
        at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:105)
        at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:96)
        at rx.internal.operators.OperatorSubscribeOn$1$1$1$1.request(OperatorSubscribeOn.java:88)
        at rx.Subscriber.setProducer(Subscriber.java:150)
        at rx.internal.operators.OperatorSubscribeOn$1$1$1.setProducer(OperatorSubscribeOn.java:81)
        at rx.Subscriber.setProducer(Subscriber.java:144)
        at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47)
        at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
        at rx.Observable.subscribe(Observable.java:8680)
        at ********************
        at ********************
        at rx.Observable.unsafeSubscribe(Observable.java:8591)
        at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
        at rx.schedulers.ExecutorScheduler$ExecutorAction.run(ExecutorScheduler.java:173)
        at rx.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.run(ExecutorScheduler.java:99)
        at java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334)
        at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604)
        at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784)
        at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398)
@benjchristensen
Copy link
Member

Are you able to reproduce this and can you provide a unit test?

Does this also occur with RxJava 1.0.0-rc.10?

@benjchristensen benjchristensen added this to the 1.0 milestone Nov 11, 2014
@laktech
Copy link
Author

laktech commented Nov 11, 2014

I was able to consistently reproduce this bug with the following:

https://gist.github.com/laktech/5bba374f394d12956210

and here is the stack trace for the unit test:

https://gist.github.com/laktech/dcac7edb94fde2d037a6

It fails on both 0.20.6 and 1.0.0-rc.10. Succeeds on 0.19.6.

@benjchristensen
Copy link
Member

Thanks. I'll look at this.

@akarnokd
Copy link
Member

The queue is null on that line, so either it was released before, or the pool didn't provide one at start or there is a visibility issue because it is not volatile.

@benjchristensen
Copy link
Member

I was able to consistently reproduce this bug with the following:

Thank you, very helpful. It fails every time for me ... but if I remove subscribeOn it works so there is a race condition somewhere.

Working on this now.

@benjchristensen
Copy link
Member

The code can be fixed by removing the nested subscribe and composing things:

    private Observable<Integer> keysObservable() {
        return Observable.range(0, 10000).flatMap(new Func1<Integer, Observable<Integer>>() {

            @Override
            public Observable<Integer> call(final Integer t1) {
                return Observable.from(listItems(1)).subscribeOn(executorScheduler);

//                return Observable.create(new OnSubscribe<Integer>() {
//
//                    @Override
//                    public void call(Subscriber<? super Integer> t2) {
//                        Observable.from(listItems(1)).subscribe(t2);
//                    }
//                }).subscribeOn(executorScheduler);
            }
        });
    }

I am still trying to figure out if there is a way to handle the RxRingBuffer issue when it gets unsubscribed prematurely.

@benjchristensen benjchristensen changed the title rx.internal.util.RxRingBuffer.poll(RxRingBuffer.java:282) NPE RxRingBuffer Concurrent Unsubscribe Non-ThreadSafe Nov 15, 2014
@benjchristensen benjchristensen modified the milestones: 1.0.x, 1.0 Nov 15, 2014
@benjchristensen
Copy link
Member

This is going to have to be done in 1.0.x as it's going to take some thought and work.

This is an edge case so I'm comfortable continuing with 1.0 as is since this code has spent a couple months being used.

@laktech I suggest you adopt the composition approach shown in my previous comment and avoid using subscribe to compose things together. It chains subscriptions together which can have unexpected results. The composition operators like merge (used by flatMap) take care of this and use unsafeSubscribe to chain subscriptions through.

I'll come back to this and try to figure out a way for RxRingBuffer to behave safely with concurrent unsubscribe while not killing the performance.

@laktech
Copy link
Author

laktech commented Nov 15, 2014

@benjchristensen Thanks for looking into this.

Are there any semantical differences between using subscribe and the composition approach? My motivation to invoke subscribe was to ensure listFiles(1) was 1) invoked for each subscriber and 2) not called until there was a subscriber, which is very explicit with subscribe. It seems easy to test but thought I'd just ask :-P

@benjchristensen
Copy link
Member

Semantically no, it will get invoked the same as what is being done via the manual Observable.create.subscribe. If you change the subscribe to unsafeSubscribe you'll get the correct behavior without the over-eager unsubscribe which is causing the issues.

@laktech
Copy link
Author

laktech commented Nov 15, 2014

Great, thanks for the work-around.

@benjchristensen
Copy link
Member

Reading code again, the listFiles(1) is slightly more eager in the composed version and will happen on the emitting thread rather than the subscribeOn thread since it happens inside the flatMap rather than the subscribeOn.

So if you want to solve that you could use defer:

return Observable.defer(() -> Observable.from(listItems(1))).subscribeOn(executorScheduler);

In my test this moves the listItems from the main thread to the scheduler thread.

@akarnokd
Copy link
Member

I've tried to fix this but apart from making the NPE go away by reading queue once everywhere and not adding the RxRingBuffer to the InnerSubscription to be unsubscribed, I'm lost. What I would like to do is to call queue.unsubscribe if the inner subscription receives an onError or onCompleted, but due to backpressure, I'm not certain if putting it into drainRequested and drainAll is enough.

https://gist.github.com/akarnokd/fc1f2e1946bb39e8794a

@akarnokd
Copy link
Member

akarnokd commented Feb 5, 2015

Fixed in 1.0.5.

@akarnokd akarnokd closed this as completed Feb 5, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants