Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error Handling: OnErrorNotImplemented and java.lang.Error #839

Merged
merged 1 commit into from
Feb 8, 2014
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
@@ -118,6 +118,7 @@
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
import rx.subscriptions.Subscriptions;
import rx.util.Exceptions;
import rx.util.OnErrorNotImplementedException;
import rx.util.Range;
import rx.util.TimeInterval;
@@ -6964,10 +6965,9 @@ public void call() {
}

});
} catch (OnErrorNotImplementedException e) {
// special handling when onError is not implemented ... we just rethrow
throw e;
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
observer.onError(hook.onSubscribeError(this, e));
13 changes: 10 additions & 3 deletions rxjava-core/src/main/java/rx/observers/SafeSubscriber.java
Original file line number Diff line number Diff line change
@@ -19,11 +19,9 @@
import java.util.concurrent.atomic.AtomicBoolean;

import rx.Subscriber;
import rx.Subscription;
import rx.operators.SafeObservableSubscription;
import rx.plugins.RxJavaPlugins;
import rx.subscriptions.Subscriptions;
import rx.util.CompositeException;
import rx.util.Exceptions;
import rx.util.OnErrorNotImplementedException;

/**
@@ -74,6 +72,9 @@ public void onCompleted() {
try {
actual.onCompleted();
} catch (Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwIfFatal(e);
// handle errors if the onCompleted implementation fails, not just if the Observable fails
_onError(e);
} finally {
@@ -85,6 +86,9 @@ public void onCompleted() {

@Override
public void onError(Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwIfFatal(e);
if (isFinished.compareAndSet(false, true)) {
_onError(e);
}
@@ -97,6 +101,9 @@ public void onNext(T args) {
actual.onNext(args);
}
} catch (Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwIfFatal(e);
// handle errors if the onNext implementation fails, not just if the Observable fails
onError(e);
}
58 changes: 4 additions & 54 deletions rxjava-core/src/main/java/rx/observers/SynchronizedObserver.java
Original file line number Diff line number Diff line change
@@ -16,18 +16,11 @@
package rx.observers;

import rx.Observer;
import rx.Subscriber;
import rx.operators.SafeObservableSubscription;

/**
* A thread-safe Observer for transitioning states in operators.
* Synchronize execution to be single-threaded.
* <p>
* Execution rules are:
* <ul>
* <li>Allow only single-threaded, synchronous, ordered execution of onNext, onCompleted, onError</li>
* <li>Once an onComplete or onError are performed, no further calls can be executed</li>
* <li>If unsubscribe is called, this means we call completed() and don't allow any further onNext calls.</li>
* </ul>
* This ONLY does synchronization. It does not involve itself in safety or subscriptions. See SafeSubscriber for that.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made this class only be responsible for synchronization, not contract enforcement.

It was colliding with error handling logic in SafeSubscriber and swallowing errors trying to be thrown.

*
* @param <T>
*/
@@ -48,76 +41,33 @@ public final class SynchronizedObserver<T> implements Observer<T> {
*/

private final Observer<? super T> observer;
private final SafeObservableSubscription subscription;
private volatile boolean finishRequested = false;
private volatile boolean finished = false;
private volatile Object lock;

public SynchronizedObserver(Observer<? super T> subscriber, SafeObservableSubscription subscription) {
public SynchronizedObserver(Observer<? super T> subscriber) {
this.observer = subscriber;
this.subscription = subscription;
this.lock = this;
}

public SynchronizedObserver(Observer<? super T> subscriber, SafeObservableSubscription subscription, Object lock) {
public SynchronizedObserver(Observer<? super T> subscriber, Object lock) {
this.observer = subscriber;
this.subscription = subscription;
this.lock = lock;
}

/**
* Used when synchronizing an Observer without access to the subscription.
*
* @param Observer
*/
public SynchronizedObserver(Observer<? super T> subscriber) {
this(subscriber, new SafeObservableSubscription());
}

public void onNext(T arg) {
if (finished || finishRequested || subscription.isUnsubscribed()) {
// if we're already stopped, or a finish request has been received, we won't allow further onNext requests
return;
}
synchronized (lock) {
// check again since this could have changed while waiting
if (finished || finishRequested || subscription.isUnsubscribed()) {
// if we're already stopped, or a finish request has been received, we won't allow further onNext requests
return;
}
observer.onNext(arg);
}
}

public void onError(Throwable e) {
if (finished || subscription.isUnsubscribed()) {
// another thread has already finished us, so we won't proceed
return;
}
finishRequested = true;
synchronized (lock) {
// check again since this could have changed while waiting
if (finished || subscription.isUnsubscribed()) {
return;
}
observer.onError(e);
finished = true;
}
}

public void onCompleted() {
if (finished || subscription.isUnsubscribed()) {
// another thread has already finished us, so we won't proceed
return;
}
finishRequested = true;
synchronized (lock) {
// check again since this could have changed while waiting
if (finished || subscription.isUnsubscribed()) {
return;
}
observer.onCompleted();
finished = true;
}
}
}
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@

import rx.Observer;
import rx.Subscriber;
import rx.operators.SafeObservableSubscription;

/**
* A thread-safe Observer for transitioning states in operators.
@@ -37,9 +36,7 @@ public final class SynchronizedSubscriber<T> extends Subscriber<T> {

public SynchronizedSubscriber(Subscriber<? super T> subscriber, Object lock) {
super(subscriber);
SafeObservableSubscription s = new SafeObservableSubscription();
subscriber.add(s);
this.observer = new SynchronizedObserver<T>(subscriber, s, lock);
this.observer = new SynchronizedObserver<T>(subscriber, lock);
}

/**
Original file line number Diff line number Diff line change
@@ -151,9 +151,7 @@ public Subscription onSubscribe(Observer<? super T> actualObserver) {
* <p>
* Bug report: https://github.com/Netflix/RxJava/issues/614
*/
SafeObservableSubscription subscription = new SafeObservableSubscription(ourSubscription);
completeSubscription.add(subscription);
SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(actualObserver, subscription);
SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(actualObserver);

/**
* Subscribe to the parent Observable to get to the children Observables
Original file line number Diff line number Diff line change
@@ -86,14 +86,13 @@ public Synchronize(Observable<? extends T> innerObservable, Object lock) {
private Object lock;

public Subscription onSubscribe(Observer<? super T> observer) {
SafeObservableSubscription subscription = new SafeObservableSubscription();
if (lock == null) {
atomicObserver = new SynchronizedObserver<T>(observer, subscription);
atomicObserver = new SynchronizedObserver<T>(observer);
}
else {
atomicObserver = new SynchronizedObserver<T>(observer, subscription, lock);
atomicObserver = new SynchronizedObserver<T>(observer, lock);
}
return subscription.wrap(innerObservable.subscribe(atomicObserver));
return innerObservable.subscribe(atomicObserver);
}

}
446 changes: 0 additions & 446 deletions rxjava-core/src/main/java/rx/operators/OperationZip.java

This file was deleted.

15 changes: 15 additions & 0 deletions rxjava-core/src/main/java/rx/util/Exceptions.java
Original file line number Diff line number Diff line change
@@ -38,4 +38,19 @@ public static RuntimeException propagate(Throwable t) {
}
}

public static void throwIfFatal(Throwable t) {
if (t instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) t;
}
// values here derived from https://github.com/Netflix/RxJava/issues/748#issuecomment-32471495
else if (t instanceof StackOverflowError) {
throw (StackOverflowError) t;
} else if (t instanceof VirtualMachineError) {
throw (VirtualMachineError) t;
} else if (t instanceof ThreadDeath) {
throw (ThreadDeath) t;
} else if (t instanceof LinkageError) {
throw (LinkageError) t;
}
}
}
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@
import rx.Subscriber;
import rx.Subscription;
import rx.operators.SafeObservableSubscription;
import rx.operators.SafeObserver;

public class SynchronizedObserverTest {

@@ -54,8 +55,7 @@ public void testSingleThreadedBasic() {
TestSingleThreadedObservable onSubscribe = new TestSingleThreadedObservable(s, "one", "two", "three");
Observable<String> w = Observable.create(onSubscribe);

SafeObservableSubscription as = new SafeObservableSubscription(s);
SynchronizedObserver<String> aw = new SynchronizedObserver<String>(observer, as);
SynchronizedObserver<String> aw = new SynchronizedObserver<String>(observer);

w.subscribe(aw);
onSubscribe.waitToFinish();
@@ -76,9 +76,8 @@ public void testMultiThreadedBasic() {
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three");
Observable<String> w = Observable.create(onSubscribe);

SafeObservableSubscription as = new SafeObservableSubscription(s);
BusyObserver busyObserver = new BusyObserver();
SynchronizedObserver<String> aw = new SynchronizedObserver<String>(busyObserver, as);
SynchronizedObserver<String> aw = new SynchronizedObserver<String>(busyObserver);

w.subscribe(aw);
onSubscribe.waitToFinish();
@@ -102,13 +101,12 @@ public void testMultiThreadedBasicWithLock() {
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three");
Observable<String> w = Observable.create(onSubscribe);

SafeObservableSubscription as = new SafeObservableSubscription(s);
BusyObserver busyObserver = new BusyObserver();

Object lock = new Object();
ExternalBusyThread externalBusyThread = new ExternalBusyThread(busyObserver, lock, 10, 100);

SynchronizedObserver<String> aw = new SynchronizedObserver<String>(busyObserver, as, lock);
SynchronizedObserver<String> aw = new SynchronizedObserver<String>(busyObserver, lock);

externalBusyThread.start();

@@ -142,9 +140,8 @@ public void testMultiThreadedWithNPE() {
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null);
Observable<String> w = Observable.create(onSubscribe);

SafeObservableSubscription as = new SafeObservableSubscription(s);
BusyObserver busyObserver = new BusyObserver();
SynchronizedObserver<String> aw = new SynchronizedObserver<String>(busyObserver, as);
SynchronizedObserver<String> aw = new SynchronizedObserver<String>(busyObserver);

w.subscribe(aw);
onSubscribe.waitToFinish();
@@ -174,13 +171,12 @@ public void testMultiThreadedWithNPEAndLock() {
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null);
Observable<String> w = Observable.create(onSubscribe);

SafeObservableSubscription as = new SafeObservableSubscription(s);
BusyObserver busyObserver = new BusyObserver();

Object lock = new Object();
ExternalBusyThread externalBusyThread = new ExternalBusyThread(busyObserver, lock, 10, 100);

SynchronizedObserver<String> aw = new SynchronizedObserver<String>(busyObserver, as, lock);
SynchronizedObserver<String> aw = new SynchronizedObserver<String>(busyObserver, lock);

externalBusyThread.start();

@@ -220,9 +216,8 @@ public void testMultiThreadedWithNPEinMiddle() {
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null, "four", "five", "six", "seven", "eight", "nine");
Observable<String> w = Observable.create(onSubscribe);

SafeObservableSubscription as = new SafeObservableSubscription(s);
BusyObserver busyObserver = new BusyObserver();
SynchronizedObserver<String> aw = new SynchronizedObserver<String>(busyObserver, as);
SynchronizedObserver<String> aw = new SynchronizedObserver<String>(busyObserver);

w.subscribe(aw);
onSubscribe.waitToFinish();
@@ -250,13 +245,12 @@ public void testMultiThreadedWithNPEinMiddleAndLock() {
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null, "four", "five", "six", "seven", "eight", "nine");
Observable<String> w = Observable.create(onSubscribe);

SafeObservableSubscription as = new SafeObservableSubscription(s);
BusyObserver busyObserver = new BusyObserver();

Object lock = new Object();
ExternalBusyThread externalBusyThread = new ExternalBusyThread(busyObserver, lock, 10, 100);

SynchronizedObserver<String> aw = new SynchronizedObserver<String>(busyObserver, as, lock);
SynchronizedObserver<String> aw = new SynchronizedObserver<String>(busyObserver, lock);

externalBusyThread.start();

@@ -300,8 +294,8 @@ public void runConcurrencyTest() {
ExecutorService tp = Executors.newFixedThreadPool(20);
try {
TestConcurrencyObserver tw = new TestConcurrencyObserver();
SafeObservableSubscription s = new SafeObservableSubscription();
SynchronizedObserver<String> w = new SynchronizedObserver<String>(tw, s);
// we need Synchronized + SafeSubscriber to handle synchronization plus life-cycle
SynchronizedObserver<String> w = new SynchronizedObserver<String>(new SafeSubscriber<String>(tw));

Future<?> f1 = tp.submit(new OnNextThread(w, 12000));
Future<?> f2 = tp.submit(new OnNextThread(w, 5000));
Original file line number Diff line number Diff line change
@@ -268,33 +268,6 @@ public void testMergeList() {
verify(stringObserver, times(2)).onNext("hello");
}

@Test
public void testUnSubscribe() {
TestObservable tA = new TestObservable();
TestObservable tB = new TestObservable();

@SuppressWarnings("unchecked")
Observable<String> m = Observable.create(mergeDelayError(Observable.create(tA), Observable.create(tB)));
Subscription s = m.subscribe(stringObserver);

tA.sendOnNext("Aone");
tB.sendOnNext("Bone");
s.unsubscribe();
tA.sendOnNext("Atwo");
tB.sendOnNext("Btwo");
tA.sendOnCompleted();
tB.sendOnCompleted();

verify(stringObserver, never()).onError(any(Throwable.class));
verify(stringObserver, times(1)).onNext("Aone");
verify(stringObserver, times(1)).onNext("Bone");
assertTrue(tA.unsubscribed);
assertTrue(tB.unsubscribed);
verify(stringObserver, never()).onNext("Atwo");
verify(stringObserver, never()).onNext("Btwo");
verify(stringObserver, never()).onCompleted();
}

@Test
public void testMergeArrayWithThreading() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this because it was relying on SynchronizedObserver behavior related to unsubscribe. This operator should not concern itself with unsubscribe from what I can tell. The SafeSubscriber on a full chain will handle this.

final TestASynchronousObservable o1 = new TestASynchronousObservable();
49 changes: 49 additions & 0 deletions rxjava-core/src/test/java/rx/operators/OperatorMapTest.java
Original file line number Diff line number Diff line change
@@ -28,7 +28,9 @@

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import rx.util.functions.Action1;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

@@ -246,6 +248,53 @@ public Integer call(Integer i) {
}).toBlockingObservable().single();
}

@Test(expected = RuntimeException.class)
public void verifyExceptionIsThrownIfThereIsNoExceptionHandler() {

Observable.OnSubscribe<Object> creator = new Observable.OnSubscribe<Object>() {

@Override
public void call(Subscriber<? super Object> observer) {
observer.onNext("a");
observer.onNext("b");
observer.onNext("c");
observer.onCompleted();
}
};

Func1<Object, Observable<Object>> manyMapper = new Func1<Object, Observable<Object>>() {

@Override
public Observable<Object> call(Object object) {

return Observable.from(object);
}
};

Func1<Object, Object> mapper = new Func1<Object, Object>() {
private int count = 0;

@Override
public Object call(Object object) {
++count;
if (count > 2) {
throw new RuntimeException();
}
return object;
}
};

Action1<Object> onNext = new Action1<Object>() {

@Override
public void call(Object object) {
System.out.println(object.toString());
}
};

Observable.create(creator).flatMap(manyMapper).map(mapper).subscribe(onNext);
}

private static Map<String, String> getMap(String prefix) {
Map<String, String> m = new HashMap<String, String>();
m.put("firstName", prefix + "First");
Original file line number Diff line number Diff line change
@@ -17,92 +17,30 @@

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static rx.operators.OperationSynchronize.*;

import org.junit.Test;
import org.mockito.Mockito;

import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.observers.SafeSubscriber;
import rx.observers.TestSubscriber;

public class OperationSynchronizeTest {

/**
* Ensure onCompleted can not be called after an Unsubscribe
*/
@Test
public void testOnCompletedAfterUnSubscribe() {
TestObservable t = new TestObservable(null);
Observable<String> st = Observable.create(synchronize(Observable.create(t)));

@SuppressWarnings("unchecked")
Observer<String> w = mock(Observer.class);
Subscription ws = st.subscribe(w);

System.out.println("ws: " + ws);

t.sendOnNext("one");
ws.unsubscribe();
System.out.println("send onCompleted");
t.sendOnCompleted();

verify(w, times(1)).onNext("one");
verify(w, Mockito.never()).onCompleted();
}

/**
* Ensure onNext can not be called after an Unsubscribe
*/
@Test
public void testOnNextAfterUnSubscribe() {
TestObservable t = new TestObservable(null);
Observable<String> st = Observable.create(synchronize(Observable.create(t)));

@SuppressWarnings("unchecked")
Observer<String> w = mock(Observer.class);
Subscription ws = st.subscribe(w);

t.sendOnNext("one");
ws.unsubscribe();
t.sendOnNext("two");

verify(w, times(1)).onNext("one");
verify(w, Mockito.never()).onNext("two");
}

/**
* Ensure onError can not be called after an Unsubscribe
*/
@Test
public void testOnErrorAfterUnSubscribe() {
TestObservable t = new TestObservable(null);
Observable<String> st = Observable.create(synchronize(Observable.create(t)));

@SuppressWarnings("unchecked")
Observer<String> w = mock(Observer.class);
Subscription ws = st.subscribe(w);

t.sendOnNext("one");
ws.unsubscribe();
t.sendOnError(new RuntimeException("bad"));

verify(w, times(1)).onNext("one");
verify(w, Mockito.never()).onError(any(Throwable.class));
}
public class SafeSubscriberTest {

/**
* Ensure onNext can not be called after onError
*/
@Test
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these are invalid for SafeSubscriber. If a Subscription doesn't succeed it is best-effort and SafeSubscriber needs to accept terminal states if they come in before an unsubscribe takes effect.

public void testOnNextAfterOnError() {
TestObservable t = new TestObservable(null);
Observable<String> st = Observable.create(synchronize(Observable.create(t)));
Observable<String> st = Observable.create(t);

@SuppressWarnings("unchecked")
Observer<String> w = mock(Observer.class);
@SuppressWarnings("unused")
Subscription ws = st.subscribe(w);
Subscription ws = st.subscribe(new SafeSubscriber<String>(new TestSubscriber<String>(w)));

t.sendOnNext("one");
t.sendOnError(new RuntimeException("bad"));
@@ -119,12 +57,12 @@ public void testOnNextAfterOnError() {
@Test
public void testOnCompletedAfterOnError() {
TestObservable t = new TestObservable(null);
Observable<String> st = Observable.create(synchronize(Observable.create(t)));
Observable<String> st = Observable.create(t);

@SuppressWarnings("unchecked")
Observer<String> w = mock(Observer.class);
@SuppressWarnings("unused")
Subscription ws = st.subscribe(w);
Subscription ws = st.subscribe(new SafeSubscriber<String>(new TestSubscriber<String>(w)));

t.sendOnNext("one");
t.sendOnError(new RuntimeException("bad"));
@@ -141,12 +79,12 @@ public void testOnCompletedAfterOnError() {
@Test
public void testOnNextAfterOnCompleted() {
TestObservable t = new TestObservable(null);
Observable<String> st = Observable.create(synchronize(Observable.create(t)));
Observable<String> st = Observable.create(t);

@SuppressWarnings("unchecked")
Observer<String> w = mock(Observer.class);
@SuppressWarnings("unused")
Subscription ws = st.subscribe(w);
Subscription ws = st.subscribe(new SafeSubscriber<String>(new TestSubscriber<String>(w)));

t.sendOnNext("one");
t.sendOnCompleted();
@@ -164,12 +102,12 @@ public void testOnNextAfterOnCompleted() {
@Test
public void testOnErrorAfterOnCompleted() {
TestObservable t = new TestObservable(null);
Observable<String> st = Observable.create(synchronize(Observable.create(t)));
Observable<String> st = Observable.create(t);

@SuppressWarnings("unchecked")
Observer<String> w = mock(Observer.class);
@SuppressWarnings("unused")
Subscription ws = st.subscribe(w);
Subscription ws = st.subscribe(new SafeSubscriber<String>(new TestSubscriber<String>(w)));

t.sendOnNext("one");
t.sendOnCompleted();
12 changes: 6 additions & 6 deletions rxjava-core/src/test/java/rx/util/AssertObservable.java
Original file line number Diff line number Diff line change
@@ -100,7 +100,7 @@ public Notification<String> call(Notification<T> expectedNotfication, Notificati
message.append(" ").append(expectedNotfication.getValue());
if (expectedNotfication.hasThrowable())
message.append(" ").append(expectedNotfication.getThrowable());
return new Notification<String>("equals " + message.toString());
return Notification.createOnNext("equals " + message.toString());
}
else {
StringBuilder error = new StringBuilder();
@@ -116,7 +116,7 @@ public Notification<String> call(Notification<T> expectedNotfication, Notificati
error.append(" ").append(actualNotification.getThrowable());
error.append(">");

return new Notification<String>(new AssertionError(error.toString()));
return Notification.createOnError(new AssertionError(error.toString()));
}
}
};
@@ -131,9 +131,9 @@ public Notification<String> call(Notification<String> a, Notification<String> b)
fail |= b.isOnError();

if (fail)
return new Notification<String>(new AssertionError(message));
return Notification.createOnError(new AssertionError(message));
else
return new Notification<String>(message);
return Notification.createOnNext(message);
}
};

@@ -142,9 +142,9 @@ public Notification<String> call(Notification<String> a, Notification<String> b)
public Notification<Void> call(Notification<String> outcome) {
if (outcome.isOnError()) {
String fullMessage = (message != null ? message + ": " : "") + "Observables are different\n\t" + outcome.getThrowable().getMessage();
return new Notification<Void>(new AssertionError(fullMessage));
return Notification.createOnError(new AssertionError(fullMessage));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed these while debugging why the AssertionError wasn't propagating ... it was because I was swallowing all java.lang.Error for a while in the throwIfFatal method before making it only throw on certain Error types.

}
return new Notification<Void>();
return Notification.createOnCompleted();
}
}).dematerialize();
return outcomeObservable;
137 changes: 137 additions & 0 deletions rxjava-core/src/test/java/rx/util/ExceptionsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.util;

import org.junit.Test;

import rx.Observable;
import rx.Observer;
import rx.subjects.PublishSubject;
import rx.util.functions.Action1;

public class ExceptionsTest {

@Test(expected = OnErrorNotImplementedException.class)
public void testOnErrorNotImplementedIsThrown() {
Observable.from(1, 2, 3).subscribe(new Action1<Integer>() {

@Override
public void call(Integer t1) {
throw new RuntimeException("hello");
}

});
}

@Test(expected = StackOverflowError.class)
public void testStackOverflowIsThrown() {
final PublishSubject<Integer> a = PublishSubject.create();
final PublishSubject<Integer> b = PublishSubject.create();
new Observer<Integer>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
e.printStackTrace();
}

@Override
public void onNext(Integer args) {
System.out.println(args);
}
};
a.subscribe(new Observer<Integer>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
e.printStackTrace();
}

@Override
public void onNext(Integer args) {
System.out.println(args);
}
});
b.subscribe();
a.subscribe(new Observer<Integer>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
e.printStackTrace();
}

@Override
public void onNext(Integer args) {
b.onNext(args + 1);
}
});
b.subscribe(new Observer<Integer>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
e.printStackTrace();
}

@Override
public void onNext(Integer args) {
a.onNext(args + 1);
}
});
a.onNext(1);
}

@Test(expected = ThreadDeath.class)
public void testThreadDeathIsThrown() {
Observable.from(1).subscribe(new Observer<Integer>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
e.printStackTrace();
}

@Override
public void onNext(Integer t) {
throw new ThreadDeath();
}

});
}

}