Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more operators to RxScala #1265

Merged
merged 14 commits into from
May 29, 2014
Merged

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented May 27, 2014

The commits explain this PR.

In addition, the reason that I changed window(closings: () => Observable[Any]) to window(boundary: => Observable[Any]) is window(boundary: => Observable[Any]) can support both two overloads window(Observable[U]) and window(Func0[_ <: Observable[_ <: TClosing]]) in RxJava.

@zsxwing
Copy link
Member Author

zsxwing commented May 27, 2014

/cc @samuelgruetter, @headinthebox

@cloudbees-pull-request-builder

RxJava-pull-requests #1160 FAILURE
Looks like there's a problem with this pull request

@headinthebox
Copy link
Contributor

Thanks @zsxwing.

Probably a good idea to look for more places where we can use =>{} and curry it as the last parameter; but I guess there are not that many places to do that if anay..

@headinthebox
Copy link
Contributor

The code looks fine to me. Can't see quickly howthe cloudbees failure connected to the code in this pull request, though.

}).toMap
}).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map(
// amb 2-9
"amb(" + _ + ")" -> "[unnecessary because we can use `o1 amb o2` instead or `amb(List(o1, o2, o3, ...)`]"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, there's no such thing as o1 amb o2. You could add this. And then, maybe no varargs, but Iterable for the one in the companion object, because the typical use case of 2 Observables would be covered already.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already have this instance method: def amb[U >: T](that: Observable[U]): Observable[U] in RxScala

@samuelgruetter
Copy link
Contributor

The two Java methods

public final <TClosing> Observable<Observable<T>> window(Func0<? extends Observable<? extends TClosing>> closingSelector)
public final <U> Observable<Observable<T>> window(Observable<U> boundary)

have very different semantics, and both of them are useful and should be present in the RxScala. Since we cannot have an overload which takes an Observable[Any] and one which takes an => Observable[Any], we should use () => Observable[Any] (or rename one of the functions?).

The same applies to buffer, and we should keep buffer and window consistent.

@headinthebox
Copy link
Contributor

Of course, main pitfall of =>T

@zsxwing
Copy link
Member Author

zsxwing commented May 28, 2014

have very different semantics

In RxJava, it's

    public OperatorWindowWithObservable(Func0<? extends Observable<? extends U>> otherFactory) {
        this.otherFactory = otherFactory;
    }
    public OperatorWindowWithObservable(final Observable<U> other) {
        this.otherFactory = new Func0<Observable<U>>() {

            @Override
            public Observable<U> call() {
                return other;
            }

        };
    }

Looks => can cover these two use cases. Any difference I miss?

@zsxwing
Copy link
Member Author

zsxwing commented May 28, 2014

I just noticed that this window method does not what it should do: When I tried to use it, I got a "java.lang.ClassCastException: rx.operators.BufferUntilSubscriber cannot be cast to java.lang.Long". Can you please rewrite this method from scratch? No idea what I was thinking when I wrote this method ;-)
And can you also add an example to RxScalaDemo?

OK.

@cloudbees-pull-request-builder

RxJava-pull-requests #1162 SUCCESS
This pull request looks good

@samuelgruetter
Copy link
Contributor

Looks => can cover these two use cases. Any difference I miss?

I think the javadoc does not correspond to the implementation...

…Any])" and "buffer(Observable[Any])""; Remove the cast for "window(=> Observable[Any])"
@zsxwing
Copy link
Member Author

zsxwing commented May 28, 2014

Probably a good idea to look for more places where we can use =>{} and curry it as the last parameter; but I guess there are not that many places to do that if anay..

The only place I found is "buffer". Already updated it.

@headinthebox
Copy link
Contributor

Cool.

@cloudbees-pull-request-builder

RxJava-pull-requests #1173 SUCCESS
This pull request looks good

@zsxwing
Copy link
Member Author

zsxwing commented May 28, 2014

@DavidMGross, could you help improve the javadoc for buffer(Func0<? extends Observable<? extends TClosing>> bufferClosingSelector) and window(Func0<? extends Observable<? extends TClosing>> closingSelector)? The current docs are misleading. Func0 is actually only called once to generate an Observable which is used to determined the boundary of each window/buffer. Also the marble diagrams:) Thank you.

@DavidMGross
Copy link
Collaborator

Will do.

On Wed, May 28, 2014 at 8:43 AM, Shixiong Zhu [email protected]:

@DavidMGross https://github.com/DavidMGross, could you help improve the
javadoc for buffer(Func0<? extends Observable<? extends TClosing>>
bufferClosingSelector) and window(Func0<? extends Observable<? extends
TClosing>> closingSelector)? The current docs are misleading. Func0 is
actually only called once to generate an Observable which is used to
determined the boundary of each window/buffer.


Reply to this email directly or view it on GitHubhttps://github.com//pull/1265#issuecomment-44424293
.

David M. Gross
PLP Consulting

@benjchristensen
Copy link
Member

Is this ready for merging?

@headinthebox
Copy link
Contributor

Looks good to go to me.

benjchristensen added a commit that referenced this pull request May 29, 2014
Add more operators to RxScala
@benjchristensen benjchristensen merged commit 28a9a99 into ReactiveX:master May 29, 2014
@samuelgruetter
Copy link
Contributor

IMHO, it makes more sense to say that the javadoc is correct, but the implementation of window(Func0) is wrong. For future reference, here's the current javadoc that I think is correct:

/**
 * Returns an Observable that emits windows of items it collects from the source Observable. The resulting
 * Observable emits connected, non-overlapping windows. It emits the current window and opens a new one when
 * the Observable produced by the specified {@code closingSelector} emits an item. The {@code closingSelector} 
 * then creates a new Observable to generate the closer of the next window.
 * <p>
 * <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/window1.png">
 * 
 * @param closingSelector
 *            a {@link Func0} that produces an Observable for every window created. When this Observable
 *            emits an item, {@code window()} emits the associated window and begins a new one.
 * @return an Observable that emits connected, non-overlapping windows of items from the source Observable
 *         when {@code closingSelector} emits an item
 * @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
 */
public final <TClosing> Observable<Observable<T>> window(Func0<? extends Observable<? extends TClosing>> closingSelector)

/**
 * Returns an Observable that emits non-overlapping windows of items it collects from the source Observable
 * where the boundary of each window is determined by the items emitted from a specified boundary-governing
 * Observable.
 * <p>
 * <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/window8.png">
 * 
 * @param <U>
 *            the window element type (ignored)
 * @param boundary
 *            an Observable whose emitted items close and open windows
 * @return an Observable that emits non-overlapping windows of items it collects from the source Observable
 *         where the boundary of each window is determined by the items emitted from the {@code boundary} Observable
 */
public final <U> Observable<Observable<T>> window(Observable<U> boundary)

@headinthebox @benjchristensen what do you think?

@zsxwing
Copy link
Member Author

zsxwing commented May 29, 2014

I think

public final <A, B> Observable<Observable<T>> window(Func0<? extends Observable<? extends A>> firstClosingSelector, Func1<? super T, ? extends Observable<? extends B>> closingSelector)

is better.

However, from MSDN: http://msdn.microsoft.com/en-us/library/hh229909(v=vs.103).aspx , the current implementation in RxJava is same as Rx.Net.

@benjchristensen benjchristensen mentioned this pull request Jun 1, 2014
@zsxwing zsxwing deleted the rxscala-more branch June 3, 2014 00:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants