Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler Outer/Inner [Preview] #797

Merged

Conversation

benjchristensen
Copy link
Member

Following are proposed changes to the Scheduler signature based on discussions between @headinthebox and I intended to simplify scheduling and make it easier to do the right thing.

This originates from three findings:

  1. It was very easy to cause memory leaks or inadvertent parallel execution since the distinction between outer and inner scheduling was not obvious.

To solve this the new design explicitly has the outer Scheduler and then Scheduler.Inner for recursion.

  1. The passing of state is not useful since scheduling over network boundaries with this model does not work.

In this new design all state passing signatures have been removed. This was determined while implementing a RemoteScheduler that attempted to use observeOn to transition execution from one machine to another. This does not work because of the requirement for serializing/deserializing the state of the entire execution stack. Migration of work over the network has been bound to be better suited to explicit boundaries established by Subjects. Thus, the complications within the Schedulers are unnecessary.

  1. The number of overloads with different ways of doing the same things were confusing.

This new design removes all but the essential and simplest methods.

This is the new signature for Scheduler:

public abstract class Scheduler {

     // the primary entry point, it immediately schedulers work on a new thread and executes it
     // the return subscription will shut down the Inner scheduler when unsubscribed
     public abstract <T> Subscription schedule(Action1<Scheduler.Inner> action);
     public <T> Subscription schedule(final Action1<Scheduler.Inner> action, final long delayTime, final TimeUnit unit);
     public int degreeOfParallelism();
     public long now();

     public abstract class Inner implements Subscription {

         public abstract void schedule(Action1<Scheduler.Inner> action);
         public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
         public void schedulePeriodically(final Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit);
            // I question having schedulePeriodically here, since recursion allow the same behavior, and with conditional logic
         public void schedule(Action1<Scheduler.Inner> action, Date dueTime);
      }
}

The simplest execution of a single task is:

Schedulers.newThread().schedule(new Action1<Inner>() {

    @Override
    public void call(Inner inner) {
        doWork();
    }

});

Recursion is easily invoked:

Schedulers.newThread().schedule(new Action1<Inner>() {

    @Override
    public void call(Inner inner) {
        doWork();
        // recurse until unsubscribed (the schedule will do nothing if unsubscribed)
        inner.schedule(this);
    }

});

The use of Action1<Inner> on both the outer and inner levels makes it so recursion that refer to this and it works easily.

Similar to the new lift/create pattern with Subscriber the Inner is also a Subscription so it allows efficient loops with unsubscribe support:

Schedulers.newThread().schedule(new Action1<Inner>() {

    @Override
    public void call(Inner inner) {
        while(!inner.isUnsubscribed()) {
            doWork();
        }
    }

});

An action can now unsubscribe the Scheduler.Inner:

Schedulers.newThread().schedule(new Action1<Inner>() {

    @Override
    public void call(Inner inner) {
        while(!inner.isUnsubscribed()) {
            int i = doOtherWork();
            if(i > 100) {
                // an Action can cause the Scheduler to unsubscribe and stop
                inner.unsubscribe();
            }
        }
    }

});

Typically just stopping is sufficient:

Schedulers.newThread().schedule(new Action1<Inner>() {

    @Override
    public void call(Inner inner) {
        int i = doOtherWork();
        if (i < 10) {
            // recurse until done 10
            inner.schedule(this);
        }
    }

});

but if other work in other tasks is being done and you want to unsubscribe conditionally you could:

Schedulers.newThread().schedule(new Action1<Inner>() {

    @Override
    public void call(Inner inner) {
        int i = doOtherWork();
        if (i < 10) {
            // recurse until done 10
            inner.schedule(this);
        } else {
            inner.unsubscribe();
        }
    }

});

and the recursion can be delayed:

Schedulers.newThread().schedule(new Action1<Inner>() {

    @Override
    public void call(Inner inner) {
        doWork();
        // recurse until unsubscribed ... but delay the recursion
        inner.schedule(this, 500, TimeUnit.MILLISECONDS);
    }

});

