Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes on #284 (groupBy) #289

Closed
wants to merge 6 commits into from
Closed

Fixes on #284 (groupBy) #289

wants to merge 6 commits into from

Conversation

Treora
Copy link
Contributor

@Treora Treora commented Jun 5, 2013

See my previous comment.

@cloudbees-pull-request-builder

RxJava-pull-requests #163 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member

I apologize for not being able to act on this right now, I've been swamped with other work but I will definitely return to this once I get through my other work!

return;
}
/*
* Technically the source should be single-threaded so we shouldn't need to do this but I am

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix indent

@codefromthecrypt
Copy link

squash into a single commit then check with @benjchristensen. LGTM

@cloudbees-pull-request-builder

RxJava-pull-requests #170 SUCCESS
This pull request looks good

@Treora
Copy link
Contributor Author

Treora commented Jun 30, 2013

Hi Ben et alii,

Regarding the issues at the end of the previous discussion, I mentioned the quirks I found to Erik, who actually considered the current terminating behaviour of groupBy in Rx.Net (which is similar to RxJava's) to be undesirable. His opinion was that when one unsubscribes from the GroupBy observable, one should still be able to subscribe to the GroupedObservables that were emitted before, because otherwise seemingly unrelated actions can cause unexpected changes in behaviour. His example was like this:

source.groupBy(x -> x%2)
    .take(2)
    .subscribe(group -> {
        new Thread(() -> {
            group.subscribe(someObserver);
        }).start();
    };

In this example, the take(2) may cause termination of the groupBy's upstream subscription because the GroupedObservables still have no subscribers. It actually creates a race between the spawned and spawning thread.

I guess the solution to this would be to not automatically terminate the upstream subscription when one might still connect to a GroupedObservable. This would lead to some memory pollution if the stream is infinite and not terminated manually (for example using takeWhile).
A nice solution could perhaps be to do some juggling with weak references in order to detect when all emitted GroupedObservables have been forgotten about by the rest of the program, in which case the upstream subscription may be ended too (if the primary observer already unsubscribed, of course).

@benjchristensen
Copy link
Member

I tried looking at this again this morning but again ran out of time trying to understand what the issue is and what is being fixed ... it's still on my list for a more focused review at some point.

@samuelgruetter
Copy link
Contributor

Here's another problem with groupBy:

This code

val firstOnly = false
val numbersByModulo3 = Observable.interval(1000 millis).take(9).groupBy(_ % 3)

(for ((modulo, numbers) <- numbersByModulo3) yield {
  println("Observable for modulo" + modulo + " started")

  if (firstOnly) numbers.take(1) else numbers
}).merge.toBlockingObservable.foreach(println(_)) 

outputs if val firstOnly = false this (expected):

Observable for modulo0 started
0
Observable for modulo1 started
1
Observable for modulo2 started
2
3
4
5
6
7
8

and if val firstOnly = true, it outputs this (unexpected):

Observable for modulo0 started
0

But I would expect that it takes the first element of each modulo class:

Observable for modulo0 started
0
Observable for modulo1 started
1
Observable for modulo2 started
2

samuelgruetter added a commit to samuelgruetter/RxJava that referenced this pull request Sep 19, 2013
and try to make Olympics groupBy work with timing, but did not
work due to problems with RxJava groupBy, see pull ReactiveX#289
@benjchristensen
Copy link
Member

Can anyone start from scratch on reviewing this and providing feedback?

@headinthebox Can you also provide input on what the behavior should be compared to Rx.Net? You've mentioned to both @Treora and myself that you feel something is wrong with the Rx.Net implementation, so I'd like to know what unit tests we should have so we can change RxJava to match the correct functionality.

@Treora
Copy link
Contributor Author

Treora commented Oct 8, 2013

Hi,
As @headinthebox appears to not have replied yet I will try to answer. From what I know, the only issue is that it is impossible to subscribe to GroupedObservables after their producer has terminated. I think I explained the whole issue earlier in this thread: #289 (comment)

The reason why it is like this in Rx.Net is because there seems to be no elegant solution to this issue. The point is that the GroupBy observable cannot know whether or not, at some point in the future, somebody will subscribe to one of the GroupedObservables that it has emitted. Therefore it cannot know whether or not it can unsubscribe from its source. The easy solution would be to simply not unsubscribe at all, but with the drawback that unused streams will also stay alive for eternity, if not terminated further upstream.

Let me know if you would like me to explain more. My work involving RxJava finished, but it would be nice if this issue will be fixed. Also, it seems these fixes I wrote (#289) have still not been merged, so doing that could already fix some bugs. If then the small issue mentioned in point 2 in #282 (comment) is also fixed, the RxJava implementation should be at least as good as the current Rx.Net implementation (or probably a bit better, as their code looks somewhat buggy too, as mentioned before).

@benjchristensen
Copy link
Member

@zsxwing Since you have become familiar with a lot of the codebase, if you want to help tackle this I'd appreciate it.

@zsxwing
Copy link
Member

zsxwing commented Oct 9, 2013

@benjchristensen I'll try to solve it.

@benjchristensen
Copy link
Member

Thanks @zsxwing

@akarnokd
Copy link
Member

akarnokd commented Dec 4, 2013

I've implemented GroupByUntil (#528) which can be turned into GroupBy by using Observable.never() as its group duration selector. With the recent PublishSubject fixes, any "escaped" group will either onCompleted() or onError()'d if the main source has been terminated. So given the example:

source.groupBy(x -> x%2)
    .take(2)
    .subscribe(group -> {
        new Thread(() -> {
            group.subscribe(someObserver);
        }).start();
    };

someObserver will likely get just an onCompleted().

If I understand the problem correctly, you'd want to receive Xs for the first two groups until they cancel, even if there won't be any new groups due to take(2). How about:

source.groupBy(x -> x % 2).take(2)
   .subscribe(group -> 
     new Thread(() ->
        source.where(a -> a % 2 == group.getKey())
        .subscribe(someObserver)))

This way we'll know what the first two group keys would be, then filter the source based on that in each group. However, lets assume source emits System.currentTimeMillis() in which case the someObserver receives different Xs that were used in creating the groups. To fix that, we would need source to be replayable:

ConnectableObservable co = source.replay();
co.groupBy(x -> x % 2).take(2)
   .subscribe(group -> 
     new Thread(() ->
        co.where(a -> a % 2 == group.getKey())
        .subscribe(someObserver)))

Subscription s0 = co.connect();
Thread.sleep(1000); // or other wait mechanism
s0.unsubscribe();

@benjchristensen
Copy link
Member

Closing this and it will be addressed via #570

rickbw pushed a commit to rickbw/RxJava that referenced this pull request Jan 9, 2014
and try to make Olympics groupBy work with timing, but did not
work due to problems with RxJava groupBy, see pull ReactiveX#289
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants