From 9f1bff7fdfff7d714e73a7c8c8a5401bed073cdd Mon Sep 17 00:00:00 2001 From: Scott Fleckenstein Date: Thu, 7 Nov 2013 18:32:36 -0800 Subject: [PATCH 1/4] Adds beginnings of doOnEach operator --- rxjava-core/src/main/java/rx/Observable.java | 17 +++ .../java/rx/operators/OperationDoOnEach.java | 66 +++++++++++ .../rx/operators/OperationDoOnEachTest.java | 108 ++++++++++++++++++ 3 files changed, 191 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java create mode 100644 rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 6bcd3e9f77..dd357eeda0 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -43,6 +43,7 @@ import rx.operators.OperationDematerialize; import rx.operators.OperationDistinct; import rx.operators.OperationDistinctUntilChanged; +import rx.operators.OperationDoOnEach; import rx.operators.OperationElementAt; import rx.operators.OperationFilter; import rx.operators.OperationFinally; @@ -4777,6 +4778,22 @@ public static Observable amb(Iterable> return create(OperationAmb.amb(sources)); } + + /** + * Invokes an action for each element in the observable sequence. + * + * @param func + * The action to invoke for each element in the source sequence. + * + * @return + * The source sequence with the side-effecting behavior applied. + * @see MSDN: Observable.Amb + */ + public Observable doOnEach(Observer observer) { + return create(OperationDoOnEach.doOnEach(this, observer)); + } + + /** * Whether a given {@link Function} is an internal implementation inside rx.* packages or not. *

diff --git a/rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java b/rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java new file mode 100644 index 0000000000..acd841fc2f --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java @@ -0,0 +1,66 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import rx.Observable; +import rx.Observer; +import rx.Observable.OnSubscribeFunc; +import rx.Subscription; + +/** + * Converts the elements of an observable sequence to the specified type. + */ +public class OperationDoOnEach { + public static OnSubscribeFunc doOnEach(Observable source, Observer observer) { + return new DoOnEachObservable(source, observer); + } + + private static class DoOnEachObservable implements OnSubscribeFunc { + + private final Observable source; + private final Observer doOnEachObserver; + + public DoOnEachObservable(Observable source, Observer doOnEachObserver) { + this.source = source; + this.doOnEachObserver = doOnEachObserver; + } + + @Override + public Subscription onSubscribe(final Observer observer) { + return source.subscribe(new Observer() { + @Override + public void onCompleted() { + doOnEachObserver.onCompleted(); + observer.onCompleted(); + } + + @Override + public void onError(Throwable e) { + doOnEachObserver.onError(e); + observer.onError(e); + } + + @Override + public void onNext(T value) { + doOnEachObserver.onNext(value); + observer.onNext(value); + } + + }); + } + + } +} \ No newline at end of file diff --git a/rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java b/rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java new file mode 100644 index 0000000000..4bf017761c --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java @@ -0,0 +1,108 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import static rx.operators.OperationMap.*; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import rx.Observable; +import rx.Observer; +import rx.concurrency.Schedulers; +import rx.util.functions.Func1; +import rx.util.functions.Func2; + +public class OperationDoOnEachTest { + + @Mock + Observer subscribedObserver; + @Mock + Observer sideEffectObserver; + + @Before + public void before() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testDoOnEach() { + Observable base = Observable.from("a", "b", "c"); + Observable doOnEach = base.doOnEach(sideEffectObserver); + + doOnEach.subscribe(subscribedObserver); + + // ensure the leaf observer is still getting called + verify(subscribedObserver, never()).onError(any(Throwable.class)); + verify(subscribedObserver, times(1)).onNext("a"); + verify(subscribedObserver, times(1)).onNext("b"); + verify(subscribedObserver, times(1)).onNext("c"); + verify(subscribedObserver, times(1)).onCompleted(); + + // ensure our injected observer is getting called + verify(sideEffectObserver, never()).onError(any(Throwable.class)); + verify(sideEffectObserver, times(1)).onNext("a"); + verify(sideEffectObserver, times(1)).onNext("b"); + verify(sideEffectObserver, times(1)).onNext("c"); + verify(sideEffectObserver, times(1)).onCompleted(); + } + + + + @Test + public void testDoOnEachWithError() { + Observable base = Observable.from("one", "fail", "two", "three", "fail"); + Observable errs = base.map(new Func1() { + @Override + public String call(String s) { + if ("fail".equals(s)) { + throw new RuntimeException("Forced Failure"); + } + return s; + } + }); + + Observable doOnEach = errs.doOnEach(sideEffectObserver); + + + doOnEach.subscribe(subscribedObserver); + verify(subscribedObserver, times(1)).onNext("one"); + verify(subscribedObserver, never()).onNext("two"); + verify(subscribedObserver, never()).onNext("three"); + verify(subscribedObserver, never()).onCompleted(); + verify(subscribedObserver, times(1)).onError(any(Throwable.class)); + + + verify(sideEffectObserver, times(1)).onNext("one"); + verify(sideEffectObserver, never()).onNext("two"); + verify(sideEffectObserver, never()).onNext("three"); + verify(sideEffectObserver, never()).onCompleted(); + verify(sideEffectObserver, times(1)).onError(any(Throwable.class)); + } + + +} From 37e1ce37bc1dc6f61979cfdec6e7387b13fceb80 Mon Sep 17 00:00:00 2001 From: Scott Fleckenstein Date: Thu, 7 Nov 2013 18:50:38 -0800 Subject: [PATCH 2/4] Adds other overrides for doOnEach --- rxjava-core/src/main/java/rx/Observable.java | 136 ++++++++++++++++++- 1 file changed, 134 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index dd357eeda0..b4bfd258f1 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -4782,17 +4782,149 @@ public static Observable amb(Iterable> /** * Invokes an action for each element in the observable sequence. * - * @param func + * @param observer * The action to invoke for each element in the source sequence. * * @return * The source sequence with the side-effecting behavior applied. - * @see MSDN: Observable.Amb + * @see MSDN: Observable.Do */ public Observable doOnEach(Observer observer) { return create(OperationDoOnEach.doOnEach(this, observer)); } + /** + * Invokes an action for each element in the observable sequence. + * + * @param onNext + * The action to invoke for each element in the source sequence. + * + * @return + * The source sequence with the side-effecting behavior applied. + * @see MSDN: Observable.Do + */ + public Observable doOnEach(final Action1 onNext) { + Observer observer = new Observer() { + @Override + public void onCompleted() {} + + @Override + public void onError(Throwable e) {} + + @Override + public void onNext(T args) { + onNext.call(args); + } + + }; + + + return create(OperationDoOnEach.doOnEach(this, observer)); + } + + /** + * Invokes an action for each element in the observable sequence. + * + * @param onNext + * The action to invoke for each element in the source sequence. + * @param onCompleted + * The action to invoke when the source sequence is completed. + * + * @return + * The source sequence with the side-effecting behavior applied. + * @see MSDN: Observable.Do + */ + public Observable doOnEach(final Action1 onNext, final Action0 onCompleted) { + Observer observer = new Observer() { + @Override + public void onCompleted() { + onCompleted.call(); + } + + @Override + public void onError(Throwable e) {} + + @Override + public void onNext(T args) { + onNext.call(args); + } + + }; + + + return create(OperationDoOnEach.doOnEach(this, observer)); + } + + /** + * Invokes an action for each element in the observable sequence. + * + * @param onNext + * The action to invoke for each element in the source sequence. + * @param onError + * The action to invoke when the source sequence calls onError. + * + * @return + * The source sequence with the side-effecting behavior applied. + * @see MSDN: Observable.Do + */ + public Observable doOnEach(final Action1 onNext, final Action1 onError) { + Observer observer = new Observer() { + @Override + public void onCompleted() {} + + @Override + public void onError(Throwable e) { + onError.call(e); + } + + @Override + public void onNext(T args) { + onNext.call(args); + } + + }; + + + return create(OperationDoOnEach.doOnEach(this, observer)); + } + + + /** + * Invokes an action for each element in the observable sequence. + * + * @param onNext + * The action to invoke for each element in the source sequence. + * @param onError + * The action to invoke when the source sequence calls onError. + * @param onCompleted + * The action to invoke when the source sequence is completed. + * + * @return + * The source sequence with the side-effecting behavior applied. + * @see MSDN: Observable.Do + */ + public Observable doOnEach(final Action1 onNext, final Action1 onError, final Action0 onCompleted) { + Observer observer = new Observer() { + @Override + public void onCompleted() { + onCompleted.call(); + } + + @Override + public void onError(Throwable e) { + onError.call(e); + } + + @Override + public void onNext(T args) { + onNext.call(args); + } + + }; + + + return create(OperationDoOnEach.doOnEach(this, observer)); + } /** * Whether a given {@link Function} is an internal implementation inside rx.* packages or not. From b39d032bcc8c465b940b180df0622b1f05e24115 Mon Sep 17 00:00:00 2001 From: Scott Fleckenstein Date: Thu, 7 Nov 2013 19:20:07 -0800 Subject: [PATCH 3/4] Adds scala adapters for doOnEach operator --- .../main/scala/rx/lang/scala/Observable.scala | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index d845a65fa5..8dd5d1b9b2 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -1807,6 +1807,28 @@ class Observable[+T] private[scala] (val asJava: rx.Observable[_ <: T]) def withFilter(p: T => Boolean): WithFilter[T] = { new WithFilter[T](p, asJava) } + + + def doOnEach(observer: Observer[T]): Observable[T] = { + Observable[T](asJava.doOnEach(observer)) + } + + def doOnEach(onNext: T => Unit): Observable[T] = { + Observable[T](asJava.doOnEach(onNext)) + } + + def doOnEach(onNext: T => Unit, onComplete: () => Unit): Observable[T] = { + Observable[T](asJava.doOnEach(onNext, onComplete)) + } + + def doOnEach(onNext: T => Unit, onError: Throwable => Unit): Observable[T] = { + Observable[T](asJava.doOnEach(onNext, onError)) + } + + def doOnEach(onNext: T => Unit, onError: Throwable => Unit, onComplete: () => Unit): Observable[T] = { + Observable[T](asJava.doOnEach(onNext, onError, onComplete)) + } + } From b57504252a7f1d351ccb75c3f0ee35849dec753c Mon Sep 17 00:00:00 2001 From: Scott Fleckenstein Date: Mon, 11 Nov 2013 15:01:09 -0800 Subject: [PATCH 4/4] Wraps DoOnEach in a SafeObserver This commit leverages the SafeObserver facility to get the desired behavior in the face of exceptions. Specifically, if any of the operations performed within the doOnEach handler raises an exception, that exception will propagate through the observable chain. --- .../java/rx/operators/OperationDoOnEach.java | 16 +++++++------- .../rx/operators/OperationDoOnEachTest.java | 21 +++++++++++++++++++ 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java b/rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java index acd841fc2f..1b0aafb578 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java @@ -24,23 +24,24 @@ * Converts the elements of an observable sequence to the specified type. */ public class OperationDoOnEach { - public static OnSubscribeFunc doOnEach(Observable source, Observer observer) { - return new DoOnEachObservable(source, observer); + public static OnSubscribeFunc doOnEach(Observable sequence, Observer observer) { + return new DoOnEachObservable(sequence, observer); } private static class DoOnEachObservable implements OnSubscribeFunc { - private final Observable source; + private final Observable sequence; private final Observer doOnEachObserver; - public DoOnEachObservable(Observable source, Observer doOnEachObserver) { - this.source = source; + public DoOnEachObservable(Observable sequence, Observer doOnEachObserver) { + this.sequence = sequence; this.doOnEachObserver = doOnEachObserver; } @Override public Subscription onSubscribe(final Observer observer) { - return source.subscribe(new Observer() { + final SafeObservableSubscription subscription = new SafeObservableSubscription(); + return subscription.wrap(sequence.subscribe(new SafeObserver(subscription, new Observer() { @Override public void onCompleted() { doOnEachObserver.onCompleted(); @@ -58,8 +59,7 @@ public void onNext(T value) { doOnEachObserver.onNext(value); observer.onNext(value); } - - }); + }))); } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java b/rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java index 4bf017761c..6c1407ebea 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java @@ -36,6 +36,7 @@ import rx.concurrency.Schedulers; import rx.util.functions.Func1; import rx.util.functions.Func2; +import rx.util.functions.Action1; public class OperationDoOnEachTest { @@ -104,5 +105,25 @@ public String call(String s) { verify(sideEffectObserver, times(1)).onError(any(Throwable.class)); } + @Test + public void testDoOnEachWithErrorInCallback() { + Observable base = Observable.from("one", "two", "fail", "three"); + Observable doOnEach = base.doOnEach(new Action1() { + @Override + public void call(String s) { + if ("fail".equals(s)) { + throw new RuntimeException("Forced Failure"); + } + } + }); + + doOnEach.subscribe(subscribedObserver); + verify(subscribedObserver, times(1)).onNext("one"); + verify(subscribedObserver, times(1)).onNext("two"); + verify(subscribedObserver, never()).onNext("three"); + verify(subscribedObserver, never()).onCompleted(); + verify(subscribedObserver, times(1)).onError(any(Throwable.class)); + + } }