diff --git a/reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java b/reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java index 540501b5cf..72baa73abf 100644 --- a/reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java +++ b/reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java @@ -574,7 +574,7 @@ public Flux generateCompanion(Flux t) { //short-circuit delay == 0 case if (nextBackoff.isZero()) { return RetrySpec.applyHooks(copy, Mono.just(iteration), - syncPreRetry, syncPostRetry, asyncPreRetry, asyncPostRetry); + syncPreRetry, syncPostRetry, asyncPreRetry, asyncPostRetry, cv); } ThreadLocalRandom random = ThreadLocalRandom.current(); @@ -602,9 +602,8 @@ public Flux generateCompanion(Flux t) { jitter = random.nextLong(lowBound, highBound); } Duration effectiveBackoff = nextBackoff.plusMillis(jitter); - return RetrySpec.applyHooks(copy, Mono.delay(effectiveBackoff, - backoffSchedulerSupplier.get()), - syncPreRetry, syncPostRetry, asyncPreRetry, asyncPostRetry); + return RetrySpec.applyHooks(copy, Mono.delay(effectiveBackoff, backoffSchedulerSupplier.get()), + syncPreRetry, syncPostRetry, asyncPreRetry, asyncPostRetry, cv); }) .contextWrite(c -> Context.empty()) ); diff --git a/reactor-core/src/main/java/reactor/util/retry/RetrySpec.java b/reactor-core/src/main/java/reactor/util/retry/RetrySpec.java index 710ff4de86..dc783537aa 100644 --- a/reactor-core/src/main/java/reactor/util/retry/RetrySpec.java +++ b/reactor-core/src/main/java/reactor/util/retry/RetrySpec.java @@ -373,7 +373,7 @@ else if (iteration >= maxAttempts) { return Mono.error(retryExhaustedGenerator.apply(this, copy)); } else { - return applyHooks(copy, Mono.just(iteration), doPreRetry, doPostRetry, asyncPreRetry, asyncPostRetry); + return applyHooks(copy, Mono.just(iteration), doPreRetry, doPostRetry, asyncPreRetry, asyncPostRetry, cv); } }) .contextWrite(c -> Context.empty()) @@ -389,7 +389,8 @@ static Mono applyHooks(RetrySignal copyOfSignal, final Consumer doPreRetry, final Consumer doPostRetry, final BiFunction, Mono> asyncPreRetry, - final BiFunction, Mono> asyncPostRetry) { + final BiFunction, Mono> asyncPostRetry, + final ContextView cv) { if (doPreRetry != NO_OP_CONSUMER) { try { doPreRetry.accept(copyOfSignal); @@ -410,6 +411,6 @@ static Mono applyHooks(RetrySignal copyOfSignal, Mono preRetryMono = asyncPreRetry == NO_OP_BIFUNCTION ? Mono.empty() : asyncPreRetry.apply(copyOfSignal, Mono.empty()); Mono postRetryMono = asyncPostRetry != NO_OP_BIFUNCTION ? asyncPostRetry.apply(copyOfSignal, postRetrySyncMono) : postRetrySyncMono; - return preRetryMono.then(originalCompanion).flatMap(postRetryMono::thenReturn); + return preRetryMono.then(originalCompanion).flatMap(postRetryMono::thenReturn).contextWrite(cv); } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java index 1eb0dbad55..87b54cb114 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java @@ -24,7 +24,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.assertj.core.api.Assertions; @@ -39,6 +41,7 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; +import reactor.test.publisher.PublisherProbe; import reactor.test.scheduler.VirtualTimeScheduler; import reactor.test.subscriber.AssertSubscriber; import reactor.util.context.Context; @@ -46,9 +49,11 @@ import reactor.util.function.Tuple2; import reactor.util.retry.Retry; import reactor.util.retry.RetryBackoffSpec; +import reactor.util.retry.RetrySpec; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.BDDMockito.given; public class FluxRetryWhenTest { @@ -58,6 +63,45 @@ public class FluxRetryWhenTest { Flux rangeError = Flux.concat(Flux.range(1, 2), Flux.error(new RuntimeException("forced failure 0"))); + @Test + // https://github.com/reactor/reactor-core/issues/3314 + void ensuresContextIsRestoredInRetryFunctions() { + PublisherProbe doBeforeRetryProbe = PublisherProbe.empty(); + AtomicReference capturedContext = new AtomicReference<>(); + + RetrySpec spec = Retry.max(1) + .doBeforeRetryAsync( + retrySignal -> + Mono.deferContextual(cv -> { + capturedContext.set(cv); + return doBeforeRetryProbe.mono(); + }) + ); + + Context context = Context.of("test", "test"); + + Mono.defer(new Supplier>() { + int index = 0; + + @Override + public Mono get() { + if (index++ == 0) { + return Mono.error(new RuntimeException()); + } else { + return Mono.just("someValue"); + } + } + }) + .retryWhen(spec) + .contextWrite(context) + .as(StepVerifier::create) + .expectNext("someValue") + .verifyComplete(); + + doBeforeRetryProbe.assertWasSubscribed(); + assertThat(capturedContext).hasValueMatching(c -> c.hasKey("test")); + } + @Test //https://github.com/reactor/reactor-core/issues/3253 public void shouldFailWhenOnErrorContinueEnabled() {