Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FlatMap overloads with maximum concurrency parameter #2627

Merged
merged 1 commit into from
Feb 11, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 93 additions & 1 deletion src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4509,7 +4509,34 @@ public final Observable<T> firstOrDefault(T defaultValue, Func1<? super T, Boole
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
return merge(map(func));
}


/**
* Returns an Observable that emits items based on applying a function that you supply to each item emitted
* by the source Observable, where that function returns an Observable, and then merging those resulting
* Observables and emitting the results of this merger, while limiting the maximum number of concurrent
* subscriptions to these Observables.
* <p>
* <!-- <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flatMap.png" alt=""> -->
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param func
* a function that, when applied to an item emitted by the source Observable, returns an
* Observable
* @param maxConcurrent
* the maximum number of Observables that may be subscribed to concurrently
* @return an Observable that emits the result of applying the transformation function to each item emitted
* by the source Observable and merging the results of the Observables obtained from this
* transformation
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
@Beta
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, int maxConcurrent) {
return merge(map(func), maxConcurrent);
}

/**
* Returns an Observable that applies a function to each item emitted or notification raised by the source
* Observable and then flattens the Observables returned from these functions and emits the resulting items.
Expand Down Expand Up @@ -4540,6 +4567,40 @@ public final <R> Observable<R> flatMap(
Func0<? extends Observable<? extends R>> onCompleted) {
return merge(mapNotification(onNext, onError, onCompleted));
}
/**
* Returns an Observable that applies a function to each item emitted or notification raised by the source
* Observable and then flattens the Observables returned from these functions and emits the resulting items,
* while limiting the maximum number of concurrent subscriptions to these Observables.
* <p>
* <!-- <img width="640" height="410" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeMap.nce.png" alt=""> -->
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R>
* the result type
* @param onNext
* a function that returns an Observable to merge for each item emitted by the source Observable
* @param onError
* a function that returns an Observable to merge for an onError notification from the source
* Observable
* @param onCompleted
* a function that returns an Observable to merge for an onCompleted notification from the source
* Observable
* @param maxConcurrent
* the maximum number of Observables that may be subscribed to concurrently
* @return an Observable that emits the results of merging the Observables returned from applying the
* specified functions to the emissions and notifications of the source Observable
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
@Beta
public final <R> Observable<R> flatMap(
Func1<? super T, ? extends Observable<? extends R>> onNext,
Func1<? super Throwable, ? extends Observable<? extends R>> onError,
Func0<? extends Observable<? extends R>> onCompleted, int maxConcurrent) {
return merge(mapNotification(onNext, onError, onCompleted), maxConcurrent);
}

/**
* Returns an Observable that emits the results of a specified function to the pair of values emitted by the
Expand Down Expand Up @@ -4568,6 +4629,37 @@ public final <U, R> Observable<R> flatMap(final Func1<? super T, ? extends Obser
final Func2<? super T, ? super U, ? extends R> resultSelector) {
return merge(lift(new OperatorMapPair<T, U, R>(collectionSelector, resultSelector)));
}
/**
* Returns an Observable that emits the results of a specified function to the pair of values emitted by the
* source Observable and a specified collection Observable, while limiting the maximum number of concurrent
* subscriptions to these Observables.
* <p>
* <!-- <img width="640" height="390" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeMap.r.png" alt=""> -->
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <U>
* the type of items emitted by the collection Observable
* @param <R>
* the type of items emitted by the resulting Observable
* @param collectionSelector
* a function that returns an Observable for each item emitted by the source Observable
* @param resultSelector
* a function that combines one item emitted by each of the source and collection Observables and
* returns an item to be emitted by the resulting Observable
* @param maxConcurrent
* the maximum number of Observables that may be subscribed to concurrently
* @return an Observable that emits the results of applying a function to a pair of values emitted by the
* source Observable and the collection Observable
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
@Beta
public final <U, R> Observable<R> flatMap(final Func1<? super T, ? extends Observable<? extends U>> collectionSelector,
final Func2<? super T, ? super U, ? extends R> resultSelector, int maxConcurrent) {
return merge(lift(new OperatorMapPair<T, U, R>(collectionSelector, resultSelector)), maxConcurrent);
}

/**
* Returns an Observable that merges each item emitted by the source Observable with the values in an
Expand Down
124 changes: 114 additions & 10 deletions src/test/java/rx/internal/operators/OperatorFlatMapTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,20 @@
package rx.internal.operators;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.*;

import java.util.Arrays;
import java.util.List;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;
import org.junit.*;

import rx.Observable;
import rx.Observer;
import rx.exceptions.TestException;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.functions.*;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

public class OperatorFlatMapTest {
@Test
Expand Down Expand Up @@ -312,4 +310,110 @@ public void testFlatMapTransformsMergeException() {
verify(o, never()).onNext(any());
verify(o, never()).onCompleted();
}

private static <T> Observable<T> compose(Observable<T> source, final AtomicInteger subscriptionCount, final int m) {
return source.doOnSubscribe(new Action0() {
@Override
public void call() {
if (subscriptionCount.getAndIncrement() >= m) {
Assert.fail("Too many subscriptions! " + subscriptionCount.get());
}
}
}).doOnCompleted(new Action0() {
@Override
public void call() {
if (subscriptionCount.decrementAndGet() < 0) {
Assert.fail("Too many unsubscriptionss! " + subscriptionCount.get());
}
}
});
}

@Test
public void testFlatMapMaxConcurrent() {
final int m = 4;
final AtomicInteger subscriptionCount = new AtomicInteger();
Observable<Integer> source = Observable.range(1, 10).flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer t1) {
return compose(Observable.range(t1 * 10, 2), subscriptionCount, m)
.subscribeOn(Schedulers.computation());
}
}, m);

TestSubscriber<Integer> ts = new TestSubscriber<Integer>();

source.subscribe(ts);

ts.awaitTerminalEvent();
ts.assertNoErrors();
Set<Integer> expected = new HashSet<Integer>(Arrays.asList(
10, 11, 20, 21, 30, 31, 40, 41, 50, 51, 60, 61, 70, 71, 80, 81, 90, 91, 100, 101
));
Assert.assertEquals(expected.size(), ts.getOnNextEvents().size());
Assert.assertTrue(expected.containsAll(ts.getOnNextEvents()));
}
@Test
public void testFlatMapSelectorMaxConcurrent() {
final int m = 4;
final AtomicInteger subscriptionCount = new AtomicInteger();
Observable<Integer> source = Observable.range(1, 10).flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer t1) {
return compose(Observable.range(t1 * 10, 2), subscriptionCount, m)
.subscribeOn(Schedulers.computation());
}
}, new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer t1, Integer t2) {
return t1 * 1000 + t2;
}
}, m);

TestSubscriber<Integer> ts = new TestSubscriber<Integer>();

source.subscribe(ts);

ts.awaitTerminalEvent();
ts.assertNoErrors();
Set<Integer> expected = new HashSet<Integer>(Arrays.asList(
1010, 1011, 2020, 2021, 3030, 3031, 4040, 4041, 5050, 5051,
6060, 6061, 7070, 7071, 8080, 8081, 9090, 9091, 10100, 10101
));
Assert.assertEquals(expected.size(), ts.getOnNextEvents().size());
System.out.println("--> testFlatMapSelectorMaxConcurrent: " + ts.getOnNextEvents());
Assert.assertTrue(expected.containsAll(ts.getOnNextEvents()));
}
@Test
public void testFlatMapTransformsMaxConcurrentNormal() {
final int m = 2;
final AtomicInteger subscriptionCount = new AtomicInteger();
Observable<Integer> onNext =
compose(Observable.from(Arrays.asList(1, 2, 3)).observeOn(Schedulers.computation()), subscriptionCount, m)
.subscribeOn(Schedulers.computation());
Observable<Integer> onCompleted = compose(Observable.from(Arrays.asList(4)), subscriptionCount, m)
.subscribeOn(Schedulers.computation());
Observable<Integer> onError = Observable.from(Arrays.asList(5));

Observable<Integer> source = Observable.from(Arrays.asList(10, 20, 30));

@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);
TestSubscriber<Object> ts = new TestSubscriber<Object>(o);

source.flatMap(just(onNext), just(onError), just0(onCompleted), m).subscribe(ts);

ts.awaitTerminalEvent(1, TimeUnit.SECONDS);
ts.assertNoErrors();
ts.assertTerminalEvent();

verify(o, times(3)).onNext(1);
verify(o, times(3)).onNext(2);
verify(o, times(3)).onNext(3);
verify(o).onNext(4);
verify(o).onCompleted();

verify(o, never()).onNext(5);
verify(o, never()).onError(any(Throwable.class));
}
}