From 96939a7dfd2d07488fabcc36fa95a48a6233604e Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 25 Apr 2014 20:07:32 +0200 Subject: [PATCH 1/2] OperatorMergeDelayError --- rxjava-core/src/main/java/rx/Observable.java | 34 +- .../operators/OperationMergeDelayError.java | 333 ------------------ .../rx/operators/OperatorMergeDelayError.java | 133 +++++++ ....java => OperatorMergeDelayErrorTest.java} | 108 ++++-- 4 files changed, 217 insertions(+), 391 deletions(-) delete mode 100644 rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorMergeDelayError.java rename rxjava-core/src/test/java/rx/operators/{OperationMergeDelayErrorTest.java => OperatorMergeDelayErrorTest.java} (83%) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 746362ca78..200e52d606 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -54,7 +54,6 @@ import rx.operators.OperationGroupJoin; import rx.operators.OperationInterval; import rx.operators.OperationJoin; -import rx.operators.OperationMergeDelayError; import rx.operators.OperationMergeMaxConcurrent; import rx.operators.OperationMulticast; import rx.operators.OperationOnErrorResumeNextViaObservable; @@ -103,6 +102,7 @@ import rx.operators.OperatorMap; import rx.operators.OperatorMaterialize; import rx.operators.OperatorMerge; +import rx.operators.OperatorMergeDelayError; import rx.operators.OperatorMergeMapPair; import rx.operators.OperatorMergeMapTransform; import rx.operators.OperatorObserveOn; @@ -2001,7 +2001,7 @@ public final static Observable merge(Observable[] sequences, * @see MSDN: Observable.Merge */ public final static Observable mergeDelayError(Observable> source) { - return create(OperationMergeDelayError.mergeDelayError(source)); + return source.lift(new OperatorMergeDelayError()); } /** @@ -2025,10 +2025,8 @@ public final static Observable mergeDelayError(ObservableRxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ - @SuppressWarnings("unchecked") - // suppress because the types are checked by the method signature before using a vararg public final static Observable mergeDelayError(Observable t1, Observable t2) { - return create(OperationMergeDelayError.mergeDelayError(t1, t2)); + return mergeDelayError(from(t1, t2)); } /** @@ -2055,10 +2053,8 @@ public final static Observable mergeDelayError(Observable t1 * @see RxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ - @SuppressWarnings("unchecked") - // suppress because the types are checked by the method signature before using a vararg public final static Observable mergeDelayError(Observable t1, Observable t2, Observable t3) { - return create(OperationMergeDelayError.mergeDelayError(t1, t2, t3)); + return mergeDelayError(from(t1, t2, t3)); } /** @@ -2087,10 +2083,8 @@ public final static Observable mergeDelayError(Observable t1 * @see RxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ - @SuppressWarnings("unchecked") - // suppress because the types are checked by the method signature before using a vararg public final static Observable mergeDelayError(Observable t1, Observable t2, Observable t3, Observable t4) { - return create(OperationMergeDelayError.mergeDelayError(t1, t2, t3, t4)); + return mergeDelayError(from(t1, t2, t3, t4)); } /** @@ -2120,10 +2114,8 @@ public final static Observable mergeDelayError(Observable t1 * @see RxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ - @SuppressWarnings("unchecked") - // suppress because the types are checked by the method signature before using a vararg public final static Observable mergeDelayError(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5) { - return create(OperationMergeDelayError.mergeDelayError(t1, t2, t3, t4, t5)); + return mergeDelayError(from(t1, t2, t3, t4, t5)); } /** @@ -2155,10 +2147,8 @@ public final static Observable mergeDelayError(Observable t1 * @see RxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ - @SuppressWarnings("unchecked") - // suppress because the types are checked by the method signature before using a vararg public final static Observable mergeDelayError(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6) { - return create(OperationMergeDelayError.mergeDelayError(t1, t2, t3, t4, t5, t6)); + return mergeDelayError(from(t1, t2, t3, t4, t5, t6)); } /** @@ -2192,10 +2182,8 @@ public final static Observable mergeDelayError(Observable t1 * @see RxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ - @SuppressWarnings("unchecked") - // suppress because the types are checked by the method signature before using a vararg public final static Observable mergeDelayError(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7) { - return create(OperationMergeDelayError.mergeDelayError(t1, t2, t3, t4, t5, t6, t7)); + return mergeDelayError(from(t1, t2, t3, t4, t5, t6, t7)); } /** @@ -2234,7 +2222,7 @@ public final static Observable mergeDelayError(Observable t1 @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public final static Observable mergeDelayError(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7, Observable t8) { - return create(OperationMergeDelayError.mergeDelayError(t1, t2, t3, t4, t5, t6, t7, t8)); + return mergeDelayError(from(t1, t2, t3, t4, t5, t6, t7, t8)); } /** @@ -2272,10 +2260,8 @@ public final static Observable mergeDelayError(Observable t1 * @see RxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ - @SuppressWarnings("unchecked") - // suppress because the types are checked by the method signature before using a vararg public final static Observable mergeDelayError(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7, Observable t8, Observable t9) { - return create(OperationMergeDelayError.mergeDelayError(t1, t2, t3, t4, t5, t6, t7, t8, t9)); + return mergeDelayError(from(t1, t2, t3, t4, t5, t6, t7, t8, t9)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java b/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java deleted file mode 100644 index d5eacf649a..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java +++ /dev/null @@ -1,333 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Subscriber; -import rx.Subscription; -import rx.exceptions.CompositeException; -import rx.observers.SerializedObserver; -import rx.subscriptions.BooleanSubscription; -import rx.subscriptions.CompositeSubscription; - -/** - * This behaves like {@link OperatorMerge} except that if any of the merged Observables notify of - * an error via onError, mergeDelayError will refrain from propagating that error - * notification until all of the merged Observables have finished emitting items. - *

- * - *

- * Even if multiple merged Observables send onError notifications, mergeDelayError will - * only invoke the onError method of its Observers once. - *

- * This operation allows an Observer to receive all successfully emitted items from all of the - * source Observables without being interrupted by an error notification from one of them. - *

- * NOTE: If this is used on an Observable that never completes, it will never call - * onError and will effectively swallow errors. - */ -public final class OperationMergeDelayError { - - /** - * Flattens the observable sequences from the list of Observables into one observable sequence - * without any transformation and delays any onError calls until after all sequences have called - * onError or onComplete so as to allow all successful onNext calls to be received. - * - * @param sequences - * An observable sequence of elements to project. - * @return An observable sequence whose elements are the result of flattening the output from the list of Observables. - * @see Observable.Merge(TSource) Method (IObservable(TSource)[]) - */ - public static OnSubscribeFunc mergeDelayError(final Observable> sequences) { - // wrap in a Func so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take rather than 1 handing both, which is not thread-safe. - return new OnSubscribeFunc() { - - @Override - public Subscription onSubscribe(Observer observer) { - return new MergeDelayErrorObservable(sequences).onSubscribe(observer); - } - }; - } - - public static OnSubscribeFunc mergeDelayError(final Observable... sequences) { - return mergeDelayError(Observable.create(new OnSubscribeFunc>() { - private final BooleanSubscription s = new BooleanSubscription(); - - @Override - public Subscription onSubscribe(Observer> observer) { - for (Observable o : sequences) { - if (!s.isUnsubscribed()) { - observer.onNext(o); - } else { - // break out of the loop if we are unsubscribed - break; - } - } - if (!s.isUnsubscribed()) { - observer.onCompleted(); - } - return s; - } - })); - } - - public static OnSubscribeFunc mergeDelayError(final List> sequences) { - return mergeDelayError(Observable.create(new OnSubscribeFunc>() { - - private final BooleanSubscription s = new BooleanSubscription(); - - @Override - public Subscription onSubscribe(Observer> observer) { - for (Observable o : sequences) { - if (!s.isUnsubscribed()) { - observer.onNext(o); - } else { - // break out of the loop if we are unsubscribed - break; - } - } - if (!s.isUnsubscribed()) { - observer.onCompleted(); - } - - return s; - } - })); - } - - /** - * This class is NOT thread-safe if invoked and referenced multiple times. In other words, don't subscribe to it multiple times from different threads. - *

- * It IS thread-safe from within it while receiving onNext events from multiple threads. - *

- * This should all be fine as long as it's kept as a private class and a new instance created from static factory method above. - *

- * Note how the take() factory method above protects us from a single instance being exposed with the Observable wrapper handling the subscribe flow. - * - * @param - */ - private static final class MergeDelayErrorObservable implements OnSubscribeFunc { - private final Observable> sequences; - private final MergeSubscription ourSubscription = new MergeSubscription(); - private AtomicBoolean stopped = new AtomicBoolean(false); - private volatile boolean parentCompleted = false; - private final ConcurrentHashMap childObservers = new ConcurrentHashMap(); - private final ConcurrentHashMap childSubscriptions = new ConcurrentHashMap(); - // onErrors we received that will be delayed until everything is completed and then sent - private ConcurrentLinkedQueue onErrorReceived = new ConcurrentLinkedQueue(); - - private MergeDelayErrorObservable(Observable> sequences) { - this.sequences = sequences; - } - - public Subscription onSubscribe(Observer actualObserver) { - CompositeSubscription completeSubscription = new CompositeSubscription(); - SerializedObserver synchronizedObserver = new SerializedObserver(actualObserver); - - /** - * Subscribe to the parent Observable to get to the children Observables - */ - completeSubscription.add(sequences.unsafeSubscribe(new ParentObserver(synchronizedObserver))); - - /* return our subscription to allow unsubscribing */ - return completeSubscription; - } - - /** - * Manage the internal subscription with a thread-safe means of stopping/unsubscribing so we don't unsubscribe twice. - *

- * Also has the stop() method returning a boolean so callers know if their thread "won" and should perform further actions. - */ - private class MergeSubscription implements Subscription { - - @Override - public void unsubscribe() { - stop(); - } - - public boolean stop() { - // try setting to false unless another thread beat us - boolean didSet = stopped.compareAndSet(false, true); - if (didSet) { - // this thread won the race to stop, so unsubscribe from the actualSubscription - for (Subscription _s : childSubscriptions.values()) { - _s.unsubscribe(); - } - return true; - } else { - // another thread beat us - return false; - } - } - - @Override - public boolean isUnsubscribed() { - return stopped.get(); - } - } - - /** - * Subscribe to the top level Observable to receive the sequence of Observable children. - * - * @param - */ - private class ParentObserver extends Subscriber> { - private final Observer actualObserver; - - public ParentObserver(Observer actualObserver) { - this.actualObserver = actualObserver; - } - - @Override - public void onCompleted() { - parentCompleted = true; - // this *can* occur before the children are done, so if it does we won't send onCompleted - // but will let the child worry about it - // if however this completes and there are no children processing, then we will send onCompleted - - if (childObservers.size() == 0) { - if (!stopped.get()) { - if (ourSubscription.stop()) { - if (onErrorReceived.size() == 1) { - // an onError was received from 1 ChildObserver so we now send it as a delayed error - actualObserver.onError(onErrorReceived.peek()); - } else if (onErrorReceived.size() > 1) { - // an onError was received from more than 1 ChildObserver so we now send it as a delayed error - actualObserver.onError(new CompositeException(onErrorReceived)); - } else { - // no delayed error so send onCompleted - actualObserver.onCompleted(); - } - } - } - } - } - - @Override - public void onError(Throwable e) { - actualObserver.onError(e); - } - - @Override - public void onNext(Observable childObservable) { - if (stopped.get()) { - // we won't act on any further items - return; - } - - if (childObservable == null) { - throw new IllegalArgumentException("Observable can not be null."); - } - - /** - * For each child Observable we receive we'll subscribe with a separate Observer - * that will each then forward their sequences to the actualObserver. - *

- * We use separate child Observers for each sequence to simplify the onComplete/onError handling so each sequence has its own lifecycle. - */ - ChildObserver _w = new ChildObserver(actualObserver); - childObservers.put(_w, _w); - Subscription _subscription = childObservable.unsafeSubscribe(_w); - // remember this Observer and the subscription from it - childSubscriptions.put(_w, _subscription); - } - } - - /** - * Subscribe to each child Observable and forward their sequence of data to the actualObserver - * - */ - private class ChildObserver extends Subscriber { - - private final Observer actualObserver; - private volatile boolean finished = false; - - public ChildObserver(Observer actualObserver) { - this.actualObserver = actualObserver; - } - - @Override - public void onCompleted() { - // remove self from map of Observers - childObservers.remove(this); - // if there are now 0 Observers left, so if the parent is also completed we send the onComplete to the actualObserver - // if the parent is not complete that means there is another sequence (and child Observer) to come - if (!stopped.get()) { - finishObserver(); - } - } - - @Override - public void onError(Throwable e) { - if (!stopped.get()) { - onErrorReceived.add(e); - // mark this ChildObserver as done - childObservers.remove(this); - // but do NOT forward to actualObserver as we want other ChildObservers to continue until completion - // and we'll delay the sending of onError until all others are done - - // we mark finished==true as a safety to ensure that if further calls to onNext occur we ignore them - finished = true; - - // check for whether the parent is completed and if so then perform the 'finishing' actions - finishObserver(); - } - } - - /** - * onComplete and onError when called need to check for the parent being complete and if so send the onCompleted or onError to the actualObserver. - *

- * This does NOT get invoked if synchronous execution occurs, but will when asynchronously executing. - *

- * TestCase testErrorDelayed4WithThreading specifically tests this use case. - */ - private void finishObserver() { - if (childObservers.size() == 0 && parentCompleted) { - if (ourSubscription.stop()) { - // this thread 'won' the race to unsubscribe/stop so let's send onError or onCompleted - if (onErrorReceived.size() == 1) { - // an onError was received from 1 ChildObserver so we now send it as a delayed error - actualObserver.onError(onErrorReceived.peek()); - } else if (onErrorReceived.size() > 1) { - // an onError was received from more than 1 ChildObserver so we now send it as a delayed error - actualObserver.onError(new CompositeException(onErrorReceived)); - } else { - // no delayed error so send onCompleted - actualObserver.onCompleted(); - } - } - } - } - - @Override - public void onNext(T args) { - // in case the Observable is poorly behaved and doesn't listen to the unsubscribe request - // we'll ignore anything that comes in after we've unsubscribed or an onError has been received and delayed - if (!stopped.get() && !finished) { - actualObserver.onNext(args); - } - } - - } - } -} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorMergeDelayError.java b/rxjava-core/src/main/java/rx/operators/OperatorMergeDelayError.java new file mode 100644 index 0000000000..fd25aa544f --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorMergeDelayError.java @@ -0,0 +1,133 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import rx.Observable; +import rx.Observable.Operator; +import rx.Subscriber; +import rx.exceptions.CompositeException; +import rx.observers.SerializedSubscriber; +import rx.subscriptions.CompositeSubscription; + +/** + * This behaves like {@link OperatorMerge} except that if any of the merged Observables notify of + * an error via onError, mergeDelayError will refrain from propagating that error + * notification until all of the merged Observables have finished emitting items. + *

+ * + *

+ * Even if multiple merged Observables send onError notifications, mergeDelayError will + * only invoke the onError method of its Observers once. + *

+ * This operation allows an Observer to receive all successfully emitted items from all of the + * source Observables without being interrupted by an error notification from one of them. + *

+ * NOTE: If this is used on an Observable that never completes, it will never call + * onError and will effectively swallow errors. + * + * @param the source and result value type + */ +public final class OperatorMergeDelayError implements Operator> { + + @Override + public Subscriber> call(Subscriber child) { + final SerializedSubscriber s = new SerializedSubscriber(child); + final CompositeSubscription csub = new CompositeSubscription(); + child.add(csub); + final AtomicInteger wip = new AtomicInteger(1); + final ConcurrentLinkedQueue exceptions = new ConcurrentLinkedQueue(); + + return new Subscriber>() { + + @Override + public void onNext(Observable t) { + wip.incrementAndGet(); + + Subscriber itemSub = new Subscriber() { + /** Make sure terminal events are handled once to avoid wip problems. */ + boolean once = true; + @Override + public void onNext(T t) { + // prevent misbehaving source to emit past the error + if (once) { + try { + s.onNext(t); + } catch (Throwable e) { + // in case the source doesn't properly handle exceptions + onError(e); + } + } + } + + @Override + public void onError(Throwable e) { + if (once) { + once = false; + error(e); + } + } + + @Override + public void onCompleted() { + if (once) { + once = false; + try { + complete(); + } finally { + csub.remove(this); + } + } + } + + }; + csub.add(itemSub); + + t.unsafeSubscribe(itemSub); + } + + @Override + public void onError(Throwable e) { + error(e); + } + + @Override + public void onCompleted() { + complete(); + } + void error(Throwable e) { + exceptions.add(e); + complete(); + } + void complete() { + if (wip.decrementAndGet() == 0) { + if (exceptions.isEmpty()) { + s.onCompleted(); + } else + if (exceptions.size() > 1) { + s.onError(new CompositeException(exceptions)); + } else { + s.onError(exceptions.peek()); + } + exceptions.clear(); + unsubscribe(); + } + } + + }; + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationMergeDelayErrorTest.java b/rxjava-core/src/test/java/rx/operators/OperatorMergeDelayErrorTest.java similarity index 83% rename from rxjava-core/src/test/java/rx/operators/OperationMergeDelayErrorTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorMergeDelayErrorTest.java index 5d18afd363..1c340d4deb 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationMergeDelayErrorTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorMergeDelayErrorTest.java @@ -18,27 +18,26 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static rx.operators.OperationMergeDelayError.mergeDelayError; +import static org.mockito.Mockito.*; import java.util.ArrayList; import java.util.List; import org.junit.Before; import org.junit.Test; +import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import rx.Observable; +import rx.Observable.OnSubscribe; import rx.Observer; +import rx.Subscriber; import rx.Subscription; import rx.exceptions.CompositeException; import rx.subscriptions.Subscriptions; -public class OperationMergeDelayErrorTest { +public class OperatorMergeDelayErrorTest { @Mock Observer stringObserver; @@ -54,7 +53,7 @@ public void testErrorDelayed1() { final Observable o2 = Observable.create(new TestErrorObservable("one", "two", "three")); @SuppressWarnings("unchecked") - Observable m = Observable.create(mergeDelayError(o1, o2)); + Observable m = Observable.mergeDelayError(o1, o2); m.subscribe(stringObserver); verify(stringObserver, times(1)).onError(any(NullPointerException.class)); @@ -75,7 +74,7 @@ public void testErrorDelayed2() { final Observable o4 = Observable.create(new TestErrorObservable("nine")); @SuppressWarnings("unchecked") - Observable m = Observable.create(mergeDelayError(o1, o2, o3, o4)); + Observable m = Observable.mergeDelayError(o1, o2, o3, o4); m.subscribe(stringObserver); verify(stringObserver, times(1)).onError(any(NullPointerException.class)); @@ -99,7 +98,7 @@ public void testErrorDelayed3() { final Observable o4 = Observable.create(new TestErrorObservable("nine")); @SuppressWarnings("unchecked") - Observable m = Observable.create(mergeDelayError(o1, o2, o3, o4)); + Observable m = Observable.mergeDelayError(o1, o2, o3, o4); m.subscribe(stringObserver); verify(stringObserver, times(1)).onError(any(NullPointerException.class)); @@ -123,7 +122,7 @@ public void testErrorDelayed4() { final Observable o4 = Observable.create(new TestErrorObservable("nine", null)); @SuppressWarnings("unchecked") - Observable m = Observable.create(mergeDelayError(o1, o2, o3, o4)); + Observable m = Observable.mergeDelayError(o1, o2, o3, o4); m.subscribe(stringObserver); verify(stringObserver, times(1)).onError(any(NullPointerException.class)); @@ -148,7 +147,7 @@ public void testErrorDelayed4WithThreading() { final TestAsyncErrorObservable o4 = new TestAsyncErrorObservable("nine", null); @SuppressWarnings("unchecked") - Observable m = Observable.create(mergeDelayError(Observable.create(o1), Observable.create(o2), Observable.create(o3), Observable.create(o4))); + Observable m = Observable.mergeDelayError(Observable.create(o1), Observable.create(o2), Observable.create(o3), Observable.create(o4)); m.subscribe(stringObserver); try { @@ -179,7 +178,7 @@ public void testCompositeErrorDelayed1() { final Observable o2 = Observable.create(new TestErrorObservable("one", "two", null)); @SuppressWarnings("unchecked") - Observable m = Observable.create(mergeDelayError(o1, o2)); + Observable m = Observable.mergeDelayError(o1, o2); m.subscribe(stringObserver); verify(stringObserver, times(1)).onError(any(CompositeException.class)); @@ -198,7 +197,7 @@ public void testCompositeErrorDelayed2() { final Observable o2 = Observable.create(new TestErrorObservable("one", "two", null)); @SuppressWarnings("unchecked") - Observable m = Observable.create(mergeDelayError(o1, o2)); + Observable m = Observable.mergeDelayError(o1, o2); CaptureObserver w = new CaptureObserver(); m.subscribe(w); @@ -234,7 +233,7 @@ public Subscription onSubscribe(Observer> observer) { } }); - Observable m = Observable.create(mergeDelayError(observableOfObservables)); + Observable m = Observable.mergeDelayError(observableOfObservables); m.subscribe(stringObserver); verify(stringObserver, never()).onError(any(Throwable.class)); @@ -248,7 +247,7 @@ public void testMergeArray() { final Observable o2 = Observable.create(new TestSynchronousObservable()); @SuppressWarnings("unchecked") - Observable m = Observable.create(mergeDelayError(o1, o2)); + Observable m = Observable.mergeDelayError(o1, o2); m.subscribe(stringObserver); verify(stringObserver, never()).onError(any(Throwable.class)); @@ -264,7 +263,7 @@ public void testMergeList() { listOfObservables.add(o1); listOfObservables.add(o2); - Observable m = Observable.create(mergeDelayError(listOfObservables)); + Observable m = Observable.mergeDelayError(Observable.from(listOfObservables)); m.subscribe(stringObserver); verify(stringObserver, never()).onError(any(Throwable.class)); @@ -278,7 +277,7 @@ public void testMergeArrayWithThreading() { final TestASynchronousObservable o2 = new TestASynchronousObservable(); @SuppressWarnings("unchecked") - Observable m = Observable.create(mergeDelayError(Observable.create(o1), Observable.create(o2))); + Observable m = Observable.mergeDelayError(Observable.create(o1), Observable.create(o2)); m.subscribe(stringObserver); try { @@ -293,23 +292,20 @@ public void testMergeArrayWithThreading() { verify(stringObserver, times(1)).onCompleted(); } - private static class TestSynchronousObservable implements Observable.OnSubscribeFunc { + private static class TestSynchronousObservable implements Observable.OnSubscribe { @Override - public Subscription onSubscribe(Observer observer) { - + public void call(Subscriber observer) { observer.onNext("hello"); observer.onCompleted(); - - return Subscriptions.empty(); } } - private static class TestASynchronousObservable implements Observable.OnSubscribeFunc { + private static class TestASynchronousObservable implements Observable.OnSubscribe { Thread t; @Override - public Subscription onSubscribe(final Observer observer) { + public void call(final Subscriber observer) { t = new Thread(new Runnable() { @Override @@ -320,8 +316,6 @@ public void run() { }); t.start(); - - return Subscriptions.empty(); } } @@ -370,7 +364,7 @@ public Subscription onSubscribe(final Observer observer) { } } - private static class TestErrorObservable implements Observable.OnSubscribeFunc { + private static class TestErrorObservable implements Observable.OnSubscribe { String[] valuesToReturn; @@ -379,7 +373,7 @@ private static class TestErrorObservable implements Observable.OnSubscribeFunc observer) { + public void call(Subscriber observer) { boolean errorThrown = false; for (String s : valuesToReturn) { if (s == null) { @@ -395,12 +389,10 @@ public Subscription onSubscribe(Observer observer) { if (!errorThrown) { observer.onCompleted(); } - - return Subscriptions.empty(); } } - private static class TestAsyncErrorObservable implements Observable.OnSubscribeFunc { + private static class TestAsyncErrorObservable implements Observable.OnSubscribe { String[] valuesToReturn; @@ -411,7 +403,7 @@ private static class TestAsyncErrorObservable implements Observable.OnSubscribeF Thread t; @Override - public Subscription onSubscribe(final Observer observer) { + public void call(final Subscriber observer) { t = new Thread(new Runnable() { @Override @@ -436,8 +428,6 @@ public void run() { }); t.start(); - - return Subscriptions.empty(); } } @@ -460,4 +450,54 @@ public void onNext(String args) { } } + @Test + public void testMergeSourceWhichDoesntPropagateExceptionBack() { + Observable source = Observable.create(new OnSubscribe() { + @Override + public void call(Subscriber t1) { + try { + t1.onNext(0); + t1.onNext(1); + } catch (Throwable swallow) { + + } + t1.onCompleted(); + } + }); + + Observable result = Observable.mergeDelayError(source, Observable.just(2)); + + @SuppressWarnings("unchecked") + final Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + result.unsafeSubscribe(new Subscriber() { + int calls; + @Override + public void onNext(Integer t) { + if (calls++ == 0) { + throw new OperationReduceTest.CustomException(); + } + o.onNext(t); + } + + @Override + public void onError(Throwable e) { + o.onError(e); + } + + @Override + public void onCompleted() { + o.onCompleted(); + } + + }); + + inOrder.verify(o).onNext(2); + inOrder.verify(o, never()).onNext(0); + inOrder.verify(o, never()).onNext(1); + inOrder.verify(o, never()).onNext(anyInt()); + inOrder.verify(o).onError(any(OperationReduceTest.CustomException.class)); + verify(o, never()).onCompleted(); + } } From 10956f3094cd6d59a207059ed3bca590c9e7acba Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 25 Apr 2014 20:11:19 +0200 Subject: [PATCH 2/2] Fix last test case. --- .../src/test/java/rx/operators/OperatorMergeDelayErrorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rxjava-core/src/test/java/rx/operators/OperatorMergeDelayErrorTest.java b/rxjava-core/src/test/java/rx/operators/OperatorMergeDelayErrorTest.java index 1c340d4deb..87b6ba8144 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorMergeDelayErrorTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorMergeDelayErrorTest.java @@ -457,10 +457,10 @@ public void testMergeSourceWhichDoesntPropagateExceptionBack() { public void call(Subscriber t1) { try { t1.onNext(0); - t1.onNext(1); } catch (Throwable swallow) { } + t1.onNext(1); t1.onCompleted(); } });