Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Expand the JavaDocs of the Scheduler API #5843

Merged
merged 2 commits into from
Feb 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 133 additions & 25 deletions src/main/java/io/reactivex/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,72 @@

/**
* A {@code Scheduler} is an object that specifies an API for scheduling
* units of work with or without delays or periodically.
* You can get common instances of this class in {@link io.reactivex.schedulers.Schedulers}.
* units of work provided in the form of {@link Runnable}s to be
* executed without delay (effectively as soon as possible), after a specified time delay or periodically
* and represents an abstraction over an asynchronous boundary that ensures
* these units of work get executed by some underlying task-execution scheme
* (such as custom Threads, event loop, {@link java.util.concurrent.Executor Executor} or Actor system)
* with some uniform properties and guarantees regardless of the particular underlying
* scheme.
* <p>
* You can get various standard, RxJava-specific instances of this class via
* the static methods of the {@link io.reactivex.schedulers.Schedulers} utility class.
* <p>
* The so-called {@link Worker}s of a {@code Scheduler} can be created via the {@link #createWorker()} method which allow the scheduling
* of multiple {@link Runnable} tasks in an isolated manner. {@code Runnable} tasks scheduled on a {@code Worker} are guaranteed to be
* executed sequentially and in a non-overlapping fashion. Non-delayed {@code Runnable} tasks are guaranteed to execute in a
* First-In-First-Out order but their execution may be interleaved with delayed tasks.
* In addition, outstanding or running tasks can be cancelled together via
* {@link Worker#dispose()} without affecting any other {@code Worker} instances of the same {@code Scheduler}.
* <p>
* Implementations of the {@link #scheduleDirect} and {@link Worker#schedule} methods are encouraged to call the {@link io.reactivex.plugins.RxJavaPlugins#onSchedule(Runnable)}
* method to allow a scheduler hook to manipulate (wrap or replace) the original {@code Runnable} task before it is submitted to the
* underlying task-execution scheme.
* <p>
* The default implementations of the {@code scheduleDirect} methods provided by this abstract class
* delegate to the respective {@code schedule} methods in the {@link Worker} instance created via {@link #createWorker()}
* for each individual {@link Runnable} task submitted. Implementors of this class are encouraged to provide
* a more efficient direct scheduling implementation to avoid the time and memory overhead of creating such {@code Worker}s
* for every task.
* This delegation is done via special wrapper instances around the original {@code Runnable} before calling the respective
* {@code Worker.schedule} method. Note that this can lead to multiple {@code RxJavaPlugins.onSchedule} calls and potentially
* multiple hooks applied. Therefore, the default implementations of {@code scheduleDirect} (and the {@link Worker#schedulePeriodically(Runnable, long, long, TimeUnit)})
* wrap the incoming {@code Runnable} into a class that implements the {@link io.reactivex.schedulers.SchedulerRunnableIntrospection}
* interface which can grant access to the original or hooked {@code Runnable}, thus, a repeated {@code RxJavaPlugins.onSchedule}
* can detect the earlier hook and not apply a new one over again.
* <p>
* The default implementation of {@link #now(TimeUnit)} and {@link Worker#now(TimeUnit)} methods to return current
* {@link System#currentTimeMillis()} value in the desired time unit. Custom {@code Scheduler} implementations can override this
* to provide specialized time accounting (such as virtual time to be advanced programmatically).
* Note that operators requiring a {@code Scheduler} may rely on either of the {@code now()} calls provided by
* {@code Scheduler} or {@code Worker} respectively, therefore, it is recommended they represent a logically
* consistent source of the current time.
* <p>
* The default implementation of the {@link Worker#schedulePeriodically(Runnable, long, long, TimeUnit)} method uses
* the {@link Worker#schedule(Runnable, long, TimeUnit)} for scheduling the {@code Runnable} task periodically.
* The algorithm calculates the next absolute time when the task should run again and schedules this execution
* based on the relative time between it and {@link Worker#now(TimeUnit)}. However, drifts or changes in the
* system clock could affect this calculation either by scheduling subsequent runs too frequently or too far apart.
* Therefore, the default implementation uses the {@link #clockDriftTolerance()} value (set via
* {@code rx2.scheduler.drift-tolerance} in minutes) to detect a drift in {@link Worker#now(TimeUnit)} and
* re-adjust the absolute/relative time calculation accordingly.
* <p>
* The default implementations of {@link #start()} and {@link #shutdown()} do nothing and should be overridden if the
* underlying task-execution scheme supports stopping and restarting itself.
* <p>
* If the {@code Scheduler} is shut down or a {@code Worker} is disposed, the {@code schedule} methods
* should return the {@link io.reactivex.disposables.Disposables#disposed()} singleton instance indicating the shut down/disposed
* state to the caller. Since the shutdown or dispose can happen from any thread, the {@code schedule} implementations
* should make best effort to cancel tasks immediately after those tasks have been submitted to the
* underlying task-execution scheme if the shutdown/dispose was detected after this submission.
* <p>
* All methods on the {@code Scheduler} and {@code Worker} classes should be thread safe.
*/
public abstract class Scheduler {
/**
* The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase.
* <p>
* The associated system parameter, {@code rx.scheduler.drift-tolerance}, expects its value in minutes.
* The associated system parameter, {@code rx2.scheduler.drift-tolerance}, expects its value in minutes.
*/
static final long CLOCK_DRIFT_TOLERANCE_NANOSECONDS;
static {
Expand All @@ -44,7 +102,7 @@ public abstract class Scheduler {

/**
* Returns the clock drift tolerance in nanoseconds.
* <p>Related system property: {@code rx2.scheduler.drift-tolerance} in minutes
* <p>Related system property: {@code rx2.scheduler.drift-tolerance} in minutes.
* @return the tolerance in nanoseconds
* @since 2.0
*/
Expand All @@ -54,11 +112,13 @@ public static long clockDriftTolerance() {


/**
* Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions.
* Retrieves or creates a new {@link Scheduler.Worker} that represents sequential execution of actions.
* <p>
* When work is completed it should be unsubscribed using {@link Scheduler.Worker#dispose()}.
* When work is completed, the {@code Worker} instance should be released
* by calling {@link Scheduler.Worker#dispose()} to avoid potential resource leaks in the
* underlying task-execution scheme.
* <p>
* Work on a {@link Scheduler.Worker} is guaranteed to be sequential.
* Work on a {@link Scheduler.Worker} is guaranteed to be sequential and non-overlapping.
*
* @return a Worker representing a serial queue of actions to be executed
*/
Expand All @@ -78,29 +138,37 @@ public long now(@NonNull TimeUnit unit) {
/**
* Allows the Scheduler instance to start threads
* and accept tasks on them.
* <p>Implementations should make sure the call is idempotent and thread-safe.
* <p>
* Implementations should make sure the call is idempotent, thread-safe and
* should not throw any {@code RuntimeException} if it doesn't support this
* functionality.
*
* @since 2.0
*/
public void start() {

}

/**
* Instructs the Scheduler instance to stop threads
* and stop accepting tasks on any outstanding Workers.
* <p>Implementations should make sure the call is idempotent and thread-safe.
* Instructs the Scheduler instance to stop threads,
* stop accepting tasks on any outstanding {@link Worker} instances
* and clean up any associated resources with this Scheduler.
* <p>
* Implementations should make sure the call is idempotent, thread-safe and
* should not throw any {@code RuntimeException} if it doesn't support this
* functionality.
* @since 2.0
*/
public void shutdown() {

}

/**
* Schedules the given task on this scheduler non-delayed execution.
* Schedules the given task on this Scheduler without any time delay.
*
* <p>
* This method is safe to be called from multiple threads but there are no
* ordering guarantees between tasks.
* ordering or non-overlapping guarantees between tasks.
*
* @param run the task to execute
*
Expand All @@ -113,7 +181,7 @@ public Disposable scheduleDirect(@NonNull Runnable run) {
}

/**
* Schedules the execution of the given task with the given delay amount.
* Schedules the execution of the given task with the given time delay.
*
* <p>
* This method is safe to be called from multiple threads but there are no
Expand All @@ -139,15 +207,16 @@ public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull Tim
}

/**
* Schedules a periodic execution of the given task with the given initial delay and period.
* Schedules a periodic execution of the given task with the given initial time delay and repeat period.
*
* <p>
* This method is safe to be called from multiple threads but there are no
* ordering guarantees between tasks.
*
* <p>
* The periodic execution is at a fixed rate, that is, the first execution will be after the initial
* delay, the second after initialDelay + period, the third after initialDelay + 2 * period, and so on.
* The periodic execution is at a fixed rate, that is, the first execution will be after the
* {@code initialDelay}, the second after {@code initialDelay + period}, the third after
* {@code initialDelay + 2 * period}, and so on.
*
* @param run the task to schedule
* @param initialDelay the initial delay amount, non-positive values indicate non-delayed scheduling
Expand Down Expand Up @@ -254,13 +323,43 @@ public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flow
}

/**
* Sequential Scheduler for executing actions on a single thread or event loop.
* Represents an isolated, sequential worker of a parent Scheduler for executing {@code Runnable} tasks on
* an underlying task-execution scheme (such as custom Threads, event loop, {@link java.util.concurrent.Executor Executor} or Actor system).
* <p>
* Disposing the {@link Worker} should cancel all outstanding work and allows resource cleanup.
* <p>
* Disposing the {@link Worker} cancels all outstanding work and allows resource cleanup.
* The default implementations of {@link #schedule(Runnable)} and {@link #schedulePeriodically(Runnable, long, long, TimeUnit)}
* delegate to the abstract {@link #schedule(Runnable, long, TimeUnit)} method. Its implementation is encouraged to
* track the individual {@code Runnable} tasks while they are waiting to be executed (with or without delay) so that
* {@link #dispose()} can prevent their execution or potentially interrupt them if they are currently running.
* <p>
* The default implementation of the {@link #now(TimeUnit)} method returns current
* {@link System#currentTimeMillis()} value in the desired time unit. Custom {@code Worker} implementations can override this
* to provide specialized time accounting (such as virtual time to be advanced programmatically).
* Note that operators requiring a scheduler may rely on either of the {@code now()} calls provided by
* {@code Scheduler} or {@code Worker} respectively, therefore, it is recommended they represent a logically
* consistent source of the current time.
* <p>
* The default implementation of the {@link #schedulePeriodically(Runnable, long, long, TimeUnit)} method uses
* the {@link #schedule(Runnable, long, TimeUnit)} for scheduling the {@code Runnable} task periodically.
* The algorithm calculates the next absolute time when the task should run again and schedules this execution
* based on the relative time between it and {@link #now(TimeUnit)}. However, drifts or changes in the
* system clock would affect this calculation either by scheduling subsequent runs too frequently or too far apart.
* Therefore, the default implementation uses the {@link #clockDriftTolerance()} value (set via
* {@code rx2.scheduler.drift-tolerance} in minutes) to detect a drift in {@link #now(TimeUnit)} and
* re-adjust the absolute/relative time calculation accordingly.
* <p>
* If the {@code Worker} is disposed, the {@code schedule} methods
* should return the {@link io.reactivex.disposables.Disposables#disposed()} singleton instance indicating the disposed
* state to the caller. Since the {@link #dispose()} call can happen on any thread, the {@code schedule} implementations
* should make best effort to cancel tasks immediately after those tasks have been submitted to the
* underlying task-execution scheme if the dispose was detected after this submission.
* <p>
* All methods on the {@code Worker} class should be thread safe.
*/
public abstract static class Worker implements Disposable {
/**
* Schedules a Runnable for execution without delay.
* Schedules a Runnable for execution without any time delay.
*
* <p>The default implementation delegates to {@link #schedule(Runnable, long, TimeUnit)}.
*
Expand All @@ -274,15 +373,16 @@ public Disposable schedule(@NonNull Runnable run) {
}

/**
* Schedules an Runnable for execution at some point in the future.
* Schedules an Runnable for execution at some point in the future specified by a time delay
* relative to the current time.
* <p>
* Note to implementors: non-positive {@code delayTime} should be regarded as non-delayed schedule, i.e.,
* as if the {@link #schedule(Runnable)} was called.
*
* @param run
* the Runnable to schedule
* @param delay
* time to wait before executing the action; non-positive values indicate an non-delayed
* time to "wait" before executing the action; non-positive values indicate an non-delayed
* schedule
* @param unit
* the time unit of {@code delayTime}
Expand All @@ -292,12 +392,20 @@ public Disposable schedule(@NonNull Runnable run) {
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);

/**
* Schedules a cancelable action to be executed periodically. This default implementation schedules
* recursively and waits for actions to complete (instead of potentially executing long-running actions
* concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
* Schedules a periodic execution of the given task with the given initial time delay and repeat period.
* <p>
* The default implementation schedules and reschedules the {@code Runnable} task via the
* {@link #schedule(Runnable, long, TimeUnit)}
* method over and over and at a fixed rate, that is, the first execution will be after the
* {@code initialDelay}, the second after {@code initialDelay + period}, the third after
* {@code initialDelay + 2 * period}, and so on.
* <p>
* Note to implementors: non-positive {@code initialTime} and {@code period} should be regarded as
* non-delayed scheduling of the first and any subsequent executions.
* In addition, a more specific {@code Worker} implementation should override this method
* if it can perform the periodic task execution with less overhead (such as by avoiding the
* creation of the wrapper and tracker objects upon each periodic invocation of the
* common {@link #schedule(Runnable, long, TimeUnit)} method).
*
* @param run
* the Runnable to execute periodically
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@
import io.reactivex.plugins.RxJavaPlugins;

/**
* Interface to wrap an action inside internal scheduler's task.
*
* You can check if runnable implements this interface and unwrap original runnable.
* For example inside of the {@link RxJavaPlugins#setScheduleHandler(Function)}
* Interface to indicate the implementor class wraps a {@code Runnable} that can
* be accessed via {@link #getWrappedRunnable()}.
* <p>
* You can check if a {@link Runnable} task submitted to a {@link io.reactivex.Scheduler Scheduler} (or its
* {@link io.reactivex.Scheduler.Worker Scheduler.Worker}) implements this interface and unwrap the
* original {@code Runnable} instance. This could help to avoid hooking the same underlying {@code Runnable}
* task in a custom {@link RxJavaPlugins#onSchedule(Runnable)} hook set via
* the {@link RxJavaPlugins#setScheduleHandler(Function)} method multiple times due to internal delegation
* of the default {@code Scheduler.scheduleDirect} or {@code Scheduler.Worker.schedule} methods.
*
* @since 2.1.7 - experimental
*/
Expand Down