Skip to content

Commit

Permalink
follow-up to #1050 Let operators report their Scheduler via RUN_ON
Browse files Browse the repository at this point in the history
The new `Attr` RUN_ON allows operators that take a Scheduler or a Worker
(except aliases like `Flux#delayElements`) to report which Scheduler or
Worker they run on.
  • Loading branch information
simonbasle committed Feb 14, 2018
1 parent 08d55d4 commit 60d7219
Show file tree
Hide file tree
Showing 39 changed files with 513 additions and 56 deletions.
11 changes: 11 additions & 0 deletions reactor-core/src/main/java/reactor/core/Scannable.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Scheduler.Worker;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuple2;

Expand Down Expand Up @@ -144,6 +146,15 @@ class Attr<T> {
public static final Attr<Scannable> PARENT = new Attr<>(null,
Scannable::from);

/**
* A key that links a {@link Scannable} to another {@link Scannable} it runs on.
* Usually exposes a link between an operator/subscriber and its {@link Worker} or
* {@link Scheduler}, provided these are {@link Scannable}. Will return
* {@link Attr#UNAVAILABLE_SCAN} if the supporting execution is not Scannable or
* {@link Attr#NULL_SCAN} if the operator doesn't define a specific runtime.
*/
public static final Attr<Scannable> RUN_ON = new Attr<>(null, Scannable::from);

/**
* Prefetch is an {@link Integer} attribute defining the rate of processing in a
* component which has capacity to request and hold a backlog of data. It
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ public void subscribe(CoreSubscriber<? super C> actual) {
bufferSupplier));
}

@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_ON) return timer;

return super.scanUnsafe(key);
}

final static class BufferTimeoutSubscriber<T, C extends Collection<? super T>>
implements InnerOperator<T, C> {
Expand Down Expand Up @@ -206,6 +211,7 @@ public Object scanUnsafe(Attr key) {
if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return requested;
if (key == Attr.CAPACITY) return batchSize;
if (key == Attr.BUFFERED) return batchSize - index;
if (key == Attr.RUN_ON) return timer;

return InnerOperator.super.scanUnsafe(key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ public void subscribe(CoreSubscriber<? super T> actual) {
source.subscribe(new CancelSubscriber<T>(actual, scheduler));
}

@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_ON) return scheduler;

return super.scanUnsafe(key);
}

static final class CancelSubscriber<T>
implements InnerOperator<T, T>, Runnable {

Expand Down Expand Up @@ -69,6 +76,7 @@ public void onSubscribe(Subscription s) {
public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return s;
if (key == Attr.CANCELLED) return cancelled == 1;
if (key == Attr.RUN_ON) return scheduler;

return InnerOperator.super.scanUnsafe(key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import org.reactivestreams.*;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.scheduler.Scheduler;

Expand All @@ -46,6 +46,13 @@ public void subscribe(CoreSubscriber<? super T> actual) {
source.subscribe(new DelaySubscriber<T>(actual, delay, w));
}

@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_ON) return scheduler;

return super.scanUnsafe(key);
}

static final class DelaySubscriber<T> implements InnerOperator<T, T> {

final CoreSubscriber<? super T> actual;
Expand Down Expand Up @@ -152,6 +159,16 @@ public void cancel() {
w.dispose();
}

@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return s;
if (key == Attr.RUN_ON) return w;
if (key == Attr.TERMINATED) return done;
if (key == Attr.CANCELLED) return w.isDisposed() && !done;

return InnerOperator.super.scanUnsafe(key);
}

final class OnError implements Runnable {
private final Throwable t;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ public void subscribe(CoreSubscriber<? super Tuple2<Long, T>> actual) {
source.subscribe(new ElapsedSubscriber<>(actual, scheduler));
}

@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_ON) return scheduler;

return super.scanUnsafe(key);
}

static final class ElapsedSubscriber<T>
implements InnerOperator<T, Tuple2<Long, T>>,
QueueSubscription<Tuple2<Long, T>> {
Expand All @@ -65,6 +72,7 @@ static final class ElapsedSubscriber<T>
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return s;
if (key == Attr.RUN_ON) return scheduler;

return InnerOperator.super.scanUnsafe(key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Scheduler.Worker;
import reactor.util.annotation.Nullable;
Expand All @@ -32,7 +33,7 @@
* or a custom async callback function
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
final class FluxInterval extends Flux<Long> {
final class FluxInterval extends Flux<Long> implements Scannable {

final Scheduler timedScheduler;

Expand Down Expand Up @@ -75,18 +76,25 @@ public void subscribe(CoreSubscriber<? super Long> actual) {
}
}

@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_ON) return timedScheduler;

return null;
}

static final class IntervalRunnable implements Runnable, Subscription,
InnerProducer<Long> {
final CoreSubscriber<? super Long> actual;

final Worker worker;

volatile long requested;
static final AtomicLongFieldUpdater<IntervalRunnable> REQUESTED =
AtomicLongFieldUpdater.newUpdater(IntervalRunnable.class, "requested");

long count;

volatile boolean cancelled;

IntervalRunnable(CoreSubscriber<? super Long> actual, Worker worker) {
Expand All @@ -103,6 +111,7 @@ public CoreSubscriber<? super Long> actual() {
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.CANCELLED) return cancelled;
if (key == Attr.RUN_ON) return worker;

return InnerProducer.super.scanUnsafe(key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ public int getPrefetch() {
return Integer.MAX_VALUE;
}

@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_ON) return ttlScheduler;

return super.scanUnsafe(key);
}

static final class BackpressureBufferTimeoutSubscriber<T> extends ArrayDeque<Object>
implements InnerOperator<T, T>, Runnable {

Expand Down Expand Up @@ -148,6 +155,7 @@ public Object scanUnsafe(Attr key) {
if (key == Attr.DELAY_ERROR) {
return false;
}
if (key == Attr.RUN_ON) return ttlScheduler;

return InnerOperator.super.scanUnsafe(key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ final class FluxPublishOn<T> extends FluxOperator<T, T> implements Fuseable {
this.queueSupplier = Objects.requireNonNull(queueSupplier, "queueSupplier");
}

@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_ON) return scheduler;

return super.scanUnsafe(key);
}

@Override
public int getPrefetch() {
return prefetch;
Expand Down Expand Up @@ -520,6 +527,7 @@ public Object scanUnsafe(Attr key) {
if (key == Attr.ERROR) return error;
if (key == Attr.DELAY_ERROR) return delayError;
if (key == Attr.PREFETCH) return prefetch;
if (key == Attr.RUN_ON) return worker;

return InnerOperator.super.scanUnsafe(key);
}
Expand Down Expand Up @@ -941,6 +949,7 @@ public Object scanUnsafe(Attr key) {
if (key == Attr.ERROR) return error;
if (key == Attr.DELAY_ERROR) return delayError;
if (key == Attr.PREFETCH) return prefetch;
if (key == Attr.RUN_ON) return worker;

return InnerOperator.super.scanUnsafe(key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,7 @@ else if (c == null) {
public Object scanUnsafe(Scannable.Attr key) {
if (key == Attr.PREFETCH) return getPrefetch();
if (key == Attr.PARENT) return source;
if (key == Attr.RUN_ON) return scheduler;

return null;
}
Expand Down Expand Up @@ -1368,6 +1369,7 @@ public Object scanUnsafe(Attr key) {
if (key == Attr.BUFFERED) return size();
if (key == Attr.CANCELLED) return isCancelled();
if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return Math.max(0L, requested);
if (key == Attr.RUN_ON) return parent.parent.scheduler;

return ReplaySubscription.super.scanUnsafe(key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.scheduler.Scheduler;
import reactor.util.annotation.Nullable;

Expand All @@ -34,7 +35,7 @@
* @param <T> the value type
* @see <a href="https://github.com/reactor/reactive-streams-commons">https://github.com/reactor/reactive-streams-commons</a>
*/
final class FluxSubscribeOnCallable<T> extends Flux<T> implements Fuseable {
final class FluxSubscribeOnCallable<T> extends Flux<T> implements Fuseable, Scannable {

final Callable<? extends T> callable;

Expand Down Expand Up @@ -62,6 +63,13 @@ public void subscribe(CoreSubscriber<? super T> actual) {
}
}

@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_ON) return scheduler;

return null;
}

