Skip to content

Commit

Permalink
Merge pull request ReactiveX#260 from madgnome/issue/ReactiveX#245-re…
Browse files Browse the repository at this point in the history
…actor-mono-tofuture-success

ReactiveX#245 Fix CircuitBreakerSubscriber for Reactor doesn't count successes…
  • Loading branch information
storozhukBM authored Sep 12, 2018
2 parents a9abce7 + a3c39d0 commit bc45b2f
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.reactivestreams.Subscriber;
import reactor.core.CoreSubscriber;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import static java.util.Objects.requireNonNull;

/**
Expand All @@ -33,23 +35,38 @@ class CircuitBreakerSubscriber<T> extends ResilienceBaseSubscriber<T> {

private final CircuitBreaker circuitBreaker;
private StopWatch stopWatch;
private final boolean singleProducer;

@SuppressWarnings("PMD")
private volatile int successSignaled = 0;
private static final AtomicIntegerFieldUpdater<CircuitBreakerSubscriber> SUCCESS_SIGNALED =
AtomicIntegerFieldUpdater.newUpdater(CircuitBreakerSubscriber.class, "successSignaled");

public CircuitBreakerSubscriber(CircuitBreaker circuitBreaker,
CoreSubscriber<? super T> actual) {
CoreSubscriber<? super T> actual,
boolean singleProducer) {
super(actual);
this.circuitBreaker = requireNonNull(circuitBreaker);
this.singleProducer = singleProducer;
}

@Override
protected void hookOnNext(T value) {
if (singleProducer && SUCCESS_SIGNALED.compareAndSet(this, 0, 1)) {
markSuccess();
}

if (notCancelled() && wasCallPermitted()) {
actual.onNext(value);
}
}

@Override
protected void hookOnComplete() {
markSuccess();
if (SUCCESS_SIGNALED.compareAndSet(this, 0, 1)) {
markSuccess();
}

if (wasCallPermitted()) {
actual.onComplete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public FluxCircuitBreaker(Flux<? extends T> source, CircuitBreaker circuitBreake

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
source.subscribe(new CircuitBreakerSubscriber<>(circuitBreaker, actual));
source.subscribe(new CircuitBreakerSubscriber<>(circuitBreaker, actual, false));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ public MonoCircuitBreaker(Mono<? extends T> source, CircuitBreaker circuitBreake

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
source.subscribe(new CircuitBreakerSubscriber<>(circuitBreaker, actual));
source.subscribe(new CircuitBreakerSubscriber<>(circuitBreaker, actual, true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public CircuitBreakerSubscriberWhiteboxVerification() {

@Override
public Subscriber<Integer> createSubscriber(WhiteboxSubscriberProbe<Integer> probe) {
return new CircuitBreakerSubscriber<Integer>(CircuitBreaker.ofDefaults("verification"), MonoProcessor.create()) {
return new CircuitBreakerSubscriber<Integer>(CircuitBreaker.ofDefaults("verification"), MonoProcessor.create(), false) {

@Override
protected void hookOnSubscribe(Subscription subscription) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.ExecutionException;

import static org.junit.Assert.fail;

public class MonoCircuitBreakerTest extends CircuitBreakerAssertions {

Expand All @@ -36,6 +39,16 @@ public void shouldEmitEvent() {
assertSingleSuccessfulCall();
}

@Test
public void shouldEmptyMonoShouldBeSuccessful() {
StepVerifier.create(
Mono.empty()
.transform(CircuitBreakerOperator.of(circuitBreaker)))
.verifyComplete();

assertSingleSuccessfulCall();
}

@Test
public void shouldPropagateError() {
StepVerifier.create(
Expand Down Expand Up @@ -82,4 +95,19 @@ public void shouldEmitErrorWithCircuitBreakerOpenException() {

assertNoRegisteredCall();
}

@Test
public void shouldRecordSuccessWhenUsingToFuture() {
try {
Mono.just("Event")
.transform(CircuitBreakerOperator.of(circuitBreaker))
.toFuture()
.get();

assertSingleSuccessfulCall();
} catch (InterruptedException | ExecutionException e) {
fail();
}

}
}

0 comments on commit bc45b2f

Please sign in to comment.