Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #730 Use Flux instead of GroupedFlux for FluxWindowPredicate #759

Merged
merged 2 commits into from
Jul 27, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -7144,10 +7144,10 @@ public final Flux<Flux<T>> windowTimeout(int maxSize, Duration timespan, Schedul
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.0.M3/src/docs/marble/windowuntil.png" alt="">
*
* @param boundaryTrigger a predicate that triggers the next window when it becomes true.
* @return a {@link Flux} of {@link GroupedFlux} windows, bounded depending
* on the predicate and keyed with the value that triggered the new window.
* @return a {@link Flux} of {@link Flux} windows, bounded depending
* on the predicate.
*/
public final Flux<GroupedFlux<T, T>> windowUntil(Predicate<T> boundaryTrigger) {
public final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger) {
return windowUntil(boundaryTrigger, false);
}

Expand All @@ -7169,10 +7169,10 @@ public final Flux<GroupedFlux<T, T>> windowUntil(Predicate<T> boundaryTrigger) {
*
* @param boundaryTrigger a predicate that triggers the next window when it becomes true.
* @param cutBefore push to true to include the triggering element in the new window rather than the old.
* @return a {@link Flux} of {@link GroupedFlux} windows, bounded depending
* on the predicate and keyed with the value that triggered the new window.
* @return a {@link Flux} of {@link Flux} windows, bounded depending
* on the predicate.
*/
public final Flux<GroupedFlux<T, T>> windowUntil(Predicate<T> boundaryTrigger, boolean cutBefore) {
public final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger, boolean cutBefore) {
return windowUntil(boundaryTrigger, cutBefore, Queues.SMALL_BUFFER_SIZE);
}

Expand All @@ -7196,10 +7196,10 @@ public final Flux<GroupedFlux<T, T>> windowUntil(Predicate<T> boundaryTrigger, b
* @param boundaryTrigger a predicate that triggers the next window when it becomes true.
* @param cutBefore push to true to include the triggering element in the new window rather than the old.
* @param prefetch the request size to use for this {@link Flux}.
* @return a {@link Flux} of {@link GroupedFlux} windows, bounded depending
* on the predicate and keyed with the value that triggered the new window.
* @return a {@link Flux} of {@link Flux} windows, bounded depending
* on the predicate.
*/
public final Flux<GroupedFlux<T, T>> windowUntil(Predicate<T> boundaryTrigger, boolean cutBefore, int prefetch) {
public final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger, boolean cutBefore, int prefetch) {
return onAssembly(new FluxWindowPredicate<>(this,
Queues.unbounded(prefetch),
Queues.unbounded(prefetch),
Expand All @@ -7220,10 +7220,10 @@ public final Flux<GroupedFlux<T, T>> windowUntil(Predicate<T> boundaryTrigger, b
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.0.M3/src/docs/marble/windowwhile.png" alt="">
*
* @param inclusionPredicate a predicate that triggers the next window when it becomes false.
* @return a {@link Flux} of {@link GroupedFlux} windows, each containing
* subsequent elements that all passed a predicate, and keyed with a separator element.
* @return a {@link Flux} of {@link Flux} windows, each containing
* subsequent elements that all passed a predicate.
*/
public final Flux<GroupedFlux<T, T>> windowWhile(Predicate<T> inclusionPredicate) {
public final Flux<Flux<T>> windowWhile(Predicate<T> inclusionPredicate) {
return windowWhile(inclusionPredicate, Queues.SMALL_BUFFER_SIZE);
}

Expand All @@ -7240,10 +7240,10 @@ public final Flux<GroupedFlux<T, T>> windowWhile(Predicate<T> inclusionPredicate
*
* @param inclusionPredicate a predicate that triggers the next window when it becomes false.
* @param prefetch the request size to use for this {@link Flux}.
* @return a {@link Flux} of {@link GroupedFlux} windows, each containing
* subsequent elements that all passed a predicate, and keyed with a separator element.
* @return a {@link Flux} of {@link Flux} windows, each containing
* subsequent elements that all passed a predicate.
*/
public final Flux<GroupedFlux<T, T>> windowWhile(Predicate<T> inclusionPredicate, int prefetch) {
public final Flux<Flux<T>> windowWhile(Predicate<T> inclusionPredicate, int prefetch) {
return onAssembly(new FluxWindowPredicate<>(this,
Queues.unbounded(prefetch),
Queues.unbounded(prefetch),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@
*
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
final class FluxWindowPredicate<T> extends FluxOperator<T, GroupedFlux<T, T>>
final class FluxWindowPredicate<T> extends FluxOperator<T, Flux<T>>
implements Fuseable{

final Supplier<? extends Queue<T>> groupQueueSupplier;

final Supplier<? extends Queue<GroupedFlux<T, T>>> mainQueueSupplier;
final Supplier<? extends Queue<Flux<T>>> mainQueueSupplier;

final Mode mode;

Expand All @@ -65,7 +65,7 @@ final class FluxWindowPredicate<T> extends FluxOperator<T, GroupedFlux<T, T>>
final int prefetch;

FluxWindowPredicate(Flux<? extends T> source,
Supplier<? extends Queue<GroupedFlux<T, T>>> mainQueueSupplier,
Supplier<? extends Queue<Flux<T>>> mainQueueSupplier,
Supplier<? extends Queue<T>> groupQueueSupplier,
int prefetch,
Predicate<? super T> predicate,
Expand All @@ -84,7 +84,7 @@ final class FluxWindowPredicate<T> extends FluxOperator<T, GroupedFlux<T, T>>
}

@Override
public void subscribe(CoreSubscriber<? super GroupedFlux<T, T>> s) {
public void subscribe(CoreSubscriber<? super Flux<T>> s) {
source.subscribe(new WindowPredicateMain<>(s,
mainQueueSupplier.get(),
groupQueueSupplier,
Expand All @@ -99,10 +99,10 @@ public int getPrefetch() {
}

static final class WindowPredicateMain<T>
implements Fuseable.QueueSubscription<GroupedFlux<T, T>>,
InnerOperator<T, GroupedFlux<T, T>> {
implements Fuseable.QueueSubscription<Flux<T>>,
InnerOperator<T, Flux<T>> {

final CoreSubscriber<? super GroupedFlux<T, T>> actual;
final CoreSubscriber<? super Flux<T>> actual;

final Supplier<? extends Queue<T>> groupQueueSupplier;

Expand All @@ -112,9 +112,9 @@ static final class WindowPredicateMain<T>

final int prefetch;

final Queue<GroupedFlux<T, T>> queue;
final Queue<Flux<T>> queue;

WindowGroupedFlux<T> window;
WindowFlux<T> window;

volatile int wip;
@SuppressWarnings("rawtypes")
Expand Down Expand Up @@ -149,8 +149,8 @@ static final class WindowPredicateMain<T>

volatile boolean outputFused;

WindowPredicateMain(CoreSubscriber<? super GroupedFlux<T, T>> actual,
Queue<GroupedFlux<T, T>> queue,
WindowPredicateMain(CoreSubscriber<? super Flux<T>> actual,
Queue<Flux<T>> queue,
Supplier<? extends Queue<T>> groupQueueSupplier,
int prefetch,
Predicate<? super T> predicate,
Expand Down Expand Up @@ -182,19 +182,19 @@ public void onSubscribe(Subscription s) {
}

void initializeWindow() {
WindowGroupedFlux<T> g = new WindowGroupedFlux<>(null,
WindowFlux<T> g = new WindowFlux<>(
groupQueueSupplier.get(),
this);
window = g;
queue.offer(g);
}

void offerNewWindow(T key, @Nullable T emitInNewWindow) {
void offerNewWindow(@Nullable T emitInNewWindow) {
// if the main is cancelled, don't create new groups
if (cancelled == 0) {
WINDOW_COUNT.getAndIncrement(this);

WindowGroupedFlux<T> g = new WindowGroupedFlux<>(key,
WindowFlux<T> g = new WindowFlux<>(
groupQueueSupplier.get(), this);
if (emitInNewWindow != null) {
g.onNext(emitInNewWindow);
Expand All @@ -215,7 +215,7 @@ public void onNext(T t) {
Operators.onNextDropped(t);
return;
}
WindowGroupedFlux<T> g = window;
WindowFlux<T> g = window;

boolean match;
try {
Expand All @@ -229,15 +229,15 @@ public void onNext(T t) {
if (mode == Mode.UNTIL && match) {
g.onNext(t);
g.onComplete();
offerNewWindow(t, null);
offerNewWindow(null);
}
else if (mode == Mode.UNTIL_CUT_BEFORE && match) {
g.onComplete();
offerNewWindow(t, t);
offerNewWindow(t);
}
else if (mode == Mode.WHILE && !match) {
g.onComplete();
offerNewWindow(t, null);
offerNewWindow(null);
//compensate for the dropped delimiter
s.request(1);
}
Expand All @@ -263,7 +263,7 @@ public void onComplete() {
return;
}

WindowGroupedFlux<T> g = window;
WindowFlux<T> g = window;
if (g != null) {
g.onComplete();
}
Expand Down Expand Up @@ -293,14 +293,14 @@ public Stream<? extends Scannable> inners() {
}

@Override
public CoreSubscriber<? super GroupedFlux<T, T>> actual() {
public CoreSubscriber<? super Flux<T>> actual() {
return actual;
}

void signalAsyncError() {
Throwable e = Exceptions.terminate(ERROR, this);
windowCount = 0;
WindowGroupedFlux<T> g = window;
WindowFlux<T> g = window;
if (g != null) {
g.onError(e);
}
Expand All @@ -325,9 +325,9 @@ public void cancel() {
else if (!outputFused) {
if (WIP.getAndIncrement(this) == 0) {
// remove queued up but unobservable groups from the mapping
GroupedFlux<T, T> g;
Flux<T> g;
while ((g = queue.poll()) != null) {
((WindowGroupedFlux<T>) g).cancel();
((WindowFlux<T>) g).cancel();
}

if (WIP.decrementAndGet(this) == 0) {
Expand Down Expand Up @@ -366,8 +366,8 @@ void drain() {
void drainFused() {
int missed = 1;

final Subscriber<? super GroupedFlux<T, T>> a = actual;
final Queue<GroupedFlux<T, T>> q = queue;
final Subscriber<? super Flux<T>> a = actual;
final Queue<Flux<T>> q = queue;

for (; ; ) {

Expand Down Expand Up @@ -402,8 +402,8 @@ void drainLoop() {

int missed = 1;

Subscriber<? super GroupedFlux<T, T>> a = actual;
Queue<GroupedFlux<T, T>> q = queue;
Subscriber<? super Flux<T>> a = actual;
Queue<Flux<T>> q = queue;

for (; ; ) {

Expand All @@ -412,7 +412,7 @@ void drainLoop() {

while (e != r) {
boolean d = done;
GroupedFlux<T, T> v = q.poll();
Flux<T> v = q.poll();
boolean empty = v == null;

if (checkTerminated(d, empty, a, q)) {
Expand Down Expand Up @@ -453,7 +453,7 @@ void drainLoop() {
boolean checkTerminated(boolean d,
boolean empty,
Subscriber<?> a,
Queue<GroupedFlux<T, T>> q) {
Queue<Flux<T>> q) {

if (cancelled != 0) {
q.clear();
Expand All @@ -477,7 +477,7 @@ else if (empty) {

@Override
@Nullable
public GroupedFlux<T, T> poll() {
public Flux<T> poll() {
return queue.poll();
}

Expand Down Expand Up @@ -506,23 +506,15 @@ public int requestFusion(int requestedMode) {
}
}

static final class WindowGroupedFlux<T> extends GroupedFlux<T, T>
static final class WindowFlux<T> extends Flux<T>
implements Fuseable, Fuseable.QueueSubscription<T>, InnerOperator<T, T> {

final T key;

@Override
@Nullable
public T key() {
return key;
}

final Queue<T> queue;

volatile WindowPredicateMain<T> parent;
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<WindowGroupedFlux, WindowPredicateMain>
PARENT = AtomicReferenceFieldUpdater.newUpdater(WindowGroupedFlux.class,
static final AtomicReferenceFieldUpdater<WindowFlux, WindowPredicateMain>
PARENT = AtomicReferenceFieldUpdater.newUpdater(WindowFlux.class,
WindowPredicateMain.class,
"parent");

Expand All @@ -531,37 +523,35 @@ public T key() {

volatile CoreSubscriber<? super T> actual;
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<WindowGroupedFlux, CoreSubscriber> ACTUAL =
AtomicReferenceFieldUpdater.newUpdater(WindowGroupedFlux.class,
static final AtomicReferenceFieldUpdater<WindowFlux, CoreSubscriber> ACTUAL =
AtomicReferenceFieldUpdater.newUpdater(WindowFlux.class,
CoreSubscriber.class,
"actual");

volatile boolean cancelled;

volatile int once;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<WindowGroupedFlux> ONCE =
AtomicIntegerFieldUpdater.newUpdater(WindowGroupedFlux.class, "once");
static final AtomicIntegerFieldUpdater<WindowFlux> ONCE =
AtomicIntegerFieldUpdater.newUpdater(WindowFlux.class, "once");

volatile int wip;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<WindowGroupedFlux> WIP =
AtomicIntegerFieldUpdater.newUpdater(WindowGroupedFlux.class, "wip");
static final AtomicIntegerFieldUpdater<WindowFlux> WIP =
AtomicIntegerFieldUpdater.newUpdater(WindowFlux.class, "wip");

volatile long requested;
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<WindowGroupedFlux> REQUESTED =
AtomicLongFieldUpdater.newUpdater(WindowGroupedFlux.class, "requested");
static final AtomicLongFieldUpdater<WindowFlux> REQUESTED =
AtomicLongFieldUpdater.newUpdater(WindowFlux.class, "requested");

volatile boolean enableOperatorFusion;

int produced;

WindowGroupedFlux(
@Nullable T key,
WindowFlux(
Queue<T> queue,
WindowPredicateMain<T> parent) {
this.key = key;
this.queue = queue;
this.parent = parent;
}
Expand Down Expand Up @@ -847,7 +837,7 @@ public Object scanUnsafe(Attr key) {

@Override
public String toString() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can safely remove that toString

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smaldini Done, thanks.

return "WindowGroupedFlux[" + key + "]";
return "WindowFlux";
}
}

Expand Down
Loading