Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MergeMap with Iterable and resultSelector overloads #736

Merged
merged 2 commits into from
Jan 14, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import rx.operators.OperationElementAt;
import rx.operators.OperationFilter;
import rx.operators.OperationFinally;
import rx.operators.OperationFlatMap;
import rx.operators.OperationGroupBy;
import rx.operators.OperationGroupByUntil;
import rx.operators.OperationGroupJoin;
Expand Down Expand Up @@ -3856,6 +3857,67 @@ public <R> Observable<R> mergeMap(Func1<? super T, ? extends Observable<? extend
return merge(map(func));
}

/**
* Create an Observable that applies a function to the pair of values from the source
* Observable and the collection Observable.
* @param <U> the element type of the collection Observable
* @param <R> the result type
* @param collectionSelector function that returns an Observable sequence for each value in the source Observable
* @param resultSelector function that combines the values of the source and collection Observable
* @return an Observable that applies a function to the pair of values from the source
* Observable and the collection Observable.
*/
public <U, R> Observable<R> mergeMap(Func1<? super T, ? extends Observable<? extends U>> collectionSelector,
Func2<? super T, ? super U, ? extends R> resultSelector) {
return create(OperationFlatMap.flatMap(this, collectionSelector, resultSelector));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks it can be implemented by a simple class Pair, for example:

    public <U, R> Observable<R> mergeMap(final Func1<? super T, ? extends Observable<? extends U>> collectionSelector,
                       final Func2<? super T, ? super U, ? extends R> resultSelector) {
        return flatMap(new Func1<T, Observable<Pair<T, U>>>() {
            @Override
            public Observable<Pair<T, U>> call(final T t) {
                return collectionSelector.call(t).map(new Func1<U, Pair<T, U>>() {

                    @Override
                    public Pair<T, U> call(U u) {
                        return new Pair<T, U>(t, u);
                    }
                });
            }
        }).map(new Func1<Pair<T, U>, R>() {
            @Override
            public R call(Pair<T, U> pair) {
                return resultSelector.call(pair._1, pair._2);
            }
        });
   }

    private static class Pair<T1, T2> {
        T1 _1;
        T2 _2;
        Pair(T1 _1, T2 _2) {
            this._1 = _1;
            this._2 = _2;
        }
    }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to introduce Pair for this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use any better class to replace Pair. However, I think we do not need to reimplement flatMap again here.

}

/**
* Create an Observable that merges the values of the iterables returned by the
* collectionSelector for each source value.
* @param <R> the result value type
* @param collectionSelector function that returns an Iterable sequence of values for
* each source value.
* @return an Observable that merges the values of the iterables returned by the
* collectionSelector for each source value.
*/
public <R> Observable<R> mergeMapIterable(Func1<? super T, ? extends Iterable<? extends R>> collectionSelector) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer an anonymous Func1, for example,

    public <R> Observable<R> mergeMapIterable(final Func1<? super T, ? extends Iterable<? extends R>> collectionSelector) {
      return flatMap(new Func1<T, Observable<? extends R>>() {
        @Override
        public Observable<? extends R> call(T t1) {
            return Observable.from(collectionSelector.call(t1));
        }
      });
    }

    public <U, R> Observable<R> mergeMapIterable(
            final Func1<? super T, ? extends Iterable<? extends U>> collectionSelector,
            Func2<? super T, ? super U, ? extends R> resultSelector) {
        return mergeMap(new Func1<T, Observable<? extends U>>() {
            @Override
            public Observable<? extends U> call(T t1) {
                return Observable.from(collectionSelector.call(t1));
            }
        }, resultSelector);
    }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a recurring issue it seems. If you do this and the Iterable returned is infinite or very long, you have a problem.

return merge(map(OperationFlatMap.flatMapIterableFunc(collectionSelector)));
}

/**
* Create an Observable that applies a function to the pair of values from the source
* Observable and the collection Iterable sequence.
* @param <U> the collection element type
* @param <R> the result type
* @param collectionSelector function that returns an Iterable sequence of values for
* each source value.
* @param resultSelector function that combines the values of the source and collection Iterable
* @return n Observable that applies a function to the pair of values from the source
* Observable and the collection Iterable sequence.
*/
public <U, R> Observable<R> mergeMapIterable(Func1<? super T, ? extends Iterable<? extends U>> collectionSelector,
Func2<? super T, ? super U, ? extends R> resultSelector) {
return mergeMap(OperationFlatMap.flatMapIterableFunc(collectionSelector), resultSelector);
}

/**
* Create an Observable that projects the notification of an observable sequence to an observable
* sequence and merges the results into one.
* @param <R> the result type
* @param onNext function returning a collection to merge for each onNext event of the source
* @param onError function returning a collection to merge for an onError event
* @param onCompleted function returning a collection to merge for an onCompleted event
* @return an Observable that projects the notification of an observable sequence to an observable
* sequence and merges the results into one.
*/
public <R> Observable<R> mergeMap(
Func1<? super T, ? extends Observable<? extends R>> onNext,
Func1<? super Throwable, ? extends Observable<? extends R>> onError,
Func0<? extends Observable<? extends R>> onCompleted) {
return create(OperationFlatMap.flatMap(this, onNext, onError, onCompleted));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It the following implementation better?

    public <R> Observable<R> mergeMap(
            final Func1<? super T, ? extends Observable<? extends R>> onNext,
            final Func1<? super Throwable, ? extends Observable<? extends R>> onError,
            final Func0<? extends Observable<? extends R>> onCompleted) {
        return materialize().flatMap(
                new Func1<Notification<T>, Observable<? extends R>>() {
                    @Override
                    public Observable<? extends R> call(Notification<T> t1) {
                        if (t1.isOnNext()) {
                            return onNext.call(t1.getValue());
                        }
                        if (t1.isOnError()) {
                            return onError.call(t1.getThrowable());
                        }
                        return onCompleted.call();
                    }
                });
    }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears to do the same thing so I guess this is simpler.

}

/**
* Creates a new Observable by applying a function that you supply to each
* item emitted by the source Observable, where that function returns an
Expand Down
Loading