The methods on the Inner never return a Subscription because they are always a single thread/event-loop/actor/etc and controlled by the Subscription returned by the initial Scheduler.schedule method. This is part of clarifying the contract.

Thus an unsubscribe controlled from the outside would be done like this:

Subscription s = Schedulers.newThread().schedule(new Action1<Inner>() {

    @Override
    public void call(Inner inner) {
        while(!inner.isUnsubscribed()) {
            doWork();
        }
    }

});

// unsubscribe from outside
s.unsubscribe();

I'd appreciate feedback on this design direction.


NOTE: This pull request is not yet complete. I have not modified the language adaptors or other modules, and there are 3 unit tests in core failing (related to buffer and window).

@cloudbees-pull-request-builder

RxJava-pull-requests #718 FAILURE
Looks like there's a problem with this pull request

@headinthebox
Copy link
Contributor

Absolutely love this new design, it fixes a lot of the issues that the previous design inherited from the old Rx.NET.

Win 1) The explicit C funcptr style of passing the state separately from the function pointer did not pan out and made everyones brain hurt. Those using the Scala bindings know that there the schedulers where exposed without the separate state parameters already.

Win 2a) The old design as Ben says deeply confused inner and outer schedulers. In .NET it is even worse since there the extension methods for scheduling using async/await also show up on IScheduler. If there is ever a good reason to leverage static typing it is this.

Win 2b) The conceptual model is now much clearer and closer to Java's executors. The additional feature is that you can schedule iteratively.

Win 3) One of the big mistakes in hindsight in .NET IScheduler was to use DateTimeOffset Now { get; } instead of long as here. In true virtual time you just think about "ticks".

That said, there is still a date-related type in public void schedule(Action1<Scheduler.Inner> action, Date dueTime); that I would like to see changed using long (if you always go from a Date to ticks). I propose that we change that to long as well to be consistent with Now.

Remark A) I think it would be better to move Now to become a property of Inner, or at least of both so you don't need to close over outer to get access to Now. Anywhere you have time-related operators you will want to access Now.

Remark B) The interface for outer Scheduler only has relative delayed scheduling, more reasons to remove it instead of adding another method.

Remark C) The semantic difference is subtle, but I think we can remove public <T> Subscription schedule(final Action1<Scheduler.Inner> action, final long delayTime, final TimeUnit unit) from Scheduler. The subtlety is when is a "new thread" allocated when calling this overload of Schedule on outer. At the moment you call Schedule or after delayTime. Since in the latter case outer must maintain some additional administration my guess is that typically this overload is implemented by calling the first overload of Schedule and then using schedule with delay on Inner.

The only price is that when you want to delay schedule you need to write a bit more code, but I think smaller interface trumps that.

Remark D) Wrt to keeping or dropping SchedulePeriodically there is an epic blogpost about the rationale here http://blogs.msdn.com/b/rxteam/archive/2012/06/20/reactive-extensions-v2-0-release-candidate-available-now.aspx.

The post goes down into all the gory details about how hard it is to deal with time in computing. The short story is that a lot of the complication in the current Rx schedulers implementation and the reason for having SchedulePeriodically is to deal with time drift.

If I were to do the .NET thing again, which is really what we are doing with RxJava grin :->, I would not let that pollute the whole implementation and design of schedulers. Instead for the unlikely event that people want to do ultra-precise and super long scheduling, I would build a completely seperate implementation of a special high-precision scheduler (like TestScheduler) that has all the bells and whistles just for that purpose.

@benjchristensen
Copy link
Member Author

Thanks @headinthebox for the feedback.

Good point regarding public void schedule(Action1<Scheduler.Inner> action, Date dueTime). I will remove that since it breaks the model of relying on now().

