Skip to content

Commit

Permalink
2.x: Introduce property rx2.scheduler.use-nanotime (#7154) (#7170)
Browse files Browse the repository at this point in the history
Co-authored-by: Sergej Isbrecht <[email protected]>
  • Loading branch information
SergejIsbrecht and Sergej Isbrecht authored Jan 28, 2021
1 parent f31aed3 commit 776f0e5
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 6 deletions.
42 changes: 36 additions & 6 deletions src/main/java/io/reactivex/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@
* interface which can grant access to the original or hooked {@code Runnable}, thus, a repeated {@code RxJavaPlugins.onSchedule}
* can detect the earlier hook and not apply a new one over again.
* <p>
* The default implementation of {@link #now(TimeUnit)} and {@link Worker#now(TimeUnit)} methods to return current
* {@link System#currentTimeMillis()} value in the desired time unit. Custom {@code Scheduler} implementations can override this
* The default implementation of {@link #now(TimeUnit)} and {@link Worker#now(TimeUnit)} methods to return current {@link System#currentTimeMillis()}
* value in the desired time unit, unless {@code rx2.scheduler.use-nanotime} (boolean) is set. When the property is set to
* {@code true}, the method uses {@link System#nanoTime()} as its basis instead. Custom {@code Scheduler} implementations can override this
* to provide specialized time accounting (such as virtual time to be advanced programmatically).
* Note that operators requiring a {@code Scheduler} may rely on either of the {@code now()} calls provided by
* {@code Scheduler} or {@code Worker} respectively, therefore, it is recommended they represent a logically
Expand All @@ -89,6 +90,34 @@
* All methods on the {@code Scheduler} and {@code Worker} classes should be thread safe.
*/
public abstract class Scheduler {
/**
* Value representing whether to use {@link System#nanoTime()}, or default as clock for {@link #now(TimeUnit)}
* and {@link Scheduler.Worker#now(TimeUnit)}
* <p>
* Associated system parameter:
* <ul>
* <li>{@code rx2.scheduler.use-nanotime}, boolean, default {@code false}
* </ul>
*/
static boolean IS_DRIFT_USE_NANOTIME = Boolean.getBoolean("rx2.scheduler.use-nanotime");

/**
* Returns the current clock time depending on state of {@link Scheduler#IS_DRIFT_USE_NANOTIME} in given {@code unit}
* <p>
* By default {@link System#currentTimeMillis()} will be used as the clock. When the property is set
* {@link System#nanoTime()} will be used.
* <p>
* @param unit the time unit
* @return the 'current time' in given unit
* @throws NullPointerException if {@code unit} is {@code null}
*/
static long computeNow(TimeUnit unit) {
if(!IS_DRIFT_USE_NANOTIME) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
return unit.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
}

/**
* The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase.
* <p>
Expand Down Expand Up @@ -131,7 +160,7 @@ public static long clockDriftTolerance() {
* @since 2.0
*/
public long now(@NonNull TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return computeNow(unit);
}

/**
Expand Down Expand Up @@ -332,8 +361,9 @@ public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flow
* track the individual {@code Runnable} tasks while they are waiting to be executed (with or without delay) so that
* {@link #dispose()} can prevent their execution or potentially interrupt them if they are currently running.
* <p>
* The default implementation of the {@link #now(TimeUnit)} method returns current
* {@link System#currentTimeMillis()} value in the desired time unit. Custom {@code Worker} implementations can override this
* The default implementation of the {@link #now(TimeUnit)} method returns current {@link System#currentTimeMillis()}
* value in the desired time unit, unless {@code rx2.scheduler.use-nanotime} (boolean) is set. When the property is set to
* {@code true}, the method uses {@link System#nanoTime()} as its basis instead. Custom {@code Worker} implementations can override this
* to provide specialized time accounting (such as virtual time to be advanced programmatically).
* Note that operators requiring a scheduler may rely on either of the {@code now()} calls provided by
* {@code Scheduler} or {@code Worker} respectively, therefore, it is recommended they represent a logically
Expand Down Expand Up @@ -448,7 +478,7 @@ public Disposable schedulePeriodically(@NonNull Runnable run, final long initial
* @since 2.0
*/
public long now(@NonNull TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return computeNow(unit);
}

/**
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
* <li>{@code rx2.single-priority} (int): sets the thread priority of the {@link #single()} Scheduler, default is {@link Thread#NORM_PRIORITY}</li>
* <li>{@code rx2.purge-enabled} (boolean): enables periodic purging of all Scheduler's backing thread pools, default is false</li>
* <li>{@code rx2.purge-period-seconds} (int): specifies the periodic purge interval of all Scheduler's backing thread pools, default is 1 second</li>
* <li>{@code rx2.scheduler.use-nanotime} (boolean): {@code true} instructs {@code Scheduler} to use {@link System#nanoTime()} for {@link Scheduler#now(TimeUnit)},
* instead of default {@link System#currentTimeMillis()} ({@code false})</li>
* </ul>
*/
public final class Schedulers {
Expand Down
61 changes: 61 additions & 0 deletions src/test/java/io/reactivex/SchedulerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

import org.junit.After;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

import static org.junit.Assert.*;

/**
* Same as {@link io.reactivex.schedulers.SchedulerTest}, but different package, to access
* package-private fields.
*/
public class SchedulerTest {
private static final String DRIFT_USE_NANOTIME = "rx2.scheduler.use-nanotime";

@After
public void cleanup() {
// reset value to default in order to not influence other tests
Scheduler.IS_DRIFT_USE_NANOTIME = false;
}

@Test
public void driftUseNanoTimeNotSetByDefault() {
assertFalse(Scheduler.IS_DRIFT_USE_NANOTIME);
assertFalse(Boolean.getBoolean(DRIFT_USE_NANOTIME));
}

@Test
public void computeNow_currentTimeMillis() {
TimeUnit unit = TimeUnit.MILLISECONDS;
assertTrue(isInRange(System.currentTimeMillis(), Scheduler.computeNow(unit), unit, 250, TimeUnit.MILLISECONDS));
}

@Test
public void computeNow_nanoTime() {
TimeUnit unit = TimeUnit.NANOSECONDS;
Scheduler.IS_DRIFT_USE_NANOTIME = true;

assertFalse(isInRange(System.currentTimeMillis(), Scheduler.computeNow(unit), unit, 250, TimeUnit.MILLISECONDS));
assertTrue(isInRange(System.nanoTime(), Scheduler.computeNow(unit), TimeUnit.NANOSECONDS, 250, TimeUnit.MILLISECONDS));
}

private boolean isInRange(long start, long stop, TimeUnit source, long maxDiff, TimeUnit diffUnit) {
long diff = Math.abs(stop - start);
return diffUnit.convert(diff, source) <= maxDiff;
}
}

0 comments on commit 776f0e5

Please sign in to comment.