Skip to content

Commit

Permalink
Merge pull request ReactiveX#479 from nullstyle/add_doOnEach
Browse files Browse the repository at this point in the history
Adds doOnEach operator
  • Loading branch information
benjchristensen committed Nov 12, 2013
2 parents 86053d2 + b575042 commit 3e50245
Show file tree
Hide file tree
Showing 4 changed files with 366 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1807,6 +1807,28 @@ class Observable[+T] private[scala] (val asJava: rx.Observable[_ <: T])
def withFilter(p: T => Boolean): WithFilter[T] = {
new WithFilter[T](p, asJava)
}


def doOnEach(observer: Observer[T]): Observable[T] = {
Observable[T](asJava.doOnEach(observer))
}

def doOnEach(onNext: T => Unit): Observable[T] = {
Observable[T](asJava.doOnEach(onNext))
}

def doOnEach(onNext: T => Unit, onComplete: () => Unit): Observable[T] = {
Observable[T](asJava.doOnEach(onNext, onComplete))
}

def doOnEach(onNext: T => Unit, onError: Throwable => Unit): Observable[T] = {
Observable[T](asJava.doOnEach(onNext, onError))
}

def doOnEach(onNext: T => Unit, onError: Throwable => Unit, onComplete: () => Unit): Observable[T] = {
Observable[T](asJava.doOnEach(onNext, onError, onComplete))
}


}

Expand Down
149 changes: 149 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import rx.operators.OperationDematerialize;
import rx.operators.OperationDistinct;
import rx.operators.OperationDistinctUntilChanged;
import rx.operators.OperationDoOnEach;
import rx.operators.OperationElementAt;
import rx.operators.OperationFilter;
import rx.operators.OperationFinally;
Expand Down Expand Up @@ -4961,6 +4962,154 @@ public static <T> Observable<T> amb(Iterable<? extends Observable<? extends T>>
return create(OperationAmb.amb(sources));
}


/**
* Invokes an action for each element in the observable sequence.
*
* @param observer
* The action to invoke for each element in the source sequence.
*
* @return
* The source sequence with the side-effecting behavior applied.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229307(v=vs.103).aspx">MSDN: Observable.Do</a>
*/
public Observable<T> doOnEach(Observer<? super T> observer) {
return create(OperationDoOnEach.doOnEach(this, observer));
}

/**
* Invokes an action for each element in the observable sequence.
*
* @param onNext
* The action to invoke for each element in the source sequence.
*
* @return
* The source sequence with the side-effecting behavior applied.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229804(v=vs.103).aspx">MSDN: Observable.Do</a>
*/
public Observable<T> doOnEach(final Action1<T> onNext) {
Observer<T> observer = new Observer<T>() {
@Override
public void onCompleted() {}

@Override
public void onError(Throwable e) {}

@Override
public void onNext(T args) {
onNext.call(args);
}

};


return create(OperationDoOnEach.doOnEach(this, observer));
}

/**
* Invokes an action for each element in the observable sequence.
*
* @param onNext
* The action to invoke for each element in the source sequence.
* @param onCompleted
* The action to invoke when the source sequence is completed.
*
* @return
* The source sequence with the side-effecting behavior applied.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229659(v=vs.103).aspx">MSDN: Observable.Do</a>
*/
public Observable<T> doOnEach(final Action1<T> onNext, final Action0 onCompleted) {
Observer<T> observer = new Observer<T>() {
@Override
public void onCompleted() {
onCompleted.call();
}

@Override
public void onError(Throwable e) {}

@Override
public void onNext(T args) {
onNext.call(args);
}

};


return create(OperationDoOnEach.doOnEach(this, observer));
}

/**
* Invokes an action for each element in the observable sequence.
*
* @param onNext
* The action to invoke for each element in the source sequence.
* @param onError
* The action to invoke when the source sequence calls onError.
*
* @return
* The source sequence with the side-effecting behavior applied.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229539(v=vs.103).aspx">MSDN: Observable.Do</a>
*/
public Observable<T> doOnEach(final Action1<T> onNext, final Action1<Throwable> onError) {
Observer<T> observer = new Observer<T>() {
@Override
public void onCompleted() {}

@Override
public void onError(Throwable e) {
onError.call(e);
}

@Override
public void onNext(T args) {
onNext.call(args);
}

};


return create(OperationDoOnEach.doOnEach(this, observer));
}


/**
* Invokes an action for each element in the observable sequence.
*
* @param onNext
* The action to invoke for each element in the source sequence.
* @param onError
* The action to invoke when the source sequence calls onError.
* @param onCompleted
* The action to invoke when the source sequence is completed.
*
* @return
* The source sequence with the side-effecting behavior applied.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229830(v=vs.103).aspx">MSDN: Observable.Do</a>
*/
public Observable<T> doOnEach(final Action1<T> onNext, final Action1<Throwable> onError, final Action0 onCompleted) {
Observer<T> observer = new Observer<T>() {
@Override
public void onCompleted() {
onCompleted.call();
}

@Override
public void onError(Throwable e) {
onError.call(e);
}

@Override
public void onNext(T args) {
onNext.call(args);
}

};


return create(OperationDoOnEach.doOnEach(this, observer));
}