Remark A) I think it would be better to move Now to become a property of Inner, or at least of both so you don't need to close over outer to get access to Now. Anywhere you have time-related operators you will want to access Now.

I found it became very awkward on some operators to have to schedule something before getting access to now(). So having it on both is perhaps better.

An example is timestamp which only needs Scheduler.now(). It shouldn't need to schedule something just to get now().

Remark B) & Remark C) on outer delayed scheduling.

It does greatly simplify usage of the API keeping this. It also leaves a nice symmetry between outer and inner:

public abstract class Scheduler {
    public abstract <T> Subscription schedule(Action1<Scheduler.Inner> action);
    public <T> Subscription schedule(Action1<Scheduler.Inner> action, long delayTime, final TimeUnit unit);

    public abstract class Inner implements Subscription {
        public abstract void schedule(Action1<Scheduler.Inner> action);
        public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
    }
}

For example, this schedules the first execution 100ms in the future then 500ms each time thereafter and uses this for recursion:

Schedulers.newThread().schedule(new Action1<Inner>() {

    @Override
    public void call(Inner inner) {
        doWork();
        // recurse until unsubscribed ... but delay the recursion
        inner.schedule(this, 500, TimeUnit.MILLISECONDS);
    }

}, 100, TimeUnit.MILLISECONDS);

Remark D) Wrt to keeping or dropping SchedulePeriodically

I'll read that post to better understand. We already support the implementation, and it works fairly well. It takes into account the time taken by the task being executed periodically and schedules in the future for the diff of periodic time - time taken for task.

I'm open for keeping or removing it.

@benjchristensen
Copy link
Member Author

Of note, we do have a handful of operators in rxjava-core that use schedulePeriodically so I plan on leaving it as is.

@cloudbees-pull-request-builder

RxJava-pull-requests #719 FAILURE
Looks like there's a problem with this pull request

@headinthebox
Copy link
Contributor

So having it (Now) on both is perhaps better

Works for me.

It does greatly simplify usage of the API keeping this.

No issues with that, but in that case we should also add absolute time.

We already support the implementation, and it works fairly well.

No real opinion about this, the implementation indeed does useful work, and since other operators rely on it, so let's keep it.

@benjchristensen
Copy link
Member Author

No issues with that, but in that case we should also add absolute time.

Why do you want to add absolute time, I thought we wanted to eliminate absolute time so it was always based off of now()?

These are the only signatures currently:

public abstract class Scheduler {
    public <T> Subscription schedule(Action1<Scheduler.Inner> action);
    public <T> Subscription schedule(Action1<Scheduler.Inner> action, long delayTime, final TimeUnit unit);
    public long now();
    public int degreeOfParallelism();

    public abstract class Inner implements Subscription {
        public void schedule(Action1<Scheduler.Inner> action);
        public void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
        public void schedulePeriodically(final Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit);
        public long now();
    }
}

@headinthebox
Copy link
Contributor

I was assuming the absolute time one would look like

public void schedule(Action1<Scheduler.Inner> action, long dueTime);

@benjchristensen
Copy link
Member Author

I was assuming the absolute time one would look like
public void schedule(Action1<Scheduler.Inner> action, long dueTime)

In Java the time value is always paired with TimeUnit, hence the signature like this: long delayTime, TimeUnit unit.

All of these are absolute times that are then scheduled relative to now() which always returns timeInMilliseconds.

@benjchristensen
Copy link
Member Author

As of commit 4bdf08d I now have all rxjava-core unit tests passing, and tests in most other modules.

The Scala bindings are currently broken ... I'll try looking at them this evening but if anyone else has time and interest to submit a PR to my branch fixing them that would help as Scala is not my strength.

@cloudbees-pull-request-builder

RxJava-pull-requests #724 FAILURE
Looks like there's a problem with this pull request

@benjchristensen
Copy link
Member Author

Yup ... fixing the Scala bindings is complex enough with traits, inner classes/traits, Java interop etc that I'm not going to learn Scala fast enough tonight to fix that. I would appreciate someone submitting a PR to me with the Scala changes.

