Skip to content

Commit

Permalink
Review int.max prefetch
Browse files Browse the repository at this point in the history
- fix toIterable signature to int
- support long.max unbounded on prefetch=int.max
  • Loading branch information
smaldini authored Sep 21, 2017
1 parent d43627f commit f3e13bf
Show file tree
Hide file tree
Showing 23 changed files with 121 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ final class BlockingIterable<T> implements Iterable<T>, Scannable {

final Publisher<? extends T> source;

final long batchSize;
final int batchSize;

final Supplier<Queue<T>> queueSupplier;

BlockingIterable(Publisher<? extends T> source,
long batchSize,
int batchSize,
Supplier<Queue<T>> queueSupplier) {
if (batchSize <= 0) {
throw new IllegalArgumentException("batchSize > 0 required but it was " + batchSize);
Expand All @@ -67,7 +67,7 @@ final class BlockingIterable<T> implements Iterable<T>, Scannable {
@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.PREFETCH) return (int) Math.min(Integer.MAX_VALUE, batchSize); //FIXME should batchSize be forced to int?
if (key == Attr.PREFETCH) return Math.min(Integer.MAX_VALUE, batchSize); //FIXME should batchSize be forced to int?
if (key == Attr.PARENT) return source;

return null;
Expand Down Expand Up @@ -120,9 +120,9 @@ static final class SubscriberIterator<T>

final Queue<T> queue;

final long batchSize;
final int batchSize;

final long limit;
final int limit;

final Lock lock;

Expand All @@ -140,10 +140,10 @@ static final class SubscriberIterator<T>
volatile boolean done;
Throwable error;

SubscriberIterator(Queue<T> queue, long batchSize) {
SubscriberIterator(Queue<T> queue, int batchSize) {
this.queue = queue;
this.batchSize = batchSize;
this.limit = batchSize - (batchSize >> 2);
this.limit = Operators.unboundedOrLimit(batchSize);
this.lock = new ReentrantLock();
this.condition = lock.newCondition();
}
Expand Down Expand Up @@ -211,7 +211,7 @@ public T next() {
@Override
public void onSubscribe(Subscription s) {
if (Operators.setOnce(S, this, s)) {
s.request(batchSize);
s.request(Operators.unboundedOrPrefetch(batchSize));
}
}

Expand Down Expand Up @@ -264,7 +264,7 @@ public Object scanUnsafe(Attr key) {
if (key == Attr.TERMINATED) return done;
if (key == Attr.PARENT) return s;
if (key == Attr.CANCELLED) return s == Operators.cancelledSubscription();
if (key == Attr.PREFETCH) return (int) Math.min(Integer.MAX_VALUE, batchSize); //FIXME should batchSize be typed int?
if (key == Attr.PREFETCH) return batchSize;
if (key == Attr.ERROR) return error;

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,14 @@ public void onSubscribe(final Subscription s) {
else if (m == Fuseable.ASYNC) {
sourceMode = m;
queue = f;
s.request(prefetch);
s.request(Operators.unboundedOrPrefetch(prefetch));
return;
}
}

queue = Queues.<T>get(prefetch).get();

s.request(prefetch);
s.request(Operators.unboundedOrPrefetch(prefetch));
}
}

Expand Down
7 changes: 4 additions & 3 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -7091,7 +7091,7 @@ public final Iterable<T> toIterable() {
*
* @return a blocking {@link Iterable}
*/
public final Iterable<T> toIterable(long batchSize) {
public final Iterable<T> toIterable(int batchSize) {
return toIterable(batchSize, null);
}

Expand All @@ -7109,10 +7109,11 @@ public final Iterable<T> toIterable(long batchSize) {
*
* @return a blocking {@link Iterable}
*/
public final Iterable<T> toIterable(long batchSize, @Nullable Supplier<Queue<T>> queueProvider) {
public final Iterable<T> toIterable(int batchSize, @Nullable Supplier<Queue<T>>
queueProvider) {
final Supplier<Queue<T>> provider;
if(queueProvider == null){
provider = Queues.get((int)Math.min(Integer.MAX_VALUE, batchSize));
provider = Queues.get(batchSize);
}
else{
provider = queueProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,41 +54,39 @@ final class FluxCombineLatest<T, R> extends Flux<R> implements Fuseable {

final Supplier<? extends Queue<SourceAndArray>> queueSupplier;

final int bufferSize;
final int prefetch;

FluxCombineLatest(Publisher<? extends T>[] array,
Function<Object[], R> combiner,
Supplier<? extends Queue<SourceAndArray>> queueSupplier,
int bufferSize) {
if (bufferSize <= 0) {
throw new IllegalArgumentException("BUFFER_SIZE > 0 required but it was " + bufferSize);
Supplier<? extends Queue<SourceAndArray>> queueSupplier, int prefetch) {
if (prefetch <= 0) {
throw new IllegalArgumentException("prefetch > 0 required but it was " + prefetch);
}

this.array = Objects.requireNonNull(array, "array");
this.iterable = null;
this.combiner = Objects.requireNonNull(combiner, "combiner");
this.queueSupplier = Objects.requireNonNull(queueSupplier, "queueSupplier");
this.bufferSize = bufferSize;
this.prefetch = prefetch;
}

FluxCombineLatest(Iterable<? extends Publisher<? extends T>> iterable,
Function<Object[], R> combiner,
Supplier<? extends Queue<SourceAndArray>> queueSupplier,
int bufferSize) {
if (bufferSize <= 0) {
throw new IllegalArgumentException("BUFFER_SIZE > 0 required but it was " + bufferSize);
Supplier<? extends Queue<SourceAndArray>> queueSupplier, int prefetch) {
if (prefetch < 0) {
throw new IllegalArgumentException("prefetch > 0 required but it was " + prefetch);
}

this.array = null;
this.iterable = Objects.requireNonNull(iterable, "iterable");
this.combiner = Objects.requireNonNull(combiner, "combiner");
this.queueSupplier = Objects.requireNonNull(queueSupplier, "queueSupplier");
this.bufferSize = bufferSize;
this.prefetch = prefetch;
}

@Override
public int getPrefetch() {
return bufferSize;
return prefetch;
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -171,7 +169,7 @@ public void subscribe(CoreSubscriber<? super R> actual) {
Queue<SourceAndArray> queue = queueSupplier.get();

CombineLatestCoordinator<T, R> coordinator =
new CombineLatestCoordinator<>(actual, combiner, n, queue, bufferSize);
new CombineLatestCoordinator<>(actual, combiner, n, queue, prefetch);

actual.onSubscribe(coordinator);

Expand Down Expand Up @@ -222,14 +220,13 @@ static final class CombineLatestCoordinator<T, R>
CombineLatestCoordinator(CoreSubscriber<? super R> actual,
Function<Object[], R> combiner,
int n,
Queue<SourceAndArray> queue,
int bufferSize) {
Queue<SourceAndArray> queue, int prefetch) {
this.actual = actual;
this.combiner = combiner;
@SuppressWarnings("unchecked") CombineLatestInner<T>[] a =
new CombineLatestInner[n];
for (int i = 0; i < n; i++) {
a[i] = new CombineLatestInner<>(this, i, bufferSize);
a[i] = new CombineLatestInner<>(this, i, prefetch);
}
this.subscribers = a;
this.latest = new Object[n];
Expand Down Expand Up @@ -578,7 +575,7 @@ static final class CombineLatestInner<T>
this.parent = parent;
this.index = index;
this.prefetch = prefetch;
this.limit = prefetch - (prefetch >> 2);
this.limit = Operators.unboundedOrLimit(prefetch);
}

@Override
Expand All @@ -589,7 +586,7 @@ public Context currentContext() {
@Override
public void onSubscribe(Subscription s) {
if (Operators.setOnce(S, this, s)) {
s.request(prefetch);
s.request(Operators.unboundedOrPrefetch(prefetch));
}
}

Expand All @@ -613,7 +610,6 @@ public void cancel() {
}

void requestOne() {

int p = produced + 1;
if (p == limit) {
produced = 0;
Expand All @@ -622,7 +618,6 @@ void requestOne() {
else {
produced = p;
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ static final class ConcatMapImmediate<T, R>
this.mapper = mapper;
this.queueSupplier = queueSupplier;
this.prefetch = prefetch;
this.limit = prefetch - (prefetch >> 2);
this.limit = Operators.unboundedOrLimit(prefetch);
this.inner = new ConcatMapInner<>(this);
}

Expand Down Expand Up @@ -224,7 +224,7 @@ else if (m == Fuseable.ASYNC) {

actual.onSubscribe(this);

s.request(prefetch == Integer.MAX_VALUE ? Long.MAX_VALUE : prefetch);
s.request(Operators.unboundedOrPrefetch(prefetch));
}
}

Expand Down Expand Up @@ -505,7 +505,7 @@ static final class ConcatMapDelayed<T, R>
this.mapper = mapper;
this.queueSupplier = queueSupplier;
this.prefetch = prefetch;
this.limit = prefetch - (prefetch >> 2);
this.limit = Operators.unboundedOrLimit(prefetch);
this.veryEnd = veryEnd;
this.inner = new ConcatMapInner<>(this);
}
Expand Down Expand Up @@ -564,7 +564,7 @@ else if (m == Fuseable.ASYNC) {

actual.onSubscribe(this);

s.request(prefetch == Integer.MAX_VALUE ? Long.MAX_VALUE : prefetch);
s.request(Operators.unboundedOrPrefetch(prefetch));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ void drain() {
}

int missed = 1;
int limit = bufferSize - (bufferSize >> 2);
int limit = Operators.unboundedOrLimit(bufferSize);
long e = emitted;
long ci = consumerIndex;
int f = consumed;
Expand Down
15 changes: 5 additions & 10 deletions reactor-core/src/main/java/reactor/core/publisher/FluxFlatMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ static final class FlatMapMain<T, R> extends FlatMapTracker<FlatMapInner<R>, R>
this.mainQueueSupplier = mainQueueSupplier;
this.prefetch = prefetch;
this.innerQueueSupplier = innerQueueSupplier;
this.limit = maxConcurrency - (maxConcurrency >> 2);
this.limit = Operators.unboundedOrLimit(maxConcurrency);
}

@Override
Expand Down Expand Up @@ -329,13 +329,7 @@ public void onSubscribe(Subscription s) {
this.s = s;

actual.onSubscribe(this);

if (maxConcurrency == Integer.MAX_VALUE) {
s.request(Long.MAX_VALUE);
}
else {
s.request(maxConcurrency);
}
s.request(Operators.unboundedOrPrefetch(maxConcurrency));
}
}

Expand Down Expand Up @@ -883,7 +877,8 @@ static final class FlatMapInner<R>
FlatMapInner(FlatMapMain<?, R> parent, int prefetch) {
this.parent = parent;
this.prefetch = prefetch;
this.limit = prefetch - (prefetch >> 2);
// this.limit = prefetch >> 2;
this.limit = Operators.unboundedOrLimit(prefetch);
}

@Override
Expand All @@ -906,7 +901,7 @@ else if (m == Fuseable.ASYNC) {
}
// NONE is just fall-through as the queue will be created on demand
}
s.request(prefetch);
s.request(Operators.unboundedOrPrefetch(prefetch));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ static final class FlattenIterableSubscriber<T, R>
this.mapper = mapper;
this.prefetch = prefetch;
this.queueSupplier = queueSupplier;
this.limit = prefetch - (prefetch >> 2);
this.limit = Operators.unboundedOrLimit(prefetch);
}

@Override
Expand Down Expand Up @@ -213,7 +213,7 @@ else if (m == Fuseable.ASYNC) {

actual.onSubscribe(this);

s.request(prefetch);
s.request(Operators.unboundedOrPrefetch(prefetch));
return;
}
}
Expand All @@ -222,7 +222,7 @@ else if (m == Fuseable.ASYNC) {

actual.onSubscribe(this);

s.request(prefetch);
s.request(Operators.unboundedOrPrefetch(prefetch));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,7 @@ public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = s;
actual.onSubscribe(this);
if (prefetch == Integer.MAX_VALUE) {
s.request(Long.MAX_VALUE);
}
else {
s.request(prefetch);
}
s.request(Operators.unboundedOrPrefetch(prefetch));
}
}

Expand Down Expand Up @@ -523,7 +518,7 @@ public K key() {
this.queue = queue;
this.context = parent.currentContext();
this.parent = parent;
this.limit = prefetch - (prefetch >> 2);
this.limit = Operators.unboundedOrLimit(prefetch);
}

void doTerminate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ static final class MergeSequentialInner<R> implements InnerConsumer<R>{
MergeSequentialInner(MergeSequentialMain<?, R> parent, int prefetch) {
this.parent = parent;
this.prefetch = prefetch;
this.limit = prefetch - (prefetch >> 2);
this.limit = Operators.unboundedOrLimit(prefetch);
}

@Override
Expand Down Expand Up @@ -543,14 +543,13 @@ public void onSubscribe(Subscription s) {
if (m == Fuseable.ASYNC) {
fusionMode = m;
queue = qs;
//FIXME could be mutualized in DrainUtils or Operators (+ review other prefetch based operators)
s.request(prefetch == Integer.MAX_VALUE ? Long.MAX_VALUE : prefetch);
s.request(Operators.unboundedOrPrefetch(prefetch));
return;
}
}

queue = Queues.<R>get(prefetch).get();
s.request(prefetch == Integer.MAX_VALUE ? Long.MAX_VALUE : prefetch);
s.request(Operators.unboundedOrPrefetch(prefetch));
}
}

Expand Down
Loading

0 comments on commit f3e13bf

Please sign in to comment.