From 0dadcdecaf4acc636835ae77b625a336aa5eb6b3 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 6 Feb 2015 09:42:00 +0100 Subject: [PATCH] FlatMap overloads with maximum concurrency parameter --- src/main/java/rx/Observable.java | 94 ++++++++++++- .../operators/OperatorFlatMapTest.java | 124 ++++++++++++++++-- 2 files changed, 207 insertions(+), 11 deletions(-) diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index d550a16a1b..f68aaf1915 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -4509,7 +4509,34 @@ public final Observable firstOrDefault(T defaultValue, Func1 Observable flatMap(Func1> func) { return merge(map(func)); } - + + /** + * Returns an Observable that emits items based on applying a function that you supply to each item emitted + * by the source Observable, where that function returns an Observable, and then merging those resulting + * Observables and emitting the results of this merger, while limiting the maximum number of concurrent + * subscriptions to these Observables. + *

+ * + *

+ *
Scheduler:
+ *
{@code flatMap} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param func + * a function that, when applied to an item emitted by the source Observable, returns an + * Observable + * @param maxConcurrent + * the maximum number of Observables that may be subscribed to concurrently + * @return an Observable that emits the result of applying the transformation function to each item emitted + * by the source Observable and merging the results of the Observables obtained from this + * transformation + * @see ReactiveX operators documentation: FlatMap + */ + @Beta + public final Observable flatMap(Func1> func, int maxConcurrent) { + return merge(map(func), maxConcurrent); + } + /** * Returns an Observable that applies a function to each item emitted or notification raised by the source * Observable and then flattens the Observables returned from these functions and emits the resulting items. @@ -4540,6 +4567,40 @@ public final Observable flatMap( Func0> onCompleted) { return merge(mapNotification(onNext, onError, onCompleted)); } + /** + * Returns an Observable that applies a function to each item emitted or notification raised by the source + * Observable and then flattens the Observables returned from these functions and emits the resulting items, + * while limiting the maximum number of concurrent subscriptions to these Observables. + *

+ * + *

+ *
Scheduler:
+ *
{@code flatMap} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param + * the result type + * @param onNext + * a function that returns an Observable to merge for each item emitted by the source Observable + * @param onError + * a function that returns an Observable to merge for an onError notification from the source + * Observable + * @param onCompleted + * a function that returns an Observable to merge for an onCompleted notification from the source + * Observable + * @param maxConcurrent + * the maximum number of Observables that may be subscribed to concurrently + * @return an Observable that emits the results of merging the Observables returned from applying the + * specified functions to the emissions and notifications of the source Observable + * @see ReactiveX operators documentation: FlatMap + */ + @Beta + public final Observable flatMap( + Func1> onNext, + Func1> onError, + Func0> onCompleted, int maxConcurrent) { + return merge(mapNotification(onNext, onError, onCompleted), maxConcurrent); + } /** * Returns an Observable that emits the results of a specified function to the pair of values emitted by the @@ -4568,6 +4629,37 @@ public final Observable flatMap(final Func1 resultSelector) { return merge(lift(new OperatorMapPair(collectionSelector, resultSelector))); } + /** + * Returns an Observable that emits the results of a specified function to the pair of values emitted by the + * source Observable and a specified collection Observable, while limiting the maximum number of concurrent + * subscriptions to these Observables. + *

+ * + *

+ *
Scheduler:
+ *
{@code flatMap} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param + * the type of items emitted by the collection Observable + * @param + * the type of items emitted by the resulting Observable + * @param collectionSelector + * a function that returns an Observable for each item emitted by the source Observable + * @param resultSelector + * a function that combines one item emitted by each of the source and collection Observables and + * returns an item to be emitted by the resulting Observable + * @param maxConcurrent + * the maximum number of Observables that may be subscribed to concurrently + * @return an Observable that emits the results of applying a function to a pair of values emitted by the + * source Observable and the collection Observable + * @see ReactiveX operators documentation: FlatMap + */ + @Beta + public final Observable flatMap(final Func1> collectionSelector, + final Func2 resultSelector, int maxConcurrent) { + return merge(lift(new OperatorMapPair(collectionSelector, resultSelector)), maxConcurrent); + } /** * Returns an Observable that merges each item emitted by the source Observable with the values in an diff --git a/src/test/java/rx/internal/operators/OperatorFlatMapTest.java b/src/test/java/rx/internal/operators/OperatorFlatMapTest.java index 3a696a0c93..a4635f1512 100644 --- a/src/test/java/rx/internal/operators/OperatorFlatMapTest.java +++ b/src/test/java/rx/internal/operators/OperatorFlatMapTest.java @@ -16,22 +16,20 @@ package rx.internal.operators; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; -import java.util.Arrays; -import java.util.List; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Test; +import org.junit.*; import rx.Observable; import rx.Observer; import rx.exceptions.TestException; -import rx.functions.Func0; -import rx.functions.Func1; -import rx.functions.Func2; +import rx.functions.*; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; public class OperatorFlatMapTest { @Test @@ -312,4 +310,110 @@ public void testFlatMapTransformsMergeException() { verify(o, never()).onNext(any()); verify(o, never()).onCompleted(); } + + private static Observable compose(Observable source, final AtomicInteger subscriptionCount, final int m) { + return source.doOnSubscribe(new Action0() { + @Override + public void call() { + if (subscriptionCount.getAndIncrement() >= m) { + Assert.fail("Too many subscriptions! " + subscriptionCount.get()); + } + } + }).doOnCompleted(new Action0() { + @Override + public void call() { + if (subscriptionCount.decrementAndGet() < 0) { + Assert.fail("Too many unsubscriptionss! " + subscriptionCount.get()); + } + } + }); + } + + @Test + public void testFlatMapMaxConcurrent() { + final int m = 4; + final AtomicInteger subscriptionCount = new AtomicInteger(); + Observable source = Observable.range(1, 10).flatMap(new Func1>() { + @Override + public Observable call(Integer t1) { + return compose(Observable.range(t1 * 10, 2), subscriptionCount, m) + .subscribeOn(Schedulers.computation()); + } + }, m); + + TestSubscriber ts = new TestSubscriber(); + + source.subscribe(ts); + + ts.awaitTerminalEvent(); + ts.assertNoErrors(); + Set expected = new HashSet(Arrays.asList( + 10, 11, 20, 21, 30, 31, 40, 41, 50, 51, 60, 61, 70, 71, 80, 81, 90, 91, 100, 101 + )); + Assert.assertEquals(expected.size(), ts.getOnNextEvents().size()); + Assert.assertTrue(expected.containsAll(ts.getOnNextEvents())); + } + @Test + public void testFlatMapSelectorMaxConcurrent() { + final int m = 4; + final AtomicInteger subscriptionCount = new AtomicInteger(); + Observable source = Observable.range(1, 10).flatMap(new Func1>() { + @Override + public Observable call(Integer t1) { + return compose(Observable.range(t1 * 10, 2), subscriptionCount, m) + .subscribeOn(Schedulers.computation()); + } + }, new Func2() { + @Override + public Integer call(Integer t1, Integer t2) { + return t1 * 1000 + t2; + } + }, m); + + TestSubscriber ts = new TestSubscriber(); + + source.subscribe(ts); + + ts.awaitTerminalEvent(); + ts.assertNoErrors(); + Set expected = new HashSet(Arrays.asList( + 1010, 1011, 2020, 2021, 3030, 3031, 4040, 4041, 5050, 5051, + 6060, 6061, 7070, 7071, 8080, 8081, 9090, 9091, 10100, 10101 + )); + Assert.assertEquals(expected.size(), ts.getOnNextEvents().size()); + System.out.println("--> testFlatMapSelectorMaxConcurrent: " + ts.getOnNextEvents()); + Assert.assertTrue(expected.containsAll(ts.getOnNextEvents())); + } + @Test + public void testFlatMapTransformsMaxConcurrentNormal() { + final int m = 2; + final AtomicInteger subscriptionCount = new AtomicInteger(); + Observable onNext = + compose(Observable.from(Arrays.asList(1, 2, 3)).observeOn(Schedulers.computation()), subscriptionCount, m) + .subscribeOn(Schedulers.computation()); + Observable onCompleted = compose(Observable.from(Arrays.asList(4)), subscriptionCount, m) + .subscribeOn(Schedulers.computation()); + Observable onError = Observable.from(Arrays.asList(5)); + + Observable source = Observable.from(Arrays.asList(10, 20, 30)); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + TestSubscriber ts = new TestSubscriber(o); + + source.flatMap(just(onNext), just(onError), just0(onCompleted), m).subscribe(ts); + + ts.awaitTerminalEvent(1, TimeUnit.SECONDS); + ts.assertNoErrors(); + ts.assertTerminalEvent(); + + verify(o, times(3)).onNext(1); + verify(o, times(3)).onNext(2); + verify(o, times(3)).onNext(3); + verify(o).onNext(4); + verify(o).onCompleted(); + + verify(o, never()).onNext(5); + verify(o, never()).onError(any(Throwable.class)); + } }