Skip to content

Commit

Permalink
feat: allow failure retries with exponential backoffs and 'until' pre…
Browse files Browse the repository at this point in the history
…dicates

Refs: #1510
  • Loading branch information
jponge committed Feb 28, 2024
1 parent cc800eb commit 4e979e0
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.smallrye.mutiny.groups;

import static io.smallrye.mutiny.helpers.ExponentialBackoff.backoffWithPredicateFactory;
import static io.smallrye.mutiny.helpers.ExponentialBackoff.noBackoffPredicateFactory;
import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull;
import static io.smallrye.mutiny.helpers.ParameterValidation.validate;

Expand All @@ -11,7 +13,6 @@

import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.ExponentialBackoff;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.infrastructure.Infrastructure;
Expand Down Expand Up @@ -97,7 +98,6 @@ public Multi<T> atMost(long numberOfAttempts) {
* @return a new {@link Multi} retrying to subscribe to the current
* {@link Multi} until it gets an item or until expiration {@code expireAt}. When the expiration is reached,
* the last failure is propagated.
*
* @throws IllegalArgumentException if back off not configured,
*/
@CheckReturnValue
Expand Down Expand Up @@ -126,7 +126,6 @@ public Multi<T> expireAt(long expireAt) {
* @return a new {@link Multi} retrying to subscribe to the current
* {@link Multi} until it gets an item or until expiration {@code expireIn}. When the expiration is reached,
* the last failure is propagated.
*
* @throws IllegalArgumentException if back off not configured,
*/
@CheckReturnValue
Expand All @@ -146,25 +145,14 @@ public Multi<T> expireIn(long expireIn) {
@CheckReturnValue
public Multi<T> until(Predicate<? super Throwable> predicate) {
Predicate<? super Throwable> actual = Infrastructure.decorate(nonNull(predicate, "predicate"));
Function<Multi<Throwable>, Publisher<Long>> whenStreamFactory;
if (backOffConfigured) {
throw new IllegalArgumentException(
"Invalid retry configuration, `until` cannot be used with a back-off configuration");
ScheduledExecutorService pool = (this.executor == null) ? Infrastructure.getDefaultWorkerPool() : this.executor;
whenStreamFactory = backoffWithPredicateFactory(initialBackOff, jitter, maxBackoff, predicate, pool);
} else {
whenStreamFactory = noBackoffPredicateFactory(predicate);
}
Function<Multi<Throwable>, Publisher<Long>> whenStreamFactory = stream -> stream.onItem()
.transformToUni(failure -> Uni.createFrom().<Long> emitter(emitter -> {
try {
if (actual.test(failure)) {
emitter.complete(1L);
} else {
emitter.fail(failure);
}
} catch (Throwable ex) {
emitter.fail(ex);
}
}))
.concatenate();
return Infrastructure
.onMultiCreation(new MultiRetryWhenOp<>(upstream, onFailurePredicate, whenStreamFactory));
return Infrastructure.onMultiCreation(new MultiRetryWhenOp<>(upstream, onFailurePredicate, whenStreamFactory));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.smallrye.mutiny.groups;

import static io.smallrye.mutiny.helpers.ExponentialBackoff.backoffWithPredicateFactory;
import static io.smallrye.mutiny.helpers.ExponentialBackoff.noBackoffPredicateFactory;
import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull;
import static io.smallrye.mutiny.helpers.ParameterValidation.validate;

Expand Down Expand Up @@ -134,24 +136,21 @@ public Uni<T> expireIn(long expireIn) {
* must not be {@code null}. If the predicate returns {@code true} for the given failure, a
* re-subscription is attempted.
* @return the new {@code Uni} instance
* @throws IllegalArgumentException if back off configured
*/
@CheckReturnValue
public Uni<T> until(Predicate<? super Throwable> predicate) {
ParameterValidation.nonNull(predicate, "predicate");
Function<Multi<Throwable>, Flow.Publisher<Long>> whenStreamFactory = stream -> stream.onItem()
.transformToUniAndConcatenate(failure -> {
try {
if (predicate.test(failure)) {
return Uni.createFrom().item(1L);
} else {
return Uni.createFrom().failure(failure);
}
} catch (Throwable err) {
return Uni.createFrom().failure(err);
}
});
return when(whenStreamFactory);
Function<Multi<Throwable>, Flow.Publisher<Long>> whenStreamFactory;
if (backOffConfigured) {
ScheduledExecutorService pool = (this.executor == null) ? Infrastructure.getDefaultWorkerPool() : this.executor;
whenStreamFactory = backoffWithPredicateFactory(initialBackOffDuration, jitter, maxBackoffDuration, predicate,
pool);
} else {
whenStreamFactory = noBackoffPredicateFactory(predicate);
}
Function<Multi<Throwable>, ? extends Flow.Publisher<?>> actual = Infrastructure
.decorate(nonNull(whenStreamFactory, "whenStreamFactory"));
return upstream.toMulti().onFailure(this.onFailurePredicate).retry().when(actual).toUni();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
Expand Down Expand Up @@ -144,4 +145,54 @@ private static Duration getNextAttemptDelay(Duration firstBackoff, Duration maxB
}
return nextBackoff;
}

public static Function<Multi<Throwable>, Publisher<Long>> backoffWithPredicateFactory(final Duration initialBackOff,
final double jitter, final Duration maxBackoff, Predicate<? super Throwable> predicate,
ScheduledExecutorService pool) {
return new Function<>() {
int index = 0;

@Override
public Publisher<Long> apply(Multi<Throwable> stream) {
return stream.onItem()
.transformToUniAndConcatenate(failure -> {
int iteration = index++;
try {
if (predicate.test(failure)) {
Duration delay = getNextDelay(initialBackOff, maxBackoff, jitter,
iteration);
return Uni.createFrom().item((long) iteration)
.onItem().delayIt().onExecutor(pool).by(delay);
} else {
return Uni.createFrom().failure(failure);
}
} catch (Throwable err) {
failure.addSuppressed(err);
return Uni.createFrom().failure(failure);
}
});
}
};
}

public static Function<Multi<Throwable>, Publisher<Long>> noBackoffPredicateFactory(
Predicate<? super Throwable> predicate) {
return new Function<>() {
@Override
public Publisher<Long> apply(Multi<Throwable> stream) {
return stream.onItem()
.transformToUniAndConcatenate(failure -> {
try {
if (predicate.test(failure)) {
return Uni.createFrom().item(1L);
} else {
return Uni.createFrom().failure(failure);
}
} catch (Throwable err) {
return Uni.createFrom().failure(err);
}
});
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -594,4 +597,54 @@ void avoidSpuriousInterruptsWithBackoff() {
assertThat(result).isEqualTo("yolo");
assertThat(interrupted).isFalse();
}

@Test
void backoffWithUntilPredicateAlwaysTrue() {
AtomicInteger counter = new AtomicInteger();
ArrayList<Long> timestamps = new ArrayList<>();
String result = Uni.createFrom().<String> emitter(emitter -> {
timestamps.add(System.currentTimeMillis());
if (counter.getAndIncrement() < 5) {
emitter.fail(new IOException("boom"));
} else {
emitter.complete("ok");
}
})
.onFailure().retry().withBackOff(Duration.ofMillis(100), Duration.ofSeconds(1)).withJitter(0).until(err -> true)
.await().atMost(Duration.ofSeconds(5));

assertThat(result).isEqualTo("ok");
assertThat(timestamps).hasSize(6);

ArrayList<Long> diffs = new ArrayList<>();
int index = timestamps.size() - 1;
while (index > 0) {
diffs.add(timestamps.get(index) - timestamps.get(index - 1));
index = index - 1;
}
assertThat(diffs)
.doesNotHaveDuplicates()
.isSortedAccordingTo(Comparator.reverseOrder());
}

@Test
void backoffWithUntilPredicateEventuallyFailing() {
AtomicInteger counter = new AtomicInteger();
ArrayList<Long> timestamps = new ArrayList<>();

assertThrows(CompletionException.class, () -> Uni.createFrom().<String> emitter(emitter -> {
timestamps.add(System.currentTimeMillis());
if (counter.getAndIncrement() < 5) {
emitter.fail(new IOException("boom"));
} else {
emitter.complete("ok");
}
})
.onFailure().retry().withBackOff(Duration.ofMillis(100), Duration.ofSeconds(1)).withJitter(0)
.until(err -> counter.get() < 2)
.await().atMost(Duration.ofSeconds(5)));

assertThat(timestamps).hasSize(2);
assertThat(timestamps.get(1) - timestamps.get(0)).isGreaterThan(0L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,6 @@ public void testThatYouCannotUseWhenIfBackoffIsConfigured() {
.onFailure().retry().withBackOff(Duration.ofSeconds(1)).when(t -> Multi.createFrom().item(t)));
}

@Test
public void testThatYouCannotUseUntilIfBackoffIsConfigured() {
assertThrows(IllegalArgumentException.class, () -> Multi.createFrom().item("hello")
.onFailure().retry().withBackOff(Duration.ofSeconds(1)).until(t -> true));
}

@Test
public void testJitterValidation() {
assertThrows(IllegalArgumentException.class, () -> Multi.createFrom().item("hello")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

Expand Down Expand Up @@ -190,4 +193,70 @@ public void testWithPredicateReturningFalse() {
subscriber.assertItems(0, 1).assertFailedWith(Exception.class, "boom");
}

@Test
public void testWithBackoffAndUntilAndAlwaysTruePredicate() {
AtomicInteger counter = new AtomicInteger();
ArrayList<Long> timestamps = new ArrayList<>();
AssertSubscriber<Integer> sub = Multi.createFrom().<Integer> emitter(emitter -> {
timestamps.add(System.currentTimeMillis());
emitter.emit(1);
emitter.emit(2);
emitter.emit(3);
if (counter.incrementAndGet() == 5) {
emitter.complete();
} else {
emitter.fail(new IOException("boom"));
}
})
.onFailure().retry().withBackOff(Duration.ofMillis(100), Duration.ofSeconds(1)).withJitter(0).until(err -> true)
.subscribe().withSubscriber(AssertSubscriber.create());

sub.request(256);
sub.awaitCompletion();
List<Integer> items = sub.getItems();
assertThat(items)
.hasSize(15)
.startsWith(1, 2, 3)
.endsWith(1, 2, 3);

assertThat(timestamps)
.hasSize(5);
assertThat(timestamps.get(4) - timestamps.get(3))
.isGreaterThan(timestamps.get(3) - timestamps.get(2));
assertThat(timestamps.get(3) - timestamps.get(2))
.isGreaterThan(timestamps.get(2) - timestamps.get(1));
}

@Test
public void testWithBackoffAndUntilAndEventualFailure() {
AtomicInteger counter = new AtomicInteger();
ArrayList<Long> timestamps = new ArrayList<>();
AssertSubscriber<Integer> sub = Multi.createFrom().<Integer> emitter(emitter -> {
timestamps.add(System.currentTimeMillis());
emitter.emit(1);
emitter.emit(2);
emitter.emit(3);
if (counter.incrementAndGet() == 5) {
emitter.complete();
} else {
emitter.fail(new IOException("boom"));
}
})
.onFailure().retry().withBackOff(Duration.ofMillis(100), Duration.ofSeconds(1)).withJitter(0)
.until(err -> counter.get() < 3)
.subscribe().withSubscriber(AssertSubscriber.create());

sub.request(256);
sub.awaitFailure().assertFailedWith(IOException.class, "boom");

assertThat(sub.getItems())
.hasSize(9)
.startsWith(1, 2, 3)
.endsWith(1, 2, 3);

assertThat(timestamps)
.hasSize(3);
assertThat(timestamps.get(2) - timestamps.get(1))
.isGreaterThan(timestamps.get(1) - timestamps.get(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
Expand Down Expand Up @@ -170,12 +169,6 @@ public void testJitterValidation() {
.onFailure().retry().withJitter(2));
}

@Test
public void testThatYouCannotUseUntilIfBackoffIsConfigured() {
assertThrows(IllegalArgumentException.class, () -> Uni.createFrom().item("hello")
.onFailure().retry().withBackOff(Duration.ofSeconds(1)).until(t -> true));
}

/**
* Reproducer for https://github.com/smallrye/smallrye-mutiny/discussions/814.
*/
Expand Down

0 comments on commit 4e979e0

Please sign in to comment.