Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/#4876 nonnull annotations #5051

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 25 additions & 14 deletions src/main/java/io/reactivex/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.TimeUnit;

import io.reactivex.annotations.Experimental;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
Expand Down Expand Up @@ -61,6 +62,7 @@ public static long clockDriftTolerance() {
*
* @return a Worker representing a serial queue of actions to be executed
*/
@NonNull
public abstract Worker createWorker();

/**
Expand All @@ -69,7 +71,7 @@ public static long clockDriftTolerance() {
* @return the 'current time'
* @since 2.0
*/
public long now(TimeUnit unit) {
public long now(@NonNull TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

Expand Down Expand Up @@ -105,7 +107,8 @@ public void shutdown() {
* @return the Disposable instance that let's one cancel this particular task.
* @since 2.0
*/
public Disposable scheduleDirect(Runnable run) {
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

Expand All @@ -122,7 +125,8 @@ public Disposable scheduleDirect(Runnable run) {
* @return the Disposable that let's one cancel this particular delayed task.
* @since 2.0
*/
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
Expand Down Expand Up @@ -159,7 +163,8 @@ public void run() {
* @return the Disposable that let's one cancel this particular delayed task.
* @since 2.0
*/
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
@NonNull
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
final Worker w = createWorker();

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
Expand Down Expand Up @@ -249,7 +254,8 @@ public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, lo
*/
@SuppressWarnings("unchecked")
@Experimental
public <S extends Scheduler & Disposable> S when(Function<Flowable<Flowable<Completable>>, Completable> combine) {
@NonNull
public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flowable<Completable>>, Completable> combine) {
return (S) new SchedulerWhen(combine, this);
}

Expand All @@ -268,7 +274,8 @@ public abstract static class Worker implements Disposable {
* Runnable to schedule
* @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
*/
public Disposable schedule(Runnable run) {
@NonNull
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}

Expand All @@ -287,7 +294,8 @@ public Disposable schedule(Runnable run) {
* the time unit of {@code delayTime}
* @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
*/
public abstract Disposable schedule(Runnable run, long delay, TimeUnit unit);
@NonNull
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);

/**
* Schedules a cancelable action to be executed periodically. This default implementation schedules
Expand All @@ -309,7 +317,8 @@ public Disposable schedule(Runnable run) {
* the time unit of {@code period}
* @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
*/
public Disposable schedulePeriodically(Runnable run, final long initialDelay, final long period, final TimeUnit unit) {
@NonNull
public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
final SequentialDisposable first = new SequentialDisposable();

final SequentialDisposable sd = new SequentialDisposable(first);
Expand Down Expand Up @@ -337,7 +346,7 @@ public Disposable schedulePeriodically(Runnable run, final long initialDelay, fi
* @return the 'current time'
* @since 2.0
*/
public long now(TimeUnit unit) {
public long now(@NonNull TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

Expand All @@ -346,15 +355,17 @@ public long now(TimeUnit unit) {
* of this task has to happen (accounting for clock drifts).
*/
final class PeriodicTask implements Runnable {
@NonNull
final Runnable decoratedRun;
@NonNull
final SequentialDisposable sd;
final long periodInNanoseconds;
long count;
long lastNowNanoseconds;
long startInNanoseconds;

PeriodicTask(long firstStartInNanoseconds, Runnable decoratedRun,
long firstNowNanoseconds, SequentialDisposable sd, long periodInNanoseconds) {
PeriodicTask(long firstStartInNanoseconds, @NonNull Runnable decoratedRun,
long firstNowNanoseconds, @NonNull SequentialDisposable sd, long periodInNanoseconds) {
this.decoratedRun = decoratedRun;
this.sd = sd;
this.periodInNanoseconds = periodInNanoseconds;
Expand Down Expand Up @@ -395,12 +406,12 @@ public void run() {
static class PeriodicDirectTask
implements Runnable, Disposable {
final Runnable run;

@NonNull
final Worker worker;

@NonNull
volatile boolean disposed;

PeriodicDirectTask(Runnable run, Worker worker) {
PeriodicDirectTask(@NonNull Runnable run, @NonNull Worker worker) {
this.run = run;
this.worker = worker;
}
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/reactivex/annotations/NonNull.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.CLASS;

/**
* Indicates that a field/parameter/variable/return type is never null.
*/
@Documented
@Target(value = {FIELD, METHOD, PARAMETER, LOCAL_VARIABLE})
@Retention(value = CLASS)
Expand Down
33 changes: 33 additions & 0 deletions src/main/java/io/reactivex/annotations/Nullable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.annotations;

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.LOCAL_VARIABLE;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.CLASS;

/**
* Indicates that a field/parameter/variable/return type may be null.
*/
@Documented
@Target(value = {FIELD, METHOD, PARAMETER, LOCAL_VARIABLE})
@Retention(value = CLASS)
public @interface Nullable { }

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.reactivex.internal.schedulers;

import io.reactivex.Scheduler;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.*;
import io.reactivex.internal.disposables.*;

Expand Down Expand Up @@ -118,19 +119,22 @@ public ComputationScheduler(ThreadFactory threadFactory) {
start();
}

@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get().getEventLoop());
}

@NonNull
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) {
PoolWorker w = pool.get().getEventLoop();
return w.scheduleDirect(run, delay, unit);
}

@NonNull
@Override
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
PoolWorker w = pool.get().getEventLoop();
return w.schedulePeriodicallyDirect(run, initialDelay, period, unit);
}
Expand Down Expand Up @@ -188,16 +192,18 @@ public boolean isDisposed() {
return disposed;
}

@NonNull
@Override
public Disposable schedule(Runnable action) {
public Disposable schedule(@NonNull Runnable action) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}

return poolWorker.scheduleActual(action, 0, null, serial);
}
@NonNull
@Override
public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) {
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.concurrent.atomic.*;

import io.reactivex.Scheduler;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.*;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.queue.MpscLinkedQueue;
Expand All @@ -29,21 +30,24 @@
*/
public final class ExecutorScheduler extends Scheduler {

@NonNull
final Executor executor;

static final Scheduler HELPER = Schedulers.single();

public ExecutorScheduler(Executor executor) {
public ExecutorScheduler(@NonNull Executor executor) {
this.executor = executor;
}

@NonNull
@Override
public Worker createWorker() {
return new ExecutorWorker(executor);
}

@NonNull
@Override
public Disposable scheduleDirect(Runnable run) {
public Disposable scheduleDirect(@NonNull Runnable run) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
try {
if (executor instanceof ExecutorService) {
Expand All @@ -60,8 +64,9 @@ public Disposable scheduleDirect(Runnable run) {
}
}

@NonNull
@Override
public Disposable scheduleDirect(Runnable run, final long delay, final TimeUnit unit) {
public Disposable scheduleDirect(@NonNull Runnable run, final long delay, final TimeUnit unit) {
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
if (executor instanceof ScheduledExecutorService) {
try {
Expand All @@ -87,8 +92,9 @@ public void run() {
return dr;
}

@NonNull
@Override
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
if (executor instanceof ScheduledExecutorService) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
try {
Expand Down Expand Up @@ -118,8 +124,9 @@ public ExecutorWorker(Executor executor) {
this.queue = new MpscLinkedQueue<Runnable>();
}

@NonNull
@Override
public Disposable schedule(Runnable run) {
public Disposable schedule(@NonNull Runnable run) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
Expand All @@ -143,8 +150,9 @@ public Disposable schedule(Runnable run) {
return br;
}

@NonNull
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
if (delay <= 0) {
return schedule(run);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.TimeUnit;

import io.reactivex.Scheduler;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.*;

/**
Expand Down Expand Up @@ -45,22 +46,26 @@ private ImmediateThinScheduler() {
// singleton class
}

@NonNull
@Override
public Disposable scheduleDirect(Runnable run) {
public Disposable scheduleDirect(@NonNull Runnable run) {
run.run();
return DISPOSED;
}

@NonNull
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) {
throw new UnsupportedOperationException("This scheduler doesn't support delayed execution");
}

@NonNull
@Override
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
throw new UnsupportedOperationException("This scheduler doesn't support periodic execution");
}

@NonNull
@Override
public Worker createWorker() {
return WORKER;
Expand All @@ -78,19 +83,22 @@ public boolean isDisposed() {
return false; // dispose() has no effect
}

@NonNull
@Override
public Disposable schedule(Runnable run) {
public Disposable schedule(@NonNull Runnable run) {
run.run();
return DISPOSED;
}

@NonNull
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
throw new UnsupportedOperationException("This scheduler doesn't support delayed execution");
}

@NonNull
@Override
public Disposable schedulePeriodically(Runnable run, long initialDelay, long period, TimeUnit unit) {
public Disposable schedulePeriodically(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
throw new UnsupportedOperationException("This scheduler doesn't support periodic execution");
}
}
Expand Down
Loading