Skip to content

Commit

Permalink
Issue ReactiveX#657: Added Future decorator to Bulkhead interface (Re…
Browse files Browse the repository at this point in the history
  • Loading branch information
pulkitmehra authored and RobWin committed Nov 6, 2019
1 parent e0221c7 commit 9f62d71
Show file tree
Hide file tree
Showing 2 changed files with 291 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.github.resilience4j.bulkhead.internal.SemaphoreBulkhead;
import io.github.resilience4j.core.EventConsumer;
import io.github.resilience4j.core.exception.AcquirePermissionCancelledException;
import io.github.resilience4j.core.functions.OnceConsumer;
import io.vavr.CheckedConsumer;
import io.vavr.CheckedFunction0;
import io.vavr.CheckedFunction1;
Expand All @@ -33,9 +34,8 @@
import io.vavr.control.Either;
import io.vavr.control.Try;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -118,6 +118,32 @@ static <T> Supplier<CompletionStage<T>> decorateCompletionStage(Bulkhead bulkhea
};
}

/**
* Returns a supplier of type Future which is decorated by a bulkhead. Bulkhead will reserve permission until {@link Future#get()}
* or {@link Future#get(long, TimeUnit)} is evaluated even if the underlying call took less time to return. Any delays in evaluating
* future will result in holding of permission in the underlying Semaphore.
*
* @param bulkhead the bulkhead
* @param supplier the original supplier
* @param <T> the type of the returned Future result
* @return a supplier which is decorated by a Bulkhead.
*/
static <T> Supplier<Future<T>> decorateFuture(Bulkhead bulkhead, Supplier<Future<T>> supplier) {
return () -> {
if (!bulkhead.tryAcquirePermission()) {
final CompletableFuture<T> promise = new CompletableFuture<>();
promise.completeExceptionally(BulkheadFullException.createBulkheadFullException(bulkhead));
return promise;
}
try {
return new BulkheadFuture<T>(bulkhead, supplier.get());
} catch (Throwable e) {
bulkhead.onComplete();
throw e;
}
};
}

/**
* Returns a runnable which is decorated by a bulkhead.
*
Expand Down Expand Up @@ -564,4 +590,54 @@ interface EventPublisher extends io.github.resilience4j.core.EventPublisher<Bulk

EventPublisher onCallFinished(EventConsumer<BulkheadOnCallFinishedEvent> eventConsumer);
}

/**
* This class decorates future with Bulkhead functionality around invocation.
*
* @param <T> of return type
*/
final class BulkheadFuture<T> implements Future<T> {
final private Future<T> future;
final private OnceConsumer<Bulkhead> onceToBulkhead;

BulkheadFuture(Bulkhead bulkhead, Future<T> future) {
Objects.requireNonNull(future, "Non null Future is required to decorate");
this.onceToBulkhead = OnceConsumer.of(bulkhead);
this.future = future;

}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return future.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return future.isCancelled();
}

@Override
public boolean isDone() {
return future.isDone();
}

@Override
public T get() throws InterruptedException, ExecutionException {
try {
return future.get();
} finally {
onceToBulkhead.applyOnce(bh -> bh.onComplete());
}
}

