Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operator Repeat and other operator fixes #807

Closed
wants to merge 1 commit into from

Conversation

akarnokd
Copy link
Member

@akarnokd akarnokd commented Feb 5, 2014

Reimplemented repeat() which exposed some other problems.

  • Added QueueDrain based on this.
  • Added overload of repeat(long n) to repeat a source n times (which is not the same as repeat().take(n) generally). Note that Ben's prototype was not working correctly because it run the re-subscription code in a loop: if subscribeOn was used, it was constantly resubscribing before the old even finished.
  • For some reason, observeOn+take didn't work as expected, causing repeat tests to fail as well. I couldn't determine the cause. I've noticed that take was unable to unsubscribe the upstream as there was nothing in its cs.
  • To fix the problem above, I've reimplemented observeOn as well. It does not use recursive scheduling but the QueueDrain.
  • This exposed another problem with GroupBy: in some cases, childObserver.onCompleted was sent out twice, breaking merge/flatMap. I've added a once check, but I'm not 100% certain there isn't anything else wrong with the GroupBy's staggered behavior.
  • Unfortunately, OperationParallelMergeTest is broken with this PR. I don't quite understand why it doesn't work nor have a clue how to fix it or the new observeOn. Maybe the parallel tests relied on thread timing and not expiring too fast so they could continue one them.

Performance

from+repeat: 3.8MOps/s
from+repeat+observeOn: 1.5MOps/s
range: 31.8MOps/s
from+observeOn+repeat: 15 kOps/s

@cloudbees-pull-request-builder

RxJava-pull-requests #733 FAILURE
Looks like there's a problem with this pull request

@benjchristensen
Copy link
Member

I've noticed that take was unable to unsubscribe the upstream as there was nothing in its cs.

What is the unit test for this scenario?

* submit this instance to a {@code Scheduler.schedule()} method.
* @param action the action to enqueue, not null
*/
public void enqueue(Action0 action) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about this accepting Notification instead of Action0? The reason is that we now have 2 wrappers ... the Notification and Action0 around a type T and 2 object allocations for each onNext.

The use of Action0 is definitely more generic, but as we've seen by your CompositeSubscription changes, we're at the point where we're moving away from generic to achieve performance and memory gains, and this class will be involved in very high throughput scenarios.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible. I can't think of any scenario right now where Action0 is necessary. I'll update the code.

@benjchristensen
Copy link
Member

Unfortunately, OperationParallelMergeTest is broken with this PR

It breaks because observeOn is now hopping threads as it always schedulers on the outer Scheduler.

The fix I pasted above resolves the OperationParallelMergeTest tests.

return new Subscriber<T>(t1) {
/** Dispatch the notification value. */
void run(final Notification<T> nt) {
qd.enqueue(new Action0() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like how the QueueDrain class abstracts this away. The concern as stated in another comment is the double wrapping of Action0 and Notification we now have.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will lock into the specialization.

@akarnokd
Copy link
Member Author

akarnokd commented Feb 6, 2014

The test case with take that caused problems was something like this:

Observable.range(1, 20000000).observeOn(Schedulers.newThread()).take(10).subscribe(System.out::println);

It prints 1..10, but then it doesn't stop and spins up a lot of threads. Basically, the OperationRepeatTest failed for me with timeouts.

benjchristensen pushed a commit to benjchristensen/RxJava that referenced this pull request Feb 6, 2014
- merging changes from ReactiveX#807
@benjchristensen
Copy link
Member

I merged repeat based on the discussion above. I'll await your work on observeOn.

@benjchristensen
Copy link
Member

then it doesn't stop and spins up a lot of threads

Isn't that because we haven't re-implemented observeOn yet?

This works:

        Observable.range(1, 20000000).take(10).toBlockingObservable().forEach(new Action1<Integer>() {

            @Override
            public void call(Integer t1) {
                System.out.println(t1);
            }

        });

@benjchristensen
Copy link
Member

Do you intend on submitting updated code soon, or should I merge the observeOn changes as discussed above?

@akarnokd
Copy link
Member Author

akarnokd commented Feb 6, 2014

I skip on this one.

@akarnokd akarnokd closed this Feb 6, 2014
@benjchristensen
Copy link
Member

Okay, thanks @akarnokd

@benjchristensen
Copy link
Member

The test case with take that caused problems was something like this:

Is the take issue resolved for you now based on #833 ?

@akarnokd
Copy link
Member Author

akarnokd commented Feb 7, 2014

The issue was with observeOn+take together. This fixes take, but observeOn in its current form doesn't work. This spins:

Observable.range(1, 20000000)
                .observeOn(Schedulers.newThread())
                .take(10).toBlockingObservable().forEach(System.out::println);

I guess I need to revisit the issue once ObserveOn has been updated.

@benjchristensen
Copy link
Member

That code works on the new observeOn implementation I just submitted: #835

@akarnokd
Copy link
Member Author

akarnokd commented Feb 7, 2014

Excellent. I'm looking at the new observeOn right now for speedup options.

@benjchristensen
Copy link
Member

Thanks @akarnokd I would appreciate that. My brain is done for the night!

@akarnokd akarnokd deleted the OperatorRepeat2 branch May 6, 2014 13:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants