From 94ba045d7f5e204d33afa96dccac940e72847977 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Basl=C3=A9?= Date: Fri, 7 Aug 2020 15:32:45 +0200 Subject: [PATCH] fix #2310 Add and polish Retry-from-Function adapters --- .../main/java/reactor/util/retry/Retry.java | 18 ++++++++++++++++- .../core/publisher/FluxRetryWhenTest.java | 20 +++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/reactor-core/src/main/java/reactor/util/retry/Retry.java b/reactor-core/src/main/java/reactor/util/retry/Retry.java index dcda1b3974..9496b47b33 100644 --- a/reactor-core/src/main/java/reactor/util/retry/Retry.java +++ b/reactor-core/src/main/java/reactor/util/retry/Retry.java @@ -193,7 +193,7 @@ public static RetrySpec indefinitely() { * @param function the {@link Function} representing the desired {@link Retry} strategy as a lambda * @return the {@link Retry} strategy adapted from the {@link Function} */ - public static final Retry from(Function, Publisher> function) { + public static final Retry from(Function, ? extends Publisher> function) { return new Retry() { @Override public Publisher generateCompanion(Flux retrySignalCompanion) { @@ -202,4 +202,20 @@ public Publisher generateCompanion(Flux retrySignalCompanion) { }; } + /** + * An adapter for {@link Flux} of {@link Throwable}-based {@link Function} to provide {@link Retry} + * from a legacy retryWhen {@link Function}. + * + * @param function the {@link Function} representing the desired {@link Retry} strategy as a lambda + * @return the {@link Retry} strategy adapted from the {@link Function} + */ + public static final Retry withThrowable(Function, ? extends Publisher> function) { + return new Retry() { + @Override + public Publisher generateCompanion(Flux retrySignals) { + return function.apply(retrySignals.map(RetrySignal::failure)); + } + }; + } + } diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java index a6c423d33c..aec4d0f068 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java @@ -893,4 +893,24 @@ public void retryWhenThrowableCompanionIsComparableToRetryWhenRetryFromFunction( .verifyComplete(); } + @Test + public void retryWhenWithThrowableFunction() { + AtomicInteger sourceHelper = new AtomicInteger(); + Flux source = Flux.create(sink -> { + if (sourceHelper.getAndIncrement() == 3) { + sink.next(1).next(2).next(3).complete(); + } + else { + sink.error(new IllegalStateException("boom")); + } + }); + + Function, Flux> throwableBased = throwableFlux -> throwableFlux.index().map(Tuple2::getT1); + + StepVerifier.create(source.retryWhen(Retry.withThrowable(throwableBased))) + .expectSubscription() + .expectNext(1, 2, 3) + .verifyComplete(); + } + }