static final class CallableSubscribeOnSubscription<T>
implements QueueSubscription<T>, InnerProducer<T>, Runnable {

Expand Down Expand Up @@ -123,6 +131,7 @@ public CoreSubscriber<? super T> actual() {
public Object scanUnsafe(Attr key) {
if (key == Attr.CANCELLED) return state == HAS_CANCELLED;
if (key == Attr.BUFFERED) return value != null ? 1 : 0;
if (key == Attr.RUN_ON) return scheduler;

return InnerProducer.super.scanUnsafe(key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
*
* @see <a href="https://github.com/reactor/reactive-streams-commons">https://github.com/reactor/reactive-streams-commons</a>
*/
final class FluxSubscribeOnValue<T> extends Flux<T> implements Fuseable {
final class FluxSubscribeOnValue<T> extends Flux<T> implements Fuseable, Scannable {

final T value;

Expand Down Expand Up @@ -69,6 +69,13 @@ public void subscribe(CoreSubscriber<? super T> actual) {
}
}

@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_ON) return scheduler;

return null;
}

static final class ScheduledScalar<T>
implements QueueSubscription<T>, InnerProducer<T>, Runnable {

Expand Down Expand Up @@ -121,6 +128,7 @@ public Object scanUnsafe(Scannable.Attr key) {
if (key == Attr.BUFFERED) {
return 1;
}
if (key == Attr.RUN_ON) return scheduler;

return InnerProducer.super.scanUnsafe(key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ public void subscribe(CoreSubscriber<? super Flux<T>> actual) {
timer));
}

@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_ON) return timer;

return super.scanUnsafe(key);
}

static final class WindowTimeoutSubscriber<T> implements InnerOperator<T, Flux<T>> {

final CoreSubscriber<? super Flux<T>> actual;
Expand Down Expand Up @@ -133,6 +140,7 @@ public Object scanUnsafe(Attr key) {
if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return requested;
if (key == Attr.CAPACITY) return maxSize;
if (key == Attr.BUFFERED) return queue.size();
if (key == Attr.RUN_ON) return worker;
return InnerOperator.super.scanUnsafe(key);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,11 @@ final class MonoCancelOn<T> extends MonoOperator<T, T> {
public void subscribe(CoreSubscriber<? super T> actual) {
source.subscribe(new FluxCancelOn.CancelSubscriber<T>(actual, scheduler));
}

@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_ON) return scheduler;

return super.scanUnsafe(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.scheduler.Scheduler;
import reactor.util.annotation.Nullable;

Expand All @@ -33,7 +34,7 @@
* wraps other form of async-delayed execution of tasks.
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
final class MonoDelay extends Mono<Long> {
final class MonoDelay extends Mono<Long> implements Scannable {

final Scheduler timedScheduler;

Expand Down Expand Up @@ -64,6 +65,13 @@ public void subscribe(CoreSubscriber<? super Long> actual) {
}
}

@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_ON) return timedScheduler;

return null;
}

static final class MonoDelayRunnable implements Runnable, InnerProducer<Long> {
final CoreSubscriber<? super Long> actual;

Expand Down
Loading

0 comments on commit 60d7219

Please sign in to comment.