Skip to content

Commit

Permalink
Merge pull request #1830 from benjchristensen/313-mergeDelayError
Browse files Browse the repository at this point in the history
Fix mergeDelayError Handling of Error in Parent Observable
  • Loading branch information
benjchristensen committed Nov 6, 2014
2 parents 18d6f3e + 6349171 commit 6f4c27e
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 3 deletions.
9 changes: 7 additions & 2 deletions src/main/java/rx/internal/operators/OperatorMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,11 @@ public Boolean call(InnerSubscriber<T> s) {

@Override
public void onError(Throwable e) {
completed = true;
innerError(e);
}

private void innerError(Throwable e) {
if (delayErrors) {
synchronized (this) {
if (exceptions == null) {
Expand Down Expand Up @@ -540,7 +545,7 @@ public void onNext(T t) {
public void onError(Throwable e) {
// it doesn't go through queues, it immediately onErrors and tears everything down
if (ONCE_TERMINATED.compareAndSet(this, 0, 1)) {
parentSubscriber.onError(e);
parentSubscriber.innerError(e);
}
}

Expand Down Expand Up @@ -753,4 +758,4 @@ private int drainQueue() {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;
import rx.exceptions.CompositeException;
import rx.exceptions.TestException;
import rx.observers.TestSubscriber;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
Expand Down Expand Up @@ -475,4 +479,65 @@ public void onCompleted() {
inOrder.verify(o).onError(any(TestException.class));
verify(o, never()).onCompleted();
}
}

@Test
public void testErrorInParentObservable() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Observable.mergeDelayError(
Observable.just(Observable.just(1), Observable.just(2))
.startWith(Observable.<Integer> error(new RuntimeException()))
).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertTerminalEvent();
ts.assertReceivedOnNext(Arrays.asList(1, 2));
assertEquals(1, ts.getOnErrorEvents().size());

}

@Test
public void testErrorInParentObservableDelayed() throws Exception {
final TestASynchronous1sDelayedObservable o1 = new TestASynchronous1sDelayedObservable();
final TestASynchronous1sDelayedObservable o2 = new TestASynchronous1sDelayedObservable();
Observable<Observable<String>> parentObservable = Observable.create(new Observable.OnSubscribe<Observable<String>>() {
@Override
public void call(Subscriber<? super Observable<String>> op) {
op.onNext(Observable.create(o1));
op.onNext(Observable.create(o2));
op.onError(new NullPointerException("throwing exception in parent"));
}
});

TestSubscriber<String> ts = new TestSubscriber<String>(stringObserver);
Observable<String> m = Observable.mergeDelayError(parentObservable);
m.subscribe(ts);
ts.awaitTerminalEvent(2000, TimeUnit.MILLISECONDS);
ts.assertTerminalEvent();

verify(stringObserver, times(2)).onNext("hello");
verify(stringObserver, times(1)).onError(any(NullPointerException.class));
verify(stringObserver, never()).onCompleted();
}

private static class TestASynchronous1sDelayedObservable implements Observable.OnSubscribe<String> {
Thread t;

@Override
public void call(final Subscriber<? super String> observer) {
t = new Thread(new Runnable() {

@Override
public void run() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
observer.onError(e);
}
observer.onNext("hello");
observer.onCompleted();
}

});
t.start();
}
}
}

0 comments on commit 6f4c27e

Please sign in to comment.