/**
* Whether a given {@link Function} is an internal implementation inside rx.* packages or not.
* <p>
Expand Down
66 changes: 66 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import rx.Observable;
import rx.Observer;
import rx.Observable.OnSubscribeFunc;
import rx.Subscription;

/**
* Converts the elements of an observable sequence to the specified type.
*/
public class OperationDoOnEach {
public static <T> OnSubscribeFunc<T> doOnEach(Observable<? extends T> sequence, Observer<? super T> observer) {
return new DoOnEachObservable<T>(sequence, observer);
}

private static class DoOnEachObservable<T> implements OnSubscribeFunc<T> {

private final Observable<? extends T> sequence;
private final Observer<? super T> doOnEachObserver;

public DoOnEachObservable(Observable<? extends T> sequence, Observer<? super T> doOnEachObserver) {
this.sequence = sequence;
this.doOnEachObserver = doOnEachObserver;
}

@Override
public Subscription onSubscribe(final Observer<? super T> observer) {
final SafeObservableSubscription subscription = new SafeObservableSubscription();
return subscription.wrap(sequence.subscribe(new SafeObserver<T>(subscription, new Observer<T>() {
@Override
public void onCompleted() {
doOnEachObserver.onCompleted();
observer.onCompleted();
}

@Override
public void onError(Throwable e) {
doOnEachObserver.onError(e);
observer.onError(e);
}

@Override
public void onNext(T value) {
doOnEachObserver.onNext(value);
observer.onNext(value);
}
})));
}

}
}
129 changes: 129 additions & 0 deletions rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static rx.operators.OperationMap.*;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import rx.Observable;
import rx.Observer;
import rx.concurrency.Schedulers;
import rx.util.functions.Func1;
import rx.util.functions.Func2;
import rx.util.functions.Action1;

public class OperationDoOnEachTest {

@Mock
Observer<String> subscribedObserver;
@Mock
Observer<String> sideEffectObserver;

@Before
public void before() {
MockitoAnnotations.initMocks(this);
}

@Test
public void testDoOnEach() {
Observable<String> base = Observable.from("a", "b", "c");
Observable<String> doOnEach = base.doOnEach(sideEffectObserver);

doOnEach.subscribe(subscribedObserver);

// ensure the leaf observer is still getting called
verify(subscribedObserver, never()).onError(any(Throwable.class));
verify(subscribedObserver, times(1)).onNext("a");
verify(subscribedObserver, times(1)).onNext("b");
verify(subscribedObserver, times(1)).onNext("c");
verify(subscribedObserver, times(1)).onCompleted();

// ensure our injected observer is getting called
verify(sideEffectObserver, never()).onError(any(Throwable.class));
verify(sideEffectObserver, times(1)).onNext("a");
verify(sideEffectObserver, times(1)).onNext("b");
verify(sideEffectObserver, times(1)).onNext("c");
verify(sideEffectObserver, times(1)).onCompleted();
}



@Test
public void testDoOnEachWithError() {
Observable<String> base = Observable.from("one", "fail", "two", "three", "fail");
Observable<String> errs = base.map(new Func1<String, String>() {
@Override
public String call(String s) {
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
return s;
}
});

Observable<String> doOnEach = errs.doOnEach(sideEffectObserver);


doOnEach.subscribe(subscribedObserver);
verify(subscribedObserver, times(1)).onNext("one");
verify(subscribedObserver, never()).onNext("two");
verify(subscribedObserver, never()).onNext("three");
verify(subscribedObserver, never()).onCompleted();
verify(subscribedObserver, times(1)).onError(any(Throwable.class));


verify(sideEffectObserver, times(1)).onNext("one");
verify(sideEffectObserver, never()).onNext("two");
verify(sideEffectObserver, never()).onNext("three");
verify(sideEffectObserver, never()).onCompleted();
verify(sideEffectObserver, times(1)).onError(any(Throwable.class));
}

@Test
public void testDoOnEachWithErrorInCallback() {
Observable<String> base = Observable.from("one", "two", "fail", "three");
Observable<String> doOnEach = base.doOnEach(new Action1<String>() {
@Override
public void call(String s) {
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
}
});

doOnEach.subscribe(subscribedObserver);
verify(subscribedObserver, times(1)).onNext("one");
verify(subscribedObserver, times(1)).onNext("two");
verify(subscribedObserver, never()).onNext("three");
verify(subscribedObserver, never()).onCompleted();
verify(subscribedObserver, times(1)).onError(any(Throwable.class));

}

}

0 comments on commit 3e50245

Please sign in to comment.