diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index d845a65fa5..8dd5d1b9b2 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -1807,6 +1807,28 @@ class Observable[+T] private[scala] (val asJava: rx.Observable[_ <: T]) def withFilter(p: T => Boolean): WithFilter[T] = { new WithFilter[T](p, asJava) } + + + def doOnEach(observer: Observer[T]): Observable[T] = { + Observable[T](asJava.doOnEach(observer)) + } + + def doOnEach(onNext: T => Unit): Observable[T] = { + Observable[T](asJava.doOnEach(onNext)) + } + + def doOnEach(onNext: T => Unit, onComplete: () => Unit): Observable[T] = { + Observable[T](asJava.doOnEach(onNext, onComplete)) + } + + def doOnEach(onNext: T => Unit, onError: Throwable => Unit): Observable[T] = { + Observable[T](asJava.doOnEach(onNext, onError)) + } + + def doOnEach(onNext: T => Unit, onError: Throwable => Unit, onComplete: () => Unit): Observable[T] = { + Observable[T](asJava.doOnEach(onNext, onError, onComplete)) + } + } diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 06fd2b53f2..ae3796a8d5 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -44,6 +44,7 @@ import rx.operators.OperationDematerialize; import rx.operators.OperationDistinct; import rx.operators.OperationDistinctUntilChanged; +import rx.operators.OperationDoOnEach; import rx.operators.OperationElementAt; import rx.operators.OperationFilter; import rx.operators.OperationFinally; @@ -4961,6 +4962,154 @@ public static Observable amb(Iterable> return create(OperationAmb.amb(sources)); } + + /** + * Invokes an action for each element in the observable sequence. + * + * @param observer + * The action to invoke for each element in the source sequence. + * + * @return + * The source sequence with the side-effecting behavior applied. + * @see MSDN: Observable.Do + */ + public Observable doOnEach(Observer observer) { + return create(OperationDoOnEach.doOnEach(this, observer)); + } + + /** + * Invokes an action for each element in the observable sequence. + * + * @param onNext + * The action to invoke for each element in the source sequence. + * + * @return + * The source sequence with the side-effecting behavior applied. + * @see MSDN: Observable.Do + */ + public Observable doOnEach(final Action1 onNext) { + Observer observer = new Observer() { + @Override + public void onCompleted() {} + + @Override + public void onError(Throwable e) {} + + @Override + public void onNext(T args) { + onNext.call(args); + } + + }; + + + return create(OperationDoOnEach.doOnEach(this, observer)); + } + + /** + * Invokes an action for each element in the observable sequence. + * + * @param onNext + * The action to invoke for each element in the source sequence. + * @param onCompleted + * The action to invoke when the source sequence is completed. + * + * @return + * The source sequence with the side-effecting behavior applied. + * @see MSDN: Observable.Do + */ + public Observable doOnEach(final Action1 onNext, final Action0 onCompleted) { + Observer observer = new Observer() { + @Override + public void onCompleted() { + onCompleted.call(); + } + + @Override + public void onError(Throwable e) {} + + @Override + public void onNext(T args) { + onNext.call(args); + } + + }; + + + return create(OperationDoOnEach.doOnEach(this, observer)); + } + + /** + * Invokes an action for each element in the observable sequence. + * + * @param onNext + * The action to invoke for each element in the source sequence. + * @param onError + * The action to invoke when the source sequence calls onError. + * + * @return + * The source sequence with the side-effecting behavior applied. + * @see MSDN: Observable.Do + */ + public Observable doOnEach(final Action1 onNext, final Action1 onError) { + Observer observer = new Observer() { + @Override + public void onCompleted() {} + + @Override + public void onError(Throwable e) { + onError.call(e); + } + + @Override + public void onNext(T args) { + onNext.call(args); + } + + }; + + + return create(OperationDoOnEach.doOnEach(this, observer)); + } + + + /** + * Invokes an action for each element in the observable sequence. + * + * @param onNext + * The action to invoke for each element in the source sequence. + * @param onError + * The action to invoke when the source sequence calls onError. + * @param onCompleted + * The action to invoke when the source sequence is completed. + * + * @return + * The source sequence with the side-effecting behavior applied. + * @see MSDN: Observable.Do + */ + public Observable doOnEach(final Action1 onNext, final Action1 onError, final Action0 onCompleted) { + Observer observer = new Observer() { + @Override + public void onCompleted() { + onCompleted.call(); + } + + @Override + public void onError(Throwable e) { + onError.call(e); + } + + @Override + public void onNext(T args) { + onNext.call(args); + } + + }; + + + return create(OperationDoOnEach.doOnEach(this, observer)); + } + /** * Whether a given {@link Function} is an internal implementation inside rx.* packages or not. *

diff --git a/rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java b/rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java new file mode 100644 index 0000000000..1b0aafb578 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java @@ -0,0 +1,66 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import rx.Observable; +import rx.Observer; +import rx.Observable.OnSubscribeFunc; +import rx.Subscription; + +/** + * Converts the elements of an observable sequence to the specified type. + */ +public class OperationDoOnEach { + public static OnSubscribeFunc doOnEach(Observable sequence, Observer observer) { + return new DoOnEachObservable(sequence, observer); + } + + private static class DoOnEachObservable implements OnSubscribeFunc { + + private final Observable sequence; + private final Observer doOnEachObserver; + + public DoOnEachObservable(Observable sequence, Observer doOnEachObserver) { + this.sequence = sequence; + this.doOnEachObserver = doOnEachObserver; + } + + @Override + public Subscription onSubscribe(final Observer observer) { + final SafeObservableSubscription subscription = new SafeObservableSubscription(); + return subscription.wrap(sequence.subscribe(new SafeObserver(subscription, new Observer() { + @Override + public void onCompleted() { + doOnEachObserver.onCompleted(); + observer.onCompleted(); + } + + @Override + public void onError(Throwable e) { + doOnEachObserver.onError(e); + observer.onError(e); + } + + @Override + public void onNext(T value) { + doOnEachObserver.onNext(value); + observer.onNext(value); + } + }))); + } + + } +} \ No newline at end of file diff --git a/rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java b/rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java new file mode 100644 index 0000000000..6c1407ebea --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java @@ -0,0 +1,129 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import static rx.operators.OperationMap.*; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import rx.Observable; +import rx.Observer; +import rx.concurrency.Schedulers; +import rx.util.functions.Func1; +import rx.util.functions.Func2; +import rx.util.functions.Action1; + +public class OperationDoOnEachTest { + + @Mock + Observer subscribedObserver; + @Mock + Observer sideEffectObserver; + + @Before + public void before() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testDoOnEach() { + Observable base = Observable.from("a", "b", "c"); + Observable doOnEach = base.doOnEach(sideEffectObserver); + + doOnEach.subscribe(subscribedObserver); + + // ensure the leaf observer is still getting called + verify(subscribedObserver, never()).onError(any(Throwable.class)); + verify(subscribedObserver, times(1)).onNext("a"); + verify(subscribedObserver, times(1)).onNext("b"); + verify(subscribedObserver, times(1)).onNext("c"); + verify(subscribedObserver, times(1)).onCompleted(); + + // ensure our injected observer is getting called + verify(sideEffectObserver, never()).onError(any(Throwable.class)); + verify(sideEffectObserver, times(1)).onNext("a"); + verify(sideEffectObserver, times(1)).onNext("b"); + verify(sideEffectObserver, times(1)).onNext("c"); + verify(sideEffectObserver, times(1)).onCompleted(); + } + + + + @Test + public void testDoOnEachWithError() { + Observable base = Observable.from("one", "fail", "two", "three", "fail"); + Observable errs = base.map(new Func1() { + @Override + public String call(String s) { + if ("fail".equals(s)) { + throw new RuntimeException("Forced Failure"); + } + return s; + } + }); + + Observable doOnEach = errs.doOnEach(sideEffectObserver); + + + doOnEach.subscribe(subscribedObserver); + verify(subscribedObserver, times(1)).onNext("one"); + verify(subscribedObserver, never()).onNext("two"); + verify(subscribedObserver, never()).onNext("three"); + verify(subscribedObserver, never()).onCompleted(); + verify(subscribedObserver, times(1)).onError(any(Throwable.class)); + + + verify(sideEffectObserver, times(1)).onNext("one"); + verify(sideEffectObserver, never()).onNext("two"); + verify(sideEffectObserver, never()).onNext("three"); + verify(sideEffectObserver, never()).onCompleted(); + verify(sideEffectObserver, times(1)).onError(any(Throwable.class)); + } + + @Test + public void testDoOnEachWithErrorInCallback() { + Observable base = Observable.from("one", "two", "fail", "three"); + Observable doOnEach = base.doOnEach(new Action1() { + @Override + public void call(String s) { + if ("fail".equals(s)) { + throw new RuntimeException("Forced Failure"); + } + } + }); + + doOnEach.subscribe(subscribedObserver); + verify(subscribedObserver, times(1)).onNext("one"); + verify(subscribedObserver, times(1)).onNext("two"); + verify(subscribedObserver, never()).onNext("three"); + verify(subscribedObserver, never()).onCompleted(); + verify(subscribedObserver, times(1)).onError(any(Throwable.class)); + + } + +}