@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
try {
return future.get(timeout, unit);
} finally {
onceToBulkhead.applyOnce(bh -> bh.onComplete());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package io.github.resilience4j.bulkhead;

import io.github.resilience4j.test.HelloWorldService;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.*;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;

public class BulkheadFutureTest {

private HelloWorldService helloWorldService;
private Future future;
private BulkheadConfig config;

@Before
public void setUp() {
helloWorldService = mock(HelloWorldService.class);
future = mock(Future.class);
config = BulkheadConfig.custom()
.maxConcurrentCalls(1)
.build();
}

@Test
public void shouldDecorateSupplierAndReturnWithSuccess() throws Exception {
Bulkhead bulkhead = Bulkhead.of("test", config);

given(future.get()).willReturn("Hello world");
given(helloWorldService.returnHelloWorldFuture()).willReturn(future);

Supplier<Future<String>> supplier = Bulkhead
.decorateFuture(bulkhead, helloWorldService::returnHelloWorldFuture);

String result = supplier.get().get();

assertThat(result).isEqualTo("Hello world");
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
then(helloWorldService).should(times(1)).returnHelloWorldFuture();
then(future).should(times(1)).get();
}

@Test
public void shouldDecorateSupplierAndReturnWithSuccessAndTimeout() throws Exception {
Bulkhead bulkhead = Bulkhead.of("test", config);

given(future.get(anyLong(), any(TimeUnit.class))).willReturn("Hello world");
given(helloWorldService.returnHelloWorldFuture()).willReturn(future);

Supplier<Future<String>> supplier = Bulkhead
.decorateFuture(bulkhead, helloWorldService::returnHelloWorldFuture);

String result = supplier.get().get(5, TimeUnit.SECONDS);

assertThat(result).isEqualTo("Hello world");
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
then(helloWorldService).should(times(1)).returnHelloWorldFuture();
then(future).should(times(1)).get(anyLong(), any(TimeUnit.class));
}

@Test
public void shouldDecorateFutureAndBulkheadApplyOnceOnMultipleFutureEval() throws Exception {
Bulkhead bulkhead = Bulkhead.of("test", config);

given(future.get(anyLong(), any(TimeUnit.class))).willReturn("Hello world");
given(helloWorldService.returnHelloWorldFuture()).willReturn(future);

Supplier<Future<String>> supplier = Bulkhead
.decorateFuture(bulkhead, helloWorldService::returnHelloWorldFuture);

Future<String> decoratedFuture = supplier.get();

decoratedFuture.get(5, TimeUnit.SECONDS);
decoratedFuture.get(5, TimeUnit.SECONDS);

assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
then(helloWorldService).should(times(1)).returnHelloWorldFuture();
then(future).should(times(2)).get(anyLong(), any(TimeUnit.class));
}

@Test
public void shouldDecorateFutureAndBulkheadApplyOnceOnMultipleFutureEvalFailure() throws Exception {
Bulkhead bulkhead = Bulkhead.of("test", config);

given(future.get()).willThrow(new ExecutionException(new RuntimeException("Hello world")));
given(helloWorldService.returnHelloWorldFuture()).willReturn(future);

Supplier<Future<String>> supplier = Bulkhead
.decorateFuture(bulkhead, helloWorldService::returnHelloWorldFuture);

Future<String> decoratedFuture = supplier.get();

catchThrowable(() -> decoratedFuture.get());
catchThrowable(() -> decoratedFuture.get());

assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
then(helloWorldService).should(times(1)).returnHelloWorldFuture();
then(future).should(times(2)).get();
}

@Test
public void shouldDecorateSupplierAndReturnWithExceptionAtAsyncStage() throws Exception {
Bulkhead bulkhead = Bulkhead.of("test", config);

given(future.get()).willThrow(new ExecutionException(new RuntimeException("BAM!")));
given(helloWorldService.returnHelloWorldFuture()).willReturn(future);

Supplier<Future<String>> supplier = Bulkhead
.decorateFuture(bulkhead, helloWorldService::returnHelloWorldFuture);

Throwable thrown = catchThrowable(() -> supplier.get().get());

assertThat(thrown).isInstanceOf(ExecutionException.class)
.hasCauseInstanceOf(RuntimeException.class);

assertThat(thrown.getCause().getMessage()).isEqualTo("BAM!");

assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
then(helloWorldService).should(times(1)).returnHelloWorldFuture();
then(future).should(times(1)).get();
}

@Test
public void shouldDecorateSupplierAndReturnWithExceptionAtSyncStage() throws Exception {
Bulkhead bulkhead = Bulkhead.of("test", config);

given(helloWorldService.returnHelloWorldFuture()).willThrow(new RuntimeException("BAM!"));

Supplier<Future<String>> supplier = Bulkhead
.decorateFuture(bulkhead, helloWorldService::returnHelloWorldFuture);

Throwable thrown = catchThrowable(() -> supplier.get().get());

assertThat(thrown).isInstanceOf(RuntimeException.class)
.hasMessage("BAM!");

assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
then(helloWorldService).should(times(1)).returnHelloWorldFuture();
then(future).shouldHaveZeroInteractions();
}

@Test
public void shouldReturnFailureWithBulkheadFullException() throws Exception {
// tag::bulkheadFullException[]
BulkheadConfig config = BulkheadConfig.custom().maxConcurrentCalls(2).build();
Bulkhead bulkhead = Bulkhead.of("test", config);
bulkhead.tryAcquirePermission();
bulkhead.tryAcquirePermission();
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);

given(future.get()).willReturn("Hello world");
given(helloWorldService.returnHelloWorldFuture()).willReturn(future);

Supplier<Future<String>> supplier = Bulkhead.decorateFuture(bulkhead, helloWorldService::returnHelloWorldFuture);

Throwable thrown = catchThrowable(() -> supplier.get().get());

assertThat(thrown).isInstanceOf(ExecutionException.class)
.hasCauseInstanceOf(BulkheadFullException.class);

then(helloWorldService).shouldHaveZeroInteractions();
then(future).shouldHaveZeroInteractions();
// end::bulkheadFullException[]
}

@Test
public void shouldReturnFailureWithFutureCancellationException() throws Exception {
Bulkhead bulkhead = Bulkhead.of("test", config);

given(future.get()).willThrow(new CancellationException());
given(helloWorldService.returnHelloWorldFuture()).willReturn(future);

Supplier<Future<String>> supplier = Bulkhead
.decorateFuture(bulkhead, helloWorldService::returnHelloWorldFuture);

Throwable thrown = catchThrowable(() -> supplier.get().get());

assertThat(thrown).isInstanceOf(CancellationException.class);

assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
then(helloWorldService).should(times(1)).returnHelloWorldFuture();
then(future).should(times(1)).get();
}

@Test
public void shouldReturnFailureWithFutureTimeoutException() throws Exception {
Bulkhead bulkhead = Bulkhead.of("test", config);

given(future.get(anyLong(), any(TimeUnit.class))).willThrow(new TimeoutException());
given(helloWorldService.returnHelloWorldFuture()).willReturn(future);

Supplier<Future<String>> supplier = Bulkhead
.decorateFuture(bulkhead, helloWorldService::returnHelloWorldFuture);

Throwable thrown = catchThrowable(() -> supplier.get().get(5, TimeUnit.SECONDS));

assertThat(thrown).isInstanceOf(TimeoutException.class);

assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
then(helloWorldService).should(times(1)).returnHelloWorldFuture();
then(future).should(times(1)).get(anyLong(), any(TimeUnit.class));
}
}

0 comments on commit 9f62d71

Please sign in to comment.