Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Possible deadlock when using observeOn(Scheduler, boolean) #6146

Closed
jkarshin opened this issue Aug 7, 2018 · 12 comments
Closed

2.x: Possible deadlock when using observeOn(Scheduler, boolean) #6146

jkarshin opened this issue Aug 7, 2018 · 12 comments
Labels
2.x Missing-Details Could be a question or a bug report, but not enough details are provided.

Comments

@jkarshin
Copy link

jkarshin commented Aug 7, 2018

RxJava version: 2.1.11
Java: 1.8.0_181

I'm encountering an intermittent deadlock in a rather long Flowable, and I believe I've pinpointed it to an observeOn(...) call. (I've reached this conclusion through a series of log statements.) I haven't been able to trace through the test when the deadlock occurs, as it only occurs about once every 30 - 40 executions, and each execution takes about a minute. I've managed to reproduce the deadlock about a dozen times (each time, I've been adding more logging to figure out where things are getting stuck).

Flowable<SomeType> flow = // Lots of stuff upstream

flow.doOnNext(x -> /* log 1 */)
    .doOnComplete(() -> /* log 2 */)
    .observeOn(Schedulers.io(), true)
    .doOnNext(x -> /* log 3 */)
    .doOnComplete(() -> /* log 4 */)
    // Lots more downstream

