Skip to content

Commit

Permalink
ensures context restored in RetrySpec (#3316)
Browse files Browse the repository at this point in the history
closes #3314

This commit restores context for inner Flux inside concatMap, while the concatMap remains without context. It allows Retry methods to have context while onErrorContinue to be disabled for retry concatMap
  • Loading branch information
OlegDokuka authored Dec 12, 2022
1 parent a8d4cbd commit cfe1979
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ public Flux<Long> generateCompanion(Flux<RetrySignal> t) {
//short-circuit delay == 0 case
if (nextBackoff.isZero()) {
return RetrySpec.applyHooks(copy, Mono.just(iteration),
syncPreRetry, syncPostRetry, asyncPreRetry, asyncPostRetry);
syncPreRetry, syncPostRetry, asyncPreRetry, asyncPostRetry, cv);
}

ThreadLocalRandom random = ThreadLocalRandom.current();
Expand Down Expand Up @@ -602,9 +602,8 @@ public Flux<Long> generateCompanion(Flux<RetrySignal> t) {
jitter = random.nextLong(lowBound, highBound);
}
Duration effectiveBackoff = nextBackoff.plusMillis(jitter);
return RetrySpec.applyHooks(copy, Mono.delay(effectiveBackoff,
backoffSchedulerSupplier.get()),
syncPreRetry, syncPostRetry, asyncPreRetry, asyncPostRetry);
return RetrySpec.applyHooks(copy, Mono.delay(effectiveBackoff, backoffSchedulerSupplier.get()),
syncPreRetry, syncPostRetry, asyncPreRetry, asyncPostRetry, cv);
})
.contextWrite(c -> Context.empty())
);
Expand Down
7 changes: 4 additions & 3 deletions reactor-core/src/main/java/reactor/util/retry/RetrySpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ else if (iteration >= maxAttempts) {
return Mono.error(retryExhaustedGenerator.apply(this, copy));
}
else {
return applyHooks(copy, Mono.just(iteration), doPreRetry, doPostRetry, asyncPreRetry, asyncPostRetry);
return applyHooks(copy, Mono.just(iteration), doPreRetry, doPostRetry, asyncPreRetry, asyncPostRetry, cv);
}
})
.contextWrite(c -> Context.empty())
Expand All @@ -389,7 +389,8 @@ static <T> Mono<T> applyHooks(RetrySignal copyOfSignal,
final Consumer<RetrySignal> doPreRetry,
final Consumer<RetrySignal> doPostRetry,
final BiFunction<RetrySignal, Mono<Void>, Mono<Void>> asyncPreRetry,
final BiFunction<RetrySignal, Mono<Void>, Mono<Void>> asyncPostRetry) {
final BiFunction<RetrySignal, Mono<Void>, Mono<Void>> asyncPostRetry,
final ContextView cv) {
if (doPreRetry != NO_OP_CONSUMER) {
try {
doPreRetry.accept(copyOfSignal);
Expand All @@ -410,6 +411,6 @@ static <T> Mono<T> applyHooks(RetrySignal copyOfSignal,
Mono<Void> preRetryMono = asyncPreRetry == NO_OP_BIFUNCTION ? Mono.empty() : asyncPreRetry.apply(copyOfSignal, Mono.empty());
Mono<Void> postRetryMono = asyncPostRetry != NO_OP_BIFUNCTION ? asyncPostRetry.apply(copyOfSignal, postRetrySyncMono) : postRetrySyncMono;

return preRetryMono.then(originalCompanion).flatMap(postRetryMono::thenReturn);
return preRetryMono.then(originalCompanion).flatMap(postRetryMono::thenReturn).contextWrite(cv);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.assertj.core.api.Assertions;
Expand All @@ -39,16 +41,19 @@
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.test.publisher.PublisherProbe;
import reactor.test.scheduler.VirtualTimeScheduler;
import reactor.test.subscriber.AssertSubscriber;
import reactor.util.context.Context;
import reactor.util.context.ContextView;
import reactor.util.function.Tuple2;
import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec;
import reactor.util.retry.RetrySpec;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.BDDMockito.given;

public class FluxRetryWhenTest {

Expand All @@ -58,6 +63,45 @@ public class FluxRetryWhenTest {
Flux<Integer> rangeError = Flux.concat(Flux.range(1, 2),
Flux.error(new RuntimeException("forced failure 0")));

@Test
// https://github.com/reactor/reactor-core/issues/3314
void ensuresContextIsRestoredInRetryFunctions() {
PublisherProbe<Void> doBeforeRetryProbe = PublisherProbe.empty();
AtomicReference<ContextView> capturedContext = new AtomicReference<>();

RetrySpec spec = Retry.max(1)
.doBeforeRetryAsync(
retrySignal ->
Mono.deferContextual(cv -> {
capturedContext.set(cv);
return doBeforeRetryProbe.mono();
})
);

Context context = Context.of("test", "test");

Mono.defer(new Supplier<Mono<?>>() {
int index = 0;

@Override
public Mono<?> get() {
if (index++ == 0) {
return Mono.error(new RuntimeException());
} else {
return Mono.just("someValue");
}
}
})
.retryWhen(spec)
.contextWrite(context)
.as(StepVerifier::create)
.expectNext("someValue")
.verifyComplete();

doBeforeRetryProbe.assertWasSubscribed();
assertThat(capturedContext).hasValueMatching(c -> c.hasKey("test"));
}

@Test
//https://github.com/reactor/reactor-core/issues/3253
public void shouldFailWhenOnErrorContinueEnabled() {
Expand Down

0 comments on commit cfe1979

Please sign in to comment.