Skip to content

Commit

Permalink
fix #879 Add limitRequest operator + limitRate lowTide (#894)
Browse files Browse the repository at this point in the history
`limitRate` is good at splitting large batches (like flatMap prefetches) into
smaller batches of request. However, it still is ultimately pass-through in the
total amount requested, and that can be wasteful for sources of data that is
costly to create/emit.

This commit introduces `limitRequest`, a slightly different backpressure control
operator that caps the total requested amount to a hard ceiling N.

If smaller requests come in from downstream, they will be passed as is until the
total requested amount reaches N, at which point no more request will be sent to
the upstream (which will also be cancelled).

In addition, a variant of `limitRate` is also introduced, which takes a lowTide
parameter setting the amount requested by replenishing prefetch optimization.

These two operators work well in tandem when one wants to ensure no more data
than absolutely necessary is generated in the source, but that prefetching
requests are still made (to avoid waiting for the emitted data to be fully
processed before requesting more).

`limitRequest` also serves as a `take(n)` alternative that is stricter with
request handling rather than relying on a fast cancel.
  • Loading branch information
simonbasle authored Oct 11, 2017
1 parent d9cbcac commit 98be36f
Show file tree
Hide file tree
Showing 9 changed files with 642 additions and 14 deletions.
78 changes: 73 additions & 5 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -4604,25 +4604,81 @@ public final Mono<T> last(T defaultValue) {
}

/**
* Ensure that backpressure signals from downstream subscribers are capped at the
* provided {@code prefetchRate} when propagated upstream, effectively rate limiting
* the upstream {@link Publisher}.
* Ensure that backpressure signals from downstream subscribers are split into batches
* capped at the provided {@code prefetchRate} when propagated upstream, effectively
* rate limiting the upstream {@link Publisher}.
* <p>
* Typically used for scenarios where consumer(s) request a large amount of data
* (eg. {@code Long.MAX_VALUE}) but the data source behaves better or can be optimized
* with smaller requests (eg. database paging, etc...). All data is still processed.
* with smaller requests (eg. database paging, etc...). All data is still processed,
* unlike with {@link #limitRequest(long)} which will cap the grand total request
* amount.
* <p>
* Equivalent to {@code flux.publishOn(Schedulers.immediate(), prefetchRate).subscribe() }
*
* @param prefetchRate the limit to apply to downstream's backpressure
*
* @return a {@link Flux} limiting downstream's backpressure
* @see #publishOn(Scheduler, int)
* @see #limitRequest(long)
*/
public final Flux<T> limitRate(int prefetchRate) {
return onAssembly(this.publishOn(Schedulers.immediate(), prefetchRate));
}

/**
* Ensure that backpressure signals from downstream subscribers are split into batches
* capped at the provided {@code highTide} first, then replenishing at the provided
* {@code lowTide}, effectively rate limiting the upstream {@link Publisher}.
* <p>
* Typically used for scenarios where consumer(s) request a large amount of data
* (eg. {@code Long.MAX_VALUE}) but the data source behaves better or can be optimized
* with smaller requests (eg. database paging, etc...). All data is still processed,
* unlike with {@link #limitRequest(long)} which will cap the grand total request
* amount.
* <p>
* Similar to {@code flux.publishOn(Schedulers.immediate(), prefetchRate).subscribe() },
* except with a customized "low tide" instead of the default 75%.
* Note that the smaller the lowTide is, the higher the potential for concurrency
* between request and data production. And thus the more extraneous replenishment
* requests this operator could make. For example, for a global downstream
* request of 14, with a highTide of 10 and a lowTide of 2, the operator would perform
* 7 low tide requests, whereas with the default lowTide of 8 it would only perform one.
*
* @param highTide the initial request amount
* @param lowTide the subsequent (or replenishing) request amount
*
* @return a {@link Flux} limiting downstream's backpressure and customizing the
* replenishment request amount
* @see #publishOn(Scheduler, int)
* @see #limitRequest(long)
*/
public final Flux<T> limitRate(int highTide, int lowTide) {
return onAssembly(this.publishOn(Schedulers.immediate(), true, highTide, lowTide));
}