In the test case where I experience the occasional deadlock, I expect only 1 item to be emitted through this part of the Flowable. I can see log statements 1 and 2 indicating that the item reaches the observeOn(...) and that the upstream is finished, but logs 3 and 4 are never reached. (I forgot to add a doOnError(...) to make sure an exception isn't sneaking through and holding things up else where, but I'm fairly confident there aren't any uncaught exceptions. I've added a doOnError(...) and am re-running my test now to make sure; I'll update my post once I have results.)

Because the logs are hit in this way, this leads me to believe the observeOn(...) is locking up somehow. What's really strange is that everything works fine most of the time.

None of the downstream operators should be attempting to dispose the Flowable early either. I believe my terminal operator is a blockingGet() on a Single, no timeout or anything.

I also logged the total number of threads in my JVM to see if I'm leaking threads somewhere, but I'm only at 49 when the deadlock occurs. (I'm using the IO scheduler, which I believe is backed by an unbounded pool, so I can't imagine I would be running out of worker threads.) I do have other Flowables doing unrelated tasks in the background. All of those Flowables are using the IO scheduler. Additionally, the up and down stream of the Flowable in my test also make use of the IO scheduler, but the deadlock always seems to happen here.

I realize that's not a lot of info to go off of, but I figured I'd ask the experts in case there's something glaring that I'm missing, or if there's something else I can do to figure out what's going on.

Thanks in advance!

@akarnokd
Copy link
Member

akarnokd commented Aug 8, 2018

What you are describing should not happen unless you either call onNext concurrently in the upstrem or your downstream does not request. Please provide a minimal runnable code example demonstrating the problem.

@akarnokd akarnokd added 2.x Missing-Details Could be a question or a bug report, but not enough details are provided. labels Aug 8, 2018
@jkarshin
Copy link
Author

jkarshin commented Aug 8, 2018

The sub-Flowable where I'm seeing the deadlock looks like this:

Collection<String> identifiers = ... // In my test, happens to always be size 1
int bufferSize = ...

// Identifiers are logged here

return Flowable.fromIterable(identifiers)
    .compose(FlowableTransformers.errorJump(flow -> flow.buffer(bufferSize)))
    .flatMap(idsInBatch -> Single.fromCallable(/* load list of entities from DB */)
        .doOnSuccess(loaded -> /* validate loaded entities */)
        .flatMapPublisher(Flowable::fromIterable)
        .sorted(/* some comparator */), 1) // End flatMap
    .doOnNext(x -> log("doOnNext before compose"))
    .doOnComplete(() -> log("doOnComplete before compose"))
    .compose(/* FlowableTransformer that in this test only calls upstream.observeOn(...) as described in previous post */)
    .doOnNext(x -> log("doOnNext after compose"))
    .doOnComplete(() -> log("doOnComplete after compose"))
    .doOnError(x -> log("doOnError after compose: " + x))
    .flatMapMaybe(entity -> /* returns Maybe.empty() or Maybe.just(entity) or throws an exception */)
    .map(entity -> /* mutate entity */)
    .map(entity -> /* another mutation */)
    .collect(Maps::<String, T> newHashMap, (map, pojo) -> map.put(pojo.getType(), pojo))
    .blockingGet();

(I call it a "sub-Flowable" to distinguish it from a larger surrounding Flowable whose code I have omitted.)

That sub-Flowable is called in a loop, which is called in the lambda of a map(...) operation on another larger Flowable. (I realize that calling blockingGet() within a lambda passed to a Flowable is bad, but could that cause the deadlock that I'm seeing?)

In my test, the sub-Flowable that I pasted is successfully executed a few times, but very rarely, one of the later invocations will hang. For the iterations that do not hang, I can see all of the log statements being hit. For the iteration that does hang, I can see the logs before the observeOn(...) being hit, but none of the 3 logs after it are hit.

I was also able to use VisualVM to do a thread dump. Most of the threads have names likeRxCachedThreadScheduler-# and RxComputationThreadPool-#, but judging by their stack traces, most seem to be waiting in their respective pools:

"RxCachedThreadScheduler-15" #37 daemon prio=5 os_prio=31 tid=0x00007ff6e7d86000 nid=0x6503 waiting on condition [0x0000700007e87000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000007977e5570> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

There are 5 threads that are blocked/waiting that aren't simply sitting in a pool. One of them seems to be waiting for the blockingGet() call at the end of the sub-Flowable I described:

"RxCachedThreadScheduler-21" #43 daemon prio=5 os_prio=31 tid=0x00007ff6eb2a8800 nid=0x8503 waiting on condition [0x0000700008497000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000077937a568> (a java.util.concurrent.CountDownLatch$Sync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
        at io.reactivex.internal.observers.BlockingMultiObserver.blockingGet(BlockingMultiObserver.java:83)
        at io.reactivex.Single.blockingGet(Single.java:2492)
        // More stack trace pointing to where I call blockingGet()

The other 4 blocked threads seem unrelated, but I can briefly describe them.

The result of the sub-Flowable I pasted is being used to write to a cache; two of the other blocked threads are attempting to clear that cache, and it is known that the clear operation will block until any pending writes complete. The cache sits above this sub-Flowable, so the sub-Flowable can't reference it (which would lead to deadlock).

One of the other blocked threads is waiting for the cache clear operation from the other threads to succeed. The final blocked thread contains the terminal blockingGet() at the end of the Flowable that surrounds the Flowable I pasted.

The relevant logs look like this:

RxCachedThreadScheduler-21-2018-08-08T14:49:40.268Z: Doing load for identifiers: [id1]
RxCachedThreadScheduler-21-2018-08-08T14:49:40.270Z: doOnNext before compose
RxCachedThreadScheduler-21-2018-08-08T14:49:40.270Z: doOnComplete before compose
RxCachedThreadScheduler-19-2018-08-08T14:49:40.270Z: doOnNext after compose
RxCachedThreadScheduler-19-2018-08-08T14:49:40.271Z: doOnComplete after compose
// Separated for readability
RxCachedThreadScheduler-21-2018-08-08T14:49:40.271Z: Doing load for identifiers: [id2]
RxCachedThreadScheduler-21-2018-08-08T14:49:40.273Z: doOnNext before compose
RxCachedThreadScheduler-21-2018-08-08T14:49:40.273Z: doOnComplete before compose
RxCachedThreadScheduler-17-2018-08-08T14:49:40.273Z: doOnNext after compose
RxCachedThreadScheduler-17-2018-08-08T14:49:40.274Z: doOnComplete after compose
// [A few more repeated logs here]
RxCachedThreadScheduler-21-2018-08-08T14:49:40.295Z: Doing load for identifiers: [id7]
RxCachedThreadScheduler-21-2018-08-08T14:49:40.296Z: doOnNext before compose
RxCachedThreadScheduler-21-2018-08-08T14:49:40.296Z: doOnComplete before compose
// All logs stop here

The only thing notable about these logs is that the logs before the observeOn(...) always come from RxCachedThreadScheduler-21, which is the thread that seems blocked inside Single.blockingGet(...) (whose thread dump I included above).

The remaining memory in the JVM seems fine as well.

Unfortunately, I can't really give you the entire test to reproduce the problem, as it is quite large and part of a proprietary code base. (And, you'd have to run the test over and over again for about an hour on average to see the deadlock.)

I'll work on writing a smaller test that I can share with you that hopefully reproduces the deadlock more consistently, but so far I haven't had any luck.

@akarnokd
Copy link
Member

akarnokd commented Aug 8, 2018

It is very likely blockingGet() is at least part of the problem, especially if you call it from a simple map. You could be blocking the thread servicing the parent flow that should trigger something that makes the inner flow complete and thus unblocking the blockingGet.

@davidmoten
Copy link
Collaborator

@jkarshin I'm interested in this, a unit test would be great 👍

@jkarshin
Copy link
Author

jkarshin commented Aug 9, 2018

Minor update:

I've added a doOnCancel(...), doOnRequest(...), and doOnSubscribe(...) right below the compose(...) call that triggers the observeOn(...). When it hangs, I see:

// More logs above
RxCachedThreadScheduler-21-2018-08-09T19:33:56.583Z: Doing load for identifiers: [id6]
RxCachedThreadScheduler-21-2018-08-09T19:33:56.583Z: Downstream subscribed
RxCachedThreadScheduler-21-2018-08-09T19:33:56.583Z: Downstream requested: 1
RxCachedThreadScheduler-21-2018-08-09T19:33:56.583Z: doOnNext before compose for identifier: id6
RxCachedThreadScheduler-21-2018-08-09T19:33:56.584Z: doOnComplete before compose for identifiers: [id6]
// End of logs

This seems to suggest that the downstream is successfully subscribing and requesting an item, and that it is not cancelling.

I'm still working on making the apparent deadlock easier to reproduce. I haven't been able to get a small unit test to show the same problem, but I have written a slightly smaller integration test that reproduces the deadlock more frequently. (Still too huge for me to share any useful code, but I'm working on it.)

@jkarshin
Copy link
Author

I converted the Flowable in my previous code snippet into a unit-testable Flowable in an attempt to reproduce the hang in a unit test, however, I can not get it to hang.

I then cloned RxJava to insert log statements into the internals of the observeOn() logic and look for discrepancies between the logging of the unit test and the hanging production code.

Long story short, I discovered that NewThreadWorker.scheduleActual(...) is submitting a Runnable that never gets executed. (I decorated the decoratedRun Runnable with some log statements and discovered that it does not get executed during the iteration of my production code that hangs.) The logs indicate that the Runnable does get submitted to the executor, but it almost seems as if the executor is blocked by some other task. (The executor only has a single thread in its pool.) I repeatedly logged the output of the toString() method on the hanging executor in intervals of one second, and I get the same output every second:

RxCachedThreadScheduler-23-2018-08-13T23:37:26.303Z: In NewThreadWorker: java.util.concurrent.ScheduledThreadPoolExecutor@3c878dd4[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 3718]

I don't know a lot about ScheduledThreadPoolExecutor's toString() method, but the fact that there is always an active thread and a queued task leads me to believe that something is hanging causing the observeOn's decoratedRunnable to never be executed. (I believe if there were only a single submitted task, there would be either an active thread or a queued task, but not both.)

This brings up a question:

For the IO Scheduler, the executors are cached so that they can be reused, correct? Is it possible that an executor can be recycled even if it hasn't completed its task? If so, the following situation might be causing my deadlock:

  1. Thread A begins re-populating my cache
  2. Thread B attempts to clear the cache. This is blocked since the cache is in the middle of being re-populated. An arbitrary executor is assigned this task.
  3. Thread A hits an observeOn call. By random chance, the executor from step 2 is recycled and used for Thread A's task.
  4. The cache-populating task can't complete until the executor completes its current task, but that current task (the cache clearing task) can't complete until the cache is done being populated.

I'll try to capture the above situation in a unit test later this week.

@akarnokd
Copy link
Member

Try with NewThread so worker reuse is not in the picture. Also don't block and don't use any traditional wait-notify. Confine cache management into one thread.

@jkarshin
Copy link
Author

After much effort, I am unable to reproduce the deadlock in a unit test.

With more logging, I've discovered that when the deadlock occurs in my production code, a ThreadWorker is being reused. This ThreadWorker's ScheduledExecutorService's toString() output indicates that it is still executing a thread, even though it had been returned to the CachedWorkerPool.

My initial thought was that the thread responsible for clearing one of my caches was somehow getting put back into the pool, but further logging indicates that this is not the case: I added logging to determine the name of the thread that was in the ScheduledExecutorService mentioned above, and surprisingly, it matched the name of the thread from which I was doing the logging. (Which was not the thread that would have been blocked clearing my cache.)

This leads me to believe that because I am nesting Flowables, Singles, Completables, etc, and doing observeOn()s and blockingGet()s in those nested Publishers (for lack of a better collective term), it is possible that a ThreadWorker can be recycled even though it is still in use. And by random chance, if one of those workers is picked up within the flow that recycled it, a deadlock will occur. A thread dump verified that the thread that the recycled worker was supposed to use was waiting in a blockingGet() call.

My next course of action is going to be to refactor my code base to try to eliminate these nested blocking calls and see if that eliminates the deadlock problem. That will probably take a few weeks, so feel free to close this ticket in the meantime; I can re-open it and renew my investigation if the problem persists.

Thanks again for the help.

@akarnokd
Copy link
Member

If a task executing on the standard schedulers does not respond to interruptions, it may lead to premature worker reuse. blockingGet uses CountDownLatch.await which can be interrupted, thus when the parent worker is disposed, it should unblock the blockingGet and throw an InterruptedException.

Unfortunately, waiting for a worker to run out of tasks may lead to memory leaks or excess pool growth (i.e., the self-release can't keep up with the request for more workers).

Without seeing into the actual code, I don't think we can help you much. I'd suggest using a dedicated, single-threaded scheduler to manage the cache and retrieve items non-blockingly:

Observable.interval(1, TimeUnit.MINUTES, Schedulers.single())
     .doOnNext(v -> cache.clear())
     .subscribe();

Observable.fromIterable(items)
    .subscribeOn(Schedulers.single())
    .map(v -> cache.get(v))
    .observeOn(Schedulers.io())
    .doOnNext(v -> { /* work with the item */ })
    .subscribe(/* ... */);

@jkarshin
Copy link
Author

I was too stubborn to give up on this, so I pressed on and made some progress:

Here are some logs that correspond to what I was describing in my previous post:

// Unrelated logs above
// [Some logs taking place on RxCachedThreadScheduler-37...]
2018-08-21 16:08:04,802 DEBUG [RxCachedThreadScheduler-37] IoScheduler - Returning threadWorker (RxCachedThreadScheduler-37) to pool.
// [Other logs taking place on thread RxCachedThreadScheduler-37...]
2018-08-21 16:08:04,832 DEBUG [RxCachedThreadScheduler-37] IoScheduler - Taking threadWorker out of pool: RxCachedThreadScheduler-37
2018-08-21 16:08:04,833 INFO  [RxCachedThreadScheduler-37] MongoDaoHelper - doOnNext before compose for identifier: id6
2018-08-21 16:08:04,833 INFO  [RxCachedThreadScheduler-37] MongoDaoHelper - doOnComplete before compose for identifiers: [id6]
// All logs stop here

Basically, the logs indicate that some work is done on thread 37, then it is returned to the pool, then more work is done on thread 37 (without taking 37 out of the pool), then finally thread 37 tries to take itself out of the pool, then the deadlock occurs. Additionally, with a thread dump, I can see that thread 37 is waiting in a blockingGet.

Normally, when a thread is put into the pool, it isn't used until a different thread takes it out of the pool. Ex:

[Thread-1]: // Does some work
[Thread-1]: IoScheduler - Returning threadWorker (Thread-1) to pool.
// ... No work on Thread-1 here ...
[Thread-2]: IoScheduler - Taking threadWorker out of pool: Thread-1
[Thread-1]: // Does some more work

Nowhere else in the logs does a thread attempt to check itself out of the pool, nor does a thread continue to do work after it has been returned to the pool (without being taken out of the pool).

This makes me think I've got some combination of RxJava operators that is causing a thread to be
used even after it has been returned to the pool. The thread eventually gets stuck in a blockingGet, waiting on the upstream to complete, while the upstream can randomly try to take that same thread out of the pool, leading to deadlock.

I was also able to get a stack trace where thread 37 is returned to the pool prematurely:

2018-08-21 16:23:27,674 DEBUG [RxCachedThreadScheduler-37] IoScheduler - Returning threadWorker (RxCachedThreadScheduler-37) to pool. Including stacktrace: 
java.lang.RuntimeException: null
	at io.reactivex.internal.schedulers.IoScheduler$CachedWorkerPool.release(IoScheduler.java:127) [rxjava-2.1.11-forked.jar:?]
	at io.reactivex.internal.schedulers.IoScheduler$EventLoopWorker.dispose(IoScheduler.java:229) [rxjava-2.1.11-forked.jar:?]
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.cancel(FlowableObserveOn.java:155) [rxjava-2.1.11-forked.jar:?]
	at io.reactivex.internal.operators.flowable.FlowableSingle$SingleElementSubscriber.cancel(FlowableSingle.java:118) [rxjava-2.1.11-forked.jar:?]
	at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.cancel(FlowableFlatMap.java:353) [rxjava-2.1.11-forked.jar:?]
	at io.reactivex.internal.subscribers.BasicFuseableSubscriber.cancel(BasicFuseableSubscriber.java:158) [rxjava-2.1.11-forked.jar:?]
	at io.reactivex.internal.subscribers.BasicFuseableSubscriber.cancel(BasicFuseableSubscriber.java:158) [rxjava-2.1.11-forked.jar:?]
	at io.reactivex.internal.subscriptions.SubscriptionArbiter.setSubscription(SubscriptionArbiter.java:84) [rxjava-2.1.11-forked.jar:?]
	at io.reactivex.internal.operators.flowable.FlowableConcatArray$ConcatArraySubscriber.onSubscribe(FlowableConcatArray.java:70) [rxjava-2.1.11-forked.jar:?]
	at io.reactivex.internal.operators.flowable.FlowableJust.subscribeActual(FlowableJust.java:34) [rxjava-2.1.11-forked.jar:?]
	at io.reactivex.Flowable.subscribe(Flowable.java:14291) [rxjava-2.1.11-forked.jar:?]
	at io.reactivex.Flowable.subscribe(Flowable.java:14237) [rxjava-2.1.11-forked.jar:?]
	at io.reactivex.internal.operators.flowable.FlowableConcatArray$ConcatArraySubscriber.onComplete(FlowableConcatArray.java:139) [rxjava-2.1.11-forked.jar:?]
	at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onComplete(BasicFuseableSubscriber.java:120) [rxjava-2.1.11-forked.jar:?]
	at io.reactivex.internal.operators.flowable.FlowableDistinct$DistinctSubscriber.onComplete(FlowableDistinct.java:113) [rxjava-2.1.11-forked.jar:?]
	at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drainLoop(FlowableFlatMap.java:426) [rxjava-2.1.11-forked.jar:?]
	at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drain(FlowableFlatMap.java:366) [rxjava-2.1.11-forked.jar:?]
	at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onComplete(FlowableFlatMap.java:338) [rxjava-2.1.11-forked.jar:?]
	at io.reactivex.internal.subscriptions.DeferredScalarSubscription.complete(DeferredScalarSubscription.java:135) [rxjava-2.1.11-forked.jar:?]
	at io.reactivex.internal.operators.flowable.FlowableSingle$SingleElementSubscriber.onComplete(FlowableSingle.java:111) [rxjava-2.1.11-forked.jar:?]
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.checkTerminated(FlowableObserveOn.java:198) [rxjava-2.1.11-forked.jar:?]
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:392) [rxjava-2.1.11-forked.jar:?]
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176) [rxjava-2.1.11-forked.jar:?]
	at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66) [rxjava-2.1.11-forked.jar:?]
	at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57) [rxjava-2.1.11-forked.jar:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_181]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_181]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_181]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]

