Skip to content

Commit

Permalink
Merge pull request #1261 from cescoffier/empty-empty-list-on-no-item-…
Browse files Browse the repository at this point in the history
…in-multi-buffer-with-timeout

Emit empty list when grouping item by time windows and no item in the window
  • Loading branch information
jponge authored Apr 26, 2023
2 parents 1f8aa84 + e255dfd commit 69afc8d
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 10 deletions.
10 changes: 9 additions & 1 deletion implementation/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,15 @@
"criticality" : "highlight",
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [ ]
"differences" : [
{
"ignore": true,
"code": "java.method.numberOfParametersChanged",
"old": "method void io.smallrye.mutiny.operators.multi.MultiBufferWithTimeoutOp<T>::<init>(io.smallrye.mutiny.Multi<T>, int, java.time.Duration, java.util.concurrent.ScheduledExecutorService)",
"new": "method void io.smallrye.mutiny.operators.multi.MultiBufferWithTimeoutOp<T>::<init>(io.smallrye.mutiny.Multi<T>, int, java.time.Duration, java.util.concurrent.ScheduledExecutorService, boolean)",
"justification": "New method adding the possibility to emit empty lists"
}
]
}
}, {
"extension" : "revapi.reporter.json",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public MultiGroupIntoLists(Multi<T> upstream) {
* <p>
* The resulting {@link Multi} emits connected, non-overlapping lists, each of a fixed duration specified by the
* {@code duration} parameter. If, during the configured time window, no items are emitted by the upstream
* {@link Multi}, an empty list is emitted by the returned {@link Multi}.
* {@link Multi}, nothing is emitted downstream.
* <p>
* When the upstream {@link Multi} sends the completion event, the resulting {@link Multi} emits the current list
* and propagates the completion event.
Expand All @@ -40,7 +40,36 @@ public MultiGroupIntoLists(Multi<T> upstream) {
public Multi<List<T>> every(Duration duration) {
return Infrastructure.onMultiCreation(new MultiBufferWithTimeoutOp<>(upstream, Integer.MAX_VALUE,
validate(duration, "duration"),
Infrastructure.getDefaultWorkerPool()));
Infrastructure.getDefaultWorkerPool(),
false));
}

/**
* Creates a {@link Multi} that emits lists of items collected from the observed {@link Multi}.
* <p>
* The resulting {@link Multi} emits connected, non-overlapping lists, each of a fixed duration specified by the
* {@code duration} parameter. If, during the configured time window, no items are emitted by the upstream
* {@link Multi}, an empty list is emitted by the returned {@link Multi} if {@code emitEmptyListIfNoItems} is set to
* {@code true}.
* <p>
* When the upstream {@link Multi} sends the completion event, the resulting {@link Multi} emits the current list
* and propagates the completion event.
* <p>
* If the upstream {@link Multi} sends a failure, the failure is propagated immediately.
*
* @param duration the period of time each list collects items before it is emitted and replaced with a new
* list. Must be non {@code null} and positive.
* @param emitEmptyListIfNoItems emits an empty list if no items from the upstream have been received during the
* time window
* @return a Multi that emits every {@code duration} with the items emitted by the upstream multi during the time
* window.
*/
@CheckReturnValue
public Multi<List<T>> every(Duration duration, boolean emitEmptyListIfNoItems) {
return Infrastructure.onMultiCreation(new MultiBufferWithTimeoutOp<>(upstream, Integer.MAX_VALUE,
validate(duration, "duration"),
Infrastructure.getDefaultWorkerPool(),
emitEmptyListIfNoItems));
}

/**
Expand Down Expand Up @@ -108,6 +137,6 @@ public Multi<List<T>> of(int size, int skip) {
@CheckReturnValue
public Multi<List<T>> of(int size, Duration maximumDelay) {
return Infrastructure.onMultiCreation(new MultiBufferWithTimeoutOp<>(upstream, positive(size, "size"),
validate(maximumDelay, "maximumDelay"), Infrastructure.getDefaultWorkerPool()));
validate(maximumDelay, "maximumDelay"), Infrastructure.getDefaultWorkerPool(), false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,18 @@ public final class MultiBufferWithTimeoutOp<T> extends AbstractMultiOperator<T,
private final Supplier<List<T>> supplier;
private final ScheduledExecutorService scheduler;
private final Duration timeout;
private final boolean emitEmptyListIfNoItem;

public MultiBufferWithTimeoutOp(Multi<T> upstream,
int size,
Duration timeout,
ScheduledExecutorService scheduler) {
ScheduledExecutorService scheduler,
boolean emitEmptyListIfNoItem) {
super(upstream);
this.timeout = ParameterValidation.validate(timeout, "timeout");
this.size = ParameterValidation.positive(size, "size");
this.scheduler = ParameterValidation.nonNull(scheduler, "scheduler");
this.emitEmptyListIfNoItem = emitEmptyListIfNoItem;
this.supplier = () -> {
if (size < Integer.MAX_VALUE) {
// Not used yet, on the roadmap.
Expand All @@ -57,7 +60,7 @@ public MultiBufferWithTimeoutOp(Multi<T> upstream,
@Override
public void subscribe(MultiSubscriber<? super List<T>> downstream) {
MultiBufferWithTimeoutProcessor<T> subscriber = new MultiBufferWithTimeoutProcessor<>(
new SerializedSubscriber<>(downstream), size, timeout, scheduler, supplier);
new SerializedSubscriber<>(downstream), size, timeout, scheduler, supplier, emitEmptyListIfNoItem);
upstream.subscribe().withSubscriber(subscriber);
}

Expand All @@ -77,23 +80,25 @@ static class MultiBufferWithTimeoutProcessor<T> extends MultiOperatorProcessor<T
private final AtomicInteger terminated = new AtomicInteger(RUNNING);
private final AtomicLong requested = new AtomicLong();
private final AtomicInteger index = new AtomicInteger();
private final boolean emitEmptyListIfNoItem;
private List<T> current;
private ScheduledFuture<?> task;

MultiBufferWithTimeoutProcessor(MultiSubscriber<? super List<T>> downstream, int size, Duration timeout,
ScheduledExecutorService executor, Supplier<List<T>> supplier) {
ScheduledExecutorService executor, Supplier<List<T>> supplier, boolean emitEmptyListIfNoItem) {
super(downstream);
this.duration = timeout;
this.executor = executor;
this.supplier = supplier;
this.size = size;
this.emitEmptyListIfNoItem = emitEmptyListIfNoItem;

this.flush = () -> {
if (terminated.get() == RUNNING) {
int index;
for (;;) {
index = this.index.get();
if (index == 0) {
if (index == 0 && !emitEmptyListIfNoItem) {
return;
}
if (this.index.compareAndSet(index, 0)) {
Expand All @@ -107,6 +112,13 @@ static class MultiBufferWithTimeoutProcessor<T> extends MultiOperatorProcessor<T

private void doOnSubscribe() {
current = supplier.get();
if (emitEmptyListIfNoItem) {
try {
task = executor.schedule(flush, duration.toMillis(), TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException rejected) {
onFailure(rejected);
}
}
}

void nextCallback(T value) {
Expand All @@ -127,7 +139,7 @@ private void flushCallback() {
} else {
cur = Collections.emptyList();
}
if (!cur.isEmpty()) {
if (!cur.isEmpty() || emitEmptyListIfNoItem) {
current = supplier.get();
flush = true;
}
Expand All @@ -136,7 +148,11 @@ private void flushCallback() {
if (flush) {
long req = requested.get();
MultiSubscriber<? super List<T>> subscriber = downstream;
if (emitEmptyListIfNoItem && terminated.get() == RUNNING) {
task = executor.schedule(this.flush, duration.toMillis(), TimeUnit.MILLISECONDS);
}
if (req != 0L) {

if (req != Long.MAX_VALUE) {
long next;
for (;;) {
Expand Down Expand Up @@ -172,7 +188,7 @@ public void onItem(final T value) {
}
}

if (index == 1) {
if (index == 1 && !emitEmptyListIfNoItem) { // If emitEmptyListIfNoItem, the task has been started in subscribe
try {
task = executor.schedule(flush, duration.toMillis(), TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException rejected) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import org.assertj.core.api.Condition;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
Expand All @@ -35,6 +36,7 @@
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.mutiny.test.Mocks;
import io.smallrye.mutiny.unchecked.Unchecked;
import junit5.support.InfrastructureResource;

@ResourceLock(value = InfrastructureResource.NAME, mode = ResourceAccessMode.READ_WRITE)
Expand Down Expand Up @@ -203,6 +205,39 @@ public void testAsListsWithDuration() {
publisher.assertCancelled();
}

@Test
public void testAsListsWithDurationWithNoItems() {
MultiOnCancellationSpy<Long> spy = Spy.onCancellation(Multi.createFrom().<Long> nothing());
AssertSubscriber<List<Long>> subscriber = spy
.group().intoLists().every(Duration.ofMillis(100), true)
.subscribe().withSubscriber(AssertSubscriber.create(100));

subscriber.awaitItems(3).cancel();
spy.assertCancelled();
assertThat(subscriber.getItems()).allSatisfy(l -> assertThat(l).isEmpty());
}

@Test
public void testAsListsWithDurationWithItemsAndNoItems() {
AssertSubscriber<List<Long>> subscriber = Multi.createFrom().<Long> emitter(Unchecked.consumer(e -> {
e.emit(1L);
e.emit(2L);
e.emit(3L);
Thread.sleep(200);
e.emit(4L);
e.emit(5L);
Thread.sleep(400);
e.complete();
}))
.group().intoLists().every(Duration.ofMillis(100), true)
.log()
.subscribe().withSubscriber(AssertSubscriber.create(100));

subscriber.awaitCompletion();
assertThat(subscriber.getItems()).hasSizeGreaterThanOrEqualTo(5);
assertThat(subscriber.getItems()).areAtLeastOne(new Condition<>(List::isEmpty, "empty"));
}

@Test
public void testAsListsWithDurationWithCompletion() {
Multi<Long> publisher = Multi.createFrom().publisher(Multi.createFrom().ticks().every(Duration.ofMillis(2)))
Expand Down

0 comments on commit 69afc8d

Please sign in to comment.