Skip to content

Commit

Permalink
Issue ReactiveX#581: Some RxJava2 or Reactor operators like ZipObserv…
Browse files Browse the repository at this point in the history
…er are ca… (ReactiveX#594)

* Issue ReactiveX#581: Some RxJava2 or Reactor operators like ZipObserver are cancelling (disposing) a second Observable when the first observable is complete. This is because an operator like zip must combine two events. It makes no sense to consume further events of the second observable when the first is completed. Unfortunately the cancellation is bad for the CircuitBreakerOperator, since no success is recorded even if an emit was emitted successfully. Solution: The CircuitBreaker operator and BulkHead operator could track if an event has been emitted successfully (onNext). And when dispose/cancel is invoked, the operator either invokes onSuccess/onComplete or releasePermission.
  • Loading branch information
RobWin authored Aug 28, 2019
1 parent 292c6b7 commit 3aeeb04
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.reactivestreams.Subscriber;
import reactor.core.CoreSubscriber;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import static java.util.Objects.requireNonNull;
Expand All @@ -34,10 +36,8 @@ class BulkheadSubscriber<T> extends AbstractSubscriber<T> {
private final Bulkhead bulkhead;
private final boolean singleProducer;

@SuppressWarnings("PMD")
private volatile int successSignaled = 0;
private static final AtomicIntegerFieldUpdater<BulkheadSubscriber> SUCCESS_SIGNALED =
AtomicIntegerFieldUpdater.newUpdater(BulkheadSubscriber.class, "successSignaled");
private final AtomicBoolean eventWasEmitted = new AtomicBoolean(false);
private final AtomicBoolean successSignaled = new AtomicBoolean(false);

BulkheadSubscriber(Bulkhead bulkhead,
CoreSubscriber<? super T> downstreamSubscriber,
Expand All @@ -50,17 +50,22 @@ class BulkheadSubscriber<T> extends AbstractSubscriber<T> {
@Override
public void hookOnNext(T t) {
if (!isDisposed()) {
if (singleProducer && SUCCESS_SIGNALED.compareAndSet(this, 0, 1)) {
if (singleProducer && successSignaled.compareAndSet( false, true)) {
bulkhead.onComplete();
}
eventWasEmitted.set(true);
downstreamSubscriber.onNext(t);
}
}

@Override
public void hookOnCancel() {
if(successSignaled == 0){
bulkhead.releasePermission();
if(!successSignaled.get()){
if(eventWasEmitted.get()){
bulkhead.onComplete();
}else{
bulkhead.releasePermission();
}
}

}
Expand All @@ -73,7 +78,7 @@ public void hookOnError(Throwable t) {

@Override
public void hookOnComplete() {
if (SUCCESS_SIGNALED.compareAndSet(this, 0, 1)) {
if (successSignaled.compareAndSet( false, true)) {
bulkhead.onComplete();
}
downstreamSubscriber.onComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import reactor.core.CoreSubscriber;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import static java.util.Objects.requireNonNull;
Expand All @@ -37,10 +38,8 @@ class CircuitBreakerSubscriber<T> extends AbstractSubscriber<T> {
private final long start;
private final boolean singleProducer;

@SuppressWarnings("PMD")
private volatile int successSignaled = 0;
private static final AtomicIntegerFieldUpdater<CircuitBreakerSubscriber> SUCCESS_SIGNALED =
AtomicIntegerFieldUpdater.newUpdater(CircuitBreakerSubscriber.class, "successSignaled");
private final AtomicBoolean successSignaled = new AtomicBoolean(false);
private final AtomicBoolean eventWasEmitted = new AtomicBoolean(false);

protected CircuitBreakerSubscriber(CircuitBreaker circuitBreaker,
CoreSubscriber<? super T> downstreamSubscriber,
Expand All @@ -54,17 +53,18 @@ protected CircuitBreakerSubscriber(CircuitBreaker circuitBreaker,
@Override
protected void hookOnNext(T value) {
if (!isDisposed()) {
if (singleProducer && SUCCESS_SIGNALED.compareAndSet(this, 0, 1)) {
if (singleProducer && successSignaled.compareAndSet( false, true)) {
circuitBreaker.onSuccess(System.nanoTime() - start, TimeUnit.NANOSECONDS);
}
eventWasEmitted.set(true);

downstreamSubscriber.onNext(value);
}
}

@Override
protected void hookOnComplete() {
if (SUCCESS_SIGNALED.compareAndSet(this, 0, 1)) {
if (successSignaled.compareAndSet( false, true)) {
circuitBreaker.onSuccess(System.nanoTime() - start, TimeUnit.NANOSECONDS);
}

Expand All @@ -73,8 +73,12 @@ protected void hookOnComplete() {

@Override
public void hookOnCancel() {
if (successSignaled == 0) {
circuitBreaker.releasePermission();
if (!successSignaled.get()) {
if(eventWasEmitted.get()){
circuitBreaker.onSuccess(System.nanoTime() - start, TimeUnit.NANOSECONDS);
}else{
circuitBreaker.releasePermission();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadFullException;
import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
Expand All @@ -25,6 +26,7 @@

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.*;
Expand Down Expand Up @@ -125,4 +127,20 @@ public void shouldReleaseBulkheadSemaphoreOnCancel() {

verify(bulkhead, times(1)).releasePermission();
}

@Test
public void shouldInvokeOnCompleteOnCancelWhenEventWasEmitted() {
given(bulkhead.tryAcquirePermission()).willReturn(true);

StepVerifier.create(
Flux.just("Event1", "Event2", "Event3")
.compose(BulkheadOperator.of(bulkhead)))
.expectSubscription()
.thenRequest(1)
.thenCancel()
.verify();

verify(bulkhead, never()).releasePermission();
verify(bulkhead, times(1)).onComplete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,21 @@ public void shouldReleasePermissionOnCancel() {
verify(circuitBreaker, never()).onError(anyLong(), any(TimeUnit.class), any(Throwable.class));
verify(circuitBreaker, never()).onSuccess(anyLong(), any(TimeUnit.class));
}

@Test
public void shouldInvokeOnSuccessOnCancelWhenEventWasEmitted() {
given(circuitBreaker.tryAcquirePermission()).willReturn(true);

StepVerifier.create(
Flux.just("Event1", "Event2", "Event3")
.compose(CircuitBreakerOperator.of(circuitBreaker)))
.expectSubscription()
.thenRequest(1)
.thenCancel()
.verify();

verify(circuitBreaker, never()).releasePermission();
verify(circuitBreaker, never()).onError(anyLong(), any(TimeUnit.class), any(Throwable.class));
verify(circuitBreaker, times(1)).onSuccess(anyLong(), any(TimeUnit.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

import io.reactivex.Observer;

import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Objects.requireNonNull;

public abstract class AbstractObserver<T> extends AbstractDisposable implements Observer<T> {

private final Observer<? super T> downstreamObserver;
protected final AtomicBoolean eventWasEmitted = new AtomicBoolean(false);

public AbstractObserver(Observer<? super T> downstreamObserver) {
this.downstreamObserver = requireNonNull(downstreamObserver);
Expand All @@ -19,7 +22,10 @@ protected void hookOnSubscribe() {

@Override
public void onNext(T item) {
whenNotDisposed(() -> downstreamObserver.onNext(item));
whenNotDisposed(() -> {
eventWasEmitted.set(true);
downstreamObserver.onNext(item);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static io.reactivex.internal.subscriptions.SubscriptionHelper.CANCELLED;
Expand All @@ -29,6 +30,7 @@ public abstract class AbstractSubscriber<T> implements Subscriber<T>, Subscripti

protected final Subscriber<? super T> downstreamSubscriber;
private final AtomicReference<Subscription> subscription = new AtomicReference<>();
protected final AtomicBoolean eventWasEmitted = new AtomicBoolean(false);

protected AbstractSubscriber(Subscriber<? super T> downstreamSubscriber) {
this.downstreamSubscriber = requireNonNull(downstreamSubscriber);
Expand All @@ -44,6 +46,7 @@ public void onSubscribe(Subscription s) {
@Override
public void onNext(T value) {
if(!isDisposed()){
eventWasEmitted.set(true);
downstreamSubscriber.onNext(value);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ public void hookOnComplete() {

@Override
public void hookOnCancel() {
circuitBreaker.releasePermission();
if(eventWasEmitted.get()){
circuitBreaker.onSuccess(System.nanoTime() - start, TimeUnit.NANOSECONDS);
}else{
circuitBreaker.releasePermission();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ protected void hookOnComplete() {

@Override
protected void hookOnCancel() {
circuitBreaker.releasePermission();
if(eventWasEmitted.get()){
circuitBreaker.onSuccess(System.nanoTime() - start, TimeUnit.NANOSECONDS);
}else{
circuitBreaker.releasePermission();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
public class FlowableCircuitBreakerTest extends BaseCircuitBreakerTest {

@Test
public void shouldSubscribeToFlowableJust() {
public void shouldInvokeOnSuccess() {
given(circuitBreaker.tryAcquirePermission()).willReturn(true);

Flowable.just("Event 1", "Event 2")
Expand All @@ -31,7 +31,7 @@ public void shouldSubscribeToFlowableJust() {
}

@Test
public void shouldPropagateError() {
public void shouldInvokeOnError() {
given(circuitBreaker.tryAcquirePermission()).willReturn(true);

Flowable.error(new IOException("BAM!"))
Expand Down Expand Up @@ -61,7 +61,7 @@ public void shouldEmitErrorWithCallNotPermittedException() {
}

@Test
public void shouldReleasePermissionOnCancel() {
public void shouldInvokeReleasePermissionReleaseOnCancel() {
given(circuitBreaker.tryAcquirePermission()).willReturn(true);

Flowable.just(1)
Expand All @@ -74,4 +74,18 @@ public void shouldReleasePermissionOnCancel() {
verify(circuitBreaker, never()).onError(anyLong(), any(TimeUnit.class), any(Throwable.class));
verify(circuitBreaker, never()).onSuccess(anyLong(), any(TimeUnit.class));
}

@Test
public void shouldInvokeOnSuccessOnCancelWhenOneEventWasEmitted() {
given(circuitBreaker.tryAcquirePermission()).willReturn(true);

Flowable.just(1,2,3)
.compose(CircuitBreakerOperator.of(circuitBreaker))
.test(1)
.cancel();

verify(circuitBreaker, never()).releasePermission();
verify(circuitBreaker, never()).onError(anyLong(), any(TimeUnit.class), any(Throwable.class));
verify(circuitBreaker, times(1)).onSuccess(anyLong(), any(TimeUnit.class));
}
}

0 comments on commit 3aeeb04

Please sign in to comment.