/**
* Ensure that the total amount requested upstream is capped at {@code cap}.
* Backpressure signals from downstream subscribers are smaller than the cap are
* propagated as is, but if they would cause the total requested amount to go over the
* cap, they are reduced to the minimum value that doesn't go over.
* <p>
* As a result, this operator never let the upstream produce more elements than the
* cap, and it can be used as a stricter form of {@link #take(long)}. Typically useful
* for cases where a race between request and cancellation can lead the upstream to
* producing a lot of extraneous data, and such a production is undesirable (e.g.
* a source that would send the extraneous data over the network).
*
* @param requestCap the global backpressure limit to apply to the sum of downstream's requests
*
* @return a {@link Flux} that requests AT MOST {@code cap} from upstream in total.
* @see #limitRate(int)
* @see #take(long)
*/
public final Flux<T> limitRequest(long requestCap) {
return onAssembly(new FluxLimitRequest<>(this, requestCap));
}

/**
* Observe all Reactive Streams signals and trace them using {@link Logger} support.
* Default will use {@link Level#INFO} and {@code java.util.logging}.
Expand Down Expand Up @@ -5426,6 +5482,10 @@ public final Flux<T> publishOn(Scheduler scheduler, int prefetch) {
* @return a {@link Flux} producing asynchronously
*/
public final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch) {
return publishOn(scheduler, delayError, prefetch, prefetch);
}

final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch, int lowTide) {
if (this instanceof Callable) {
if (this instanceof Fuseable.ScalarCallable) {
@SuppressWarnings("unchecked")
Expand All @@ -5442,7 +5502,7 @@ public final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int pref
return onAssembly(new FluxSubscribeOnCallable<>(c, scheduler));
}

return onAssembly(new FluxPublishOn<>(this, scheduler, delayError, prefetch, Queues.get(prefetch)));
return onAssembly(new FluxPublishOn<>(this, scheduler, delayError, prefetch, lowTide, Queues.get(prefetch)));
}

/**
Expand Down Expand Up @@ -6688,12 +6748,20 @@ public final Flux<T> tag(String key, String value) {
* <p>
* If N is zero, the resulting {@link Flux} completes as soon as this {@link Flux}
* signals its first value (which is not not relayed, though).
* <p>
* Note that this operator doesn't manipulate the backpressure requested amount.
* Rather, it merely lets requests from downstream propagate as is and cancels once
* N elements have been emitted. As a result, the source could produce a lot of
* extraneous elements in the meantime. If that behavior is undesirable and you do
* not own the request from downstream (e.g. prefetching operators), consider
* using {@link #limitRequest(long)} instead.
*
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.0.RC1/src/docs/marble/take0.png" alt="">
* @param n the number of items to emit from this {@link Flux}
*
* @return a {@link Flux} limited to size N
* @see #limitRequest(long)
*/
public final Flux<T> take(long n) {
if (this instanceof Fuseable) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;

/**
* @author Simon Baslé
* @author David Karnok
*/
final class FluxLimitRequest<T> extends FluxOperator<T, T> {

final long cap;

FluxLimitRequest(Flux<T> flux, long cap) {
super(flux);
this.cap = cap;
}

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
source.subscribe(new FluxLimitRequestSubscriber<>(actual, this.cap));
}

@Override
public int getPrefetch() {
return 0;
}

@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return cap;

//FluxOperator defines PREFETCH and PARENT
return super.scanUnsafe(key);
}

static class FluxLimitRequestSubscriber<T> implements InnerOperator<T, T> {

final CoreSubscriber<? super T> actual;

Subscription parent;
long toProduce;

volatile long requestRemaining;
static final AtomicLongFieldUpdater<FluxLimitRequestSubscriber> REQUEST_REMAINING =
AtomicLongFieldUpdater.newUpdater(FluxLimitRequestSubscriber.class, "requestRemaining");


FluxLimitRequestSubscriber(CoreSubscriber<? super T> actual, long cap) {
this.actual = actual;
this.toProduce = cap;
this.requestRemaining = cap;
}

@Override
public CoreSubscriber<? super T> actual() {
return this.actual;
}

@Override
public void onNext(T t) {
long r = toProduce;
if (r > 0L) {
toProduce = --r;
actual.onNext(t);

if (r == 0) {
parent.cancel();
actual.onComplete();
}
}
}

@Override
public void onError(Throwable throwable) {
if (toProduce != 0L) {
toProduce = 0L;
actual.onError(throwable);
}
}

@Override
public void onComplete() {
if (toProduce != 0L) {
toProduce = 0L;
actual.onComplete();
}
}

@Override
public void onSubscribe(Subscription s) {
parent = s;
actual.onSubscribe(this);
}

@Override
public void request(long l) {
for (;;) {
long r = requestRemaining;
long newRequest;
if (r <= l) {
newRequest = r;
} else {
newRequest = l;
}
long u = r - newRequest;
if (REQUEST_REMAINING.compareAndSet(this, r, u)) {
if (newRequest != 0) {
parent.request(newRequest);
}
break;
}
}
}

@Override
public void cancel() {
parent.cancel();
}

@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return parent;
if (key == Attr.TERMINATED) return toProduce == 0L;

//InnerOperator defines ACTUAL
return InnerOperator.super.scanUnsafe(key);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@ final class FluxPublishOn<T> extends FluxOperator<T, T> implements Fuseable {

final int prefetch;

final int lowTide;

FluxPublishOn(Flux<? extends T> source,
Scheduler scheduler,
boolean delayError,
int prefetch,
int lowTide,
Supplier<? extends Queue<T>> queueSupplier) {
super(source);
if (prefetch <= 0) {
Expand All @@ -61,6 +64,7 @@ final class FluxPublishOn<T> extends FluxOperator<T, T> implements Fuseable {
this.scheduler = Objects.requireNonNull(scheduler, "scheduler");
this.delayError = delayError;
this.prefetch = prefetch;
this.lowTide = lowTide;
this.queueSupplier = Objects.requireNonNull(queueSupplier, "queueSupplier");
}

Expand Down Expand Up @@ -90,6 +94,7 @@ public void subscribe(CoreSubscriber<? super T> actual) {
worker,
delayError,
prefetch,
lowTide,
queueSupplier));
return;
}
Expand All @@ -98,6 +103,7 @@ public void subscribe(CoreSubscriber<? super T> actual) {
worker,
delayError,
prefetch,
lowTide,
queueSupplier));
}

Expand Down Expand Up @@ -149,14 +155,15 @@ static final class PublishOnSubscriber<T>
Worker worker,
boolean delayError,
int prefetch,
int lowTide,
Supplier<? extends Queue<T>> queueSupplier) {
this.actual = actual;
this.worker = worker;
this.scheduler = scheduler;
this.delayError = delayError;
this.prefetch = prefetch;
this.queueSupplier = queueSupplier;
this.limit = Operators.unboundedOrLimit(prefetch);
this.limit = Operators.unboundedOrLimit(prefetch, lowTide);
}