/cc @headinthebox @samuelgruetter @mattrjacobs @jmhofer

@akarnokd
Copy link
Member

akarnokd commented Feb 4, 2014

Is there a reason only Inner has the schedulePeriodically and the outer doesn't?

@benjchristensen
Copy link
Member Author

I left it out for simplicity as it seems to only make sense calling it on the outer. The point is to start it and let it run. The inner is used for manual recursion. All use cases for schedulePeriodically I refactored or considered only made sense on the outer.

Since it's easy to add and hard to remove I left it out until a use case is shown. Do you have one?

@headinthebox
Copy link
Contributor

Yup ... fixing the Scala bindings is complex enough with traits, inner classes/traits, Java interop etc that I'm not going to learn Scala fast enough tonight to fix that. I would appreciate someone submitting a PR to me with the Scala changes.

No worries, working on it.

(One day I hope to convince you to switch from Groovy to Scala ....)

@headinthebox
Copy link
Contributor

@akarnokd We should be careful with that, before you know it outer is as complex as it was before ... Erik's razor is actually still a bit eager to cut out the delay one from outer.

@headinthebox
Copy link
Contributor

Wrt to now, should we let now return http://docs.oracle.com/javase/7/docs/api/java/lang/System.html#nanoTime() instead of milliSeconds? i.e. Now should reflect the most accurate time you can measure.

In any case, since now has an implicit unit, then absolute time scheduling would not need a time unit, but always be the same as that of now.

The latter is not a big deal, but I do think we should seriously look at nanoTime. Interestingly, if you read the description it has the same goal as now in a scheduler, to provide local time, that is only relevant in a closed context.

@akarnokd
Copy link
Member

akarnokd commented Feb 4, 2014

@benjchristensen I don't have any use case for that, but moving the schedulePeriodically to the outer is okay. I guess the reduced surface again means that there won't be any Action0 overloads, right?

@headinthebox The delayed version is necessary for most timed operators (take, skip, delay, etc.). Nanotime would be great (at least for the time-windowed replay).

@headinthebox
Copy link
Contributor

@akarnokd My proposal is to always go via Action0 on outer. That is what delay must do anyway, so it feel purer to make that explicit.

@akarnokd
Copy link
Member

akarnokd commented Feb 4, 2014

@headinthebox but that way, you can't use the same action in the outer and inner as the inner has to get the Inner scheduler.

While we are at it, some operators use Timestamped<T> and compare against now(). If now changes unit and origin, we need to rethink/redocument Timestamped usage as well.

@headinthebox
Copy link
Contributor

but that way, you can't use the same action in the outer and inner

That's fine with me; the outer just dispatches to inner.

Agree with Timestamped.

@benjchristensen
Copy link
Member Author

It already has an implicit unit, milliseconds, but in Java duration always goes with TimeUnit and is hen used to convert to whatever is desired.

In any case, since now has an implicit unit, then absolute time scheduling would not need a time unit, but always be the same as that of now.

The latter is not a big deal, but I do think we should seriously look at nanoTime. Interestingly, if you read the description it has the same goal as now in a scheduler, to provide local time, that is only relevant in a closed context.

We didn't go with nanos originally because:

a) you can only get relative time since the JVM started, not real time
b) no scheduler I'm aware of can schedule at that speed. (My understanding is the OS ticks at 1ms, some slower than that, I've seen them tick at 10ms)

If we change from millis to nanos it will break any code assuming millis.

Issue (a) is the one I'm most concerned with as you could no longer take Scheduler.now() and turn it into a real date. This also means historical schedulers would be useless because the values from Scheduler.now() would not be meaningful on a Calendar.

@benjchristensen
Copy link
Member Author

I guess the reduced surface again means that there won't be any Action0 overloads, right?

