diff --git a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy index 8f50ec7c9e..9b2f86b807 100644 --- a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy +++ b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy @@ -197,6 +197,40 @@ def class ObservableTests { Observable.toSortedList(Observable.toObservable(1, 3, 2, 5, 4), {a, b -> a - b}).subscribe({ result -> a.received(result)}); verify(a, times(1)).received(Arrays.asList(1, 2, 3, 4, 5)); } + + @Test + public void testForEach() { + Observable.toObservable(1, 3, 2, 5, 4).forEach({ result -> a.received(result)}); + verify(a, times(1)).received(1); + verify(a, times(1)).received(3); + verify(a, times(1)).received(2); + verify(a, times(1)).received(5); + verify(a, times(1)).received(4); + } + + @Test + public void testForEachWithComplete() { + Observable.toObservable(1, 3, 2, 5, 4).forEach({ result -> a.received(result)}, {a.received('done')}); + verify(a, times(1)).received(1); + verify(a, times(1)).received(3); + verify(a, times(1)).received(2); + verify(a, times(1)).received(5); + verify(a, times(1)).received(4); + verify(a, times(1)).received("done"); + } + + @Test + public void testForEachWithCompleteAndError() { + Observable.toObservable(1, 3, 2, 5, 4).forEach({ result -> throw new RuntimeException('err')}, {a.received('done')}, {err -> a.received(err.message)}); + verify(a, times(0)).received(1); + verify(a, times(0)).received(3); + verify(a, times(0)).received(2); + verify(a, times(0)).received(5); + verify(a, times(0)).received(4); + verify(a, times(1)).received("err"); + verify(a, times(0)).received("done"); + } + def class TestFactory { int counter = 1; diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index b16753001d..8be5489094 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -34,6 +34,7 @@ import rx.operators.OperationConcat; import rx.operators.OperationFilter; +import rx.operators.OperationForEach; import rx.operators.OperationLast; import rx.operators.OperationMap; import rx.operators.OperationMaterialize; @@ -1781,6 +1782,122 @@ public R call(T0 t0, T1 t1, T2 t2, T3 t3) { }); } + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + */ + public static void forEach(final Observable sequence, final Action1 onNext) { + OperationForEach.forEach(sequence, onNext); + } + + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + */ + public static void forEach(final Observable sequence, final Object onNext) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(onNext); + OperationForEach.forEach(sequence, + new Action1() { + + @Override + public void call(T t1) { + _f.call(t1); + + } + }); + } + + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + * @param onCompleted + */ + public static void forEach(final Observable sequence, final Action1 onNext, final Action0 onCompleted) { + OperationForEach.forEach(sequence, onNext, onCompleted); + } + + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + * @param onCompleted + */ + public static void forEach(final Observable sequence, final Object onNext, final Object onCompleted) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(onNext); + @SuppressWarnings("rawtypes") + final FuncN _f2 = Functions.from(onCompleted); + OperationForEach.forEach(sequence, + new Action1() { + + @Override + public void call(T t1) { + _f.call(t1); + + } + }, new Action0() { + + @Override + public void call() { + _f2.call(); + } + }); + } + + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + * @param onCompleted + * @param onError + */ + public static void forEach(final Observable sequence, final Action1 onNext, final Action0 onCompleted, + final Action1 onError) { + OperationForEach.forEach(sequence, onNext, onCompleted, onError); + } + + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + * @param onCompleted + * @param onError + */ + public static void forEach(final Observable sequence, final Object onNext, final Object onCompleted, + final Object onError) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(onNext); + @SuppressWarnings("rawtypes") + final FuncN _f2 = Functions.from(onCompleted); + @SuppressWarnings("rawtypes") + final FuncN _f3 = Functions.from(onError); + OperationForEach.forEach(sequence, + new Action1() { + + @Override + public void call(T t1) { + _f.call(t1); + + } + }, new Action0() { + + @Override + public void call() { + _f2.call(); + } + }, new Action1() { + + @Override + public void call(Exception t1) { + _f3.call(t1); + } + }); + } + /** * Filters an Observable by discarding any of its emissions that do not meet some test. *

@@ -2303,6 +2420,66 @@ public Observable take(final int num) { } /** + * Invokes an action for each element in the sequence. + * + * @param onNext + */ + public void forEach(final Action1 onNext) { + forEach(this, onNext); + } + + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + */ + public void forEach(final Object onNext) { + forEach(this, onNext); + } + + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + * @param onCompleted + */ + public void forEach(final Action1 onNext, final Action0 onCompleted) { + forEach(this, onNext, onCompleted); + } + + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + * @param onCompleted + */ + public void forEach(final Object onNext, final Object onCompleted) { + forEach(this, onNext, onCompleted); + } + + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + * @param onCompleted + * @param onError + */ + public void forEach(final Action1 onNext, final Action0 onCompleted, final Action1 onError) { + forEach(this, onNext, onCompleted, onError); + } + + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + * @param onCompleted + * @param onError + */ + public void forEach(final Object onNext, final Object onCompleted, final Object onError) { + forEach(this, onNext, onCompleted, onError); + } + + /* * Returns an Observable that emits the last count items emitted by the source * Observable. * diff --git a/rxjava-core/src/main/java/rx/operators/OperationForEach.java b/rxjava-core/src/main/java/rx/operators/OperationForEach.java new file mode 100644 index 0000000000..fc696f8c47 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationForEach.java @@ -0,0 +1,207 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import org.junit.Test; +import rx.Observable; +import rx.Observer; +import rx.util.functions.Action0; +import rx.util.functions.Action1; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +public final class OperationForEach { + + /** + * Accepts a sequence and a action. Applies the action to each element in + * the sequence. + * + * @param sequence + * the input sequence. + * @param onNext + * a action to apply to each item in the sequence. + */ + public static void forEach(final Observable sequence, final Action1 onNext) { + forEach(sequence, onNext, null, null); + } + + /** + * Accepts a sequence and a action. Applies the action to each element in + * the sequence. + * + * @param sequence + * the input sequence. + * @param onNext + * a action to apply to each item in the sequence. + * @param onCompleted + * a action to run when sequence completes. + */ + public static void forEach(final Observable sequence, final Action1 onNext, final Action0 onCompleted) { + forEach(sequence, onNext, onCompleted, null); + } + + /** + * Accepts a sequence and a action. Applies the action to each element in + * the sequence. + * + * @param sequence + * the input sequence. + * @param onNext + * a action to apply to each item in the sequence. + * @param onCompleted + * a action to run when sequence completes. + * @param onError + * a action to run when an exception is thrown. + */ + public static void forEach(final Observable sequence, final Action1 onNext, final Action0 onCompleted, + final Action1 onError) { + ForEachObserver fe = new ForEachObserver(onNext, onCompleted, onError); + sequence.subscribe(fe); + } + + private static final class ForEachObserver implements Observer { + private final Action1 onNext; + private final Action0 onCompleted; + private final Action1 onError; + + private boolean running = true; + + private ForEachObserver(final Action1 onNext, final Action0 onCompleted, final Action1 onError) { + if (onNext == null) + throw new NullPointerException(); + this.onNext = onNext; + this.onCompleted = onCompleted; + this.onError = onError; + } + + @Override + public void onCompleted() { + if(running) { + running = false; + if (onCompleted != null) { + onCompleted.call(); + } + } + } + + @Override + public void onError(final Exception e) { + if(running) { + running = false; + if (onError != null) { + onError.call(e); + } + } + } + + @Override + public void onNext(final T args) { + if (running) { + try { + onNext.call(args); + } catch (Exception e) { + onError(e); + } + } + } + } + + public static class UnitTest { + + @Test + public void testForEach() { + Map m1 = getMap("One"); + Map m2 = getMap("Two"); + + Observable> observable = Observable.toObservable(m1, m2); + + final AtomicInteger counter = new AtomicInteger(); + forEach(observable, new Action1>() { + @Override + public void call(final Map stringStringMap) { + switch (counter.getAndIncrement()) { + case 0: + assertEquals("firstName doesn't match", "OneFirst", stringStringMap.get("firstName")); + assertEquals("lastName doesn't match", "OneLast", stringStringMap.get("lastName")); + break; + case 1: + assertEquals("firstName doesn't match", "TwoFirst", stringStringMap.get("firstName")); + assertEquals("lastName doesn't match", "TwoLast", stringStringMap.get("lastName")); + break; + default: + fail("Unknown increment"); + } + } + }); + assertEquals("Number of executions didn't match expected.", 2, counter.get()); + } + + @Test + public void testForEachEmptyObserver() { + Observable> observable = Observable.empty(); + + final AtomicInteger counter = new AtomicInteger(); + forEach(observable, new Action1>() { + @Override + public void call(final Map stringStringMap) { + counter.incrementAndGet(); + fail("Should not have called action"); + } + }); + assertEquals("Number of executions didn't match expected.", 0, counter.get()); + } + + @Test + public void testForEachWithException() { + Observable observable = Observable.toObservable(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + + final AtomicInteger counter = new AtomicInteger(); + final AtomicReference exception = new AtomicReference(); + forEach(observable, new Action1() { + @Override + public void call(final Integer integer) { + counter.incrementAndGet(); + if (integer.equals(5)) { + // fail half way through + throw new RuntimeException("testForEachWithException"); + } + } + }, null, new Action1() { + @Override + public void call(final Exception e) { + exception.set(e); + } + }); + assertEquals("Number of executions didn't match expected.", 5, counter.get()); + assertNotNull(exception.get()); + } + + private Map getMap(String prefix) { + Map m = new HashMap(); + m.put("firstName", prefix + "First"); + m.put("lastName", prefix + "Last"); + return m; + } + + } +}