diff --git a/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriber.java b/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriber.java index 0ea6b702d2..005088fe0b 100644 --- a/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriber.java +++ b/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriber.java @@ -22,6 +22,8 @@ import org.reactivestreams.Subscriber; import reactor.core.CoreSubscriber; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + import static java.util.Objects.requireNonNull; /** @@ -33,15 +35,27 @@ class CircuitBreakerSubscriber extends ResilienceBaseSubscriber { private final CircuitBreaker circuitBreaker; private StopWatch stopWatch; + private final boolean singleProducer; + + @SuppressWarnings("PMD") + private volatile int successSignaled = 0; + private static final AtomicIntegerFieldUpdater SUCCESS_SIGNALED = + AtomicIntegerFieldUpdater.newUpdater(CircuitBreakerSubscriber.class, "successSignaled"); public CircuitBreakerSubscriber(CircuitBreaker circuitBreaker, - CoreSubscriber actual) { + CoreSubscriber actual, + boolean singleProducer) { super(actual); this.circuitBreaker = requireNonNull(circuitBreaker); + this.singleProducer = singleProducer; } @Override protected void hookOnNext(T value) { + if (singleProducer && SUCCESS_SIGNALED.compareAndSet(this, 0, 1)) { + markSuccess(); + } + if (notCancelled() && wasCallPermitted()) { actual.onNext(value); } @@ -49,7 +63,10 @@ protected void hookOnNext(T value) { @Override protected void hookOnComplete() { - markSuccess(); + if (SUCCESS_SIGNALED.compareAndSet(this, 0, 1)) { + markSuccess(); + } + if (wasCallPermitted()) { actual.onComplete(); } diff --git a/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/FluxCircuitBreaker.java b/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/FluxCircuitBreaker.java index 3d9a5fef55..bbf43faf9b 100644 --- a/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/FluxCircuitBreaker.java +++ b/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/FluxCircuitBreaker.java @@ -31,7 +31,7 @@ public FluxCircuitBreaker(Flux source, CircuitBreaker circuitBreake @Override public void subscribe(CoreSubscriber actual) { - source.subscribe(new CircuitBreakerSubscriber<>(circuitBreaker, actual)); + source.subscribe(new CircuitBreakerSubscriber<>(circuitBreaker, actual, false)); } } diff --git a/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/MonoCircuitBreaker.java b/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/MonoCircuitBreaker.java index e4e1507558..7413753f8b 100644 --- a/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/MonoCircuitBreaker.java +++ b/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/MonoCircuitBreaker.java @@ -30,6 +30,6 @@ public MonoCircuitBreaker(Mono source, CircuitBreaker circuitBreake @Override public void subscribe(CoreSubscriber actual) { - source.subscribe(new CircuitBreakerSubscriber<>(circuitBreaker, actual)); + source.subscribe(new CircuitBreakerSubscriber<>(circuitBreaker, actual, true)); } } diff --git a/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriberWhiteboxVerification.java b/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriberWhiteboxVerification.java index 27b3a01176..b6f48361d6 100644 --- a/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriberWhiteboxVerification.java +++ b/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriberWhiteboxVerification.java @@ -31,7 +31,7 @@ public CircuitBreakerSubscriberWhiteboxVerification() { @Override public Subscriber createSubscriber(WhiteboxSubscriberProbe probe) { - return new CircuitBreakerSubscriber(CircuitBreaker.ofDefaults("verification"), MonoProcessor.create()) { + return new CircuitBreakerSubscriber(CircuitBreaker.ofDefaults("verification"), MonoProcessor.create(), false) { @Override protected void hookOnSubscribe(Subscription subscription) { diff --git a/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/MonoCircuitBreakerTest.java b/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/MonoCircuitBreakerTest.java index 66f0c2eef3..59e1dc1217 100644 --- a/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/MonoCircuitBreakerTest.java +++ b/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/MonoCircuitBreakerTest.java @@ -22,6 +22,9 @@ import java.io.IOException; import java.time.Duration; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.fail; public class MonoCircuitBreakerTest extends CircuitBreakerAssertions { @@ -36,6 +39,16 @@ public void shouldEmitEvent() { assertSingleSuccessfulCall(); } + @Test + public void shouldEmptyMonoShouldBeSuccessful() { + StepVerifier.create( + Mono.empty() + .transform(CircuitBreakerOperator.of(circuitBreaker))) + .verifyComplete(); + + assertSingleSuccessfulCall(); + } + @Test public void shouldPropagateError() { StepVerifier.create( @@ -82,4 +95,19 @@ public void shouldEmitErrorWithCircuitBreakerOpenException() { assertNoRegisteredCall(); } + + @Test + public void shouldRecordSuccessWhenUsingToFuture() { + try { + Mono.just("Event") + .transform(CircuitBreakerOperator.of(circuitBreaker)) + .toFuture() + .get(); + + assertSingleSuccessfulCall(); + } catch (InterruptedException | ExecutionException e) { + fail(); + } + + } } \ No newline at end of file