Skip to content

Commit

Permalink
Subject Error Handling
Browse files Browse the repository at this point in the history
Fixes ReactiveX#1685 by delaying errors that are caught until after all subscribers have a chance to receive the event.

Note that this has a lot of code duplication to handle this across the Subject implementations. It may be worth abstracting this logic ... but right now I'm just doing what makes sense to fix this as the Subject abstractions are non-trivial.
  • Loading branch information
benjchristensen committed Oct 10, 2014
1 parent b2fe579 commit d66df7f
Show file tree
Hide file tree
Showing 9 changed files with 297 additions and 6 deletions.
23 changes: 22 additions & 1 deletion src/main/java/rx/subjects/AsyncSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
*/
package rx.subjects;

import java.util.ArrayList;
import java.util.List;

import rx.Observer;
import rx.exceptions.CompositeException;
import rx.exceptions.Exceptions;
import rx.functions.Action1;
import rx.internal.operators.NotificationLite;
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
Expand Down Expand Up @@ -104,8 +109,24 @@ public void onCompleted() {
public void onError(final Throwable e) {
if (state.active) {
Object n = nl.error(e);
List<Throwable> errors = null;
for (SubjectObserver<T> bo : state.terminate(n)) {
bo.onError(e);
try {
bo.onError(e);
} catch (Throwable e2) {
if (errors == null) {
errors = new ArrayList<Throwable>();
}
errors.add(e2);
}
}

if (errors != null) {
if (errors.size() == 1) {
Exceptions.propagate(errors.get(0));
} else {
throw new CompositeException("Errors while emitting AsyncSubject.onError", errors);
}
}
}
}
Expand Down
23 changes: 22 additions & 1 deletion src/main/java/rx/subjects/BehaviorSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
package rx.subjects;


import java.util.ArrayList;
import java.util.List;

import rx.Observer;
import rx.exceptions.CompositeException;
import rx.exceptions.Exceptions;
import rx.functions.Action1;
import rx.internal.operators.NotificationLite;
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
Expand Down Expand Up @@ -131,8 +136,24 @@ public void onError(Throwable e) {
Object last = state.get();
if (last == null || state.active) {
Object n = nl.error(e);
List<Throwable> errors = null;
for (SubjectObserver<T> bo : state.terminate(n)) {
bo.emitNext(n, state.nl);
try {
bo.emitNext(n, state.nl);
} catch (Throwable e2) {
if (errors == null) {
errors = new ArrayList<Throwable>();
}
errors.add(e2);
}
}

if (errors != null) {
if (errors.size() == 1) {
Exceptions.propagate(errors.get(0));
} else {
throw new CompositeException("Errors while emitting AsyncSubject.onError", errors);
}
}
}
}
Expand Down
22 changes: 21 additions & 1 deletion src/main/java/rx/subjects/PublishSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
*/
package rx.subjects;

import java.util.ArrayList;
import java.util.List;

import rx.Observer;
import rx.exceptions.CompositeException;
import rx.exceptions.Exceptions;
import rx.functions.Action1;
import rx.internal.operators.NotificationLite;
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
Expand Down Expand Up @@ -89,8 +94,23 @@ public void onCompleted() {
public void onError(final Throwable e) {
if (state.active) {
Object n = nl.error(e);
List<Throwable> errors = null;
for (SubjectObserver<T> bo : state.terminate(n)) {
bo.emitNext(n, state.nl);
try {
bo.emitNext(n, state.nl);
} catch (Throwable e2) {
if (errors == null) {
errors = new ArrayList<Throwable>();
}
errors.add(e2);
}
}
if (errors != null) {
if (errors.size() == 1) {
Exceptions.propagate(errors.get(0));
} else {
throw new CompositeException("Errors while emitting PublishSubject.onError", errors);
}
}
}
}
Expand Down
23 changes: 21 additions & 2 deletions src/main/java/rx/subjects/ReplaySubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
package rx.subjects;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import rx.Observer;
import rx.Scheduler;
import rx.exceptions.CompositeException;
import rx.exceptions.Exceptions;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Functions;
Expand Down Expand Up @@ -303,9 +306,25 @@ public void onNext(T t) {
public void onError(final Throwable e) {
if (ssm.active) {
state.error(e);
List<Throwable> errors = null;
for (SubjectObserver<? super T> o : ssm.terminate(NotificationLite.instance().error(e))) {
if (caughtUp(o)) {
o.onError(e);
try {
if (caughtUp(o)) {
o.onError(e);
}
} catch (Throwable e2) {
if (errors == null) {
errors = new ArrayList<Throwable>();
}
errors.add(e2);
}
}

if (errors != null) {
if (errors.size() == 1) {
Exceptions.propagate(errors.get(0));
} else {
throw new CompositeException("Errors while emitting ReplaySubject.onError", errors);
}
}
}
Expand Down
19 changes: 18 additions & 1 deletion src/test/java/rx/ObservableTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,15 @@
import rx.Observable.OnSubscribe;
import rx.Observable.Transformer;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Action2;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.observables.ConnectableObservable;
import rx.observers.TestSubscriber;
import rx.schedulers.TestScheduler;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
import rx.subscriptions.BooleanSubscription;

public class ObservableTests {
Expand Down Expand Up @@ -1125,5 +1126,21 @@ public String call(Integer t1) {
ts.assertNoErrors();
ts.assertReceivedOnNext(Arrays.asList("1", "2", "3"));
}

@Test
public void testErrorThrownIssue1685() {
Subject<Object, Object> subject = ReplaySubject.create();

Observable.error(new RuntimeException("oops"))
.materialize()
.delay(1, TimeUnit.SECONDS)
.dematerialize()
.subscribe(subject);

subject.subscribe();
subject.materialize().toBlocking().first();

System.out.println("Done");
}

}
48 changes: 48 additions & 0 deletions src/test/java/rx/subjects/AsyncSubjectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package rx.subjects;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.inOrder;
Expand All @@ -33,7 +34,10 @@

import rx.Observer;
import rx.Subscription;
import rx.exceptions.CompositeException;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action1;
import rx.observers.TestSubscriber;

public class AsyncSubjectTest {

Expand Down Expand Up @@ -281,5 +285,49 @@ public void run() {
}
}
}

@Test
public void testOnErrorThrowsDoesntPreventDelivery() {
AsyncSubject<String> ps = AsyncSubject.create();

ps.subscribe();
TestSubscriber<String> ts = new TestSubscriber<String>();
ps.subscribe(ts);

try {
ps.onError(new RuntimeException("an exception"));
fail("expect OnErrorNotImplementedException");
} catch (OnErrorNotImplementedException e) {
// ignore
}
// even though the onError above throws we should still receive it on the other subscriber
assertEquals(1, ts.getOnErrorEvents().size());
}

/**
* This one has multiple failures so should get a CompositeException
*/
@Test
public void testOnErrorThrowsDoesntPreventDelivery2() {
AsyncSubject<String> ps = AsyncSubject.create();

ps.subscribe();
ps.subscribe();
TestSubscriber<String> ts = new TestSubscriber<String>();
ps.subscribe(ts);
ps.subscribe();
ps.subscribe();
ps.subscribe();

try {
ps.onError(new RuntimeException("an exception"));
fail("expect OnErrorNotImplementedException");
} catch (CompositeException e) {
// we should have 5 of them
assertEquals(5, e.getExceptions().size());
}
// even though the onError above throws we should still receive it on the other subscriber
assertEquals(1, ts.getOnErrorEvents().size());
}

}
48 changes: 48 additions & 0 deletions src/test/java/rx/subjects/BehaviorSubjectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package rx.subjects;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
Expand All @@ -30,7 +31,10 @@
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.exceptions.CompositeException;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Func1;
import rx.observers.TestSubscriber;

public class BehaviorSubjectTest {

Expand Down Expand Up @@ -367,4 +371,48 @@ public void testTakeOneSubscriber() {

assertEquals(0, source.subscriberCount());
}

@Test
public void testOnErrorThrowsDoesntPreventDelivery() {
BehaviorSubject<String> ps = BehaviorSubject.create();

ps.subscribe();
TestSubscriber<String> ts = new TestSubscriber<String>();
ps.subscribe(ts);

try {
ps.onError(new RuntimeException("an exception"));
fail("expect OnErrorNotImplementedException");
} catch (OnErrorNotImplementedException e) {
// ignore
}
// even though the onError above throws we should still receive it on the other subscriber
assertEquals(1, ts.getOnErrorEvents().size());
}

/**
* This one has multiple failures so should get a CompositeException
*/
@Test
public void testOnErrorThrowsDoesntPreventDelivery2() {
BehaviorSubject<String> ps = BehaviorSubject.create();

ps.subscribe();
ps.subscribe();
TestSubscriber<String> ts = new TestSubscriber<String>();
ps.subscribe(ts);
ps.subscribe();
ps.subscribe();
ps.subscribe();

try {
ps.onError(new RuntimeException("an exception"));
fail("expect OnErrorNotImplementedException");
} catch (CompositeException e) {
// we should have 5 of them
assertEquals(5, e.getExceptions().size());
}
// even though the onError above throws we should still receive it on the other subscriber
assertEquals(1, ts.getOnErrorEvents().size());
}
}
Loading

0 comments on commit d66df7f

Please sign in to comment.