Skip to content

Commit

Permalink
fix #2310 Add and polish Retry-from-Function adapters
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Aug 7, 2020
1 parent e1e4667 commit 94ba045
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
18 changes: 17 additions & 1 deletion reactor-core/src/main/java/reactor/util/retry/Retry.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public static RetrySpec indefinitely() {
* @param function the {@link Function} representing the desired {@link Retry} strategy as a lambda
* @return the {@link Retry} strategy adapted from the {@link Function}
*/
public static final Retry from(Function<Flux<RetrySignal>, Publisher<?>> function) {
public static final Retry from(Function<Flux<RetrySignal>, ? extends Publisher<?>> function) {
return new Retry() {
@Override
public Publisher<?> generateCompanion(Flux<RetrySignal> retrySignalCompanion) {
Expand All @@ -202,4 +202,20 @@ public Publisher<?> generateCompanion(Flux<RetrySignal> retrySignalCompanion) {
};
}

/**
* An adapter for {@link Flux} of {@link Throwable}-based {@link Function} to provide {@link Retry}
* from a legacy retryWhen {@link Function}.
*
* @param function the {@link Function} representing the desired {@link Retry} strategy as a lambda
* @return the {@link Retry} strategy adapted from the {@link Function}
*/
public static final Retry withThrowable(Function<Flux<Throwable>, ? extends Publisher<?>> function) {
return new Retry() {
@Override
public Publisher<?> generateCompanion(Flux<RetrySignal> retrySignals) {
return function.apply(retrySignals.map(RetrySignal::failure));
}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -893,4 +893,24 @@ public void retryWhenThrowableCompanionIsComparableToRetryWhenRetryFromFunction(
.verifyComplete();
}

@Test
public void retryWhenWithThrowableFunction() {
AtomicInteger sourceHelper = new AtomicInteger();
Flux<Integer> source = Flux.create(sink -> {
if (sourceHelper.getAndIncrement() == 3) {
sink.next(1).next(2).next(3).complete();
}
else {
sink.error(new IllegalStateException("boom"));
}
});

Function<Flux<Throwable>, Flux<Long>> throwableBased = throwableFlux -> throwableFlux.index().map(Tuple2::getT1);

StepVerifier.create(source.retryWhen(Retry.withThrowable(throwableBased)))
.expectSubscription()
.expectNext(1, 2, 3)
.verifyComplete();
}

}

0 comments on commit 94ba045

Please sign in to comment.