Correct. I'm keeping it as simple as can be. Without all of the Func2 and state overloads it is very simple now.

Also, this Scheduler interface doesn't have type-erasure issues so the methods can be targeted with lambdas. Overloads with Action0/Action1 conflict.

More importantly though, we want the Action1 interface common on all of them for two reasons:

  1. make it very difficult to not correctly use a inner for recursion
  2. use the same type on outer and inner so recursion is as simple as inner.schedule(this).

@headinthebox
Copy link
Contributor

This also means historical schedulers would be useless because the values
from Scheduler.now() would not be meaningful on a Calendar.

That should be no problem, if you try that, you are going against the spirit of now; historical schedulers implement now themselves, either by explicitly advancing (test schedulers) or by using timestamps in the input. Time is virtual, especially in historical schedulers.

@mttkay
Copy link
Contributor

mttkay commented Feb 4, 2014

I really like the changes I'm seeing. The scheduling code was quite difficult to follow and it's a lot cleaner now. One thing I'd still like to see (but might be well outside the scope of this PR) is a way to skip to first wrap around for recursive scheduling when going through observeOn. Maybe I just don't understand why it's implemented the way it is.

What's happening is that for the first notification that arrives, instead of processing that notification directly using the given scheduler, it's deferred into a function which is then piped through that scheduler again. That means, it's not the notification that will get processed during the first call to schedule, but instead another call to schedule which then again schedules the actual notification.

For schedulers like the one we use for Android, this means an extra (unnecessary) round through the message loop, thus prolonging the RTT for the first notification.

Any idea why it is implemented the way it is? I fail to see the point I guess.

@benjchristensen
Copy link
Member Author

either by explicitly advancing (test schedulers) or by using timestamps in the input. Time is virtual, especially in historical schedulers.

Agreed that the passage of time is virtual, not sure we want the value to be virtual though.

What should the timestamp operator use? It currently uses Scheduler.now() to create timestamps. If converted to nanoTime it could no longer do this.

@benjchristensen
Copy link
Member Author

Here is a broader use case on the timestamp and nanoTime subject...

Say I have multiple machines all processing events and writing an event log. They use timestamp when generating that log. A historical scheduler is then used to pull in multiple event logs from multiple machines and combine them in further offline processing. If the timestamps are real, all of the events can be merged together and time in meaningful. If each machine has its own relative representation, then the timestamps in the event logs are meaningless when compared against each other.

We could make timestamp not use Scheduler.now() but that seems contrary to the entire purpose of Scheduler.now() and would mean that timestamps generated when using virtual time (such as TestScheduler or a HistoricalScheduler) would have nothing to do with the virtual time and would incorrectly represent actual time.

In short, if we're not going to use actual time, then I think we would need to eliminate timestamp and anything else representing actual time values and user code would have to decide what "time" means if they want to do something like write out timestamped event logs.

@benjchristensen
Copy link
Member Author

@mttkay Glad you like the changes.

a way to skip to first wrap around for recursive scheduling when going through observeOn.

Interesting point, I hadn't paid attention to that before. I'm pretty sure we can eliminate that. It's more about the code organization for invoking 'processQueue` than a reason for re-scheduling on the first pass.

We can pursue fixing this in a separate PR. Thanks for pointing that out!

@benjchristensen
Copy link
Member Author

I'm proceeding with the merge as this is a major change that will conflict with almost all other pull requests. The Scala fixes (and any others needed) can be done against master.

benjchristensen added a commit that referenced this pull request Feb 4, 2014
@benjchristensen benjchristensen merged commit 3bac5b9 into ReactiveX:master Feb 4, 2014
@benjchristensen benjchristensen deleted the scheduler-inner-outer branch February 4, 2014 17:19

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "RxNewThreadScheduler-" + count.incrementAndGet());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @benjchristensen , you forgot t.setDaemon(true); for NewThreadScheduler. I just find NewThreadScheduler can not exit automatically.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants