Skip to content

Commit

Permalink
Revisit nullability annotations (reactor#864)
Browse files Browse the repository at this point in the history
Revisit nullability annotations
 - adds @nonnull and @nullable annotations which allows to apply
null-safety semantics on a specific type like JSR 305 but allows to
target generic type arguments.
 - @NonNullApi does not apply to ElementType.TYPE anymore
 - Nullability annotations are now also applied to generic type arguments,
varargs and array elements.
 - Package is now reactor.util.annotation
 - Add null-safety reference documentation
  • Loading branch information
sdeleuze authored and smaldini committed Sep 21, 2017
1 parent f3e13bf commit b597815
Show file tree
Hide file tree
Showing 242 changed files with 1,427 additions and 1,196 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ project('reactor-core') {
testCompile "org.reactivestreams:reactive-streams-tck:1.0.1"

// JSR-305 annotations
optional "com.google.code.findbugs:jsr305:3.0.0"
optional "com.google.code.findbugs:jsr305:3.0.2"

//Optional Logging Operator
optional "org.slf4j:slf4j-api:$slf4jVersion"
Expand Down
17 changes: 9 additions & 8 deletions reactor-core/src/main/java/reactor/adapter/JdkFlowAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.util.annotation.NonNull;

/**
* Convert a Java 9+ {@literal Flow.Publisher} to/from a Reactive Streams {@link Publisher}.
Expand All @@ -39,7 +40,7 @@ public abstract class JdkFlowAdapter {
* @param <T> the type of the publisher
* @return a java {@code Flow.Publisher} from the given {@link Publisher}
*/
public static <T> Flow.Publisher<T> publisherToFlowPublisher(final Publisher<T>
public static <T> Flow.Publisher<@NonNull T> publisherToFlowPublisher(final Publisher<@NonNull T>
publisher) {
return new PublisherAsFlowPublisher<>(publisher);
}
Expand All @@ -51,32 +52,32 @@ public static <T> Flow.Publisher<T> publisherToFlowPublisher(final Publisher<T>
* @param <T> the type of the publisher
* @return a {@link Flux} from a java {@code Flow.Publisher}
*/
public static <T> Flux<T> flowPublisherToFlux(Flow.Publisher<T> publisher) {
public static <T> Flux<@NonNull T> flowPublisherToFlux(Flow.Publisher<@NonNull T> publisher) {
return new FlowPublisherAsFlux<>(publisher);
}

private static class FlowPublisherAsFlux<T> extends Flux<T> {
private final java.util.concurrent.Flow.Publisher<T> pub;

private FlowPublisherAsFlux(java.util.concurrent.Flow.Publisher<T> pub) {
private FlowPublisherAsFlux(java.util.concurrent.Flow.Publisher<@NonNull T> pub) {
this.pub = pub;
}

@Override
public void subscribe(final CoreSubscriber<? super T> actual) {
public void subscribe(final CoreSubscriber<? super @NonNull T> actual) {
pub.subscribe(new SubscriberToRS<>(actual));
}
}

private static class PublisherAsFlowPublisher<T> implements Flow.Publisher<T> {
private final Publisher<T> pub;

private PublisherAsFlowPublisher(Publisher<T> pub) {
private PublisherAsFlowPublisher(Publisher<@NonNull T> pub) {
this.pub = pub;
}

@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
public void subscribe(Flow.Subscriber<? super @NonNull T> subscriber) {
pub.subscribe(new FlowSubscriber<>(subscriber));
}
}
Expand All @@ -87,7 +88,7 @@ private static class FlowSubscriber<T> implements CoreSubscriber<T>, Flow.Subscr

Subscription subscription;

public FlowSubscriber(Flow.Subscriber<? super T> subscriber) {
public FlowSubscriber(Flow.Subscriber<? super @NonNull T> subscriber) {
this.subscriber = subscriber;
}

Expand Down Expand Up @@ -129,7 +130,7 @@ private static class SubscriberToRS<T> implements Flow.Subscriber<T>, Subscripti

Flow.Subscription subscription;

public SubscriberToRS(Subscriber<? super T> s) {
public SubscriberToRS(Subscriber<? super @NonNull T> s) {
this.s = s;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@
@NonNullApi
package reactor.adapter;

import reactor.util.lang.NonNullApi;
import reactor.util.annotation.NonNullApi;
3 changes: 2 additions & 1 deletion reactor-core/src/main/java/reactor/core/Disposable.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

import java.util.Collection;
import java.util.function.Supplier;
import javax.annotation.Nullable;

import reactor.util.annotation.Nullable;

/**
* Indicates that a task or resource can be cancelled/disposed.
Expand Down
15 changes: 8 additions & 7 deletions reactor-core/src/main/java/reactor/core/Disposables.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Stream;
import javax.annotation.Nullable;

import reactor.util.concurrent.Queues;
import reactor.util.annotation.NonNull;
import reactor.util.annotation.Nullable;

/**
* A support class that offers factory methods for implementations of the specialized
Expand Down Expand Up @@ -55,7 +56,7 @@ public static Disposable.Composite composite() {
*
* @return a pre-filled atomic {@link Disposable.Composite}
*/
public static Disposable.Composite composite(Disposable... disposables) {
public static Disposable.Composite composite(Disposable @NonNull ... disposables) {
return new CompositeDisposable(disposables);
}

Expand All @@ -66,7 +67,7 @@ public static Disposable.Composite composite(Disposable... disposables) {
* @return a pre-filled atomic {@link Disposable.Composite}
*/
public static Disposable.Composite composite(
Iterable<? extends Disposable> disposables) {
Iterable<? extends @NonNull Disposable> disposables) {
return new CompositeDisposable(disposables);
}

Expand Down Expand Up @@ -146,7 +147,7 @@ static final class CompositeDisposable implements Disposable.Composite, Scannabl
* Creates a {@link CompositeDisposable} with the given array of initial elements.
* @param disposables the array of {@link Disposable} to start with
*/
CompositeDisposable(Disposable... disposables) {
CompositeDisposable(Disposable @NonNull ... disposables) {
Objects.requireNonNull(disposables, "disposables is null");

int capacity = disposables.length + 1;
Expand All @@ -167,7 +168,7 @@ static final class CompositeDisposable implements Disposable.Composite, Scannabl
* initial elements.
* @param disposables the Iterable sequence of {@link Disposable} to start with
*/
CompositeDisposable(Iterable<? extends Disposable> disposables) {
CompositeDisposable(Iterable<? extends @NonNull Disposable> disposables) {
Objects.requireNonNull(disposables, "disposables is null");
this.loadFactor = DEFAULT_LOAD_FACTOR;
int c = DEFAULT_CAPACITY;
Expand Down Expand Up @@ -239,7 +240,7 @@ public boolean add(Disposable d) {
}

@Override
public boolean addAll(Collection<? extends Disposable> ds) {
public boolean addAll(Collection<? extends @NonNull Disposable> ds) {
Objects.requireNonNull(ds, "ds is null");
if (!disposed) {
synchronized (this) {
Expand Down Expand Up @@ -357,7 +358,7 @@ boolean removeEntry(Disposable value) {
}
}

boolean removeEntry(int pos, Disposable[] a, int m) {
boolean removeEntry(int pos, Disposable @Nullable [] a, int m) {
size--;

int last;
Expand Down
8 changes: 5 additions & 3 deletions reactor-core/src/main/java/reactor/core/Exceptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import java.util.Objects;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;

import reactor.util.annotation.NonNull;
import reactor.util.annotation.Nullable;

/**
* Global Reactor Core Exception handling and utils to operate on.
Expand Down Expand Up @@ -102,7 +104,7 @@ public static <T> boolean addThrowable(AtomicReferenceFieldUpdater<T, Throwable>
* suppressed exceptions
* @see #addThrowable(AtomicReferenceFieldUpdater, Object, Throwable)
*/
public static RuntimeException multiple(Throwable... throwables) {
public static RuntimeException multiple(Throwable @NonNull ... throwables) {
CompositeException multiple = new CompositeException();
//noinspection ConstantConditions
if (throwables != null) {
Expand All @@ -127,7 +129,7 @@ public static RuntimeException multiple(Throwable... throwables) {
* suppressed exceptions
* @see #addThrowable(AtomicReferenceFieldUpdater, Object, Throwable)
*/
public static RuntimeException multiple(Iterable<Throwable> throwables) {
public static RuntimeException multiple(Iterable<@NonNull Throwable> throwables) {
RuntimeException multiple = new RuntimeException("Multiple exceptions");
//noinspection ConstantConditions
if (throwables != null) {
Expand Down
18 changes: 9 additions & 9 deletions reactor-core/src/main/java/reactor/core/Scannable.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;

import reactor.util.function.Tuple2;
import reactor.util.annotation.NonNull;
import reactor.util.annotation.Nullable;

/**
* A Scannable component exposes state in a non strictly memory consistent way and
Expand Down Expand Up @@ -214,7 +214,7 @@ public boolean isScanAvailable() {
}
};

static Stream<? extends Scannable> recurse(Scannable _s,
static Stream<? extends @NonNull Scannable> recurse(Scannable _s,
Attr<Scannable> key){
Scannable s = Scannable.from(_s.scan(key));
if(!s.isScanAvailable()) {
Expand Down Expand Up @@ -265,7 +265,7 @@ static Scannable from(@Nullable Object o) {
* @return a {@link Stream} navigating the {@link org.reactivestreams.Subscriber}
* chain (downward)
*/
default Stream<? extends Scannable> actuals() {
default Stream<? extends @NonNull Scannable> actuals() {
return Attr.recurse(this, Attr.ACTUAL);
}

Expand All @@ -274,7 +274,7 @@ default Stream<? extends Scannable> actuals() {
*
* @return a {@link Stream} of referenced inners (flatmap, multicast etc)
*/
default Stream<? extends Scannable> inners() {
default Stream<? extends @NonNull Scannable> inners() {
return Stream.empty();
}

Expand Down Expand Up @@ -330,7 +330,7 @@ default String operatorName() {
* @return a {@link Stream} navigating the {@link org.reactivestreams.Subscription}
* chain (upward)
*/
default Stream<? extends Scannable> parents() {
default Stream<? extends @NonNull Scannable> parents() {
return Attr.recurse(this, Attr.PARENT);
}

Expand Down Expand Up @@ -361,7 +361,7 @@ default Stream<? extends Scannable> parents() {
*
*/
@Nullable
default <T> T scan(Attr<T> key) {
default <T> T scan(Attr<@Nullable T> key) {
@SuppressWarnings("unchecked")
T value = (T) scanUnsafe(key);
if (value == null)
Expand All @@ -379,7 +379,7 @@ default <T> T scan(Attr<T> key) {
*
* @return a value associated to the key or the provided default if unmatched or unresolved
*/
default <T> T scanOrDefault(Attr<T> key, T defaultValue) {
default <T> T scanOrDefault(Attr<@Nullable T> key, T defaultValue) {
@SuppressWarnings("unchecked")
T v = (T) scanUnsafe(key);
if (v == null) {
Expand All @@ -394,7 +394,7 @@ default <T> T scanOrDefault(Attr<T> key, T defaultValue) {
*
* @return the stream of tags for this {@link Scannable} and its parents
*/
default Stream<Tuple2<String, String>> tags() {
default Stream<@NonNull Tuple2<@NonNull String, @NonNull String>> tags() {
Stream<Tuple2<String, String>> parentTags =
parents().flatMap(s -> s.scan(Attr.TAGS));

Expand Down
2 changes: 1 addition & 1 deletion reactor-core/src/main/java/reactor/core/package-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@
@NonNullApi
package reactor.core;

import reactor.util.lang.NonNullApi;
import reactor.util.annotation.NonNullApi;
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.util.context.Context;
import reactor.util.annotation.NonNull;
import reactor.util.annotation.Nullable;

/**
* An iterable that consumes a Publisher in a blocking fashion.
Expand Down Expand Up @@ -74,7 +74,7 @@ public Object scanUnsafe(Attr key) {
}

@Override
public Iterator<T> iterator() {
public Iterator<@NonNull T> iterator() {
SubscriberIterator<T> it = createIterator();

source.subscribe(it);
Expand All @@ -83,15 +83,15 @@ public Iterator<T> iterator() {
}

@Override
public Spliterator<T> spliterator() {
public Spliterator<@NonNull T> spliterator() {
return stream().spliterator(); // cancellation should be composed through this way
}

/**
* @return a {@link Stream} of unknown size with onClose attached to {@link
* Subscription#cancel()}
*/
public Stream<T> stream() {
public Stream<@NonNull T> stream() {
SubscriberIterator<T> it = createIterator();
source.subscribe(it);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import javax.annotation.Nullable;
import reactor.util.annotation.Nullable;

/**
* @see <a href="https://github.com/reactor/reactive-streams-commons">https://github.com/reactor/reactive-streams-commons</a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import reactor.core.Fuseable;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.NonNull;

/**
* The abstract base class for connectable publishers that let subscribers pile up
Expand All @@ -42,7 +43,7 @@ public abstract class ConnectableFlux<T> extends Flux<T> {
*
* @return a {@link Flux} that connects to the upstream source when the first {@link org.reactivestreams.Subscriber} subscribes
*/
public final Flux<T> autoConnect() {
public final Flux<@NonNull T> autoConnect() {
return autoConnect(1);
}

Expand All @@ -60,7 +61,7 @@ public final Flux<T> autoConnect() {
*
* @return a {@link Flux} that connects to the upstream source when the given amount of Subscribers subscribed
*/
public final Flux<T> autoConnect(int minSubscribers) {
public final Flux<@NonNull T> autoConnect(int minSubscribers) {
return autoConnect(minSubscribers, NOOP_DISCONNECT);
}

Expand All @@ -75,7 +76,7 @@ public final Flux<T> autoConnect(int minSubscribers) {
*
* @return a {@link Flux} that connects to the upstream source when the given amount of subscribers subscribed
*/
public final Flux<T> autoConnect(int minSubscribers, Consumer<? super Disposable> cancelSupport) {
public final Flux<@NonNull T> autoConnect(int minSubscribers, Consumer<? super Disposable> cancelSupport) {
if (minSubscribers == 0) {
connect(cancelSupport);
return this;
Expand Down Expand Up @@ -111,7 +112,7 @@ public final Disposable connect() {
* @param cancelSupport the callback is called with a Disposable instance that can
* be called to disconnect the source, even synchronously.
*/
public abstract void connect(Consumer<? super Disposable> cancelSupport);
public abstract void connect(Consumer<? super @NonNull Disposable> cancelSupport);

/**
* Connects to the upstream source when the first {@link org.reactivestreams.Subscriber} subscribes and disconnects
Expand Down
Loading

0 comments on commit b597815

Please sign in to comment.