-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MergeMap with Iterable and resultSelector overloads #736
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,6 +53,7 @@ | |
import rx.operators.OperationElementAt; | ||
import rx.operators.OperationFilter; | ||
import rx.operators.OperationFinally; | ||
import rx.operators.OperationFlatMap; | ||
import rx.operators.OperationGroupBy; | ||
import rx.operators.OperationGroupByUntil; | ||
import rx.operators.OperationGroupJoin; | ||
|
@@ -3856,6 +3857,67 @@ public <R> Observable<R> mergeMap(Func1<? super T, ? extends Observable<? extend | |
return merge(map(func)); | ||
} | ||
|
||
/** | ||
* Create an Observable that applies a function to the pair of values from the source | ||
* Observable and the collection Observable. | ||
* @param <U> the element type of the collection Observable | ||
* @param <R> the result type | ||
* @param collectionSelector function that returns an Observable sequence for each value in the source Observable | ||
* @param resultSelector function that combines the values of the source and collection Observable | ||
* @return an Observable that applies a function to the pair of values from the source | ||
* Observable and the collection Observable. | ||
*/ | ||
public <U, R> Observable<R> mergeMap(Func1<? super T, ? extends Observable<? extends U>> collectionSelector, | ||
Func2<? super T, ? super U, ? extends R> resultSelector) { | ||
return create(OperationFlatMap.flatMap(this, collectionSelector, resultSelector)); | ||
} | ||
|
||
/** | ||
* Create an Observable that merges the values of the iterables returned by the | ||
* collectionSelector for each source value. | ||
* @param <R> the result value type | ||
* @param collectionSelector function that returns an Iterable sequence of values for | ||
* each source value. | ||
* @return an Observable that merges the values of the iterables returned by the | ||
* collectionSelector for each source value. | ||
*/ | ||
public <R> Observable<R> mergeMapIterable(Func1<? super T, ? extends Iterable<? extends R>> collectionSelector) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prefer an anonymous Func1, for example, public <R> Observable<R> mergeMapIterable(final Func1<? super T, ? extends Iterable<? extends R>> collectionSelector) {
return flatMap(new Func1<T, Observable<? extends R>>() {
@Override
public Observable<? extends R> call(T t1) {
return Observable.from(collectionSelector.call(t1));
}
});
}
public <U, R> Observable<R> mergeMapIterable(
final Func1<? super T, ? extends Iterable<? extends U>> collectionSelector,
Func2<? super T, ? super U, ? extends R> resultSelector) {
return mergeMap(new Func1<T, Observable<? extends U>>() {
@Override
public Observable<? extends U> call(T t1) {
return Observable.from(collectionSelector.call(t1));
}
}, resultSelector);
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a recurring issue it seems. If you do this and the Iterable returned is infinite or very long, you have a problem. |
||
return merge(map(OperationFlatMap.flatMapIterableFunc(collectionSelector))); | ||
} | ||
|
||
/** | ||
* Create an Observable that applies a function to the pair of values from the source | ||
* Observable and the collection Iterable sequence. | ||
* @param <U> the collection element type | ||
* @param <R> the result type | ||
* @param collectionSelector function that returns an Iterable sequence of values for | ||
* each source value. | ||
* @param resultSelector function that combines the values of the source and collection Iterable | ||
* @return n Observable that applies a function to the pair of values from the source | ||
* Observable and the collection Iterable sequence. | ||
*/ | ||
public <U, R> Observable<R> mergeMapIterable(Func1<? super T, ? extends Iterable<? extends U>> collectionSelector, | ||
Func2<? super T, ? super U, ? extends R> resultSelector) { | ||
return mergeMap(OperationFlatMap.flatMapIterableFunc(collectionSelector), resultSelector); | ||
} | ||
|
||
/** | ||
* Create an Observable that projects the notification of an observable sequence to an observable | ||
* sequence and merges the results into one. | ||
* @param <R> the result type | ||
* @param onNext function returning a collection to merge for each onNext event of the source | ||
* @param onError function returning a collection to merge for an onError event | ||
* @param onCompleted function returning a collection to merge for an onCompleted event | ||
* @return an Observable that projects the notification of an observable sequence to an observable | ||
* sequence and merges the results into one. | ||
*/ | ||
public <R> Observable<R> mergeMap( | ||
Func1<? super T, ? extends Observable<? extends R>> onNext, | ||
Func1<? super Throwable, ? extends Observable<? extends R>> onError, | ||
Func0<? extends Observable<? extends R>> onCompleted) { | ||
return create(OperationFlatMap.flatMap(this, onNext, onError, onCompleted)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It the following implementation better? public <R> Observable<R> mergeMap(
final Func1<? super T, ? extends Observable<? extends R>> onNext,
final Func1<? super Throwable, ? extends Observable<? extends R>> onError,
final Func0<? extends Observable<? extends R>> onCompleted) {
return materialize().flatMap(
new Func1<Notification<T>, Observable<? extends R>>() {
@Override
public Observable<? extends R> call(Notification<T> t1) {
if (t1.isOnNext()) {
return onNext.call(t1.getValue());
}
if (t1.isOnError()) {
return onError.call(t1.getThrowable());
}
return onCompleted.call();
}
});
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It appears to do the same thing so I guess this is simpler. |
||
} | ||
|
||
/** | ||
* Creates a new Observable by applying a function that you supply to each | ||
* item emitted by the source Observable, where that function returns an | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks it can be implemented by a simple class
Pair
, for example:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't want to introduce Pair for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use any better class to replace Pair. However, I think we do not need to reimplement
flatMap
again here.