(The RuntimeException: null bit is from me calling new RuntimeException().printStackTrace(...))

Unfortunately, the stack trace doesn't reference my code at all, so I can't pinpoint what operators I am using. I did find it strange that there are a few calls to cancel() in that trace (I don't think I'm explicitly cancelling anything). If I had to guess, one of the RxJava operators that I'm using is doing an implicit cancel() (causing the thread to be returned to the pool), and then continuing to use the thread.

I'll try to reverse engineer the stack trace to find what combination of operators is causing that thread to get returned to the pool. If I can find the combination of operators, hopefully I can turn that into a unit test that consistently shows a thread being returned to the pool prematurely.

@akarnokd
Copy link
Member

I think I know what's going on.

  • observeOn signals a completion
  • concat switches to the next source, which starts with a just operator
  • SubscriptionArbiter cancels the previous subscription, which is completing
  • observeOn receives cancel, releases the worker
  • the next source of concat starts a blocking operation still on the current thread (released)
  • some other thread gets the released worker and none of its task will execute.

On your side, you could try that the just that ends up blocking also receives a subscribeOn(Schedulers.io()).

I'll think about the observeOn and SubscriptionArbiter's behaviors.

@jkarshin
Copy link
Author

I believe you are correct. I found a piece of my code similar to what you described, and I was able to turn it into a unit test that consistently returns a worker thread to the pool prematurely. Using a blockingGet(), I can make this deadlock consistently as well:

RxJava: 2.1.11

Flowable.<Integer> empty()
                .observeOn(Schedulers.io())
                .concatWith(Flowable.just(42))
                .doOnNext(x -> log("After concatWith()..."))
                .map(x -> Single.just(x)
                        .doOnSuccess(y -> log("In nested Single before observeOn..."))
                        .observeOn(Schedulers.io())
                        .doOnSuccess(y -> log("This log is never hit"))
                        .blockingGet())
                .blockingSubscribe();

The above hangs consistently on my machine and yields logs similar to the following:

main: IoScheduler - Creating new threadWorker for thread: RxCachedThreadScheduler-1
RxCachedThreadScheduler-1: IoScheduler - Returning threadWorker (RxCachedThreadScheduler-1) to pool.
RxCachedThreadScheduler-1: After concatWith()...
RxCachedThreadScheduler-1: In nested Single before observeOn...
RxCachedThreadScheduler-1: IoScheduler - Re-using threadWorker for thread: RxCachedThreadScheduler-1
RxCachedThreadScheduler-1: BlockingMultiObserver - About to wait in blockingGet...

(Replacing the .map(... blockingGet ...) with a flatMapSingle(...) fixes the deadlock caused by re-using the prematurely recycled thread, but this isn't necessarily a viable solution for me, since that nested Single is hidden behind a sub-service whose interface I'd rather not change.)

Running against the 2.x branch of RxJava after the changes you made in response to my issue (#6167) fixes the hanging unit test and seems to fix my hanging production code. So, I will wait for the next RxJava release =)

Thanks again!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
2.x Missing-Details Could be a question or a bug report, but not enough details are provided.
Projects
None yet
Development

No branches or pull requests

3 participants