@Override
Expand Down Expand Up @@ -616,14 +623,15 @@ static final class PublishOnConditionalSubscriber<T>
Worker worker,
boolean delayError,
int prefetch,
int lowTide,
Supplier<? extends Queue<T>> queueSupplier) {
this.actual = actual;
this.worker = worker;
this.scheduler = scheduler;
this.delayError = delayError;
this.prefetch = prefetch;
this.queueSupplier = queueSupplier;
this.limit = Operators.unboundedOrLimit(prefetch);
this.limit = Operators.unboundedOrLimit(prefetch, lowTide);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,13 @@ static int unboundedOrLimit(int prefetch) {
return prefetch == Integer.MAX_VALUE ? Integer.MAX_VALUE : (prefetch - (prefetch >> 2));
}

static int unboundedOrLimit(int prefetch, int lowTide) {
if (lowTide >= prefetch) {
return unboundedOrLimit(prefetch);
}
return prefetch == Integer.MAX_VALUE ? Integer.MAX_VALUE : lowTide;
}

Operators() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ public void subscribe(CoreSubscriber<? super T>[] subscribers) {
parents[i] = new FluxPublishOn.PublishOnConditionalSubscriber<>(
(Fuseable.ConditionalSubscriber<T>)subscribers[i],
scheduler, w, true,
prefetch, queueSupplier);
prefetch, prefetch, queueSupplier);
}
else {
parents[i] = new FluxPublishOn.PublishOnSubscriber<>(subscribers[i],
scheduler, w, true,
prefetch, queueSupplier);
prefetch, prefetch, queueSupplier);
}
}

Expand Down
Loading

0 comments on commit 98be36f

Please sign in to comment.