From 30f855b021a8feb673a34cda2c5022c8f74e9541 Mon Sep 17 00:00:00 2001 From: dcapwell Date: Thu, 7 Feb 2013 22:32:10 -0800 Subject: [PATCH 1/4] Added first draft of forEach operator --- rxjava-core/src/main/java/rx/Observable.java | 62 ++++++ .../java/rx/operators/OperationForEach.java | 203 ++++++++++++++++++ 2 files changed, 265 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationForEach.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 83f49dc512..e049ec0ac8 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -34,6 +34,7 @@ import rx.operators.OperationConcat; import rx.operators.OperationFilter; +import rx.operators.OperationForEach; import rx.operators.OperationLast; import rx.operators.OperationMap; import rx.operators.OperationMaterialize; @@ -1692,6 +1693,37 @@ public R call(T0 t0, T1 t1, T2 t2, T3 t3) { }); } + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + */ + public static void forEach(final Observable sequence, final Action1 onNext) { + OperationForEach.forEach(sequence, onNext); + } + + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + * @param onCompleted + */ + public static void forEach(final Observable sequence, final Action1 onNext, final Action0 onCompleted) { + OperationForEach.forEach(sequence, onNext, onCompleted); + } + + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + * @param onCompleted + * @param onError + */ + public static void forEach(final Observable sequence, final Action1 onNext, final Action0 onCompleted, + final Action1 onError) { + OperationForEach.forEach(sequence, onNext, onCompleted, onError); + } + /** * Filters an Observable by discarding any of its emissions that do not meet some test. *

@@ -2213,6 +2245,36 @@ public Observable take(final int num) { return take(this, num); } + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + */ + public void forEach(final Action1 onNext) { + forEach(this, onNext); + } + + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + * @param onCompleted + */ + public void forEach(final Action1 onNext, final Action0 onCompleted) { + forEach(this, onNext, onCompleted); + } + + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + * @param onCompleted + * @param onError + */ + public void forEach(final Action1 onNext, final Action0 onCompleted, final Action1 onError) { + forEach(this, onNext, onCompleted, onError); + } + /** * Returns an Observable that emits a single item, a list composed of all the items emitted by * the source Observable. diff --git a/rxjava-core/src/main/java/rx/operators/OperationForEach.java b/rxjava-core/src/main/java/rx/operators/OperationForEach.java new file mode 100644 index 0000000000..2a3c63eb53 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationForEach.java @@ -0,0 +1,203 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import org.junit.Test; +import rx.Observable; +import rx.Observer; +import rx.util.functions.Action0; +import rx.util.functions.Action1; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +public final class OperationForEach { + + /** + * Accepts a sequence and a action. Applies the action to each element in + * the sequence. + * + * @param sequence + * the input sequence. + * @param onNext + * a action to apply to each item in the sequence. + */ + public static void forEach(final Observable sequence, final Action1 onNext) { + forEach(sequence, onNext, null, null); + } + + /** + * Accepts a sequence and a action. Applies the action to each element in + * the sequence. + * + * @param sequence + * the input sequence. + * @param onNext + * a action to apply to each item in the sequence. + * @param onCompleted + * a action to run when sequence completes. + */ + public static void forEach(final Observable sequence, final Action1 onNext, final Action0 onCompleted) { + forEach(sequence, onNext, onCompleted, null); + } + + /** + * Accepts a sequence and a action. Applies the action to each element in + * the sequence. + * + * @param sequence + * the input sequence. + * @param onNext + * a action to apply to each item in the sequence. + * @param onCompleted + * a action to run when sequence completes. + * @param onError + * a action to run when an exception is thrown. + */ + public static void forEach(final Observable sequence, final Action1 onNext, final Action0 onCompleted, + final Action1 onError) { + ForEachObserver fe = new ForEachObserver(onNext, onCompleted, onError); + sequence.subscribe(fe); + } + + private static final class ForEachObserver implements Observer { + private final Action1 onNext; + private final Action0 onCompleted; + private final Action1 onError; + + private boolean running = true; + + private ForEachObserver(final Action1 onNext, final Action0 onCompleted, final Action1 onError) { + if (onNext == null) + throw new NullPointerException(); + this.onNext = onNext; + this.onCompleted = onCompleted; + this.onError = onError; + } + + @Override + public void onCompleted() { + running = false; + if (onCompleted != null) { + onCompleted.call(); + } + } + + @Override + public void onError(final Exception e) { + running = false; + if (onError != null) { + onError.call(e); + } + } + + @Override + public void onNext(final T args) { + if (running) { + try { + onNext.call(args); + } catch (Exception e) { + onError(e); + } + } + } + } + + public static class UnitTest { + + @Test + public void testForEach() { + Map m1 = getMap("One"); + Map m2 = getMap("Two"); + + Observable> observable = Observable.toObservable(m1, m2); + + final AtomicInteger counter = new AtomicInteger(); + forEach(observable, new Action1>() { + @Override + public void call(final Map stringStringMap) { + switch (counter.getAndIncrement()) { + case 0: + assertEquals("firstName doesn't match", "OneFirst", stringStringMap.get("firstName")); + assertEquals("lastName doesn't match", "OneLast", stringStringMap.get("lastName")); + break; + case 1: + assertEquals("firstName doesn't match", "TwoFirst", stringStringMap.get("firstName")); + assertEquals("lastName doesn't match", "TwoLast", stringStringMap.get("lastName")); + break; + default: + fail("Unknown increment"); + } + } + }); + assertEquals("Number of executions didn't match expected.", 2, counter.get()); + } + + @Test + public void testForEachEmptyObserver() { + Observable> observable = Observable.empty(); + + final AtomicInteger counter = new AtomicInteger(); + forEach(observable, new Action1>() { + @Override + public void call(final Map stringStringMap) { + counter.incrementAndGet(); + fail("Should not have called action"); + } + }); + assertEquals("Number of executions didn't match expected.", 0, counter.get()); + } + + @Test + public void testForEachWithException() { + Observable observable = Observable.toObservable(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + + final AtomicInteger counter = new AtomicInteger(); + final AtomicReference exception = new AtomicReference(); + forEach(observable, new Action1() { + @Override + public void call(final Integer integer) { + counter.incrementAndGet(); + if (integer.equals(5)) { + // fail half way through + throw new RuntimeException("testForEachWithException"); + } + } + }, null, new Action1() { + @Override + public void call(final Exception e) { + exception.set(e); + } + }); + assertEquals("Number of executions didn't match expected.", 5, counter.get()); + assertNotNull(exception.get()); + } + + private Map getMap(String prefix) { + Map m = new HashMap(); + m.put("firstName", prefix + "First"); + m.put("lastName", prefix + "Last"); + return m; + } + + } +} From 09079f84f6e3a57344b1eb1e332067eda1589c45 Mon Sep 17 00:00:00 2001 From: dcapwell Date: Fri, 8 Feb 2013 00:33:39 -0800 Subject: [PATCH 2/4] added Object to each forEach method to validate that it works properly with groovy --- .../java/rx/lang/groovy/GroovyAdaptor.java | 10 ++ rxjava-core/src/main/java/rx/Observable.java | 115 ++++++++++++++++++ .../java/rx/operators/OperationForEach.java | 6 +- 3 files changed, 128 insertions(+), 3 deletions(-) diff --git a/language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/GroovyAdaptor.java b/language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/GroovyAdaptor.java index 1aba69ff93..9c303e9d13 100644 --- a/language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/GroovyAdaptor.java +++ b/language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/GroovyAdaptor.java @@ -201,6 +201,16 @@ public void testToSortedListWithFunctionStatic() { runGroovyScript("o.toSortedList(o.toObservable(1, 3, 2, 5, 4), {a, b -> a - b}).subscribe({ result -> a.received(result)});"); verify(assertion, times(1)).received(Arrays.asList(1, 2, 3, 4, 5)); } + + @Test + public void testForEach() { + runGroovyScript("o.toObservable(1, 3, 2, 5, 4).forEach({ result -> a.received(result)});"); + verify(assertion, times(1)).received(1); + verify(assertion, times(1)).received(3); + verify(assertion, times(1)).received(2); + verify(assertion, times(1)).received(5); + verify(assertion, times(1)).received(4); + } private void runGroovyScript(String script) { ClassLoader parent = getClass().getClassLoader(); diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index e049ec0ac8..a66f2a016d 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -1701,6 +1701,25 @@ public R call(T0 t0, T1 t1, T2 t2, T3 t3) { public static void forEach(final Observable sequence, final Action1 onNext) { OperationForEach.forEach(sequence, onNext); } + + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + */ + public static void forEach(final Observable sequence, final Object onNext) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(onNext); + OperationForEach.forEach(sequence, + new Action1() { + + @Override + public void call(T t1) { + _f.call(t1); + + } + }); + } /** * Invokes an action for each element in the sequence. @@ -1711,6 +1730,34 @@ public static void forEach(final Observable sequence, final Action1 on public static void forEach(final Observable sequence, final Action1 onNext, final Action0 onCompleted) { OperationForEach.forEach(sequence, onNext, onCompleted); } + + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + * @param onCompleted + */ + public static void forEach(final Observable sequence, final Object onNext, final Object onCompleted) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(onNext); + @SuppressWarnings("rawtypes") + final FuncN _f2 = Functions.from(onCompleted); + OperationForEach.forEach(sequence, + new Action1() { + + @Override + public void call(T t1) { + _f.call(t1); + + } + }, new Action0() { + + @Override + public void call() { + _f2.call(); + } + }); + } /** * Invokes an action for each element in the sequence. @@ -1723,6 +1770,44 @@ public static void forEach(final Observable sequence, final Action1 on final Action1 onError) { OperationForEach.forEach(sequence, onNext, onCompleted, onError); } + + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + * @param onCompleted + * @param onError + */ + public static void forEach(final Observable sequence, final Object onNext, final Object onCompleted, + final Object onError) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(onNext); + @SuppressWarnings("rawtypes") + final FuncN _f2 = Functions.from(onCompleted); + @SuppressWarnings("rawtypes") + final FuncN _f3 = Functions.from(onError); + OperationForEach.forEach(sequence, + new Action1() { + + @Override + public void call(T t1) { + _f.call(t1); + + } + }, new Action0() { + + @Override + public void call() { + _f2.call(); + } + }, new Action1() { + + @Override + public void call(Exception t1) { + _f3.call(t1); + } + }); + } /** * Filters an Observable by discarding any of its emissions that do not meet some test. @@ -2253,6 +2338,15 @@ public Observable take(final int num) { public void forEach(final Action1 onNext) { forEach(this, onNext); } + + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + */ + public void forEach(final Object onNext) { + forEach(this, onNext); + } /** * Invokes an action for each element in the sequence. @@ -2263,6 +2357,16 @@ public void forEach(final Action1 onNext) { public void forEach(final Action1 onNext, final Action0 onCompleted) { forEach(this, onNext, onCompleted); } + + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + * @param onCompleted + */ + public void forEach(final Object onNext, final Object onCompleted) { + forEach(this, onNext, onCompleted); + } /** * Invokes an action for each element in the sequence. @@ -2274,6 +2378,17 @@ public void forEach(final Action1 onNext, final Action0 onCompleted) { public void forEach(final Action1 onNext, final Action0 onCompleted, final Action1 onError) { forEach(this, onNext, onCompleted, onError); } + + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + * @param onCompleted + * @param onError + */ + public void forEach(final Object onNext, final Object onCompleted, final Object onError) { + forEach(this, onNext, onCompleted, onError); + } /** * Returns an Observable that emits a single item, a list composed of all the items emitted by diff --git a/rxjava-core/src/main/java/rx/operators/OperationForEach.java b/rxjava-core/src/main/java/rx/operators/OperationForEach.java index 2a3c63eb53..e9b229b31f 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationForEach.java +++ b/rxjava-core/src/main/java/rx/operators/OperationForEach.java @@ -35,7 +35,7 @@ public final class OperationForEach { /** * Accepts a sequence and a action. Applies the action to each element in * the sequence. - * + * * @param sequence * the input sequence. * @param onNext @@ -48,7 +48,7 @@ public static void forEach(final Observable sequence, final Action1 on /** * Accepts a sequence and a action. Applies the action to each element in * the sequence. - * + * * @param sequence * the input sequence. * @param onNext @@ -63,7 +63,7 @@ public static void forEach(final Observable sequence, final Action1 on /** * Accepts a sequence and a action. Applies the action to each element in * the sequence. - * + * * @param sequence * the input sequence. * @param onNext From 30fffbd569a173e2c0153ad8dff967f6fbb67edf Mon Sep 17 00:00:00 2001 From: dcapwell Date: Fri, 8 Feb 2013 00:42:07 -0800 Subject: [PATCH 3/4] added a groovy test for each overloaded forEach method. Fixed a bug in forEach where onError and onCompleted could both be called. --- .../java/rx/lang/groovy/GroovyAdaptor.java | 23 +++++++++++++++++++ .../java/rx/operators/OperationForEach.java | 16 ++++++++----- 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/GroovyAdaptor.java b/language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/GroovyAdaptor.java index 9c303e9d13..c0e1ab4dac 100644 --- a/language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/GroovyAdaptor.java +++ b/language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/GroovyAdaptor.java @@ -211,6 +211,29 @@ public void testForEach() { verify(assertion, times(1)).received(5); verify(assertion, times(1)).received(4); } + + @Test + public void testForEachWithComplete() { + runGroovyScript("o.toObservable(1, 3, 2, 5, 4).forEach({ result -> a.received(result)}, {a.received('done')});"); + verify(assertion, times(1)).received(1); + verify(assertion, times(1)).received(3); + verify(assertion, times(1)).received(2); + verify(assertion, times(1)).received(5); + verify(assertion, times(1)).received(4); + verify(assertion, times(1)).received("done"); + } + + @Test + public void testForEachWithCompleteAndError() { + runGroovyScript("o.toObservable(1, 3, 2, 5, 4).forEach({ result -> throw new RuntimeException('err')}, {a.received('done')}, {err -> a.received(err.message)});"); + verify(assertion, times(0)).received(1); + verify(assertion, times(0)).received(3); + verify(assertion, times(0)).received(2); + verify(assertion, times(0)).received(5); + verify(assertion, times(0)).received(4); + verify(assertion, times(1)).received("err"); + verify(assertion, times(0)).received("done"); + } private void runGroovyScript(String script) { ClassLoader parent = getClass().getClassLoader(); diff --git a/rxjava-core/src/main/java/rx/operators/OperationForEach.java b/rxjava-core/src/main/java/rx/operators/OperationForEach.java index e9b229b31f..fc696f8c47 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationForEach.java +++ b/rxjava-core/src/main/java/rx/operators/OperationForEach.java @@ -96,17 +96,21 @@ private ForEachObserver(final Action1 onNext, final Action0 onCompleted, fina @Override public void onCompleted() { - running = false; - if (onCompleted != null) { - onCompleted.call(); + if(running) { + running = false; + if (onCompleted != null) { + onCompleted.call(); + } } } @Override public void onError(final Exception e) { - running = false; - if (onError != null) { - onError.call(e); + if(running) { + running = false; + if (onError != null) { + onError.call(e); + } } } From c252fafae53c87a518e904f512d03b7228ec7d86 Mon Sep 17 00:00:00 2001 From: dcapwell Date: Tue, 12 Feb 2013 20:49:38 -0800 Subject: [PATCH 4/4] readded groovy tests for foreach --- .../java/rx/lang/groovy/GroovyAdaptor.java | 31 +++++++++++++++++ .../rx/lang/groovy/ObservableTests.groovy | 34 +++++++++++++++++++ 2 files changed, 65 insertions(+) create mode 100644 language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/GroovyAdaptor.java diff --git a/language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/GroovyAdaptor.java b/language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/GroovyAdaptor.java new file mode 100644 index 0000000000..70cef9c18e --- /dev/null +++ b/language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/GroovyAdaptor.java @@ -0,0 +1,31 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.groovy; + +import groovy.lang.Closure; +import rx.util.functions.FunctionLanguageAdaptor; + +public class GroovyAdaptor implements FunctionLanguageAdaptor { + + @Override + public Object call(Object function, Object[] args) { + return ((Closure) function).call(args); + } + + public Class[] getFunctionClass() { + return new Class[] { Closure.class }; + } +} diff --git a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy index 8f50ec7c9e..9b2f86b807 100644 --- a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy +++ b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy @@ -197,6 +197,40 @@ def class ObservableTests { Observable.toSortedList(Observable.toObservable(1, 3, 2, 5, 4), {a, b -> a - b}).subscribe({ result -> a.received(result)}); verify(a, times(1)).received(Arrays.asList(1, 2, 3, 4, 5)); } + + @Test + public void testForEach() { + Observable.toObservable(1, 3, 2, 5, 4).forEach({ result -> a.received(result)}); + verify(a, times(1)).received(1); + verify(a, times(1)).received(3); + verify(a, times(1)).received(2); + verify(a, times(1)).received(5); + verify(a, times(1)).received(4); + } + + @Test + public void testForEachWithComplete() { + Observable.toObservable(1, 3, 2, 5, 4).forEach({ result -> a.received(result)}, {a.received('done')}); + verify(a, times(1)).received(1); + verify(a, times(1)).received(3); + verify(a, times(1)).received(2); + verify(a, times(1)).received(5); + verify(a, times(1)).received(4); + verify(a, times(1)).received("done"); + } + + @Test + public void testForEachWithCompleteAndError() { + Observable.toObservable(1, 3, 2, 5, 4).forEach({ result -> throw new RuntimeException('err')}, {a.received('done')}, {err -> a.received(err.message)}); + verify(a, times(0)).received(1); + verify(a, times(0)).received(3); + verify(a, times(0)).received(2); + verify(a, times(0)).received(5); + verify(a, times(0)).received(4); + verify(a, times(1)).received("err"); + verify(a, times(0)).received("done"); + } + def class TestFactory { int counter = 1;