Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-implement GroupBy Operator #570

Closed
benjchristensen opened this issue Dec 6, 2013 · 9 comments
Closed

Re-implement GroupBy Operator #570

benjchristensen opened this issue Dec 6, 2013 · 9 comments
Assignees

Comments

@benjchristensen
Copy link
Member

The GroupBy operator almost certainly needs to be rewritten. History on the discussions and attempts at fixing it can be seen at #282 and #289

@samuelgruetter
Copy link
Contributor

And please, when you do so, also make it work with multiple subscribers. @manojo and me ran into this issue yesterday, and I wanted to open another issue when I just saw this ;-)

@akarnokd
Copy link
Member

akarnokd commented Dec 6, 2013

I've read through both issues but still don't quite understand the problem. @benjchristensen and/or @samuelgruetter , could you please reiterate the requirements and corner cases?

Edit: Somewhat related issue in Rx.NET.

@samuelgruetter
Copy link
Contributor

Edit: Somewhat related issue in Rx.NET.

This should be added to the documentation.

There's a problem I remember, but I don't have a proper test illustrating this right now. I used groupBy on a source Observable like Observable.interval(...).someTransformationsWhichMakeItFinite(), but when the source Observable completed, the returned Observable<Observable<T>> or some of the inner Observable<T> did not complete.
Having a good unit test which tests this with concurrency and multiple subscribers would imho be very useful, even if it does not reveal a bug, it would make it more trustworthy.

@DavidMGross
Copy link
Collaborator

What do you think of the explanation/illustration here:

https://github.com/Netflix/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil

Note that when groupBy( ) or groupByUntil( ) splits up the source
Observable into an Observable that emits Observables, it begins to emit
items from the source Observable onto these emitted Observables
immediately. That is to say, it does not wait for any Observers to
subscribe. So if you want to ensure that you see all of the items that are
emitted on these new Observables, you should take care to subscribe to them
right away.

The following illustration shows how this can cause unexpected behavior: In
this illustration, groupBy( ) is used to separate a source Observable that
emits the numbers 1 through 6 into an Observable (in red) that emits two
Observables: one that emits the odd numbers from the source Observable, and
the other that emits the even numbers. Then, this Observable of
Observables, shown in red, is zipped with another Observable (shown in
blue) that emits the strings "odd" and "even", and in the zip function, it
applies this string label to all of the items emitted by the associated
Observable emitted by the Observable shown in red.

However, zip( ) does not apply this zip function until it observes an item
emitted by each of the red and the blue source Observables. Since "odd"
arrives after the first of these Observables has already emitted "1", when zip(
) has not yet subscribed to this Observable, it never observes this "1" and
does not apply the zip function to it. A similar thing happens with "even",
which arrives after both "2" and "4" are emitted by the Observable emission
it is paired with from the red-colored Observable. For this reason, the
transformed Observable emitted by zip( ) is missing some of the data from
the original Observables.

On Sun, Dec 8, 2013 at 7:08 AM, samuelgruetter [email protected]:

Edit: Somewhat related issue in Rx.NET.

This should be added to the documentation.

There's a problem I remember, but I don't have a proper test illustrating
this right now. I used groupBy on a source Observable like
Observable.interval(...).someTransformationsWhichMakeItFinite(), but when
the source Observable completed, the returned Observable<Observable>or some of the inner
Observable did not complete.
Having a good unit test which tests this with concurrency and multiple
subscribers would imho be very useful, even if it does not reveal a bug, it
would make it more trustworthy.


Reply to this email directly or view it on GitHubhttps://github.com//issues/570#issuecomment-30083341
.

David M. Gross
PLP Consulting

@akarnokd
Copy link
Member

You can fix the odd-even issue by using ReplaySubject in groupBy instead of the default PublishSubject. I think this is unnecessarily strong and memory-consuming solution which needs to be paid on every groupBy use case, not just these non-immediate consuming versions. For the cases where the immediate consumption is not possible, one could use a replay() mapped group manually:

Observable<Integer> source = Observable.from(1,2,3,4,5,6,7,8,9,10);
Observable.concat(
    source.groupBy(x -> x % 2 == 0).map(u -> u.replay()).doOnEach(v -> v.connect()))
.subscribe(System.out::println);

@ghost ghost assigned benjchristensen Jan 2, 2014
@akarnokd
Copy link
Member

akarnokd commented May 8, 2014

Are we happy with the current groupBy behavior? Note that groupByUntil does not use the BufferUntilSubscriber "subject". I guess we want them consistent.

@benjchristensen
Copy link
Member Author

From what I can tell groupBy is finally working correctly. The groupByUntil implementation has not been updated though and needs to be brought inline I imagine.

@headinthebox
Copy link
Contributor

From what I can tell groupBy is finally working correctly.

Nice! I'll try some edge cases ...

@benjchristensen
Copy link
Member Author

It was fully re-implemented and I know of no problems at this time so closing it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants