Skip to content

Commit

Permalink
Merge pull request #839 from benjchristensen/error-handling
Browse files Browse the repository at this point in the history
Error Handling: OnErrorNotImplemented and java.lang.Error
  • Loading branch information
benjchristensen committed Feb 8, 2014
2 parents 5c6db66 + 2a4e9c6 commit d56b1b9
Show file tree
Hide file tree
Showing 14 changed files with 250 additions and 639 deletions.
6 changes: 3 additions & 3 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
import rx.subscriptions.Subscriptions;
import rx.util.Exceptions;
import rx.util.OnErrorNotImplementedException;
import rx.util.Range;
import rx.util.TimeInterval;
Expand Down Expand Up @@ -6964,10 +6965,9 @@ public void call() {
}

});
} catch (OnErrorNotImplementedException e) {
// special handling when onError is not implemented ... we just rethrow
throw e;
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
observer.onError(hook.onSubscribeError(this, e));
Expand Down
13 changes: 10 additions & 3 deletions rxjava-core/src/main/java/rx/observers/SafeSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
import java.util.concurrent.atomic.AtomicBoolean;

import rx.Subscriber;
import rx.Subscription;
import rx.operators.SafeObservableSubscription;
import rx.plugins.RxJavaPlugins;
import rx.subscriptions.Subscriptions;
import rx.util.CompositeException;
import rx.util.Exceptions;
import rx.util.OnErrorNotImplementedException;

/**
Expand Down Expand Up @@ -74,6 +72,9 @@ public void onCompleted() {
try {
actual.onCompleted();
} catch (Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwIfFatal(e);
// handle errors if the onCompleted implementation fails, not just if the Observable fails
_onError(e);
} finally {
Expand All @@ -85,6 +86,9 @@ public void onCompleted() {

@Override
public void onError(Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwIfFatal(e);
if (isFinished.compareAndSet(false, true)) {
_onError(e);
}
Expand All @@ -97,6 +101,9 @@ public void onNext(T args) {
actual.onNext(args);
}
} catch (Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwIfFatal(e);
// handle errors if the onNext implementation fails, not just if the Observable fails
onError(e);
}
Expand Down
58 changes: 4 additions & 54 deletions rxjava-core/src/main/java/rx/observers/SynchronizedObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,11 @@
package rx.observers;

import rx.Observer;
import rx.Subscriber;
import rx.operators.SafeObservableSubscription;

/**
* A thread-safe Observer for transitioning states in operators.
* Synchronize execution to be single-threaded.
* <p>
* Execution rules are:
* <ul>
* <li>Allow only single-threaded, synchronous, ordered execution of onNext, onCompleted, onError</li>
* <li>Once an onComplete or onError are performed, no further calls can be executed</li>
* <li>If unsubscribe is called, this means we call completed() and don't allow any further onNext calls.</li>
* </ul>
* This ONLY does synchronization. It does not involve itself in safety or subscriptions. See SafeSubscriber for that.
*
* @param <T>
*/
Expand All @@ -48,76 +41,33 @@ public final class SynchronizedObserver<T> implements Observer<T> {
*/

private final Observer<? super T> observer;
private final SafeObservableSubscription subscription;
private volatile boolean finishRequested = false;
private volatile boolean finished = false;
private volatile Object lock;

public SynchronizedObserver(Observer<? super T> subscriber, SafeObservableSubscription subscription) {
public SynchronizedObserver(Observer<? super T> subscriber) {
this.observer = subscriber;
this.subscription = subscription;
this.lock = this;
}

public SynchronizedObserver(Observer<? super T> subscriber, SafeObservableSubscription subscription, Object lock) {
public SynchronizedObserver(Observer<? super T> subscriber, Object lock) {
this.observer = subscriber;
this.subscription = subscription;
this.lock = lock;
}

/**
* Used when synchronizing an Observer without access to the subscription.
*
* @param Observer
*/
public SynchronizedObserver(Observer<? super T> subscriber) {
this(subscriber, new SafeObservableSubscription());
}

public void onNext(T arg) {
if (finished || finishRequested || subscription.isUnsubscribed()) {
// if we're already stopped, or a finish request has been received, we won't allow further onNext requests
return;
}
synchronized (lock) {
// check again since this could have changed while waiting
if (finished || finishRequested || subscription.isUnsubscribed()) {
// if we're already stopped, or a finish request has been received, we won't allow further onNext requests
return;
}
observer.onNext(arg);
}
}

public void onError(Throwable e) {
if (finished || subscription.isUnsubscribed()) {
// another thread has already finished us, so we won't proceed
return;
}
finishRequested = true;
synchronized (lock) {
// check again since this could have changed while waiting
if (finished || subscription.isUnsubscribed()) {
return;
}
observer.onError(e);
finished = true;
}
}

public void onCompleted() {
if (finished || subscription.isUnsubscribed()) {
// another thread has already finished us, so we won't proceed
return;
}
finishRequested = true;
synchronized (lock) {
// check again since this could have changed while waiting
if (finished || subscription.isUnsubscribed()) {
return;
}
observer.onCompleted();
finished = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import rx.Observer;
import rx.Subscriber;
import rx.operators.SafeObservableSubscription;

/**
* A thread-safe Observer for transitioning states in operators.
Expand All @@ -37,9 +36,7 @@ public final class SynchronizedSubscriber<T> extends Subscriber<T> {

public SynchronizedSubscriber(Subscriber<? super T> subscriber, Object lock) {
super(subscriber);
SafeObservableSubscription s = new SafeObservableSubscription();
subscriber.add(s);
this.observer = new SynchronizedObserver<T>(subscriber, s, lock);
this.observer = new SynchronizedObserver<T>(subscriber, lock);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,7 @@ public Subscription onSubscribe(Observer<? super T> actualObserver) {
* <p>
* Bug report: https://github.com/Netflix/RxJava/issues/614
*/
SafeObservableSubscription subscription = new SafeObservableSubscription(ourSubscription);
completeSubscription.add(subscription);
SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(actualObserver, subscription);
SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(actualObserver);

/**
* Subscribe to the parent Observable to get to the children Observables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,13 @@ public Synchronize(Observable<? extends T> innerObservable, Object lock) {
private Object lock;

public Subscription onSubscribe(Observer<? super T> observer) {
SafeObservableSubscription subscription = new SafeObservableSubscription();
if (lock == null) {
atomicObserver = new SynchronizedObserver<T>(observer, subscription);
atomicObserver = new SynchronizedObserver<T>(observer);
}
else {
atomicObserver = new SynchronizedObserver<T>(observer, subscription, lock);
atomicObserver = new SynchronizedObserver<T>(observer, lock);
}
return subscription.wrap(innerObservable.subscribe(atomicObserver));
return innerObservable.subscribe(atomicObserver);
}

}
Expand Down
Loading

0 comments on commit d56b1b9

Please sign in to comment.