Skip to content

Commit

Permalink
Protect subscribe/forEach implementations against user provided funct…
Browse files Browse the repository at this point in the history
…ion failures

Related to ReactiveX#216

The new forEach unit test went into a deadlock prior to this fix.
  • Loading branch information
benjchristensen committed Apr 1, 2013
1 parent c9fc4df commit ac3cee8
Showing 1 changed file with 110 additions and 9 deletions.
119 changes: 110 additions & 9 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import rx.util.functions.Func3;
import rx.util.functions.Func4;
import rx.util.functions.FuncN;
import rx.util.functions.Function;
import rx.util.functions.FunctionLanguageAdaptor;
import rx.util.functions.Functions;

Expand Down Expand Up @@ -152,7 +153,7 @@ public Subscription subscribe(Observer<T> observer) {
/**
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
if (observer.getClass().getPackage().getName().startsWith("rx")) {
if (isInternalImplementation(observer)) {
Subscription s = onSubscribe.call(observer);
if (s == null) {
// this generally shouldn't be the case on a 'trusted' onSubscribe but in case it happens
Expand All @@ -178,6 +179,16 @@ public Subscription subscribe(Observer<T> observer) {
}
}

/**
* Used for protecting against errors being thrown from Observer implementations and ensuring onNext/onError/onCompleted contract compliance.
* <p>
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
private Subscription protectivelyWrapAndSubscribe(Observer<T> o) {
AtomicObservableSubscription subscription = new AtomicObservableSubscription();
return subscription.wrap(subscribe(new AtomicObserver<T>(subscription, o)));
}

@SuppressWarnings({ "rawtypes", "unchecked" })
public Subscription subscribe(final Map<String, Object> callbacks) {
// lookup and memoize onNext
Expand All @@ -187,7 +198,12 @@ public Subscription subscribe(final Map<String, Object> callbacks) {
}
final FuncN onNext = Functions.from(_onNext);

return subscribe(new Observer() {
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {

public void onCompleted() {
Object onComplete = callbacks.get("onCompleted");
Expand Down Expand Up @@ -224,7 +240,12 @@ public Subscription subscribe(final Object o) {
}
final FuncN onNext = Functions.from(o);

return subscribe(new Observer() {
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {

public void onCompleted() {
// do nothing
Expand All @@ -244,7 +265,12 @@ public void onNext(Object args) {

public Subscription subscribe(final Action1<T> onNext) {

return subscribe(new Observer<T>() {
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer<T>() {

public void onCompleted() {
// do nothing
Expand Down Expand Up @@ -273,7 +299,12 @@ public Subscription subscribe(final Object onNext, final Object onError) {
}
final FuncN onNextFunction = Functions.from(onNext);

return subscribe(new Observer() {
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {

public void onCompleted() {
// do nothing
Expand All @@ -295,7 +326,12 @@ public void onNext(Object args) {

public Subscription subscribe(final Action1<T> onNext, final Action1<Exception> onError) {

return subscribe(new Observer<T>() {
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer<T>() {

public void onCompleted() {
// do nothing
Expand Down Expand Up @@ -326,7 +362,12 @@ public Subscription subscribe(final Object onNext, final Object onError, final O
}
final FuncN onNextFunction = Functions.from(onNext);

return subscribe(new Observer() {
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {

public void onCompleted() {
if (onComplete != null) {
Expand All @@ -350,7 +391,12 @@ public void onNext(Object args) {

public Subscription subscribe(final Action1<T> onNext, final Action1<Exception> onError, final Action0 onComplete) {

return subscribe(new Observer<T>() {
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer<T>() {

public void onCompleted() {
onComplete.call();
Expand Down Expand Up @@ -389,7 +435,12 @@ public void forEach(final Action1<T> onNext) {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> exceptionFromOnError = new AtomicReference<Exception>();

subscribe(new Observer<T>() {
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
protectivelyWrapAndSubscribe(new Observer<T>() {
public void onCompleted() {
latch.countDown();
}
Expand Down Expand Up @@ -3260,6 +3311,23 @@ public Iterable<T> mostRecent(T initialValue) {
return mostRecent(this, initialValue);
}

/**
* Whether a given {@link Function} is an internal implementation inside rx.* packages or not.
* <p>
* For why this is being used see https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*
* NOTE: If strong reasons for not depending on package names comes up then the implementation of this method can change to looking for a marker interface.
*
* @param f
* @return
*/
private boolean isInternalImplementation(Object o) {
if (o == null) {
return true;
}
return (o.getClass().getPackage().getName().startsWith("rx."));
}

public static class UnitTest {

@Mock
Expand Down Expand Up @@ -3723,6 +3791,39 @@ public void onNext(String v) {
}
}

@Test
public void testForEachWithError() {
try {
Observable.create(new Func1<Observer<String>, Subscription>() {

@Override
public Subscription call(final Observer<String> observer) {
final BooleanSubscription subscription = new BooleanSubscription();
new Thread(new Runnable() {

@Override
public void run() {
observer.onNext("one");
observer.onNext("two");
observer.onNext("three");
observer.onCompleted();
}
}).start();
return subscription;
}
}).forEach(new Action1<String>() {

@Override
public void call(String t1) {
throw new RuntimeException("fail");
}
});
fail("we expect an exception to be thrown");
} catch (Exception e) {
// do nothing as we expect this
}
}

private static class TestException extends RuntimeException {
private static final long serialVersionUID = 1L;
}
Expand Down

0 comments on commit ac3cee8

Please sign in to comment.