diff --git a/reactor-core/src/main/java/reactor/core/publisher/BlockingIterable.java b/reactor-core/src/main/java/reactor/core/publisher/BlockingIterable.java index 8d27831024..a5146e0fd8 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/BlockingIterable.java +++ b/reactor-core/src/main/java/reactor/core/publisher/BlockingIterable.java @@ -49,12 +49,12 @@ final class BlockingIterable implements Iterable, Scannable { final Publisher source; - final long batchSize; + final int batchSize; final Supplier> queueSupplier; BlockingIterable(Publisher source, - long batchSize, + int batchSize, Supplier> queueSupplier) { if (batchSize <= 0) { throw new IllegalArgumentException("batchSize > 0 required but it was " + batchSize); @@ -67,7 +67,7 @@ final class BlockingIterable implements Iterable, Scannable { @Override @Nullable public Object scanUnsafe(Attr key) { - if (key == Attr.PREFETCH) return (int) Math.min(Integer.MAX_VALUE, batchSize); //FIXME should batchSize be forced to int? + if (key == Attr.PREFETCH) return Math.min(Integer.MAX_VALUE, batchSize); //FIXME should batchSize be forced to int? if (key == Attr.PARENT) return source; return null; @@ -120,9 +120,9 @@ static final class SubscriberIterator final Queue queue; - final long batchSize; + final int batchSize; - final long limit; + final int limit; final Lock lock; @@ -140,10 +140,10 @@ static final class SubscriberIterator volatile boolean done; Throwable error; - SubscriberIterator(Queue queue, long batchSize) { + SubscriberIterator(Queue queue, int batchSize) { this.queue = queue; this.batchSize = batchSize; - this.limit = batchSize - (batchSize >> 2); + this.limit = Operators.unboundedOrLimit(batchSize); this.lock = new ReentrantLock(); this.condition = lock.newCondition(); } @@ -211,7 +211,7 @@ public T next() { @Override public void onSubscribe(Subscription s) { if (Operators.setOnce(S, this, s)) { - s.request(batchSize); + s.request(Operators.unboundedOrPrefetch(batchSize)); } } @@ -264,7 +264,7 @@ public Object scanUnsafe(Attr key) { if (key == Attr.TERMINATED) return done; if (key == Attr.PARENT) return s; if (key == Attr.CANCELLED) return s == Operators.cancelledSubscription(); - if (key == Attr.PREFETCH) return (int) Math.min(Integer.MAX_VALUE, batchSize); //FIXME should batchSize be typed int? + if (key == Attr.PREFETCH) return batchSize; if (key == Attr.ERROR) return error; return null; diff --git a/reactor-core/src/main/java/reactor/core/publisher/EmitterProcessor.java b/reactor-core/src/main/java/reactor/core/publisher/EmitterProcessor.java index 6378044240..56c45d5e50 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/EmitterProcessor.java +++ b/reactor-core/src/main/java/reactor/core/publisher/EmitterProcessor.java @@ -213,14 +213,14 @@ public void onSubscribe(final Subscription s) { else if (m == Fuseable.ASYNC) { sourceMode = m; queue = f; - s.request(prefetch); + s.request(Operators.unboundedOrPrefetch(prefetch)); return; } } queue = Queues.get(prefetch).get(); - s.request(prefetch); + s.request(Operators.unboundedOrPrefetch(prefetch)); } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/Flux.java b/reactor-core/src/main/java/reactor/core/publisher/Flux.java index 6c1a9609f1..540c088249 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Flux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Flux.java @@ -7091,7 +7091,7 @@ public final Iterable toIterable() { * * @return a blocking {@link Iterable} */ - public final Iterable toIterable(long batchSize) { + public final Iterable toIterable(int batchSize) { return toIterable(batchSize, null); } @@ -7109,10 +7109,11 @@ public final Iterable toIterable(long batchSize) { * * @return a blocking {@link Iterable} */ - public final Iterable toIterable(long batchSize, @Nullable Supplier> queueProvider) { + public final Iterable toIterable(int batchSize, @Nullable Supplier> + queueProvider) { final Supplier> provider; if(queueProvider == null){ - provider = Queues.get((int)Math.min(Integer.MAX_VALUE, batchSize)); + provider = Queues.get(batchSize); } else{ provider = queueProvider; diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxCombineLatest.java b/reactor-core/src/main/java/reactor/core/publisher/FluxCombineLatest.java index 984aeaeab6..84fa7658c2 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxCombineLatest.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxCombineLatest.java @@ -54,41 +54,39 @@ final class FluxCombineLatest extends Flux implements Fuseable { final Supplier> queueSupplier; - final int bufferSize; + final int prefetch; FluxCombineLatest(Publisher[] array, Function combiner, - Supplier> queueSupplier, - int bufferSize) { - if (bufferSize <= 0) { - throw new IllegalArgumentException("BUFFER_SIZE > 0 required but it was " + bufferSize); + Supplier> queueSupplier, int prefetch) { + if (prefetch <= 0) { + throw new IllegalArgumentException("prefetch > 0 required but it was " + prefetch); } this.array = Objects.requireNonNull(array, "array"); this.iterable = null; this.combiner = Objects.requireNonNull(combiner, "combiner"); this.queueSupplier = Objects.requireNonNull(queueSupplier, "queueSupplier"); - this.bufferSize = bufferSize; + this.prefetch = prefetch; } FluxCombineLatest(Iterable> iterable, Function combiner, - Supplier> queueSupplier, - int bufferSize) { - if (bufferSize <= 0) { - throw new IllegalArgumentException("BUFFER_SIZE > 0 required but it was " + bufferSize); + Supplier> queueSupplier, int prefetch) { + if (prefetch < 0) { + throw new IllegalArgumentException("prefetch > 0 required but it was " + prefetch); } this.array = null; this.iterable = Objects.requireNonNull(iterable, "iterable"); this.combiner = Objects.requireNonNull(combiner, "combiner"); this.queueSupplier = Objects.requireNonNull(queueSupplier, "queueSupplier"); - this.bufferSize = bufferSize; + this.prefetch = prefetch; } @Override public int getPrefetch() { - return bufferSize; + return prefetch; } @SuppressWarnings("unchecked") @@ -171,7 +169,7 @@ public void subscribe(CoreSubscriber actual) { Queue queue = queueSupplier.get(); CombineLatestCoordinator coordinator = - new CombineLatestCoordinator<>(actual, combiner, n, queue, bufferSize); + new CombineLatestCoordinator<>(actual, combiner, n, queue, prefetch); actual.onSubscribe(coordinator); @@ -222,14 +220,13 @@ static final class CombineLatestCoordinator CombineLatestCoordinator(CoreSubscriber actual, Function combiner, int n, - Queue queue, - int bufferSize) { + Queue queue, int prefetch) { this.actual = actual; this.combiner = combiner; @SuppressWarnings("unchecked") CombineLatestInner[] a = new CombineLatestInner[n]; for (int i = 0; i < n; i++) { - a[i] = new CombineLatestInner<>(this, i, bufferSize); + a[i] = new CombineLatestInner<>(this, i, prefetch); } this.subscribers = a; this.latest = new Object[n]; @@ -578,7 +575,7 @@ static final class CombineLatestInner this.parent = parent; this.index = index; this.prefetch = prefetch; - this.limit = prefetch - (prefetch >> 2); + this.limit = Operators.unboundedOrLimit(prefetch); } @Override @@ -589,7 +586,7 @@ public Context currentContext() { @Override public void onSubscribe(Subscription s) { if (Operators.setOnce(S, this, s)) { - s.request(prefetch); + s.request(Operators.unboundedOrPrefetch(prefetch)); } } @@ -613,7 +610,6 @@ public void cancel() { } void requestOne() { - int p = produced + 1; if (p == limit) { produced = 0; @@ -622,7 +618,6 @@ void requestOne() { else { produced = p; } - } @Override diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxConcatMap.java b/reactor-core/src/main/java/reactor/core/publisher/FluxConcatMap.java index 4a7261711e..5a8d739bdf 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxConcatMap.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxConcatMap.java @@ -174,7 +174,7 @@ static final class ConcatMapImmediate this.mapper = mapper; this.queueSupplier = queueSupplier; this.prefetch = prefetch; - this.limit = prefetch - (prefetch >> 2); + this.limit = Operators.unboundedOrLimit(prefetch); this.inner = new ConcatMapInner<>(this); } @@ -224,7 +224,7 @@ else if (m == Fuseable.ASYNC) { actual.onSubscribe(this); - s.request(prefetch == Integer.MAX_VALUE ? Long.MAX_VALUE : prefetch); + s.request(Operators.unboundedOrPrefetch(prefetch)); } } @@ -505,7 +505,7 @@ static final class ConcatMapDelayed this.mapper = mapper; this.queueSupplier = queueSupplier; this.prefetch = prefetch; - this.limit = prefetch - (prefetch >> 2); + this.limit = Operators.unboundedOrLimit(prefetch); this.veryEnd = veryEnd; this.inner = new ConcatMapInner<>(this); } @@ -564,7 +564,7 @@ else if (m == Fuseable.ASYNC) { actual.onSubscribe(this); - s.request(prefetch == Integer.MAX_VALUE ? Long.MAX_VALUE : prefetch); + s.request(Operators.unboundedOrPrefetch(prefetch)); } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxFilterWhen.java b/reactor-core/src/main/java/reactor/core/publisher/FluxFilterWhen.java index 54ae614ec7..d826e514db 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxFilterWhen.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxFilterWhen.java @@ -198,7 +198,7 @@ void drain() { } int missed = 1; - int limit = bufferSize - (bufferSize >> 2); + int limit = Operators.unboundedOrLimit(bufferSize); long e = emitted; long ci = consumerIndex; int f = consumed; diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxFlatMap.java b/reactor-core/src/main/java/reactor/core/publisher/FluxFlatMap.java index 25e999beb6..473319b96e 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxFlatMap.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxFlatMap.java @@ -241,7 +241,7 @@ static final class FlatMapMain extends FlatMapTracker, R> this.mainQueueSupplier = mainQueueSupplier; this.prefetch = prefetch; this.innerQueueSupplier = innerQueueSupplier; - this.limit = maxConcurrency - (maxConcurrency >> 2); + this.limit = Operators.unboundedOrLimit(maxConcurrency); } @Override @@ -329,13 +329,7 @@ public void onSubscribe(Subscription s) { this.s = s; actual.onSubscribe(this); - - if (maxConcurrency == Integer.MAX_VALUE) { - s.request(Long.MAX_VALUE); - } - else { - s.request(maxConcurrency); - } + s.request(Operators.unboundedOrPrefetch(maxConcurrency)); } } @@ -883,7 +877,8 @@ static final class FlatMapInner FlatMapInner(FlatMapMain parent, int prefetch) { this.parent = parent; this.prefetch = prefetch; - this.limit = prefetch - (prefetch >> 2); +// this.limit = prefetch >> 2; + this.limit = Operators.unboundedOrLimit(prefetch); } @Override @@ -906,7 +901,7 @@ else if (m == Fuseable.ASYNC) { } // NONE is just fall-through as the queue will be created on demand } - s.request(prefetch); + s.request(Operators.unboundedOrPrefetch(prefetch)); } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java b/reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java index 4fa3f47896..e98b1c1fdf 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java @@ -165,7 +165,7 @@ static final class FlattenIterableSubscriber this.mapper = mapper; this.prefetch = prefetch; this.queueSupplier = queueSupplier; - this.limit = prefetch - (prefetch >> 2); + this.limit = Operators.unboundedOrLimit(prefetch); } @Override @@ -213,7 +213,7 @@ else if (m == Fuseable.ASYNC) { actual.onSubscribe(this); - s.request(prefetch); + s.request(Operators.unboundedOrPrefetch(prefetch)); return; } } @@ -222,7 +222,7 @@ else if (m == Fuseable.ASYNC) { actual.onSubscribe(this); - s.request(prefetch); + s.request(Operators.unboundedOrPrefetch(prefetch)); } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxGroupBy.java b/reactor-core/src/main/java/reactor/core/publisher/FluxGroupBy.java index e5a40af589..af378674ce 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxGroupBy.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxGroupBy.java @@ -162,12 +162,7 @@ public void onSubscribe(Subscription s) { if (Operators.validate(this.s, s)) { this.s = s; actual.onSubscribe(this); - if (prefetch == Integer.MAX_VALUE) { - s.request(Long.MAX_VALUE); - } - else { - s.request(prefetch); - } + s.request(Operators.unboundedOrPrefetch(prefetch)); } } @@ -523,7 +518,7 @@ public K key() { this.queue = queue; this.context = parent.currentContext(); this.parent = parent; - this.limit = prefetch - (prefetch >> 2); + this.limit = Operators.unboundedOrLimit(prefetch); } void doTerminate() { diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxMergeSequential.java b/reactor-core/src/main/java/reactor/core/publisher/FluxMergeSequential.java index f83808995d..b4ab67a334 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxMergeSequential.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxMergeSequential.java @@ -504,7 +504,7 @@ static final class MergeSequentialInner implements InnerConsumer{ MergeSequentialInner(MergeSequentialMain parent, int prefetch) { this.parent = parent; this.prefetch = prefetch; - this.limit = prefetch - (prefetch >> 2); + this.limit = Operators.unboundedOrLimit(prefetch); } @Override @@ -543,14 +543,13 @@ public void onSubscribe(Subscription s) { if (m == Fuseable.ASYNC) { fusionMode = m; queue = qs; - //FIXME could be mutualized in DrainUtils or Operators (+ review other prefetch based operators) - s.request(prefetch == Integer.MAX_VALUE ? Long.MAX_VALUE : prefetch); + s.request(Operators.unboundedOrPrefetch(prefetch)); return; } } queue = Queues.get(prefetch).get(); - s.request(prefetch == Integer.MAX_VALUE ? Long.MAX_VALUE : prefetch); + s.request(Operators.unboundedOrPrefetch(prefetch)); } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxPublish.java b/reactor-core/src/main/java/reactor/core/publisher/FluxPublish.java index 3c107d2578..b766c10429 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxPublish.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxPublish.java @@ -224,17 +224,17 @@ public void onSubscribe(Subscription s) { drain(); return; } - else if (m == Fuseable.ASYNC) { + if (m == Fuseable.ASYNC) { sourceMode = m; queue = f; - s.request(prefetch); + s.request(Operators.unboundedOrPrefetch(prefetch)); return; } } queue = parent.queueSupplier.get(); - s.request(prefetch); + s.request(Operators.unboundedOrPrefetch(prefetch)); } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxPublishMulticast.java b/reactor-core/src/main/java/reactor/core/publisher/FluxPublishMulticast.java index b233190189..21c3352de3 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxPublishMulticast.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxPublishMulticast.java @@ -153,7 +153,7 @@ static final class FluxPublishMulticaster extends Flux FluxPublishMulticaster(int prefetch, Supplier> queueSupplier, Context ctx) { this.prefetch = prefetch; - this.limit = prefetch - (prefetch >> 2); + this.limit = Operators.unboundedOrLimit(prefetch); this.queueSupplier = queueSupplier; this.subscribers = EMPTY; this.context = ctx; @@ -226,13 +226,13 @@ public void onSubscribe(Subscription s) { return; } - else if (m == Fuseable.ASYNC) { + if (m == Fuseable.ASYNC) { sourceMode = m; queue = qs; connected = true; - s.request(prefetch); + s.request(Operators.unboundedOrPrefetch(prefetch)); return; } @@ -241,7 +241,7 @@ else if (m == Fuseable.ASYNC) { queue = queueSupplier.get(); connected = true; - s.request(prefetch); + s.request(Operators.unboundedOrPrefetch(prefetch)); } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxPublishOn.java b/reactor-core/src/main/java/reactor/core/publisher/FluxPublishOn.java index 3459287ebe..24b492d4f8 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxPublishOn.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxPublishOn.java @@ -156,12 +156,7 @@ static final class PublishOnSubscriber this.delayError = delayError; this.prefetch = prefetch; this.queueSupplier = queueSupplier; - if (prefetch != Integer.MAX_VALUE) { - this.limit = prefetch - (prefetch >> 2); - } - else { - this.limit = Integer.MAX_VALUE; - } + this.limit = Operators.unboundedOrLimit(prefetch); } @Override @@ -183,13 +178,13 @@ public void onSubscribe(Subscription s) { actual.onSubscribe(this); return; } - else if (m == Fuseable.ASYNC) { + if (m == Fuseable.ASYNC) { sourceMode = Fuseable.ASYNC; queue = f; actual.onSubscribe(this); - initialRequest(); + s.request(Operators.unboundedOrPrefetch(prefetch)); return; } @@ -199,16 +194,7 @@ else if (m == Fuseable.ASYNC) { actual.onSubscribe(this); - initialRequest(); - } - } - - void initialRequest() { - if (prefetch == Integer.MAX_VALUE) { - s.request(Long.MAX_VALUE); - } - else { - s.request(prefetch); + s.request(Operators.unboundedOrPrefetch(prefetch)); } } @@ -637,12 +623,7 @@ static final class PublishOnConditionalSubscriber this.delayError = delayError; this.prefetch = prefetch; this.queueSupplier = queueSupplier; - if (prefetch != Integer.MAX_VALUE) { - this.limit = prefetch - (prefetch >> 2); - } - else { - this.limit = Integer.MAX_VALUE; - } + this.limit = Operators.unboundedOrLimit(prefetch); } @Override @@ -664,13 +645,13 @@ public void onSubscribe(Subscription s) { actual.onSubscribe(this); return; } - else if (m == Fuseable.ASYNC) { + if (m == Fuseable.ASYNC) { sourceMode = Fuseable.ASYNC; queue = f; actual.onSubscribe(this); - initialRequest(); + s.request(Operators.unboundedOrPrefetch(prefetch)); return; } @@ -680,16 +661,7 @@ else if (m == Fuseable.ASYNC) { actual.onSubscribe(this); - initialRequest(); - } - } - - void initialRequest() { - if (prefetch == Integer.MAX_VALUE) { - s.request(Long.MAX_VALUE); - } - else { - s.request(prefetch); + s.request(Operators.unboundedOrPrefetch(prefetch)); } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxSwitchMap.java b/reactor-core/src/main/java/reactor/core/publisher/FluxSwitchMap.java index 58a6fd9f58..ebcc7bef76 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxSwitchMap.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxSwitchMap.java @@ -50,7 +50,7 @@ final class FluxSwitchMap extends FluxOperator { final Supplier> queueSupplier; - final int bufferSize; + final int prefetch; @SuppressWarnings("ConstantConditions") static final SwitchMapInner CANCELLED_INNER = @@ -59,14 +59,15 @@ final class FluxSwitchMap extends FluxOperator { FluxSwitchMap(Flux source, Function> mapper, Supplier> queueSupplier, - int bufferSize) { + int prefetch) { super(source); - if (bufferSize <= 0) { - throw new IllegalArgumentException("BUFFER_SIZE > 0 required but it was " + bufferSize); + if (prefetch <= 0) { + throw new IllegalArgumentException("prefetch > 0 required but it was " + + prefetch); } this.mapper = Objects.requireNonNull(mapper, "mapper"); this.queueSupplier = Objects.requireNonNull(queueSupplier, "queueSupplier"); - this.bufferSize = bufferSize; + this.prefetch = prefetch; } @Override @@ -82,7 +83,7 @@ public void subscribe(CoreSubscriber actual) { source.subscribe(new SwitchMapMain(actual, mapper, - queueSupplier.get(), bufferSize)); + queueSupplier.get(), prefetch)); } static final class SwitchMapMain implements InnerOperator { @@ -91,7 +92,7 @@ static final class SwitchMapMain implements InnerOperator { final Queue queue; final BiPredicate queueBiAtomic; - final int bufferSize; + final int prefetch; final CoreSubscriber actual; Subscription s; @@ -144,11 +145,11 @@ static final class SwitchMapMain implements InnerOperator { SwitchMapMain(CoreSubscriber actual, Function> mapper, Queue queue, - int bufferSize) { + int prefetch) { this.actual = actual; this.mapper = mapper; this.queue = queue; - this.bufferSize = bufferSize; + this.prefetch = prefetch; this.active = 1; if(queue instanceof BiPredicate){ this.queueBiAtomic = (BiPredicate) queue; @@ -170,7 +171,7 @@ public Object scanUnsafe(Attr key) { if (key == Attr.PARENT) return s; if (key == Attr.TERMINATED) return done; if (key == Attr.ERROR) return error; - if (key == Attr.PREFETCH) return bufferSize; + if (key == Attr.PREFETCH) return prefetch; if (key == Attr.BUFFERED) return queue.size(); if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return requested; @@ -220,7 +221,7 @@ public void onNext(T t) { } SwitchMapInner innerSubscriber = - new SwitchMapInner<>(this, bufferSize, idx); + new SwitchMapInner<>(this, prefetch, idx); if (INNER.compareAndSet(this, si, innerSubscriber)) { ACTIVE.getAndIncrement(this); @@ -429,7 +430,7 @@ static final class SwitchMapInner implements InnerConsumer, Subscription { final SwitchMapMain parent; - final int bufferSize; + final int prefetch; final int limit; @@ -449,10 +450,10 @@ static final class SwitchMapInner implements InnerConsumer, Subscription { int produced; - SwitchMapInner(SwitchMapMain parent, int bufferSize, long index) { + SwitchMapInner(SwitchMapMain parent, int prefetch, long index) { this.parent = parent; - this.bufferSize = bufferSize; - this.limit = bufferSize - (bufferSize >> 2); + this.prefetch = prefetch; + this.limit = Operators.unboundedOrLimit(prefetch); this.index = index; } @@ -467,7 +468,7 @@ public Object scanUnsafe(Attr key) { if (key == Attr.CANCELLED) return s == Operators.cancelledSubscription(); if (key == Attr.PARENT) return s; if (key == Attr.ACTUAL) return parent; - if (key == Attr.PREFETCH) return bufferSize; + if (key == Attr.PREFETCH) return prefetch; return null; } @@ -486,7 +487,7 @@ public void onSubscribe(Subscription s) { } if (S.compareAndSet(this, null, s)) { - s.request(bufferSize); + s.request(Operators.unboundedOrPrefetch(prefetch)); return; } a = this.s; diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxWindowPredicate.java b/reactor-core/src/main/java/reactor/core/publisher/FluxWindowPredicate.java index 4397657c08..5f8aa56e61 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxWindowPredicate.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxWindowPredicate.java @@ -171,12 +171,7 @@ public void onSubscribe(Subscription s) { this.s = s; actual.onSubscribe(this); if (cancelled == 0) { - if (prefetch == Integer.MAX_VALUE) { - s.request(Long.MAX_VALUE); - } - else { - s.request(prefetch); - } + s.request(Operators.unboundedOrPrefetch(prefetch)); } } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxZip.java b/reactor-core/src/main/java/reactor/core/publisher/FluxZip.java index 37b28752e3..52dcaf7008 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxZip.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxZip.java @@ -842,7 +842,7 @@ static final class ZipInner this.prefetch = prefetch; this.index = index; this.queueSupplier = queueSupplier; - this.limit = prefetch - (prefetch >> 2); + this.limit = Operators.unboundedOrLimit(prefetch); } @SuppressWarnings("unchecked") @@ -872,7 +872,7 @@ else if (m == ASYNC) { else { queue = queueSupplier.get(); } - s.request(prefetch); + s.request(Operators.unboundedOrPrefetch(prefetch)); } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/Mono.java b/reactor-core/src/main/java/reactor/core/publisher/Mono.java index e06b82b355..3e947154ce 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Mono.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Mono.java @@ -547,7 +547,7 @@ public static Mono sequenceEqual(Publisher source1, Pu * the second Publisher to compare * @param isEqual * a function used to compare items emitted by each Publisher - * @param bufferSize + * @param prefetch * the number of items to prefetch from the first and second source Publisher * @param * the type of items emitted by each Publisher @@ -556,8 +556,8 @@ public static Mono sequenceEqual(Publisher source1, Pu */ public static Mono sequenceEqual(Publisher source1, Publisher source2, - BiPredicate isEqual, int bufferSize) { - return onAssembly(new MonoSequenceEqual<>(source1, source2, isEqual, bufferSize)); + BiPredicate isEqual, int prefetch) { + return onAssembly(new MonoSequenceEqual<>(source1, source2, isEqual, prefetch)); } /** diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoSequenceEqual.java b/reactor-core/src/main/java/reactor/core/publisher/MonoSequenceEqual.java index bb35ad7875..e594bc6aea 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoSequenceEqual.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoSequenceEqual.java @@ -37,23 +37,24 @@ final class MonoSequenceEqual extends Mono { final Publisher first; final Publisher second; final BiPredicate comparer; - final int bufferSize; + final int prefetch; MonoSequenceEqual(Publisher first, Publisher second, - BiPredicate comparer, int bufferSize) { + BiPredicate comparer, int prefetch) { this.first = Objects.requireNonNull(first, "first"); this.second = Objects.requireNonNull(second, "second"); this.comparer = Objects.requireNonNull(comparer, "comparer"); - if(bufferSize < 1){ + if(prefetch < 1){ throw new IllegalArgumentException("Buffer size must be strictly positive: " + - ""+bufferSize); + ""+ prefetch); } - this.bufferSize = bufferSize; + this.prefetch = prefetch; } @Override public void subscribe(CoreSubscriber actual) { - EqualCoordinator ec = new EqualCoordinator<>(actual, bufferSize, first, second, comparer); + EqualCoordinator ec = new EqualCoordinator<>(actual, + prefetch, first, second, comparer); actual.onSubscribe(ec); ec.subscribe(); } @@ -82,15 +83,15 @@ static final class EqualCoordinator implements InnerProducer { static final AtomicIntegerFieldUpdater WIP = AtomicIntegerFieldUpdater.newUpdater(EqualCoordinator.class, "wip"); - EqualCoordinator(CoreSubscriber actual, int bufferSize, + EqualCoordinator(CoreSubscriber actual, int prefetch, Publisher first, Publisher second, BiPredicate comparer) { this.actual = actual; this.first = first; this.second = second; this.comparer = comparer; - firstSubscriber = new EqualSubscriber<>(this, bufferSize); - secondSubscriber = new EqualSubscriber<>(this, bufferSize); + firstSubscriber = new EqualSubscriber<>(this, prefetch); + secondSubscriber = new EqualSubscriber<>(this, prefetch); } @Override @@ -281,7 +282,7 @@ static final class EqualSubscriber implements InnerConsumer { final EqualCoordinator parent; final Queue queue; - final int bufferSize; + final int prefetch; volatile boolean done; Throwable error; @@ -292,10 +293,10 @@ static final class EqualSubscriber AtomicReferenceFieldUpdater.newUpdater(EqualSubscriber.class, Subscription.class, "subscription"); - EqualSubscriber(EqualCoordinator parent, int bufferSize) { + EqualSubscriber(EqualCoordinator parent, int prefetch) { this.parent = parent; - this.bufferSize = bufferSize; - this.queue = Queues.get(bufferSize).get(); + this.prefetch = prefetch; + this.queue = Queues.get(prefetch).get(); } @Override @@ -311,7 +312,7 @@ public Object scanUnsafe(Attr key) { if (key == Attr.ERROR) return error; if (key == Attr.CANCELLED) return subscription == Operators.cancelledSubscription(); if (key == Attr.PARENT) return subscription; - if (key == Attr.PREFETCH) return bufferSize; + if (key == Attr.PREFETCH) return prefetch; if (key == Attr.BUFFERED) return queue.size(); return null; @@ -321,7 +322,7 @@ public Object scanUnsafe(Attr key) { public void onSubscribe(Subscription s) { if (Operators.setOnce(S, this, s)) { this.cachedSubscription = s; - s.request(bufferSize); + s.request(Operators.unboundedOrPrefetch(prefetch)); } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/Operators.java b/reactor-core/src/main/java/reactor/core/publisher/Operators.java index 6c170dafde..1e345d75a9 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Operators.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Operators.java @@ -785,6 +785,14 @@ static void onNextDroppedMulticast(T t) { onNextDropped(t, Context.empty()); } + static long unboundedOrPrefetch(int prefetch) { + return prefetch == Integer.MAX_VALUE ? Long.MAX_VALUE : prefetch; + } + + static int unboundedOrLimit(int prefetch) { + return prefetch == Integer.MAX_VALUE ? Integer.MAX_VALUE : (prefetch - (prefetch >> 2)); + } + Operators() { } @@ -813,8 +821,8 @@ public void onComplete() { log.error("Unexpected call to Operators.emptySubscriber()", e); } }; - // + final static class CancelledSubscription implements Subscription, Scannable { static final CancelledSubscription INSTANCE = new CancelledSubscription(); diff --git a/reactor-core/src/main/java/reactor/core/publisher/ParallelMergeSequential.java b/reactor-core/src/main/java/reactor/core/publisher/ParallelMergeSequential.java index 1a2c4888f6..c803cadb44 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/ParallelMergeSequential.java +++ b/reactor-core/src/main/java/reactor/core/publisher/ParallelMergeSequential.java @@ -179,7 +179,7 @@ void onNext(MergeSequentialInner inner, T value) { if (requested != Long.MAX_VALUE) { REQUESTED.decrementAndGet(this); } - inner.request(1); + inner.requestOne(); } else { Queue q = inner.getQueue(queueSupplier); @@ -364,7 +364,7 @@ static final class MergeSequentialInner implements InnerConsumer { MergeSequentialInner(MergeSequentialMain parent, int prefetch) { this.parent = parent; this.prefetch = prefetch ; - this.limit = prefetch - (prefetch >> 2); + this.limit = Operators.unboundedOrLimit(prefetch); } @Override @@ -388,7 +388,7 @@ public Context currentContext() { @Override public void onSubscribe(Subscription s) { if (Operators.setOnce(S, this, s)) { - s.request(prefetch); + s.request(Operators.unboundedOrPrefetch(prefetch)); } } @@ -417,16 +417,6 @@ void requestOne() { } } - public void request(long n) { - long p = produced + n; - if (p >= limit) { - produced = 0; - s.request(p); - } else { - produced = p; - } - } - public void cancel() { Operators.terminate(S, this); } diff --git a/reactor-core/src/main/java/reactor/core/publisher/ParallelSource.java b/reactor-core/src/main/java/reactor/core/publisher/ParallelSource.java index 1c57285e06..7eecd82c79 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/ParallelSource.java +++ b/reactor-core/src/main/java/reactor/core/publisher/ParallelSource.java @@ -136,7 +136,7 @@ static final class ParallelSourceMain implements InnerConsumer { this.subscribers = subscribers; this.prefetch = prefetch; this.queueSupplier = queueSupplier; - this.limit = prefetch - (prefetch >> 2); + this.limit = Operators.unboundedOrLimit(prefetch); this.requests = new AtomicLongArray(subscribers.length); this.emissions = new long[subscribers.length]; } @@ -189,7 +189,7 @@ public void onSubscribe(Subscription s) { setupSubscribers(); - s.request(prefetch); + s.request(Operators.unboundedOrPrefetch(prefetch)); return; } @@ -199,7 +199,7 @@ public void onSubscribe(Subscription s) { setupSubscribers(); - s.request(prefetch); + s.request(Operators.unboundedOrPrefetch(prefetch)); } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/BlockingIterableTest.java b/reactor-core/src/test/java/reactor/core/publisher/BlockingIterableTest.java index 11a5c1294a..3456ce5fcd 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/BlockingIterableTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/BlockingIterableTest.java @@ -149,10 +149,10 @@ public void scanOperator() { public void scanOperatorLargePrefetchIsLimitedToIntMax() { Flux source = Flux.range(1, 10); BlockingIterable test = new BlockingIterable<>(source, - Integer.MAX_VALUE + 30L, + Integer.MAX_VALUE, Queues.one()); - assertThat(test.scan(Attr.PREFETCH)).isEqualTo(Integer.MAX_VALUE); //FIXME + assertThat(test.scan(Attr.PREFETCH)).isEqualTo(Integer.MAX_VALUE); } @Test @@ -175,7 +175,7 @@ public void scanSubscriber() { public void scanSubscriberLargePrefetchIsLimitedToIntMax() { BlockingIterable.SubscriberIterator subscriberIterator = new BlockingIterable.SubscriberIterator<>(Queues.one().get(), - Integer.MAX_VALUE + 30L); + Integer.MAX_VALUE); assertThat(subscriberIterator.scan(Attr.PREFETCH)).isEqualTo(Integer.MAX_VALUE); //FIXME } diff --git a/reactor-core/src/test/java/reactor/test/publisher/BaseOperatorTest.java b/reactor-core/src/test/java/reactor/test/publisher/BaseOperatorTest.java index 1be8b3489c..5031133e85 100644 --- a/reactor-core/src/test/java/reactor/test/publisher/BaseOperatorTest.java +++ b/reactor-core/src/test/java/reactor/test/publisher/BaseOperatorTest.java @@ -328,16 +328,6 @@ protected final RuntimeException exception() { return defaultScenario.producerError; } - final int defaultLimit(OperatorScenario scenario) { - if (scenario.prefetch() == -1) { - return Queues.SMALL_BUFFER_SIZE - (Queues.SMALL_BUFFER_SIZE >> 2); - } - if (scenario.prefetch() == Integer.MAX_VALUE) { - return Integer.MAX_VALUE; - } - return scenario.prefetch() - (scenario.prefetch() >> 2); - } - protected OperatorScenario defaultScenarioOptions(OperatorScenario defaultOptions) { return defaultOptions; }