From f4dd2770f5d58b149e3ce1eccbd69d64f3c8e36e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Basl=C3=A9?= Date: Mon, 8 Jan 2018 20:28:18 +0100 Subject: [PATCH] fix #969 Reimplement FluxBufferWhen and avoid buffer leaks --- .../core/publisher/FluxBufferWhen.java | 610 +++++++----------- .../core/publisher/FluxBufferWhenTest.java | 174 +++-- 2 files changed, 356 insertions(+), 428 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferWhen.java b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferWhen.java index f0a6d822e5..4e21e623fb 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferWhen.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferWhen.java @@ -16,15 +16,14 @@ package reactor.core.publisher; +import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; +import java.util.LinkedList; +import java.util.List; import java.util.Objects; import java.util.Queue; -import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Function; import java.util.function.Supplier; @@ -33,37 +32,40 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; +import reactor.core.Disposable; +import reactor.core.Disposables; import reactor.core.Exceptions; +import reactor.core.Scannable; import reactor.util.annotation.Nullable; -import reactor.util.context.Context; +import reactor.util.concurrent.MpscLinkedQueue; /** * buffers elements into possibly overlapping buffers whose boundaries are determined * by a start Publisher's element and a signal of a derived Publisher * * @param the source value type - * @param the value type of the publisher opening the buffers - * @param the value type of the publisher closing the individual buffers - * @param the collection type that holds the buffered values + * @param the value type of the publisher opening the buffers + * @param the value type of the publisher closing the individual buffers + * @param the collection type that holds the buffered values * * @see Reactive-Streams-Commons */ -final class FluxBufferWhen> - extends FluxOperator { +final class FluxBufferWhen> + extends FluxOperator { - final Publisher start; + final Publisher start; - final Function> end; + final Function> end; - final Supplier bufferSupplier; + final Supplier bufferSupplier; - final Supplier> queueSupplier; + final Supplier> queueSupplier; FluxBufferWhen(Flux source, - Publisher start, - Function> end, - Supplier bufferSupplier, - Supplier> queueSupplier) { + Publisher start, + Function> end, + Supplier bufferSupplier, + Supplier> queueSupplier) { super(source); this.start = Objects.requireNonNull(start, "start"); this.end = Objects.requireNonNull(end, "end"); @@ -77,525 +79,361 @@ public int getPrefetch() { } @Override - public void subscribe(CoreSubscriber actual) { + public void subscribe(CoreSubscriber actual) { + BufferWhenMainSubscriber main = + new BufferWhenMainSubscriber<>(actual, bufferSupplier, start, end); - Queue q = queueSupplier.get(); - - BufferStartEndMainSubscriber parent = - new BufferStartEndMainSubscriber<>(actual, bufferSupplier, q, end); - - actual.onSubscribe(parent); - - start.subscribe(parent.starter); - - source.subscribe(parent); + source.subscribe(main); } - static final class BufferStartEndMainSubscriber> - implements InnerOperator { - final Supplier bufferSupplier; - - final Queue queue; - - final Function> end; - final CoreSubscriber actual; - - Set endSubscriptions; - - final BufferStartEndStarter starter; - - Map buffers; - - volatile Subscription s; + static final class BufferWhenMainSubscriber> + extends QueueDrainSubscriber + implements Disposable { + final Publisher bufferOpen; + final Function> bufferClose; + final Supplier bufferSupplier; + final Composite resources; - @SuppressWarnings("rawtypes") - static final AtomicReferenceFieldUpdater - S = - AtomicReferenceFieldUpdater.newUpdater(BufferStartEndMainSubscriber.class, - Subscription.class, - "s"); + Subscription s; - volatile long requested; - @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater REQUESTED = - AtomicLongFieldUpdater.newUpdater(BufferStartEndMainSubscriber.class, - "requested"); + final List buffers; - long index; + volatile int windows; + static final AtomicIntegerFieldUpdater WINDOWS = + AtomicIntegerFieldUpdater.newUpdater(BufferWhenMainSubscriber.class, "windows"); - volatile int wip; - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater WIP = - AtomicIntegerFieldUpdater.newUpdater(BufferStartEndMainSubscriber.class, - "wip"); + BufferWhenOpenSubscriber bos; - volatile Throwable error; - @SuppressWarnings("rawtypes") - static final AtomicReferenceFieldUpdater - ERROR = AtomicReferenceFieldUpdater.newUpdater( - BufferStartEndMainSubscriber.class, - Throwable.class, - "error"); - - volatile boolean done; - - volatile boolean cancelled; - - volatile int open; - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater OPEN = - AtomicIntegerFieldUpdater.newUpdater(BufferStartEndMainSubscriber.class, - "open"); - - BufferStartEndMainSubscriber(CoreSubscriber actual, - Supplier bufferSupplier, - Queue queue, - Function> end) { - this.actual = actual; + BufferWhenMainSubscriber(CoreSubscriber actual, + Supplier bufferSupplier, + Publisher bufferOpen, + Function> bufferClose) { + super(actual, new MpscLinkedQueue<>()); + this.bufferOpen = bufferOpen; + this.bufferClose = bufferClose; this.bufferSupplier = bufferSupplier; - this.buffers = new HashMap<>(); - this.endSubscriptions = new HashSet<>(); - this.queue = queue; - this.end = end; - this.open = 1; - this.starter = new BufferStartEndStarter<>(this); + this.buffers = new LinkedList<>(); + this.resources = Disposables.composite(); } - @Override public void onSubscribe(Subscription s) { - if (Operators.setOnce(S, this, s)) { + if (Operators.validate(this.s, s)) { + this.s = s; + + BufferWhenOpenSubscriber bos = + new BufferWhenOpenSubscriber<>(this); + resources.add(bos); + this.bos = bos; //keep reference to remove and dispose it if source early onCompletes + + actual.onSubscribe(this); + + WINDOWS.lazySet(this, 1); + bufferOpen.subscribe(bos); + s.request(Long.MAX_VALUE); } } - @Override - public final CoreSubscriber actual() { - return actual; - } - @Override public void onNext(T t) { synchronized (this) { - Map set = buffers; - if (set != null) { - for (C b : set.values()) { - b.add(t); - } - return; + for (BUFFER b : buffers) { + b.add(t); } } - - Operators.onNextDropped(t, actual.currentContext()); } @Override public void onError(Throwable t) { - boolean report; + error = t; + done = true; + cancel(); + cancelled = true; synchronized (this) { - Map set = buffers; - if (set != null) { - buffers = null; - report = true; - } - else { - report = false; - } - } - - if (report) { - anyError(t); - } - else { - Operators.onErrorDropped(t, actual.currentContext()); + buffers.clear(); } + actual.onError(t); } @Override public void onComplete() { - Map set; + done = true; + resources.remove(bos); + bos.dispose(); + if (WINDOWS.decrementAndGet(this) == 0) { + complete(); + } + } + void complete() { + List list; synchronized (this) { - set = buffers; - if (set == null) { - return; - } + list = new ArrayList<>(buffers); + buffers.clear(); } - cancelStart(); - cancelEnds(); - - for (C b : set.values()) { - queue.offer(b); + Queue q = queue; + for (BUFFER u : list) { + q.offer(u); } done = true; - drain(); + if (enter()) { + drainMaxLoop(q, actual, false, this, this); + } } @Override public void request(long n) { - if (Operators.validate(n)) { - Operators.addCap(REQUESTED, this, n); - } - } - - void cancelMain() { - Operators.terminate(S, this); + requested(n); } - void cancelStart() { - starter.cancel(); - } - - void cancelEnds() { - Set set; - synchronized (starter) { - set = endSubscriptions; - - if (set == null) { - return; - } - endSubscriptions = null; - } - - for (Subscription s : set) { - s.cancel(); - } + @Override + public void dispose() { + resources.dispose(); } - boolean addEndSubscription(Subscription s) { - synchronized (starter) { - Set set = endSubscriptions; - - if (set != null) { - set.add(s); - return true; - } - } - s.cancel(); - return false; + @Override + public boolean isDisposed() { + return resources.isDisposed(); } @Override public void cancel() { if (!cancelled) { cancelled = true; - - cancelMain(); - - cancelStart(); - - cancelEnds(); + dispose(); } } - boolean emit(C b) { - long r = requested; - if (r != 0L) { - actual.onNext(b); - if (r != Long.MAX_VALUE) { - REQUESTED.decrementAndGet(this); - } - return true; - } - else { - - actual.onError(Exceptions.failWithOverflow( - "Could not emit buffer due to lack of requests")); - - return false; - } + @Override + public boolean accept(Subscriber a, BUFFER v) { + a.onNext(v); + return true; } - void anyError(Throwable t) { - if (Exceptions.addThrowable(ERROR, this, t)) { - done = true; - drain(); - } - else { - Operators.onErrorDropped(t, actual.currentContext()); + void open(OPEN window) { + if (cancelled) { + return; } - } - - void startNext(U u) { - - long idx = index; - index = idx + 1; - C b; + BUFFER b; try { - b = Objects.requireNonNull(bufferSupplier.get(), - "The bufferSupplier returned a null buffer"); - } - catch (Throwable e) { - anyError(Operators.onOperatorError(starter, e, u, actual.currentContext())); + b = Objects.requireNonNull(bufferSupplier.get(), "The buffer supplied is null"); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + onError(e); return; } - synchronized (this) { - Map set = buffers; - if (set == null) { - return; - } - - set.put(idx, b); - } - - Publisher p; + Publisher p; try { - p = Objects.requireNonNull(end.apply(u), - "The end returned a null publisher"); - } - catch (Throwable e) { - anyError(Operators.onOperatorError(starter, e, u, actual.currentContext())); + p = Objects.requireNonNull(bufferClose.apply(window), "The buffer closing publisher is null"); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + onError(e); return; } - BufferStartEndEnder end = new BufferStartEndEnder<>(this, b, idx); - - if (addEndSubscription(end)) { - OPEN.getAndIncrement(this); - - p.subscribe(end); - } - } - - void startError(Throwable e) { - anyError(e); - } - - void startComplete() { - if (OPEN.decrementAndGet(this) == 0) { - cancelAll(); - done = true; - drain(); + if (cancelled) { + return; } - } - - void cancelAll() { - cancelMain(); - - cancelStart(); - - cancelEnds(); - } - void endSignal(BufferStartEndEnder ender) { synchronized (this) { - Map set = buffers; - - if (set == null) { - return; - } - - if (set.remove(ender.index) == null) { + if (cancelled) { return; } - - queue.offer(ender.buffer); - } - if (OPEN.decrementAndGet(this) == 0) { - cancelAll(); - done = true; - } - drain(); - } - - void endError(Throwable e) { - anyError(e); - } - - void drain() { - if (WIP.getAndIncrement(this) != 0) { - return; + buffers.add(b); } - final Subscriber a = actual; - final Queue q = queue; - - int missed = 1; - - for (; ; ) { + BufferWhenCloseSubscriber bcs = + new BufferWhenCloseSubscriber<>(b, this); + resources.add(bcs); - for (; ; ) { - boolean d = done; + WINDOWS.getAndIncrement(this); - C b = q.poll(); - - boolean empty = b == null; - - if (checkTerminated(d, empty, a, q)) { - return; - } - - if (empty) { - break; - } - - long r = requested; - if (r != 0L) { - actual.onNext(b); - if (r != Long.MAX_VALUE) { - REQUESTED.decrementAndGet(this); - } - } - else { - anyError(Exceptions.failWithOverflow( - "Could not emit buffer due to lack of requests")); - } - } + p.subscribe(bcs); + } - missed = WIP.addAndGet(this, -missed); - if (missed == 0) { - break; + void openFinished(Disposable d) { + if (resources.remove(d)) { + if (WINDOWS.decrementAndGet(this) == 0) { + complete(); } } } - boolean checkTerminated(boolean d, boolean empty, Subscriber a, Queue q) { - if (cancelled) { - queue.clear(); - return true; + void close(BUFFER b, Disposable d) { + boolean e; + synchronized (this) { + e = buffers.remove(b); } - if (d) { - Throwable e = Exceptions.terminate(ERROR, this); - if (e != null && e != Exceptions.TERMINATED) { - cancel(); - queue.clear(); - a.onError(e); - return true; - } - else if (empty) { - a.onComplete(); - return true; + if (e) { + fastPathOrderedEmitMax(b, false, this); + } + + if (resources.remove(d)) { + if (WINDOWS.decrementAndGet(this) == 0) { + complete(); } } - return false; } @Override @Nullable public Object scanUnsafe(Attr key) { if (key == Attr.PARENT) return s; - if (key == Attr.TERMINATED) return done; - if (key == Attr.CANCELLED) return cancelled; if (key == Attr.PREFETCH) return Integer.MAX_VALUE; - if (key == Attr.BUFFERED) return buffers.values() - .stream() - .mapToInt(Collection::size) - .sum(); - if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return requested; + if (key == Attr.BUFFERED) return buffers.stream() + .mapToInt(Collection::size) + .sum(); - return InnerOperator.super.scanUnsafe(key); + return super.scanUnsafe(key); } } - static final class BufferStartEndStarter extends Operators.DeferredSubscription - implements InnerConsumer { + static final class BufferWhenOpenSubscriber> + implements Disposable, InnerConsumer { + + volatile Subscription subscription; + static final AtomicReferenceFieldUpdater SUBSCRIPTION = + AtomicReferenceFieldUpdater.newUpdater(BufferWhenOpenSubscriber.class, Subscription.class, "subscription"); - final BufferStartEndMainSubscriber main; + final BufferWhenMainSubscriber parent; + boolean done; - BufferStartEndStarter(BufferStartEndMainSubscriber main) { - this.main = main; + BufferWhenOpenSubscriber(BufferWhenMainSubscriber parent) { + this.parent = parent; } @Override public void onSubscribe(Subscription s) { - if (set(s)) { - s.request(Long.MAX_VALUE); + if (Operators.setOnce(SUBSCRIPTION, this, s)) { + subscription.request(Long.MAX_VALUE); } } @Override - public void onNext(U t) { - main.startNext(t); + public void dispose() { + Operators.terminate(SUBSCRIPTION, this); } @Override - public void onError(Throwable t) { - main.startError(t); + public boolean isDisposed() { + return subscription == Operators.cancelledSubscription(); } @Override - public void onComplete() { - main.startComplete(); + public void onNext(OPEN t) { + if (done) { + return; + } + parent.open(t); + } + + @Override + public void onError(Throwable t) { + if (done) { + Operators.onErrorDropped(t, parent.actual.currentContext()); + return; + } + done = true; + parent.onError(t); } @Override - public Context currentContext() { - return main.currentContext(); + public void onComplete() { + if (done) { + return; + } + done = true; + parent.openFinished(this); } @Override @Nullable public Object scanUnsafe(Attr key) { if (key == Attr.ACTUAL) { - return main; + return parent; } - return super.scanUnsafe(key); + if (key == Attr.PARENT) return subscription; + if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return Long.MAX_VALUE; + if (key == Attr.CANCELLED) return isDisposed(); + + return null; } } - static final class BufferStartEndEnder> - extends Operators.DeferredSubscription implements InnerConsumer { + static final class BufferWhenCloseSubscriber> + implements Disposable, InnerConsumer { - final BufferStartEndMainSubscriber main; + volatile Subscription subscription; + static final AtomicReferenceFieldUpdater SUBSCRIPTION = + AtomicReferenceFieldUpdater.newUpdater(BufferWhenCloseSubscriber.class, Subscription.class, "subscription"); - final C buffer; + final BufferWhenMainSubscriber parent; + final BUFFER value; + boolean done; - final long index; - BufferStartEndEnder(BufferStartEndMainSubscriber main, - C buffer, - long index) { - this.main = main; - this.buffer = buffer; - this.index = index; + BufferWhenCloseSubscriber(BUFFER value, BufferWhenMainSubscriber parent) { + this.parent = parent; + this.value = value; } @Override - public Context currentContext() { - return main.currentContext(); + public void onSubscribe(Subscription s) { + if (Operators.setOnce(SUBSCRIPTION, this, s)) { + subscription.request(Long.MAX_VALUE); + } } @Override - @Nullable - public Object scanUnsafe(Attr key) { - if (key == Attr.ACTUAL) { - return main; - } - return super.scanUnsafe(key); + public void dispose() { + Operators.terminate(SUBSCRIPTION, this); } @Override - public void onSubscribe(Subscription s) { - if (set(s)) { - s.request(Long.MAX_VALUE); - } + public boolean isDisposed() { + return subscription == Operators.cancelledSubscription(); } @Override - public void onNext(V t) { - if (!isCancelled()) { - cancel(); - - main.endSignal(this); - } + public void onNext(CLOSE t) { + onComplete(); } @Override public void onError(Throwable t) { - main.endError(t); + if (done) { + Operators.onErrorDropped(t, parent.actual.currentContext()); + return; + } + parent.onError(t); } @Override public void onComplete() { - if (!isCancelled()) { - main.endSignal(this); + if (done) { + return; } + done = true; + parent.close(value, this); } + @Override + @Nullable + public Object scanUnsafe(Attr key) { + if (key == Attr.ACTUAL) { + return parent; + } + if (key == Attr.PARENT) return subscription; + if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return Long.MAX_VALUE; + if (key == Attr.CANCELLED) return isDisposed(); + + return null; + } } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferWhenTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferWhenTest.java index 9f51d0ee5c..f61dc9b64b 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferWhenTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferWhenTest.java @@ -21,7 +21,11 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; +import org.assertj.core.api.Condition; import org.junit.Assert; import org.junit.Test; import org.reactivestreams.Subscription; @@ -29,12 +33,84 @@ import reactor.core.Scannable; import reactor.test.StepVerifier; import reactor.test.subscriber.AssertSubscriber; +import reactor.util.Logger; +import reactor.util.Loggers; import reactor.util.concurrent.Queues; +import reactor.util.function.Tuple3; +import reactor.util.function.Tuples; import static org.assertj.core.api.Assertions.assertThat; public class FluxBufferWhenTest { + private static final Logger LOGGER = Loggers.getLogger(FluxBufferWhenTest.class); + + @Test + public void gh969_timedOutBuffersDontLeak() throws InterruptedException { + LongAdder created = new LongAdder(); + LongAdder finalized = new LongAdder(); + class Wrapper { + + final int i; + + Wrapper(int i) { + created.increment(); + this.i = i; + } + + @Override + public String toString() { + return "{i=" + i + '}'; + } + + @Override + protected void finalize() { + finalized.increment(); + } + } + + final CountDownLatch latch = new CountDownLatch(1); + final UnicastProcessor processor = UnicastProcessor.create(); + + Flux emitter = Flux.range(1, 400) + .delayElements(Duration.ofMillis(10)) + .doOnNext(i -> processor.onNext(new Wrapper(i))) + .doOnError(processor::onError) + .doOnComplete(processor::onComplete); + + Mono>> buffers = + processor.buffer(Duration.ofMillis(1000), Duration.ofMillis(500)) + .filter(b -> b.size() > 0) + .index() + .doOnNext(it -> System.gc()) + //index, bounds of buffer, finalized + .map(t2 -> Tuples.of(t2.getT1(), + String.format("from %s to %s", t2.getT2().get(0), + t2.getT2().get(t2.getT2().size() - 1)), + finalized.longValue())) + .doOnNext(v -> LOGGER.info(v.toString())) + .doOnComplete(latch::countDown) + .collectList(); + + emitter.subscribe(); + List> finalizeStats = buffers.block(Duration.ofSeconds(10)); + + Condition> hasFinalized = new Condition<>( + t3 -> t3.getT3() > 0, "has finalized"); + + //at least 5 intermediate finalize + assertThat(finalizeStats).areAtLeast(5, hasFinalized); + + latch.await(10, TimeUnit.SECONDS); + LOGGER.debug("final GC"); + System.gc(); + Thread.sleep(500); + + assertThat(finalized.longValue()) + .as("final GC collects all") + .isEqualTo(created.longValue()); + } + @Test public void normal() { AssertSubscriber> ts = AssertSubscriber.create(); @@ -96,41 +172,41 @@ public void normal() { public void startCompletes() { AssertSubscriber> ts = AssertSubscriber.create(); - DirectProcessor sp1 = DirectProcessor.create(); - DirectProcessor sp2 = DirectProcessor.create(); - DirectProcessor sp3 = DirectProcessor.create(); + DirectProcessor source = DirectProcessor.create(); + DirectProcessor open = DirectProcessor.create(); + DirectProcessor close = DirectProcessor.create(); - sp1.bufferWhen(sp2, v -> sp3) + source.bufferWhen(open, v -> close) .subscribe(ts); ts.assertNoValues() .assertNoError() .assertNotComplete(); - sp1.onNext(1); + source.onNext(1); ts.assertNoValues() .assertNoError() .assertNotComplete(); - sp2.onNext(1); - sp2.onComplete(); + open.onNext(1); + open.onComplete(); - Assert.assertTrue("sp3 has no subscribers?", sp3.hasDownstreams()); + Assert.assertTrue("close has no subscribers?", close.hasDownstreams()); - sp1.onNext(2); - sp1.onNext(3); - sp1.onNext(4); + source.onNext(2); + source.onNext(3); + source.onNext(4); - sp3.onComplete(); + close.onComplete(); ts.assertValues(Arrays.asList(2, 3, 4)) .assertNoError() .assertComplete(); - Assert.assertFalse("sp1 has subscribers?", sp1.hasDownstreams()); - Assert.assertFalse("sp2 has subscribers?", sp2.hasDownstreams()); - Assert.assertFalse("sp3 has subscribers?", sp3.hasDownstreams()); +// Assert.assertFalse("source has subscribers?", source.hasDownstreams()); //FIXME + Assert.assertFalse("open has subscribers?", open.hasDownstreams()); + Assert.assertFalse("close has subscribers?", close.hasDownstreams()); } @@ -236,8 +312,9 @@ public void bufferWillSubdivideAnInputFluxGapTime() { public void scanStartEndMain() { CoreSubscriber> actual = new LambdaSubscriber<>(null, e -> {}, null, null); - FluxBufferWhen.BufferStartEndMainSubscriber> test = new FluxBufferWhen.BufferStartEndMainSubscriber<>( - actual, ArrayList::new, Queues.>one().get(), u -> Mono.just(1L)); + FluxBufferWhen.BufferWhenMainSubscriber> test = + new FluxBufferWhen.BufferWhenMainSubscriber>( + actual, ArrayList::new, Flux.never(), u -> Mono.just(1L)); //use never to never open and early finish Subscription parent = Operators.emptySubscription(); test.onSubscribe(parent); test.request(100L); @@ -247,7 +324,7 @@ public void scanStartEndMain() { assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse(); assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(Integer.MAX_VALUE); assertThat(test.scan(Scannable.Attr.BUFFERED)).isEqualTo(0); //TODO - assertThat(test.scan(Scannable.Attr.REQUESTED_FROM_DOWNSTREAM)).isEqualTo(100L); + assertThat(test.scan(Scannable.Attr.REQUESTED_FROM_DOWNSTREAM)).isEqualTo(Long.MAX_VALUE); assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual); test.onError(new IllegalStateException("boom")); @@ -258,25 +335,43 @@ public void scanStartEndMain() { public void scanStartEndMainCancelled() { CoreSubscriber> actual = new LambdaSubscriber<>(null, e -> {}, null, null); - FluxBufferWhen.BufferStartEndMainSubscriber> test = new FluxBufferWhen.BufferStartEndMainSubscriber<>( - actual, ArrayList::new, Queues.>one().get(), u -> Mono.just(1L)); + FluxBufferWhen.BufferWhenMainSubscriber> test = + new FluxBufferWhen.BufferWhenMainSubscriber>( + actual, ArrayList::new, Flux.never(), u -> Mono.just(1L)); Subscription parent = Operators.emptySubscription(); test.onSubscribe(parent); test.cancel(); assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue(); } + @Test + public void scanStartEndMainCompleted() { + CoreSubscriber> actual = new LambdaSubscriber<>(null, e -> {}, null, null); + + FluxBufferWhen.BufferWhenMainSubscriber> test = + new FluxBufferWhen.BufferWhenMainSubscriber>( + actual, ArrayList::new, Flux.never(), u -> Mono.just(1L)); + Subscription parent = Operators.emptySubscription(); + test.onSubscribe(parent); + assertThat(test.scan(Scannable.Attr.TERMINATED)).isFalse(); + + test.complete(); + assertThat(test.scan(Scannable.Attr.TERMINATED)).isTrue(); + } + @Test - public void scanStartEndEnder() { + public void scanWhenCloseSubscriber() { //noinspection ConstantConditions - FluxBufferWhen.BufferStartEndMainSubscriber> main = new FluxBufferWhen.BufferStartEndMainSubscriber<>( - null, ArrayList::new, Queues.>one().get(), u -> Mono.just(1L)); + FluxBufferWhen.BufferWhenMainSubscriber> main = + new FluxBufferWhen.BufferWhenMainSubscriber<>(null, + ArrayList::new, + Mono.just(1), + u -> Mono.just(1L)); - FluxBufferWhen.BufferStartEndEnder test = new FluxBufferWhen.BufferStartEndEnder<>(main, Arrays.asList("foo", "bar"), 1); + FluxBufferWhen.BufferWhenCloseSubscriber test = new FluxBufferWhen.BufferWhenCloseSubscriber<>(Arrays.asList("foo", "bar"), main); - test.request(4); //request is forwarded directly to parent, no parent = we track it - assertThat(test.scan(Scannable.Attr.REQUESTED_FROM_DOWNSTREAM)).isEqualTo(4L); + assertThat(test.scan(Scannable.Attr.REQUESTED_FROM_DOWNSTREAM)).isEqualTo(Long.MAX_VALUE); Subscription parent = Operators.emptySubscription(); test.onSubscribe(parent); @@ -284,38 +379,33 @@ public void scanStartEndEnder() { assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(main); assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent); assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse(); - assertThat(test.scan(Scannable.Attr.REQUESTED_FROM_DOWNSTREAM)).isEqualTo(0L); - test.request(2); //request is forwarded directly to parent - assertThat(test.scan(Scannable.Attr.REQUESTED_FROM_DOWNSTREAM)).isEqualTo(0L); - - test.cancel(); + test.dispose(); + assertThat(test.isDisposed()).isTrue(); assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue(); } @Test - public void scanStartEndStarter() { + public void scanWhenOpenSubscriber() { //noinspection ConstantConditions - FluxBufferWhen.BufferStartEndMainSubscriber> main = new FluxBufferWhen.BufferStartEndMainSubscriber<>( - null, ArrayList::new, Queues.>one().get(), u -> Mono.just(1L)); + FluxBufferWhen.BufferWhenMainSubscriber> main = new FluxBufferWhen.BufferWhenMainSubscriber<>( + null, ArrayList::new, Mono.just(1), u -> Mono.just(1L)); - FluxBufferWhen.BufferStartEndStarter test = new FluxBufferWhen.BufferStartEndStarter<>(main); + FluxBufferWhen.BufferWhenOpenSubscriber test = new FluxBufferWhen.BufferWhenOpenSubscriber<>(main); - test.request(4); //request is forwarded directly to parent, no parent = we track it - assertThat(test.scan(Scannable.Attr.REQUESTED_FROM_DOWNSTREAM)).isEqualTo(4L); Subscription parent = Operators.emptySubscription(); test.onSubscribe(parent); + assertThat(test.scan(Scannable.Attr.REQUESTED_FROM_DOWNSTREAM)).isEqualTo(Long.MAX_VALUE); assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(main); assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent); assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse(); - assertThat(test.scan(Scannable.Attr.REQUESTED_FROM_DOWNSTREAM)).isEqualTo(0L); - test.request(2); //request is forwarded directly to parent - assertThat(test.scan(Scannable.Attr.REQUESTED_FROM_DOWNSTREAM)).isEqualTo(0L); - - test.cancel(); + test.dispose(); + assertThat(test.isDisposed()).isTrue(); assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue(); } + + //TODO test gh994 }