Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert generic type arguments nullability #875

Merged
merged 3 commits into from
Sep 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions reactor-core/src/main/java/reactor/adapter/JdkFlowAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.util.annotation.NonNull;

/**
* Convert a Java 9+ {@literal Flow.Publisher} to/from a Reactive Streams {@link Publisher}.
Expand All @@ -40,7 +39,7 @@ public abstract class JdkFlowAdapter {
* @param <T> the type of the publisher
* @return a java {@code Flow.Publisher} from the given {@link Publisher}
*/
public static <T> Flow.Publisher<@NonNull T> publisherToFlowPublisher(final Publisher<@NonNull T>
public static <T> Flow.Publisher<T> publisherToFlowPublisher(final Publisher<T>
publisher) {
return new PublisherAsFlowPublisher<>(publisher);
}
Expand All @@ -52,32 +51,32 @@ public abstract class JdkFlowAdapter {
* @param <T> the type of the publisher
* @return a {@link Flux} from a java {@code Flow.Publisher}
*/
public static <T> Flux<@NonNull T> flowPublisherToFlux(Flow.Publisher<@NonNull T> publisher) {
public static <T> Flux<T> flowPublisherToFlux(Flow.Publisher<T> publisher) {
return new FlowPublisherAsFlux<>(publisher);
}

private static class FlowPublisherAsFlux<T> extends Flux<T> {
private final java.util.concurrent.Flow.Publisher<T> pub;

private FlowPublisherAsFlux(java.util.concurrent.Flow.Publisher<@NonNull T> pub) {
private FlowPublisherAsFlux(java.util.concurrent.Flow.Publisher<T> pub) {
this.pub = pub;
}

@Override
public void subscribe(final CoreSubscriber<? super @NonNull T> actual) {
public void subscribe(final CoreSubscriber<? super T> actual) {
pub.subscribe(new SubscriberToRS<>(actual));
}
}

private static class PublisherAsFlowPublisher<T> implements Flow.Publisher<T> {
private final Publisher<T> pub;

private PublisherAsFlowPublisher(Publisher<@NonNull T> pub) {
private PublisherAsFlowPublisher(Publisher<T> pub) {
this.pub = pub;
}

@Override
public void subscribe(Flow.Subscriber<? super @NonNull T> subscriber) {
public void subscribe(Flow.Subscriber<? super T> subscriber) {
pub.subscribe(new FlowSubscriber<>(subscriber));
}
}
Expand All @@ -88,7 +87,7 @@ private static class FlowSubscriber<T> implements CoreSubscriber<T>, Flow.Subscr

Subscription subscription;

public FlowSubscriber(Flow.Subscriber<? super @NonNull T> subscriber) {
public FlowSubscriber(Flow.Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
}

Expand Down Expand Up @@ -130,7 +129,7 @@ private static class SubscriberToRS<T> implements Flow.Subscriber<T>, Subscripti

Flow.Subscription subscription;

public SubscriberToRS(Subscriber<? super @NonNull T> s) {
public SubscriberToRS(Subscriber<? super T> s) {
this.s = s;
}

Expand Down
15 changes: 7 additions & 8 deletions reactor-core/src/main/java/reactor/core/Disposables.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Stream;

import reactor.util.concurrent.Queues;
import reactor.util.annotation.NonNull;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;

/**
* A support class that offers factory methods for implementations of the specialized
Expand Down Expand Up @@ -56,7 +55,7 @@ public static Disposable.Composite composite() {
*
* @return a pre-filled atomic {@link Disposable.Composite}
*/
public static Disposable.Composite composite(Disposable @NonNull ... disposables) {
public static Disposable.Composite composite(Disposable... disposables) {
return new CompositeDisposable(disposables);
}

Expand All @@ -67,7 +66,7 @@ public static Disposable.Composite composite(Disposable @NonNull ... disposables
* @return a pre-filled atomic {@link Disposable.Composite}
*/
public static Disposable.Composite composite(
Iterable<? extends @NonNull Disposable> disposables) {
Iterable<? extends Disposable> disposables) {
return new CompositeDisposable(disposables);
}

Expand Down Expand Up @@ -147,7 +146,7 @@ static final class CompositeDisposable implements Disposable.Composite, Scannabl
* Creates a {@link CompositeDisposable} with the given array of initial elements.
* @param disposables the array of {@link Disposable} to start with
*/
CompositeDisposable(Disposable @NonNull ... disposables) {
CompositeDisposable(Disposable... disposables) {
Objects.requireNonNull(disposables, "disposables is null");

int capacity = disposables.length + 1;
Expand All @@ -168,7 +167,7 @@ static final class CompositeDisposable implements Disposable.Composite, Scannabl
* initial elements.
* @param disposables the Iterable sequence of {@link Disposable} to start with
*/
CompositeDisposable(Iterable<? extends @NonNull Disposable> disposables) {
CompositeDisposable(Iterable<? extends Disposable> disposables) {
Objects.requireNonNull(disposables, "disposables is null");
this.loadFactor = DEFAULT_LOAD_FACTOR;
int c = DEFAULT_CAPACITY;
Expand Down Expand Up @@ -240,7 +239,7 @@ public boolean add(Disposable d) {
}

@Override
public boolean addAll(Collection<? extends @NonNull Disposable> ds) {
public boolean addAll(Collection<? extends Disposable> ds) {
Objects.requireNonNull(ds, "ds is null");
if (!disposed) {
synchronized (this) {
Expand Down Expand Up @@ -358,7 +357,7 @@ boolean removeEntry(Disposable value) {
}
}

boolean removeEntry(int pos, Disposable @Nullable [] a, int m) {
boolean removeEntry(int pos, Disposable[] a, int m) {
size--;

int last;
Expand Down
5 changes: 2 additions & 3 deletions reactor-core/src/main/java/reactor/core/Exceptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import reactor.util.annotation.NonNull;
import reactor.util.annotation.Nullable;

/**
Expand Down Expand Up @@ -104,7 +103,7 @@ public static <T> boolean addThrowable(AtomicReferenceFieldUpdater<T, Throwable>
* suppressed exceptions
* @see #addThrowable(AtomicReferenceFieldUpdater, Object, Throwable)
*/
public static RuntimeException multiple(Throwable @NonNull ... throwables) {
public static RuntimeException multiple(Throwable... throwables) {
CompositeException multiple = new CompositeException();
//noinspection ConstantConditions
if (throwables != null) {
Expand All @@ -129,7 +128,7 @@ public static RuntimeException multiple(Throwable @NonNull ... throwables) {
* suppressed exceptions
* @see #addThrowable(AtomicReferenceFieldUpdater, Object, Throwable)
*/
public static RuntimeException multiple(Iterable<@NonNull Throwable> throwables) {
public static RuntimeException multiple(Iterable<Throwable> throwables) {
RuntimeException multiple = new RuntimeException("Multiple exceptions");
//noinspection ConstantConditions
if (throwables != null) {
Expand Down
18 changes: 9 additions & 9 deletions reactor-core/src/main/java/reactor/core/Scannable.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import reactor.util.function.Tuple2;
import reactor.util.annotation.NonNull;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuple2;

/**
* A Scannable component exposes state in a non strictly memory consistent way and
Expand Down Expand Up @@ -214,7 +214,7 @@ public boolean isScanAvailable() {
}
};

static Stream<? extends @NonNull Scannable> recurse(Scannable _s,
static Stream<? extends Scannable> recurse(Scannable _s,
Attr<Scannable> key){
Scannable s = Scannable.from(_s.scan(key));
if(!s.isScanAvailable()) {
Expand Down Expand Up @@ -265,7 +265,7 @@ static Scannable from(@Nullable Object o) {
* @return a {@link Stream} navigating the {@link org.reactivestreams.Subscriber}
* chain (downward)
*/
default Stream<? extends @NonNull Scannable> actuals() {
default Stream<? extends Scannable> actuals() {
return Attr.recurse(this, Attr.ACTUAL);
}

Expand All @@ -274,7 +274,7 @@ static Scannable from(@Nullable Object o) {
*
* @return a {@link Stream} of referenced inners (flatmap, multicast etc)
*/
default Stream<? extends @NonNull Scannable> inners() {
default Stream<? extends Scannable> inners() {
return Stream.empty();
}

Expand Down Expand Up @@ -330,7 +330,7 @@ default String operatorName() {
* @return a {@link Stream} navigating the {@link org.reactivestreams.Subscription}
* chain (upward)
*/
default Stream<? extends @NonNull Scannable> parents() {
default Stream<? extends Scannable> parents() {
return Attr.recurse(this, Attr.PARENT);
}

Expand Down Expand Up @@ -361,7 +361,7 @@ default String operatorName() {
*
*/
@Nullable
default <T> T scan(Attr<@Nullable T> key) {
default <T> T scan(Attr<T> key) {
@SuppressWarnings("unchecked")
T value = (T) scanUnsafe(key);
if (value == null)
Expand All @@ -379,7 +379,7 @@ default <T> T scan(Attr<@Nullable T> key) {
*
* @return a value associated to the key or the provided default if unmatched or unresolved
*/
default <T> T scanOrDefault(Attr<@Nullable T> key, T defaultValue) {
default <T> T scanOrDefault(Attr<T> key, T defaultValue) {
@SuppressWarnings("unchecked")
T v = (T) scanUnsafe(key);
if (v == null) {
Expand All @@ -394,7 +394,7 @@ default <T> T scanOrDefault(Attr<@Nullable T> key, T defaultValue) {
*
* @return the stream of tags for this {@link Scannable} and its parents
*/
default Stream<@NonNull Tuple2<@NonNull String, @NonNull String>> tags() {
default Stream<Tuple2<String, String>> tags() {
Stream<Tuple2<String, String>> parentTags =
parents().flatMap(s -> s.scan(Attr.TAGS));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import org.reactivestreams.Subscription;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.util.annotation.NonNull;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

/**
* An iterable that consumes a Publisher in a blocking fashion.
Expand Down Expand Up @@ -74,7 +74,7 @@ public Object scanUnsafe(Attr key) {
}

@Override
public Iterator<@NonNull T> iterator() {
public Iterator<T> iterator() {
SubscriberIterator<T> it = createIterator();

source.subscribe(it);
Expand All @@ -83,15 +83,15 @@ public Object scanUnsafe(Attr key) {
}

@Override
public Spliterator<@NonNull T> spliterator() {
public Spliterator<T> spliterator() {
return stream().spliterator(); // cancellation should be composed through this way
}

/**
* @return a {@link Stream} of unknown size with onClose attached to {@link
* Subscription#cancel()}
*/
public Stream<@NonNull T> stream() {
public Stream<T> stream() {
SubscriberIterator<T> it = createIterator();
source.subscribe(it);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import reactor.core.Fuseable;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.NonNull;

/**
* The abstract base class for connectable publishers that let subscribers pile up
Expand All @@ -43,7 +42,7 @@ public abstract class ConnectableFlux<T> extends Flux<T> {
*
* @return a {@link Flux} that connects to the upstream source when the first {@link org.reactivestreams.Subscriber} subscribes
*/
public final Flux<@NonNull T> autoConnect() {
public final Flux<T> autoConnect() {
return autoConnect(1);
}

Expand All @@ -61,7 +60,7 @@ public abstract class ConnectableFlux<T> extends Flux<T> {
*
* @return a {@link Flux} that connects to the upstream source when the given amount of Subscribers subscribed
*/
public final Flux<@NonNull T> autoConnect(int minSubscribers) {
public final Flux<T> autoConnect(int minSubscribers) {
return autoConnect(minSubscribers, NOOP_DISCONNECT);
}

Expand All @@ -76,7 +75,7 @@ public abstract class ConnectableFlux<T> extends Flux<T> {
*
* @return a {@link Flux} that connects to the upstream source when the given amount of subscribers subscribed
*/
public final Flux<@NonNull T> autoConnect(int minSubscribers, Consumer<? super Disposable> cancelSupport) {
public final Flux<T> autoConnect(int minSubscribers, Consumer<? super Disposable> cancelSupport) {
if (minSubscribers == 0) {
connect(cancelSupport);
return this;
Expand Down Expand Up @@ -112,7 +111,7 @@ public final Disposable connect() {
* @param cancelSupport the callback is called with a Disposable instance that can
* be called to disconnect the source, even synchronously.
*/
public abstract void connect(Consumer<? super @NonNull Disposable> cancelSupport);
public abstract void connect(Consumer<? super Disposable> cancelSupport);

/**
* Connects to the upstream source when the first {@link org.reactivestreams.Subscriber} subscribes and disconnects
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.FluxOnAssembly.AssemblySnapshotException;
import reactor.util.annotation.NonNull;
import reactor.util.annotation.Nullable;

/**
Expand Down Expand Up @@ -51,18 +50,18 @@ final class ConnectableFluxOnAssembly<T> extends ConnectableFlux<T> implements

final AssemblySnapshotException stacktrace;

ConnectableFluxOnAssembly(ConnectableFlux<@NonNull T> source) {
ConnectableFluxOnAssembly(ConnectableFlux<T> source) {
this.source = source;
this.stacktrace = new AssemblySnapshotException();
}

@Override
public void subscribe(CoreSubscriber<? super @NonNull T> actual) {
public void subscribe(CoreSubscriber<? super T> actual) {
FluxOnAssembly.subscribe(actual, source, stacktrace);
}

@Override
public void connect(Consumer<? super @NonNull Disposable> cancelSupport) {
public void connect(Consumer<? super Disposable> cancelSupport) {
source.connect(cancelSupport);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.util.context.Context;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

/**
* @author Stephane Maldini
Expand Down
Loading