diff --git a/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/bulkhead/operator/BulkheadSubscriber.java b/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/bulkhead/operator/BulkheadSubscriber.java index b057bc3666..d049fc264e 100644 --- a/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/bulkhead/operator/BulkheadSubscriber.java +++ b/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/bulkhead/operator/BulkheadSubscriber.java @@ -20,6 +20,8 @@ import org.reactivestreams.Subscriber; import reactor.core.CoreSubscriber; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import static java.util.Objects.requireNonNull; @@ -34,10 +36,8 @@ class BulkheadSubscriber extends AbstractSubscriber { private final Bulkhead bulkhead; private final boolean singleProducer; - @SuppressWarnings("PMD") - private volatile int successSignaled = 0; - private static final AtomicIntegerFieldUpdater SUCCESS_SIGNALED = - AtomicIntegerFieldUpdater.newUpdater(BulkheadSubscriber.class, "successSignaled"); + private final AtomicBoolean eventWasEmitted = new AtomicBoolean(false); + private final AtomicBoolean successSignaled = new AtomicBoolean(false); BulkheadSubscriber(Bulkhead bulkhead, CoreSubscriber downstreamSubscriber, @@ -50,17 +50,22 @@ class BulkheadSubscriber extends AbstractSubscriber { @Override public void hookOnNext(T t) { if (!isDisposed()) { - if (singleProducer && SUCCESS_SIGNALED.compareAndSet(this, 0, 1)) { + if (singleProducer && successSignaled.compareAndSet( false, true)) { bulkhead.onComplete(); } + eventWasEmitted.set(true); downstreamSubscriber.onNext(t); } } @Override public void hookOnCancel() { - if(successSignaled == 0){ - bulkhead.releasePermission(); + if(!successSignaled.get()){ + if(eventWasEmitted.get()){ + bulkhead.onComplete(); + }else{ + bulkhead.releasePermission(); + } } } @@ -73,7 +78,7 @@ public void hookOnError(Throwable t) { @Override public void hookOnComplete() { - if (SUCCESS_SIGNALED.compareAndSet(this, 0, 1)) { + if (successSignaled.compareAndSet( false, true)) { bulkhead.onComplete(); } downstreamSubscriber.onComplete(); diff --git a/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriber.java b/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriber.java index 15c99965bf..f4abc40dc5 100644 --- a/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriber.java +++ b/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriber.java @@ -21,6 +21,7 @@ import reactor.core.CoreSubscriber; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import static java.util.Objects.requireNonNull; @@ -37,10 +38,8 @@ class CircuitBreakerSubscriber extends AbstractSubscriber { private final long start; private final boolean singleProducer; - @SuppressWarnings("PMD") - private volatile int successSignaled = 0; - private static final AtomicIntegerFieldUpdater SUCCESS_SIGNALED = - AtomicIntegerFieldUpdater.newUpdater(CircuitBreakerSubscriber.class, "successSignaled"); + private final AtomicBoolean successSignaled = new AtomicBoolean(false); + private final AtomicBoolean eventWasEmitted = new AtomicBoolean(false); protected CircuitBreakerSubscriber(CircuitBreaker circuitBreaker, CoreSubscriber downstreamSubscriber, @@ -54,9 +53,10 @@ protected CircuitBreakerSubscriber(CircuitBreaker circuitBreaker, @Override protected void hookOnNext(T value) { if (!isDisposed()) { - if (singleProducer && SUCCESS_SIGNALED.compareAndSet(this, 0, 1)) { + if (singleProducer && successSignaled.compareAndSet( false, true)) { circuitBreaker.onSuccess(System.nanoTime() - start, TimeUnit.NANOSECONDS); } + eventWasEmitted.set(true); downstreamSubscriber.onNext(value); } @@ -64,7 +64,7 @@ protected void hookOnNext(T value) { @Override protected void hookOnComplete() { - if (SUCCESS_SIGNALED.compareAndSet(this, 0, 1)) { + if (successSignaled.compareAndSet( false, true)) { circuitBreaker.onSuccess(System.nanoTime() - start, TimeUnit.NANOSECONDS); } @@ -73,8 +73,12 @@ protected void hookOnComplete() { @Override public void hookOnCancel() { - if (successSignaled == 0) { - circuitBreaker.releasePermission(); + if (!successSignaled.get()) { + if(eventWasEmitted.get()){ + circuitBreaker.onSuccess(System.nanoTime() - start, TimeUnit.NANOSECONDS); + }else{ + circuitBreaker.releasePermission(); + } } } diff --git a/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/bulkhead/operator/FluxBulkheadTest.java b/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/bulkhead/operator/FluxBulkheadTest.java index d555bf3527..b233a7b170 100644 --- a/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/bulkhead/operator/FluxBulkheadTest.java +++ b/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/bulkhead/operator/FluxBulkheadTest.java @@ -17,6 +17,7 @@ import io.github.resilience4j.bulkhead.Bulkhead; import io.github.resilience4j.bulkhead.BulkheadFullException; +import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -25,6 +26,7 @@ import java.io.IOException; import java.time.Duration; +import java.util.concurrent.TimeUnit; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.*; @@ -125,4 +127,20 @@ public void shouldReleaseBulkheadSemaphoreOnCancel() { verify(bulkhead, times(1)).releasePermission(); } + + @Test + public void shouldInvokeOnCompleteOnCancelWhenEventWasEmitted() { + given(bulkhead.tryAcquirePermission()).willReturn(true); + + StepVerifier.create( + Flux.just("Event1", "Event2", "Event3") + .compose(BulkheadOperator.of(bulkhead))) + .expectSubscription() + .thenRequest(1) + .thenCancel() + .verify(); + + verify(bulkhead, never()).releasePermission(); + verify(bulkhead, times(1)).onComplete(); + } } \ No newline at end of file diff --git a/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/FluxCircuitBreakerTest.java b/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/FluxCircuitBreakerTest.java index bbf6999690..d2aacdb28b 100644 --- a/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/FluxCircuitBreakerTest.java +++ b/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/FluxCircuitBreakerTest.java @@ -155,4 +155,21 @@ public void shouldReleasePermissionOnCancel() { verify(circuitBreaker, never()).onError(anyLong(), any(TimeUnit.class), any(Throwable.class)); verify(circuitBreaker, never()).onSuccess(anyLong(), any(TimeUnit.class)); } + + @Test + public void shouldInvokeOnSuccessOnCancelWhenEventWasEmitted() { + given(circuitBreaker.tryAcquirePermission()).willReturn(true); + + StepVerifier.create( + Flux.just("Event1", "Event2", "Event3") + .compose(CircuitBreakerOperator.of(circuitBreaker))) + .expectSubscription() + .thenRequest(1) + .thenCancel() + .verify(); + + verify(circuitBreaker, never()).releasePermission(); + verify(circuitBreaker, never()).onError(anyLong(), any(TimeUnit.class), any(Throwable.class)); + verify(circuitBreaker, times(1)).onSuccess(anyLong(), any(TimeUnit.class)); + } } \ No newline at end of file diff --git a/resilience4j-rxjava2/src/main/java/io/github/resilience4j/AbstractObserver.java b/resilience4j-rxjava2/src/main/java/io/github/resilience4j/AbstractObserver.java index b08776e9d4..b609bd4aa4 100644 --- a/resilience4j-rxjava2/src/main/java/io/github/resilience4j/AbstractObserver.java +++ b/resilience4j-rxjava2/src/main/java/io/github/resilience4j/AbstractObserver.java @@ -2,11 +2,14 @@ import io.reactivex.Observer; +import java.util.concurrent.atomic.AtomicBoolean; + import static java.util.Objects.requireNonNull; public abstract class AbstractObserver extends AbstractDisposable implements Observer { private final Observer downstreamObserver; + protected final AtomicBoolean eventWasEmitted = new AtomicBoolean(false); public AbstractObserver(Observer downstreamObserver) { this.downstreamObserver = requireNonNull(downstreamObserver); @@ -19,7 +22,10 @@ protected void hookOnSubscribe() { @Override public void onNext(T item) { - whenNotDisposed(() -> downstreamObserver.onNext(item)); + whenNotDisposed(() -> { + eventWasEmitted.set(true); + downstreamObserver.onNext(item); + }); } @Override diff --git a/resilience4j-rxjava2/src/main/java/io/github/resilience4j/AbstractSubscriber.java b/resilience4j-rxjava2/src/main/java/io/github/resilience4j/AbstractSubscriber.java index 753874f973..c647226930 100644 --- a/resilience4j-rxjava2/src/main/java/io/github/resilience4j/AbstractSubscriber.java +++ b/resilience4j-rxjava2/src/main/java/io/github/resilience4j/AbstractSubscriber.java @@ -20,6 +20,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static io.reactivex.internal.subscriptions.SubscriptionHelper.CANCELLED; @@ -29,6 +30,7 @@ public abstract class AbstractSubscriber implements Subscriber, Subscripti protected final Subscriber downstreamSubscriber; private final AtomicReference subscription = new AtomicReference<>(); + protected final AtomicBoolean eventWasEmitted = new AtomicBoolean(false); protected AbstractSubscriber(Subscriber downstreamSubscriber) { this.downstreamSubscriber = requireNonNull(downstreamSubscriber); @@ -44,6 +46,7 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T value) { if(!isDisposed()){ + eventWasEmitted.set(true); downstreamSubscriber.onNext(value); } diff --git a/resilience4j-rxjava2/src/main/java/io/github/resilience4j/circuitbreaker/operator/FlowableCircuitBreaker.java b/resilience4j-rxjava2/src/main/java/io/github/resilience4j/circuitbreaker/operator/FlowableCircuitBreaker.java index f465a722c7..c2f13eb627 100644 --- a/resilience4j-rxjava2/src/main/java/io/github/resilience4j/circuitbreaker/operator/FlowableCircuitBreaker.java +++ b/resilience4j-rxjava2/src/main/java/io/github/resilience4j/circuitbreaker/operator/FlowableCircuitBreaker.java @@ -69,7 +69,11 @@ public void hookOnComplete() { @Override public void hookOnCancel() { - circuitBreaker.releasePermission(); + if(eventWasEmitted.get()){ + circuitBreaker.onSuccess(System.nanoTime() - start, TimeUnit.NANOSECONDS); + }else{ + circuitBreaker.releasePermission(); + } } } diff --git a/resilience4j-rxjava2/src/main/java/io/github/resilience4j/circuitbreaker/operator/ObserverCircuitBreaker.java b/resilience4j-rxjava2/src/main/java/io/github/resilience4j/circuitbreaker/operator/ObserverCircuitBreaker.java index db31fdac23..8aa65f3df3 100644 --- a/resilience4j-rxjava2/src/main/java/io/github/resilience4j/circuitbreaker/operator/ObserverCircuitBreaker.java +++ b/resilience4j-rxjava2/src/main/java/io/github/resilience4j/circuitbreaker/operator/ObserverCircuitBreaker.java @@ -64,7 +64,11 @@ protected void hookOnComplete() { @Override protected void hookOnCancel() { - circuitBreaker.releasePermission(); + if(eventWasEmitted.get()){ + circuitBreaker.onSuccess(System.nanoTime() - start, TimeUnit.NANOSECONDS); + }else{ + circuitBreaker.releasePermission(); + } } } diff --git a/resilience4j-rxjava2/src/test/java/io/github/resilience4j/circuitbreaker/operator/FlowableCircuitBreakerTest.java b/resilience4j-rxjava2/src/test/java/io/github/resilience4j/circuitbreaker/operator/FlowableCircuitBreakerTest.java index b1e7a2328c..8ee62507d5 100644 --- a/resilience4j-rxjava2/src/test/java/io/github/resilience4j/circuitbreaker/operator/FlowableCircuitBreakerTest.java +++ b/resilience4j-rxjava2/src/test/java/io/github/resilience4j/circuitbreaker/operator/FlowableCircuitBreakerTest.java @@ -18,7 +18,7 @@ public class FlowableCircuitBreakerTest extends BaseCircuitBreakerTest { @Test - public void shouldSubscribeToFlowableJust() { + public void shouldInvokeOnSuccess() { given(circuitBreaker.tryAcquirePermission()).willReturn(true); Flowable.just("Event 1", "Event 2") @@ -31,7 +31,7 @@ public void shouldSubscribeToFlowableJust() { } @Test - public void shouldPropagateError() { + public void shouldInvokeOnError() { given(circuitBreaker.tryAcquirePermission()).willReturn(true); Flowable.error(new IOException("BAM!")) @@ -61,7 +61,7 @@ public void shouldEmitErrorWithCallNotPermittedException() { } @Test - public void shouldReleasePermissionOnCancel() { + public void shouldInvokeReleasePermissionReleaseOnCancel() { given(circuitBreaker.tryAcquirePermission()).willReturn(true); Flowable.just(1) @@ -74,4 +74,18 @@ public void shouldReleasePermissionOnCancel() { verify(circuitBreaker, never()).onError(anyLong(), any(TimeUnit.class), any(Throwable.class)); verify(circuitBreaker, never()).onSuccess(anyLong(), any(TimeUnit.class)); } + + @Test + public void shouldInvokeOnSuccessOnCancelWhenOneEventWasEmitted() { + given(circuitBreaker.tryAcquirePermission()).willReturn(true); + + Flowable.just(1,2,3) + .compose(CircuitBreakerOperator.of(circuitBreaker)) + .test(1) + .cancel(); + + verify(circuitBreaker, never()).releasePermission(); + verify(circuitBreaker, never()).onError(anyLong(), any(TimeUnit.class), any(Throwable.class)); + verify(circuitBreaker, times(1)).onSuccess(anyLong(), any(TimeUnit.class)); + } }