diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 542bedbff2..3d672326e5 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -118,6 +118,7 @@ import rx.subjects.ReplaySubject; import rx.subjects.Subject; import rx.subscriptions.Subscriptions; +import rx.util.Exceptions; import rx.util.OnErrorNotImplementedException; import rx.util.Range; import rx.util.TimeInterval; @@ -6964,10 +6965,9 @@ public void call() { } }); - } catch (OnErrorNotImplementedException e) { - // special handling when onError is not implemented ... we just rethrow - throw e; } catch (Throwable e) { + // special handling for certain Throwable/Error/Exception types + Exceptions.throwIfFatal(e); // if an unhandled error occurs executing the onSubscribe we will propagate it try { observer.onError(hook.onSubscribeError(this, e)); diff --git a/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java b/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java index a7ce075ef5..d757d85251 100644 --- a/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java +++ b/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java @@ -19,11 +19,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import rx.Subscriber; -import rx.Subscription; -import rx.operators.SafeObservableSubscription; import rx.plugins.RxJavaPlugins; -import rx.subscriptions.Subscriptions; import rx.util.CompositeException; +import rx.util.Exceptions; import rx.util.OnErrorNotImplementedException; /** @@ -74,6 +72,9 @@ public void onCompleted() { try { actual.onCompleted(); } catch (Throwable e) { + // we handle here instead of another method so we don't add stacks to the frame + // which can prevent it from being able to handle StackOverflow + Exceptions.throwIfFatal(e); // handle errors if the onCompleted implementation fails, not just if the Observable fails _onError(e); } finally { @@ -85,6 +86,9 @@ public void onCompleted() { @Override public void onError(Throwable e) { + // we handle here instead of another method so we don't add stacks to the frame + // which can prevent it from being able to handle StackOverflow + Exceptions.throwIfFatal(e); if (isFinished.compareAndSet(false, true)) { _onError(e); } @@ -97,6 +101,9 @@ public void onNext(T args) { actual.onNext(args); } } catch (Throwable e) { + // we handle here instead of another method so we don't add stacks to the frame + // which can prevent it from being able to handle StackOverflow + Exceptions.throwIfFatal(e); // handle errors if the onNext implementation fails, not just if the Observable fails onError(e); } diff --git a/rxjava-core/src/main/java/rx/observers/SynchronizedObserver.java b/rxjava-core/src/main/java/rx/observers/SynchronizedObserver.java index ad18e6a2eb..eedecd1368 100644 --- a/rxjava-core/src/main/java/rx/observers/SynchronizedObserver.java +++ b/rxjava-core/src/main/java/rx/observers/SynchronizedObserver.java @@ -16,18 +16,11 @@ package rx.observers; import rx.Observer; -import rx.Subscriber; -import rx.operators.SafeObservableSubscription; /** - * A thread-safe Observer for transitioning states in operators. + * Synchronize execution to be single-threaded. *

- * Execution rules are: - *

+ * This ONLY does synchronization. It does not involve itself in safety or subscriptions. See SafeSubscriber for that. * * @param */ @@ -48,76 +41,33 @@ public final class SynchronizedObserver implements Observer { */ private final Observer observer; - private final SafeObservableSubscription subscription; - private volatile boolean finishRequested = false; - private volatile boolean finished = false; private volatile Object lock; - public SynchronizedObserver(Observer subscriber, SafeObservableSubscription subscription) { + public SynchronizedObserver(Observer subscriber) { this.observer = subscriber; - this.subscription = subscription; this.lock = this; } - public SynchronizedObserver(Observer subscriber, SafeObservableSubscription subscription, Object lock) { + public SynchronizedObserver(Observer subscriber, Object lock) { this.observer = subscriber; - this.subscription = subscription; this.lock = lock; } - /** - * Used when synchronizing an Observer without access to the subscription. - * - * @param Observer - */ - public SynchronizedObserver(Observer subscriber) { - this(subscriber, new SafeObservableSubscription()); - } - public void onNext(T arg) { - if (finished || finishRequested || subscription.isUnsubscribed()) { - // if we're already stopped, or a finish request has been received, we won't allow further onNext requests - return; - } synchronized (lock) { - // check again since this could have changed while waiting - if (finished || finishRequested || subscription.isUnsubscribed()) { - // if we're already stopped, or a finish request has been received, we won't allow further onNext requests - return; - } observer.onNext(arg); } } public void onError(Throwable e) { - if (finished || subscription.isUnsubscribed()) { - // another thread has already finished us, so we won't proceed - return; - } - finishRequested = true; synchronized (lock) { - // check again since this could have changed while waiting - if (finished || subscription.isUnsubscribed()) { - return; - } observer.onError(e); - finished = true; } } public void onCompleted() { - if (finished || subscription.isUnsubscribed()) { - // another thread has already finished us, so we won't proceed - return; - } - finishRequested = true; synchronized (lock) { - // check again since this could have changed while waiting - if (finished || subscription.isUnsubscribed()) { - return; - } observer.onCompleted(); - finished = true; } } } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/observers/SynchronizedSubscriber.java b/rxjava-core/src/main/java/rx/observers/SynchronizedSubscriber.java index 54eef0f8cc..8c01f506e1 100644 --- a/rxjava-core/src/main/java/rx/observers/SynchronizedSubscriber.java +++ b/rxjava-core/src/main/java/rx/observers/SynchronizedSubscriber.java @@ -17,7 +17,6 @@ import rx.Observer; import rx.Subscriber; -import rx.operators.SafeObservableSubscription; /** * A thread-safe Observer for transitioning states in operators. @@ -37,9 +36,7 @@ public final class SynchronizedSubscriber extends Subscriber { public SynchronizedSubscriber(Subscriber subscriber, Object lock) { super(subscriber); - SafeObservableSubscription s = new SafeObservableSubscription(); - subscriber.add(s); - this.observer = new SynchronizedObserver(subscriber, s, lock); + this.observer = new SynchronizedObserver(subscriber, lock); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java b/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java index d36225a5ed..7a31593d99 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java @@ -151,9 +151,7 @@ public Subscription onSubscribe(Observer actualObserver) { *

* Bug report: https://github.com/Netflix/RxJava/issues/614 */ - SafeObservableSubscription subscription = new SafeObservableSubscription(ourSubscription); - completeSubscription.add(subscription); - SynchronizedObserver synchronizedObserver = new SynchronizedObserver(actualObserver, subscription); + SynchronizedObserver synchronizedObserver = new SynchronizedObserver(actualObserver); /** * Subscribe to the parent Observable to get to the children Observables diff --git a/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java b/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java index a13fbab9cb..e94411b313 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java @@ -86,14 +86,13 @@ public Synchronize(Observable innerObservable, Object lock) { private Object lock; public Subscription onSubscribe(Observer observer) { - SafeObservableSubscription subscription = new SafeObservableSubscription(); if (lock == null) { - atomicObserver = new SynchronizedObserver(observer, subscription); + atomicObserver = new SynchronizedObserver(observer); } else { - atomicObserver = new SynchronizedObserver(observer, subscription, lock); + atomicObserver = new SynchronizedObserver(observer, lock); } - return subscription.wrap(innerObservable.subscribe(atomicObserver)); + return innerObservable.subscribe(atomicObserver); } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationZip.java b/rxjava-core/src/main/java/rx/operators/OperationZip.java deleted file mode 100644 index 7f6affae4f..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationZip.java +++ /dev/null @@ -1,446 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Subscription; -import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.SerialSubscription; -import rx.subscriptions.Subscriptions; -import rx.util.functions.Func2; -import rx.util.functions.Func3; -import rx.util.functions.Func4; -import rx.util.functions.Func5; -import rx.util.functions.Func6; -import rx.util.functions.Func7; -import rx.util.functions.Func8; -import rx.util.functions.Func9; -import rx.util.functions.FuncN; -import rx.util.functions.Functions; - -/** - * Returns an Observable that emits the results of a function applied to sets of items emitted, in - * sequence, by two or more other Observables. - *

- * - *

- * The zip operation applies this function in strict sequence, so the first item emitted by the new - * Observable will be the result of the function applied to the first item emitted by each zipped - * Observable; the second item emitted by the new Observable will be the result of the function - * applied to the second item emitted by each zipped Observable; and so forth. - *

- * The resulting Observable returned from zip will invoke onNext as many times as the - * number of onNext invocations of the source Observable that emits the fewest items. - */ -public final class OperationZip { - - @SuppressWarnings("unchecked") - public static OnSubscribeFunc zip(Observable o1, Observable o2, final Func2 zipFunction) { - return zip(Arrays.asList(o1, o2), Functions.fromFunc(zipFunction)); - } - - @SuppressWarnings("unchecked") - public static OnSubscribeFunc zip(Observable o1, Observable o2, Observable o3, final Func3 zipFunction) { - return zip(Arrays.asList(o1, o2, o3), Functions.fromFunc(zipFunction)); - } - - @SuppressWarnings("unchecked") - public static OnSubscribeFunc zip(Observable o1, Observable o2, Observable o3, Observable o4, final Func4 zipFunction) { - return zip(Arrays.asList(o1, o2, o3, o4), Functions.fromFunc(zipFunction)); - } - - @SuppressWarnings("unchecked") - public static OnSubscribeFunc zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, final Func5 zipFunction) { - return zip(Arrays.asList(o1, o2, o3, o4, o5), Functions.fromFunc(zipFunction)); - } - - @SuppressWarnings("unchecked") - public static OnSubscribeFunc zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, - final Func6 zipFunction) { - return zip(Arrays.asList(o1, o2, o3, o4, o5, o6), Functions.fromFunc(zipFunction)); - } - - @SuppressWarnings("unchecked") - public static OnSubscribeFunc zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, - final Func7 zipFunction) { - return zip(Arrays.asList(o1, o2, o3, o4, o5, o6, o7), Functions.fromFunc(zipFunction)); - } - - @SuppressWarnings("unchecked") - public static OnSubscribeFunc zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8, - final Func8 zipFunction) { - return zip(Arrays.asList(o1, o2, o3, o4, o5, o6, o7, o8), Functions.fromFunc(zipFunction)); - } - - @SuppressWarnings("unchecked") - public static OnSubscribeFunc zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8, - Observable o9, final Func9 zipFunction) { - return zip(Arrays.asList(o1, o2, o3, o4, o5, o6, o7, o8, o9), Functions.fromFunc(zipFunction)); - } - - public static OnSubscribeFunc zip(Iterable> ws, final FuncN zipFunction) { - return new ManyObservables(ws, zipFunction); - } - - /** - * Merges the values across multiple sources and applies the selector - * function. - *

The resulting sequence terminates if no more pairs can be - * established, i.e., streams of length 1 and 2 zipped will produce - * only 1 item.

- *

Exception semantics: errors from the source observable are - * propagated as-is.

- * - * @param - * the common element type - * @param - * the result element type - */ - private static final class ManyObservables implements OnSubscribeFunc { - /** */ - protected final Iterable> sources; - /** */ - protected final FuncN selector; - - /** - * Constructor. - * - * @param sources - * the sources - * @param selector - * the result selector - */ - public ManyObservables( - Iterable> sources, - FuncN selector) { - this.sources = sources; - this.selector = selector; - } - - @Override - public Subscription onSubscribe(final Observer observer) { - - final CompositeSubscription composite = new CompositeSubscription(); - - final ReadWriteLock rwLock = new ReentrantReadWriteLock(true); - - final List> all = new ArrayList>(); - - Observer> o2 = new Observer>() { - boolean done; - @Override - public void onCompleted() { - if (!done) { - done = true; - observer.onCompleted(); - } - } - - @Override - public void onError(Throwable t) { - if (!done) { - done = true; - observer.onError(t); - } - } - - @Override - public void onNext(List value) { - if (!done) { - observer.onNext(selector.call(value.toArray(new Object[value.size()]))); - } - } - }; - - for (Observable o : sources) { - - ItemObserver io = new ItemObserver( - rwLock, all, o, o2, composite); - composite.add(io); - all.add(io); - } - - for (ItemObserver io : all) { - io.connect(); - } - - return composite; - } - - /** - * The individual line's observer. - * - * @author akarnokd, 2013.01.14. - * @param - * the element type - */ - private static final class ItemObserver implements Observer, Subscription { - /** Reader-writer lock. */ - protected final ReadWriteLock rwLock; - /** The queue. */ - public final Queue queue = new LinkedList(); - /** The list of the other observers. */ - public final List> all; - /** The global cancel. */ - protected final Subscription cancel; - /** The subscription to the source. */ - protected final SerialSubscription toSource = new SerialSubscription(); - /** Indicate completion of this stream. */ - protected boolean done; - /** The source. */ - protected final Observable source; - /** The observer. */ - protected final Observer> observer; - - /** - * Constructor. - * - * @param rwLock - * the reader-writer lock to use - * @param all - * all observers - * @param source - * the source sequence - * @param observer - * the output observer - * @param cancel - * the cancellation handler - */ - public ItemObserver( - ReadWriteLock rwLock, - List> all, - Observable source, - Observer> observer, - Subscription cancel) { - this.rwLock = rwLock; - this.all = all; - this.source = source; - this.observer = observer; - this.cancel = cancel; - } - - @Override - public void onNext(T value) { - rwLock.readLock().lock(); - try { - if (done) { - return; - } - queue.add(value); - } finally { - rwLock.readLock().unlock(); - } - runCollector(); - } - - @Override - public void onError(Throwable ex) { - rwLock.writeLock().lock(); - try { - if (done) { - return; - } - done = true; - observer.onError(ex); - } finally { - rwLock.writeLock().unlock(); - } - cancel.unsubscribe(); - unsubscribe(); - } - - @Override - public void onCompleted() { - rwLock.readLock().lock(); - try { - done = true; - } finally { - rwLock.readLock().unlock(); - } - runCollector(); - unsubscribe(); - } - - /** Connect to the source observable. */ - public void connect() { - toSource.set(source.subscribe(this)); - } - - @Override - public void unsubscribe() { - toSource.unsubscribe(); - } - - @Override - public boolean isUnsubscribed() { - return toSource.isUnsubscribed(); - } - - private void runCollector() { - if (rwLock.writeLock().tryLock()) { - boolean cu = false; - try { - while (true) { - List values = new ArrayList(all.size()); - for (ItemObserver io : all) { - if (io.queue.isEmpty()) { - if (io.done) { - observer.onCompleted(); - cu = true; - return; - } - } else { - T value = io.queue.peek(); - values.add(value); - } - } - if (values.size() == all.size()) { - for (ItemObserver io : all) { - io.queue.poll(); - } - observer.onNext(values); - } else { - break; - } - } - } finally { - rwLock.writeLock().unlock(); - if (cu) { - cancel.unsubscribe(); - } - } - } - } - - } - } - - /** - * Zips an Observable and an iterable sequence and applies - * a function to the pair of values. - */ - public static OnSubscribeFunc zipIterable(Observable source, Iterable other, Func2 zipFunction) { - return new ZipIterable(source, other, zipFunction); - } - - /** - * Zips an Observable and an iterable sequence and applies - * a function to the pair of values. - */ - private static final class ZipIterable implements OnSubscribeFunc { - final Observable source; - final Iterable other; - final Func2 zipFunction; - - public ZipIterable(Observable source, Iterable other, Func2 zipFunction) { - this.source = source; - this.other = other; - this.zipFunction = zipFunction; - } - - @Override - public Subscription onSubscribe(Observer t1) { - - Iterator it; - boolean first; - try { - it = other.iterator(); - first = it.hasNext(); - } catch (Throwable t) { - t1.onError(t); - return Subscriptions.empty(); - } - - if (!first) { - t1.onCompleted(); - return Subscriptions.empty(); - } - - SerialSubscription ssub = new SerialSubscription(); - - ssub.set(source.subscribe(new SourceObserver(t1, it, zipFunction, ssub))); - - return ssub; - } - - /** Observe the source. */ - private static final class SourceObserver implements Observer { - final Observer observer; - final Iterator other; - final Func2 zipFunction; - final Subscription cancel; - - public SourceObserver(Observer observer, Iterator other, - Func2 zipFunction, Subscription cancel) { - this.observer = observer; - this.other = other; - this.zipFunction = zipFunction; - this.cancel = cancel; - } - - @Override - public void onNext(T args) { - U u = other.next(); - - R r; - try { - r = zipFunction.call(args, u); - } catch (Throwable t) { - onError(t); - return; - } - - observer.onNext(r); - - boolean has; - try { - has = other.hasNext(); - } catch (Throwable t) { - onError(t); - return; - } - - if (!has) { - onCompleted(); - } - } - - @Override - public void onError(Throwable e) { - observer.onError(e); - cancel.unsubscribe(); - } - - @Override - public void onCompleted() { - observer.onCompleted(); - cancel.unsubscribe(); - } - - } - } -} diff --git a/rxjava-core/src/main/java/rx/util/Exceptions.java b/rxjava-core/src/main/java/rx/util/Exceptions.java index 537af0872f..c59f36129b 100644 --- a/rxjava-core/src/main/java/rx/util/Exceptions.java +++ b/rxjava-core/src/main/java/rx/util/Exceptions.java @@ -38,4 +38,19 @@ public static RuntimeException propagate(Throwable t) { } } + public static void throwIfFatal(Throwable t) { + if (t instanceof OnErrorNotImplementedException) { + throw (OnErrorNotImplementedException) t; + } + // values here derived from https://github.com/Netflix/RxJava/issues/748#issuecomment-32471495 + else if (t instanceof StackOverflowError) { + throw (StackOverflowError) t; + } else if (t instanceof VirtualMachineError) { + throw (VirtualMachineError) t; + } else if (t instanceof ThreadDeath) { + throw (ThreadDeath) t; + } else if (t instanceof LinkageError) { + throw (LinkageError) t; + } + } } \ No newline at end of file diff --git a/rxjava-core/src/test/java/rx/observers/SynchronizedObserverTest.java b/rxjava-core/src/test/java/rx/observers/SynchronizedObserverTest.java index b1bc776ce2..835d09a34d 100644 --- a/rxjava-core/src/test/java/rx/observers/SynchronizedObserverTest.java +++ b/rxjava-core/src/test/java/rx/observers/SynchronizedObserverTest.java @@ -37,6 +37,7 @@ import rx.Subscriber; import rx.Subscription; import rx.operators.SafeObservableSubscription; +import rx.operators.SafeObserver; public class SynchronizedObserverTest { @@ -54,8 +55,7 @@ public void testSingleThreadedBasic() { TestSingleThreadedObservable onSubscribe = new TestSingleThreadedObservable(s, "one", "two", "three"); Observable w = Observable.create(onSubscribe); - SafeObservableSubscription as = new SafeObservableSubscription(s); - SynchronizedObserver aw = new SynchronizedObserver(observer, as); + SynchronizedObserver aw = new SynchronizedObserver(observer); w.subscribe(aw); onSubscribe.waitToFinish(); @@ -76,9 +76,8 @@ public void testMultiThreadedBasic() { TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three"); Observable w = Observable.create(onSubscribe); - SafeObservableSubscription as = new SafeObservableSubscription(s); BusyObserver busyObserver = new BusyObserver(); - SynchronizedObserver aw = new SynchronizedObserver(busyObserver, as); + SynchronizedObserver aw = new SynchronizedObserver(busyObserver); w.subscribe(aw); onSubscribe.waitToFinish(); @@ -102,13 +101,12 @@ public void testMultiThreadedBasicWithLock() { TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three"); Observable w = Observable.create(onSubscribe); - SafeObservableSubscription as = new SafeObservableSubscription(s); BusyObserver busyObserver = new BusyObserver(); Object lock = new Object(); ExternalBusyThread externalBusyThread = new ExternalBusyThread(busyObserver, lock, 10, 100); - SynchronizedObserver aw = new SynchronizedObserver(busyObserver, as, lock); + SynchronizedObserver aw = new SynchronizedObserver(busyObserver, lock); externalBusyThread.start(); @@ -142,9 +140,8 @@ public void testMultiThreadedWithNPE() { TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null); Observable w = Observable.create(onSubscribe); - SafeObservableSubscription as = new SafeObservableSubscription(s); BusyObserver busyObserver = new BusyObserver(); - SynchronizedObserver aw = new SynchronizedObserver(busyObserver, as); + SynchronizedObserver aw = new SynchronizedObserver(busyObserver); w.subscribe(aw); onSubscribe.waitToFinish(); @@ -174,13 +171,12 @@ public void testMultiThreadedWithNPEAndLock() { TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null); Observable w = Observable.create(onSubscribe); - SafeObservableSubscription as = new SafeObservableSubscription(s); BusyObserver busyObserver = new BusyObserver(); Object lock = new Object(); ExternalBusyThread externalBusyThread = new ExternalBusyThread(busyObserver, lock, 10, 100); - SynchronizedObserver aw = new SynchronizedObserver(busyObserver, as, lock); + SynchronizedObserver aw = new SynchronizedObserver(busyObserver, lock); externalBusyThread.start(); @@ -220,9 +216,8 @@ public void testMultiThreadedWithNPEinMiddle() { TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null, "four", "five", "six", "seven", "eight", "nine"); Observable w = Observable.create(onSubscribe); - SafeObservableSubscription as = new SafeObservableSubscription(s); BusyObserver busyObserver = new BusyObserver(); - SynchronizedObserver aw = new SynchronizedObserver(busyObserver, as); + SynchronizedObserver aw = new SynchronizedObserver(busyObserver); w.subscribe(aw); onSubscribe.waitToFinish(); @@ -250,13 +245,12 @@ public void testMultiThreadedWithNPEinMiddleAndLock() { TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null, "four", "five", "six", "seven", "eight", "nine"); Observable w = Observable.create(onSubscribe); - SafeObservableSubscription as = new SafeObservableSubscription(s); BusyObserver busyObserver = new BusyObserver(); Object lock = new Object(); ExternalBusyThread externalBusyThread = new ExternalBusyThread(busyObserver, lock, 10, 100); - SynchronizedObserver aw = new SynchronizedObserver(busyObserver, as, lock); + SynchronizedObserver aw = new SynchronizedObserver(busyObserver, lock); externalBusyThread.start(); @@ -300,8 +294,8 @@ public void runConcurrencyTest() { ExecutorService tp = Executors.newFixedThreadPool(20); try { TestConcurrencyObserver tw = new TestConcurrencyObserver(); - SafeObservableSubscription s = new SafeObservableSubscription(); - SynchronizedObserver w = new SynchronizedObserver(tw, s); + // we need Synchronized + SafeSubscriber to handle synchronization plus life-cycle + SynchronizedObserver w = new SynchronizedObserver(new SafeSubscriber(tw)); Future f1 = tp.submit(new OnNextThread(w, 12000)); Future f2 = tp.submit(new OnNextThread(w, 5000)); diff --git a/rxjava-core/src/test/java/rx/operators/OperationMergeDelayErrorTest.java b/rxjava-core/src/test/java/rx/operators/OperationMergeDelayErrorTest.java index 6a6908698a..60a05fd935 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationMergeDelayErrorTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationMergeDelayErrorTest.java @@ -268,33 +268,6 @@ public void testMergeList() { verify(stringObserver, times(2)).onNext("hello"); } - @Test - public void testUnSubscribe() { - TestObservable tA = new TestObservable(); - TestObservable tB = new TestObservable(); - - @SuppressWarnings("unchecked") - Observable m = Observable.create(mergeDelayError(Observable.create(tA), Observable.create(tB))); - Subscription s = m.subscribe(stringObserver); - - tA.sendOnNext("Aone"); - tB.sendOnNext("Bone"); - s.unsubscribe(); - tA.sendOnNext("Atwo"); - tB.sendOnNext("Btwo"); - tA.sendOnCompleted(); - tB.sendOnCompleted(); - - verify(stringObserver, never()).onError(any(Throwable.class)); - verify(stringObserver, times(1)).onNext("Aone"); - verify(stringObserver, times(1)).onNext("Bone"); - assertTrue(tA.unsubscribed); - assertTrue(tB.unsubscribed); - verify(stringObserver, never()).onNext("Atwo"); - verify(stringObserver, never()).onNext("Btwo"); - verify(stringObserver, never()).onCompleted(); - } - @Test public void testMergeArrayWithThreading() { final TestASynchronousObservable o1 = new TestASynchronousObservable(); diff --git a/rxjava-core/src/test/java/rx/operators/OperatorMapTest.java b/rxjava-core/src/test/java/rx/operators/OperatorMapTest.java index 652ff720ac..17296ac0d3 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorMapTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorMapTest.java @@ -28,7 +28,9 @@ import rx.Observable; import rx.Observer; +import rx.Subscriber; import rx.schedulers.Schedulers; +import rx.util.functions.Action1; import rx.util.functions.Func1; import rx.util.functions.Func2; @@ -246,6 +248,53 @@ public Integer call(Integer i) { }).toBlockingObservable().single(); } + @Test(expected = RuntimeException.class) + public void verifyExceptionIsThrownIfThereIsNoExceptionHandler() { + + Observable.OnSubscribe creator = new Observable.OnSubscribe() { + + @Override + public void call(Subscriber observer) { + observer.onNext("a"); + observer.onNext("b"); + observer.onNext("c"); + observer.onCompleted(); + } + }; + + Func1> manyMapper = new Func1>() { + + @Override + public Observable call(Object object) { + + return Observable.from(object); + } + }; + + Func1 mapper = new Func1() { + private int count = 0; + + @Override + public Object call(Object object) { + ++count; + if (count > 2) { + throw new RuntimeException(); + } + return object; + } + }; + + Action1 onNext = new Action1() { + + @Override + public void call(Object object) { + System.out.println(object.toString()); + } + }; + + Observable.create(creator).flatMap(manyMapper).map(mapper).subscribe(onNext); + } + private static Map getMap(String prefix) { Map m = new HashMap(); m.put("firstName", prefix + "First"); diff --git a/rxjava-core/src/test/java/rx/operators/OperationSynchronizeTest.java b/rxjava-core/src/test/java/rx/operators/SafeSubscriberTest.java similarity index 64% rename from rxjava-core/src/test/java/rx/operators/OperationSynchronizeTest.java rename to rxjava-core/src/test/java/rx/operators/SafeSubscriberTest.java index c94ba537ca..7f23a8379a 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationSynchronizeTest.java +++ b/rxjava-core/src/test/java/rx/operators/SafeSubscriberTest.java @@ -17,7 +17,6 @@ import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; -import static rx.operators.OperationSynchronize.*; import org.junit.Test; import org.mockito.Mockito; @@ -25,71 +24,10 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.observers.SafeSubscriber; +import rx.observers.TestSubscriber; -public class OperationSynchronizeTest { - - /** - * Ensure onCompleted can not be called after an Unsubscribe - */ - @Test - public void testOnCompletedAfterUnSubscribe() { - TestObservable t = new TestObservable(null); - Observable st = Observable.create(synchronize(Observable.create(t))); - - @SuppressWarnings("unchecked") - Observer w = mock(Observer.class); - Subscription ws = st.subscribe(w); - - System.out.println("ws: " + ws); - - t.sendOnNext("one"); - ws.unsubscribe(); - System.out.println("send onCompleted"); - t.sendOnCompleted(); - - verify(w, times(1)).onNext("one"); - verify(w, Mockito.never()).onCompleted(); - } - - /** - * Ensure onNext can not be called after an Unsubscribe - */ - @Test - public void testOnNextAfterUnSubscribe() { - TestObservable t = new TestObservable(null); - Observable st = Observable.create(synchronize(Observable.create(t))); - - @SuppressWarnings("unchecked") - Observer w = mock(Observer.class); - Subscription ws = st.subscribe(w); - - t.sendOnNext("one"); - ws.unsubscribe(); - t.sendOnNext("two"); - - verify(w, times(1)).onNext("one"); - verify(w, Mockito.never()).onNext("two"); - } - - /** - * Ensure onError can not be called after an Unsubscribe - */ - @Test - public void testOnErrorAfterUnSubscribe() { - TestObservable t = new TestObservable(null); - Observable st = Observable.create(synchronize(Observable.create(t))); - - @SuppressWarnings("unchecked") - Observer w = mock(Observer.class); - Subscription ws = st.subscribe(w); - - t.sendOnNext("one"); - ws.unsubscribe(); - t.sendOnError(new RuntimeException("bad")); - - verify(w, times(1)).onNext("one"); - verify(w, Mockito.never()).onError(any(Throwable.class)); - } +public class SafeSubscriberTest { /** * Ensure onNext can not be called after onError @@ -97,12 +35,12 @@ public void testOnErrorAfterUnSubscribe() { @Test public void testOnNextAfterOnError() { TestObservable t = new TestObservable(null); - Observable st = Observable.create(synchronize(Observable.create(t))); + Observable st = Observable.create(t); @SuppressWarnings("unchecked") Observer w = mock(Observer.class); @SuppressWarnings("unused") - Subscription ws = st.subscribe(w); + Subscription ws = st.subscribe(new SafeSubscriber(new TestSubscriber(w))); t.sendOnNext("one"); t.sendOnError(new RuntimeException("bad")); @@ -119,12 +57,12 @@ public void testOnNextAfterOnError() { @Test public void testOnCompletedAfterOnError() { TestObservable t = new TestObservable(null); - Observable st = Observable.create(synchronize(Observable.create(t))); + Observable st = Observable.create(t); @SuppressWarnings("unchecked") Observer w = mock(Observer.class); @SuppressWarnings("unused") - Subscription ws = st.subscribe(w); + Subscription ws = st.subscribe(new SafeSubscriber(new TestSubscriber(w))); t.sendOnNext("one"); t.sendOnError(new RuntimeException("bad")); @@ -141,12 +79,12 @@ public void testOnCompletedAfterOnError() { @Test public void testOnNextAfterOnCompleted() { TestObservable t = new TestObservable(null); - Observable st = Observable.create(synchronize(Observable.create(t))); + Observable st = Observable.create(t); @SuppressWarnings("unchecked") Observer w = mock(Observer.class); @SuppressWarnings("unused") - Subscription ws = st.subscribe(w); + Subscription ws = st.subscribe(new SafeSubscriber(new TestSubscriber(w))); t.sendOnNext("one"); t.sendOnCompleted(); @@ -164,12 +102,12 @@ public void testOnNextAfterOnCompleted() { @Test public void testOnErrorAfterOnCompleted() { TestObservable t = new TestObservable(null); - Observable st = Observable.create(synchronize(Observable.create(t))); + Observable st = Observable.create(t); @SuppressWarnings("unchecked") Observer w = mock(Observer.class); @SuppressWarnings("unused") - Subscription ws = st.subscribe(w); + Subscription ws = st.subscribe(new SafeSubscriber(new TestSubscriber(w))); t.sendOnNext("one"); t.sendOnCompleted(); diff --git a/rxjava-core/src/test/java/rx/util/AssertObservable.java b/rxjava-core/src/test/java/rx/util/AssertObservable.java index c41e853733..ca42cba3d5 100644 --- a/rxjava-core/src/test/java/rx/util/AssertObservable.java +++ b/rxjava-core/src/test/java/rx/util/AssertObservable.java @@ -100,7 +100,7 @@ public Notification call(Notification expectedNotfication, Notificati message.append(" ").append(expectedNotfication.getValue()); if (expectedNotfication.hasThrowable()) message.append(" ").append(expectedNotfication.getThrowable()); - return new Notification("equals " + message.toString()); + return Notification.createOnNext("equals " + message.toString()); } else { StringBuilder error = new StringBuilder(); @@ -116,7 +116,7 @@ public Notification call(Notification expectedNotfication, Notificati error.append(" ").append(actualNotification.getThrowable()); error.append(">"); - return new Notification(new AssertionError(error.toString())); + return Notification.createOnError(new AssertionError(error.toString())); } } }; @@ -131,9 +131,9 @@ public Notification call(Notification a, Notification b) fail |= b.isOnError(); if (fail) - return new Notification(new AssertionError(message)); + return Notification.createOnError(new AssertionError(message)); else - return new Notification(message); + return Notification.createOnNext(message); } }; @@ -142,9 +142,9 @@ public Notification call(Notification a, Notification b) public Notification call(Notification outcome) { if (outcome.isOnError()) { String fullMessage = (message != null ? message + ": " : "") + "Observables are different\n\t" + outcome.getThrowable().getMessage(); - return new Notification(new AssertionError(fullMessage)); + return Notification.createOnError(new AssertionError(fullMessage)); } - return new Notification(); + return Notification.createOnCompleted(); } }).dematerialize(); return outcomeObservable; diff --git a/rxjava-core/src/test/java/rx/util/ExceptionsTest.java b/rxjava-core/src/test/java/rx/util/ExceptionsTest.java new file mode 100644 index 0000000000..8029038e0a --- /dev/null +++ b/rxjava-core/src/test/java/rx/util/ExceptionsTest.java @@ -0,0 +1,137 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.util; + +import org.junit.Test; + +import rx.Observable; +import rx.Observer; +import rx.subjects.PublishSubject; +import rx.util.functions.Action1; + +public class ExceptionsTest { + + @Test(expected = OnErrorNotImplementedException.class) + public void testOnErrorNotImplementedIsThrown() { + Observable.from(1, 2, 3).subscribe(new Action1() { + + @Override + public void call(Integer t1) { + throw new RuntimeException("hello"); + } + + }); + } + + @Test(expected = StackOverflowError.class) + public void testStackOverflowIsThrown() { + final PublishSubject a = PublishSubject.create(); + final PublishSubject b = PublishSubject.create(); + new Observer() { + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + e.printStackTrace(); + } + + @Override + public void onNext(Integer args) { + System.out.println(args); + } + }; + a.subscribe(new Observer() { + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + e.printStackTrace(); + } + + @Override + public void onNext(Integer args) { + System.out.println(args); + } + }); + b.subscribe(); + a.subscribe(new Observer() { + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + e.printStackTrace(); + } + + @Override + public void onNext(Integer args) { + b.onNext(args + 1); + } + }); + b.subscribe(new Observer() { + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + e.printStackTrace(); + } + + @Override + public void onNext(Integer args) { + a.onNext(args + 1); + } + }); + a.onNext(1); + } + + @Test(expected = ThreadDeath.class) + public void testThreadDeathIsThrown() { + Observable.from(1).subscribe(new Observer() { + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + e.printStackTrace(); + } + + @Override + public void onNext(Integer t) { + throw new ThreadDeath(); + } + + }); + } + +}