Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Synchronous Source in OnSubscribeRefCount #1753

Merged

Conversation

benjchristensen
Copy link
Member

Merge of #1695 into 1.x branch.

@benjchristensen
Copy link
Member Author

Keep getting a failure here:

rx.internal.operators.OperatorDelayTest > testBackpressureWithSelectorDelay STARTED
No output has been received in the last 10 minutes, this potentially indicates a stalled build or something wrong with the build itself.

Yet I can run that test in an infinite loop on my machine and never see it hang.

@benjchristensen
Copy link
Member Author

I also don't see any use of multicast or refCount in any of the code or dependent code used by that test.

@benjchristensen
Copy link
Member Author

Merging ... as this code passes in other environments, and will dig into this issue on main branch.

benjchristensen added a commit that referenced this pull request Oct 14, 2014
Support Synchronous Source in OnSubscribeRefCount
@benjchristensen benjchristensen merged commit 563cd40 into ReactiveX:1.x Oct 14, 2014
@benjchristensen benjchristensen deleted the onSubscribeRefCount-1688 branch October 14, 2014 20:04
@benjchristensen
Copy link
Member Author

Hmm, even in the main branch it's now breaking on Travis. I don't understand.

Hung here:

rx.BackpressureTests > testSubscribeOnScheduling STARTED

@benjchristensen
Copy link
Member Author

From Eclipse tests don't fail, from command-line I get these:

rx.internal.operators.BufferUntilSubscriberTest > testIssue1677 FAILED
    java.lang.AssertionError: Failed inner latch wait, iteration 4
        at org.junit.Assert.fail(Assert.java:93)
        at rx.internal.operators.BufferUntilSubscriberTest.testIssue1677(BufferUntilSubscriberTest.java:78)


rx.internal.operators.OnSubscribeCacheTest > testWithPublishSubjectAndRepeat FAILED
    java.lang.Exception: test timed out after 10000 milliseconds


rx.internal.operators.OnSubscribeCombineLatestTest > testBackpressure STARTED
> Building 80% > :test > 182 tests completed, 2 failed, 1 skipped

@benjchristensen
Copy link
Member Author

I reverted this, something is very wrong.

@akarnokd
Copy link
Member

The refcount tests probably blocked all computation scheduler threads and this is why some tests fail.

@benjchristensen
Copy link
Member Author

Those tests are pretty simple, so if they are causing threads to pileup then the operator implementation is a problem.

@benjchristensen
Copy link
Member Author

I've committed some code to discuss: #1754

UPDATE: Ignore that pull request. Look at #1755 instead.

@akarnokd
Copy link
Member

A threaddump would be great, if you can get it hang in your command line long enough.

@benjchristensen
Copy link
Member Author

I changed the unit tests and things seem to be working now: #1755

@benjchristensen
Copy link
Member Author

If I put these two unit tests back in it hangs:

    @Test
    public void testRefCountUnsubscribeForSynchronousSource() throws InterruptedException {
        final CountDownLatch latch = new CountDownLatch(1);
        Observable<Long> o = synchronousInterval().lift(detectUnsubscription(latch));
        Subscriber<Long> sub = Subscribers.empty();
        o.publish().refCount().subscribeOn(Schedulers.computation()).subscribe(sub);
        Thread.sleep(100);
        sub.unsubscribe();
        assertTrue(latch.await(3, TimeUnit.SECONDS));
    }

    @Test
    public void testSubscribeToPublishWithAlreadyUnsubscribedSubscriber() {
        Subscriber<Object> sub = Subscribers.empty();
        sub.unsubscribe();
        ConnectableObservable<Object> o = Observable.empty().publish();
        o.subscribe(sub);
        o.connect();
    }

    private Operator<Long, Long> detectUnsubscription(final CountDownLatch latch) {
        return new Operator<Long,Long>(){
            @Override
            public Subscriber<? super Long> call(Subscriber<? super Long> subscriber) {
                latch.countDown();
                return Subscribers.from(subscriber);
            }};
    }

Here is where it hangs:

rx.internal.operators.OperatorDelayTest > testBackpressureWithSubscriptionTimedDelay STARTED
> Building 80% > :test > 374 tests completed, 1 skipped

It seems the detectUnsubscription is wrong. That is actually detecting a subscription, not unsubscription.

The thread dumps of interest are:



