diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java index c4860532b1..e0dc7be452 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java @@ -409,7 +409,7 @@ public void onError(Throwable e) { boolean sendOnComplete = false; synchronized (this) { wip--; - if (wip == 0 && completed) { + if ((wip == 0 && completed) || (wip < 0)) { sendOnComplete = true; } } diff --git a/rxjava-core/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java b/rxjava-core/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java index 2c60a89632..f5163bdb8e 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java @@ -15,26 +15,11 @@ */ package rx.internal.operators; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.util.ArrayList; -import java.util.List; - import org.junit.Before; import org.junit.Test; import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.MockitoAnnotations; - import rx.Observable; import rx.Observable.OnSubscribe; import rx.Observer; @@ -42,6 +27,15 @@ import rx.exceptions.CompositeException; import rx.exceptions.TestException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.*; + public class OperatorMergeDelayErrorTest { @Mock @@ -289,6 +283,35 @@ public void testMergeArrayWithThreading() { verify(stringObserver, times(1)).onCompleted(); } + @Test(timeout=1000L) + public void testSynchronousError() { + final Observable> o1 = Observable.error(new RuntimeException("unit test")); + + final CountDownLatch latch = new CountDownLatch(1); + Observable.mergeDelayError(o1).subscribe(new Subscriber() { + @Override + public void onCompleted() { + fail("Expected onError path"); + } + + @Override + public void onError(Throwable e) { + latch.countDown(); + } + + @Override + public void onNext(String s) { + fail("Expected onError path"); + } + }); + + try { + latch.await(); + } catch (InterruptedException ex) { + fail("interrupted"); + } + } + private static class TestSynchronousObservable implements Observable.OnSubscribe { @Override