Skip to content

Commit

Permalink
Issue ReactiveX#518: Added decorators to CircuitBreaker and Retry for…
Browse files Browse the repository at this point in the history
… Supplier… (ReactiveX#519)
  • Loading branch information
RobWin authored Jun 28, 2019
1 parent fa85386 commit 322ee78
Show file tree
Hide file tree
Showing 10 changed files with 771 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import io.vavr.CheckedFunction0;
import io.vavr.CheckedFunction1;
import io.vavr.CheckedRunnable;
import io.vavr.control.Either;
import io.vavr.control.Try;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -127,6 +129,28 @@ default <T> T executeSupplier(Supplier<T> supplier){
return decorateSupplier(this, supplier).get();
}

/**
* Decorates and executes the decorated Supplier.
*
* @param supplier the original Supplier
* @param <T> the type of results supplied by this supplier
* @return the result of the decorated Supplier.
*/
default <T> Try<T> executeTrySupplier(Supplier<Try<T>> supplier){
return decorateTrySupplier(this, supplier).get();
}

/**
* Decorates and executes the decorated Supplier.
*
* @param supplier the original Supplier
* @param <T> the type of results supplied by this supplier
* @return the result of the decorated Supplier.
*/
default <T> Either<Exception, T> executeEitherSupplier(Supplier<Either<? extends Exception, T>> supplier){
return decorateEitherSupplier(this, supplier).get();
}

/**
* Decorates and executes the decorated Callable.
*
Expand Down Expand Up @@ -294,6 +318,55 @@ static <T> Supplier<T> decorateSupplier(Bulkhead bulkhead, Supplier<T> supplier)
};
}

/**
* Returns a supplier which is decorated by a bulkhead.
*
* @param bulkhead the bulkhead
* @param supplier the original supplier
* @param <T> the type of results supplied by this supplier
*
* @return a supplier which is decorated by a Bulkhead.
*/
static <T> Supplier<Try<T>> decorateTrySupplier(Bulkhead bulkhead, Supplier<Try<T>> supplier){
return () -> {
if(bulkhead.tryAcquirePermission()){
try {
return supplier.get();
}
finally {
bulkhead.onComplete();
}
}else{
return Try.failure(new BulkheadFullException(bulkhead));
}
};
}

/**
* Returns a supplier which is decorated by a bulkhead.
*
* @param bulkhead the bulkhead
* @param supplier the original supplier
* @param <T> the type of results supplied by this supplier
*
* @return a supplier which is decorated by a Bulkhead.
*/
static <T> Supplier<Either<Exception, T>> decorateEitherSupplier(Bulkhead bulkhead, Supplier<Either<? extends Exception, T>> supplier){
return () -> {
if(bulkhead.tryAcquirePermission()){
try {
Either<? extends Exception, T> result = supplier.get();
return Either.narrow(result);
}
finally {
bulkhead.onComplete();
}
}else{
return Either.left(new BulkheadFullException(bulkhead));
}
};
}

/**
* Returns a consumer which is decorated by a bulkhead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.vavr.CheckedFunction0;
import io.vavr.CheckedFunction1;
import io.vavr.CheckedRunnable;
import io.vavr.control.Either;
import io.vavr.control.Try;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -40,6 +41,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;

public class BulkheadTest {
Expand Down Expand Up @@ -595,4 +597,98 @@ public void shouldInvokeMap() {
// end::shouldInvokeMap[]
}

@Test
public void shouldDecorateTrySupplierAndReturnWithSuccess() {
// Given
Bulkhead bulkhead = Bulkhead.of("test", config);
BDDMockito.given(helloWorldService.returnTry()).willReturn(Try.success("Hello world"));

// When
Try<String> result = bulkhead.executeTrySupplier(helloWorldService::returnTry);

// Then
assertThat(result.get()).isEqualTo("Hello world");
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
BDDMockito.then(helloWorldService).should(times(1)).returnTry();
}

@Test
public void shouldDecorateTrySupplierAndReturnWithException() {
// Given
Bulkhead bulkhead = Bulkhead.of("test", config);
BDDMockito.given(helloWorldService.returnTry()).willReturn(Try.failure(new RuntimeException("BAM!")));

// When
Try<String> result = bulkhead.executeTrySupplier(helloWorldService::returnTry);

//Then
assertThat(result.isFailure()).isTrue();
assertThat(result.failed().get()).isInstanceOf(RuntimeException.class);
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
BDDMockito.then(helloWorldService).should(times(1)).returnTry();
}

@Test
public void shouldDecorateEitherSupplierAndReturnWithSuccess() {

// Given
Bulkhead bulkhead = Bulkhead.of("test", config);
BDDMockito.given(helloWorldService.returnEither()).willReturn(Either.right("Hello world"));

// When
Either<Exception, String> result = bulkhead.executeEitherSupplier(helloWorldService::returnEither);

// Then
assertThat(result.get()).isEqualTo("Hello world");
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
BDDMockito.then(helloWorldService).should(times(1)).returnEither();
}

@Test
public void shouldDecorateEitherSupplierAndReturnWithException() {
// Given
Bulkhead bulkhead = Bulkhead.of("test", config);
BDDMockito.given(helloWorldService.returnEither()).willReturn(Either.left(new WebServiceException("BAM!")));

// When
Either<Exception, String> result = bulkhead.executeEitherSupplier(helloWorldService::returnEither);

//Then
assertThat(result.isLeft()).isTrue();
assertThat(result.getLeft()).isInstanceOf(RuntimeException.class);
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
BDDMockito.then(helloWorldService).should(times(1)).returnEither();
}

@Test
public void shouldDecorateTrySupplierAndReturnWithBulkheadFullException() {
// Given
Bulkhead bulkhead = Mockito.mock(Bulkhead.class);
BDDMockito.given(bulkhead.tryAcquirePermission()).willReturn(false);

// When
Try<String> result = Bulkhead.decorateTrySupplier(bulkhead, helloWorldService::returnTry).get();

//Then
assertThat(result.isFailure()).isTrue();
assertThat(result.getCause()).isInstanceOf(BulkheadFullException.class);
BDDMockito.then(helloWorldService).should(never()).returnTry();
}

@Test
public void shouldDecorateEitherSupplierAndReturnWithBulkheadFullException() {
// Given
Bulkhead bulkhead = Mockito.mock(Bulkhead.class);
BDDMockito.given(bulkhead.tryAcquirePermission()).willReturn(false);

// When
Either<Exception, String> result = Bulkhead.decorateEitherSupplier(bulkhead, helloWorldService::returnEither).get();

//Then
assertThat(result.isLeft()).isTrue();
assertThat(result.getLeft()).isInstanceOf(BulkheadFullException.class);
BDDMockito.then(helloWorldService).should(never()).returnEither();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.github.resilience4j.circuitbreaker.internal.CircuitBreakerStateMachine;
import io.github.resilience4j.core.EventConsumer;
import io.vavr.*;
import io.vavr.control.Either;
import io.vavr.control.Try;

import java.util.Arrays;
import java.util.Map;
Expand Down Expand Up @@ -210,6 +212,52 @@ default <T> Supplier<T> decorateSupplier(Supplier<T> supplier){
return decorateSupplier(this, supplier);
}

/**
* Decorates and executes the decorated Supplier.
*
* @param supplier the original Supplier
* @param <T> the type of results supplied by this supplier
* @return the result of the decorated Supplier.
*/
default <T> Either<Exception, T> executeEitherSupplier(Supplier<Either<? extends Exception, T>> supplier){
return decorateEitherSupplier(this, supplier).get();
}

/**
* Returns a supplier which is decorated by a CircuitBreaker.
*
* @param supplier the original supplier
* @param <T> the type of results supplied by this supplier
*
* @return a supplier which is decorated by a CircuitBreaker.
*/
default <T> Supplier<Try<T>> decorateTrySupplier(Supplier<Try<T>> supplier){
return decorateTrySupplier(this, supplier);
}

/**
* Decorates and executes the decorated Supplier.
*
* @param supplier the original Supplier
* @param <T> the type of results supplied by this supplier
* @return the result of the decorated Supplier.
*/
default <T> Try<T> executeTrySupplier(Supplier<Try<T>> supplier){
return decorateTrySupplier(this, supplier).get();
}

/**
* Returns a supplier which is decorated by a CircuitBreaker.
*
* @param supplier the original supplier
* @param <T> the type of results supplied by this supplier
*
* @return a supplier which is decorated by a CircuitBreaker.
*/
default <T> Supplier<Either<Exception, T>> decorateEitherSupplier(Supplier<Either<? extends Exception, T>> supplier){
return decorateEitherSupplier(this, supplier);
}

/**
* Decorates and executes the decorated Callable.
*
Expand Down Expand Up @@ -666,6 +714,62 @@ static <T> Supplier<T> decorateSupplier(CircuitBreaker circuitBreaker, Supplier<
};
}

/**
* Returns a supplier which is decorated by a CircuitBreaker.
*
* @param circuitBreaker the CircuitBreaker
* @param supplier the original supplier
* @param <T> the type of results supplied by this supplier
*
* @return a supplier which is decorated by a CircuitBreaker.
*/
static <T> Supplier<Either<Exception, T>> decorateEitherSupplier(CircuitBreaker circuitBreaker, Supplier<Either<? extends Exception, T>> supplier) {
return () -> {
if(circuitBreaker.tryAcquirePermission()) {
circuitBreaker.acquirePermission();
long start = System.nanoTime();
Either<? extends Exception, T> result = supplier.get();
long durationInNanos = System.nanoTime() - start;
if (result.isRight()) {
circuitBreaker.onSuccess(durationInNanos);
} else {
Exception exception = result.getLeft();
circuitBreaker.onError(durationInNanos, exception);
}
return Either.narrow(result);
}else{
return Either.left(new CallNotPermittedException(circuitBreaker));
}
};
}

/**
* Returns a supplier which is decorated by a CircuitBreaker.
*
* @param circuitBreaker the CircuitBreaker
* @param supplier the original function
* @param <T> the type of results supplied by this supplier
* @return a retryable function
*/
static <T> Supplier<Try<T>> decorateTrySupplier(CircuitBreaker circuitBreaker, Supplier<Try<T>> supplier) {
return () -> {
if(circuitBreaker.tryAcquirePermission()){
long start = System.nanoTime();
Try<T> result = supplier.get();
long durationInNanos = System.nanoTime() - start;
if(result.isSuccess()){
circuitBreaker.onSuccess(durationInNanos);
return result;
}else{
circuitBreaker.onError(durationInNanos, result.getCause());
return result;
}
}else{
return Try.failure(new CallNotPermittedException(circuitBreaker));
}
};
}

/**
* Returns a consumer which is decorated by a CircuitBreaker.
Expand Down
Loading

0 comments on commit 322ee78

Please sign in to comment.