"RxComputationThreadPool-4" daemon prio=5 tid=0x00007f92a5a2c800 nid=0x6303 waiting on condition [0x000000011e1ac000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
    at java.lang.Thread.sleep(Native Method)
    at rx.internal.operators.OnSubscribeRefCountTest$15.call(OnSubscribeRefCountTest.java:318)
    at rx.internal.operators.OnSubscribeRefCountTest$15.call(OnSubscribeRefCountTest.java:312)
    at rx.Observable$1.call(Observable.java:144)
    at rx.Observable$1.call(Observable.java:137)
    at rx.Observable.unsafeSubscribe(Observable.java:7600)
    at rx.internal.operators.OperatorMulticast.connect(OperatorMulticast.java:146)
    at rx.internal.operators.OnSubscribeRefCount.call(OnSubscribeRefCount.java:71)
    at rx.internal.operators.OnSubscribeRefCount.call(OnSubscribeRefCount.java:38)
    at rx.Observable.unsafeSubscribe(Observable.java:7600)
    at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:45)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)


"Test worker" prio=5 tid=0x00007f92a4134000 nid=0x5703 waiting on condition [0x000000011db99000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007b3de47e0> (a java.util.concurrent.CountDownLatch$Sync)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
    at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
    at rx.observers.TestSubscriber.awaitTerminalEvent(TestSubscriber.java:185)
    at rx.internal.operators.OperatorDelayTest.testBackpressureWithSubscriptionTimedDelay(OperatorDelayTest.java:721)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:86)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:49)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:69)
    at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:48)
    at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
    at org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
    at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:105)
    at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:355)
    at org.gradle.internal.concurrent.DefaultExecutorFactory$StoppableExecutorImpl$1.run(DefaultExecutorFactory.java:64)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

@benjchristensen
Copy link
Member Author

It's still very odd to me why the OperatorDelayTest > testBackpressureWithSubscriptionTimedDelay test is hanging ... I haven't figured that out yet. There is only 1 Rx thread lost to RefCount and 7 others available.

@akarnokd
Copy link
Member

Since we do round-robin when assigning worker threads in computation scheduler, it is possible a blocked thread gets assigned to the test even if the others are free.

@benjchristensen
Copy link
Member Author

Changing this:

    @Test
    public void testRefCountUnsubscribeForSynchronousSource() throws InterruptedException {
        final CountDownLatch latch = new CountDownLatch(1);
        Observable<Long> o = synchronousInterval().lift(detectUnsubscription(latch));
        Subscriber<Long> sub = Subscribers.empty();
        o.publish().refCount().subscribeOn(Schedulers.computation()).subscribe(sub);
        Thread.sleep(100);
        sub.unsubscribe();
        assertTrue(latch.await(3, TimeUnit.SECONDS));
    }

    private Operator<Long, Long> detectUnsubscription(final CountDownLatch latch) {
        return new Operator<Long,Long>(){
            @Override
            public Subscriber<? super Long> call(Subscriber<? super Long> subscriber) {
                latch.countDown();
                return Subscribers.from(subscriber);
            }};
    }

to this

    @Test
    public void testRefCountUnsubscribeForSynchronousSource() throws InterruptedException {
        final CountDownLatch latch = new CountDownLatch(1);
        Observable<Long> o = synchronousInterval().doOnUnsubscribe(new Action0() {

            @Override
            public void call() {
                latch.countDown();                
            }

        });
        Subscriber<Long> sub = Subscribers.empty();
        o.publish().refCount().subscribeOn(Schedulers.computation()).subscribe(sub);
        Thread.sleep(100);
        sub.unsubscribe();
        assertTrue(latch.await(3, TimeUnit.SECONDS));
    }

fixes the issue.

So it seems that our Scheduler implementation taking a random thread can end up blocked on a hung thread.

Obviously blocking an event loop thread is a "bad thing" ... and this is a nasty side effect.

@benjchristensen
Copy link
Member Author

Since we do round-robin when assigning worker threads in computation scheduler, it is possible a blocked thread gets assigned to the test even if the others are free.

Yeah, what you said.

@benjchristensen
Copy link
Member Author

K, got it I think ... Subscribers.from wraps the Subscriber but does not compose through the subscription, so the unsubscribe never propagates and thus never shuts down the infinite loop.

The test passes though because the latch is immediately released as soon as a subscribe happens.

So ... just a bad test.

@akarnokd
Copy link
Member

Good catch!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants