Skip to content

Commit

Permalink
add FaultTolerance.cast() and castAsync()
Browse files Browse the repository at this point in the history
  • Loading branch information
Ladicek committed Sep 30, 2024
1 parent 3c40a65 commit bf5f183
Show file tree
Hide file tree
Showing 8 changed files with 527 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,34 @@ default Runnable adaptRunnable(Runnable action) {
return () -> run(action);
}

/**
* Casts this {@link FaultTolerance} object so that it guards actions of a different type.
* Since the type of the action is only used in fallback, this is usually safe; if this
* {@code FaultTolerance} object contains a fallback, this method throws an exception.
* <p>
* Note that this method may only be used to cast <em>synchronous</em> {@code FaultTolerance}.
* If this {@code FaultTolerance} object guards asynchronous actions, this method throws
* an exception.
*
* @param <U> type of value of the guarded action
*/
<U> FaultTolerance<U> cast();

/**
* Casts this {@link FaultTolerance} object so that it guards actions of a different type.
* Since the type of the action is only used in fallback, this is usually safe; if this
* {@code FaultTolerance} object contains a fallback, this method throws an exception.
* <p>
* Note that this method may only be used to cast <em>asynchronous</em> {@code FaultTolerance}
* of given type (such as {@code CompletionStage} or {@code Uni}). If this {@code FaultTolerance}
* object guards synchronous actions or asynchronous actions of different type, this method
* throws an exception.
*
* @param asyncType the asynchronous type, such as {@code CompletionStage} or {@code Uni}
* @param <U> type of value of the guarded action
*/
<U> FaultTolerance<U> castAsync(Class<?> asyncType);

/**
* A builder for configuring fault tolerance strategies. A fault tolerance strategy is included in the resulting
* set if the corresponding {@code with[Strategy]} method is called. Each strategy has its own builder to configure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public final class FaultToleranceImpl<V, S, T> implements FaultTolerance<T> {
private final FaultToleranceStrategy<S> strategy;
private final AsyncSupport<V, T> asyncSupport;
private final EventHandlers eventHandlers;
private final boolean hasFallback;

// Circuit breakers created using the programmatic API are registered with `CircuitBreakerMaintenance`
// in two phases:
Expand All @@ -94,10 +95,12 @@ public final class FaultToleranceImpl<V, S, T> implements FaultTolerance<T> {
// but that instance is never used. The useful `FaultTolerance` instance is held by the actual bean instance,
// which is created lazily, on the first method invocation on the client proxy.

FaultToleranceImpl(FaultToleranceStrategy<S> strategy, AsyncSupport<V, T> asyncSupport, EventHandlers eventHandlers) {
FaultToleranceImpl(FaultToleranceStrategy<S> strategy, AsyncSupport<V, T> asyncSupport,
EventHandlers eventHandlers, boolean hasFallback) {
this.strategy = strategy;
this.asyncSupport = asyncSupport;
this.eventHandlers = eventHandlers;
this.hasFallback = hasFallback;
}

@Override
Expand Down Expand Up @@ -130,6 +133,33 @@ public void run(Runnable action) {
}
}

@Override
public <U> FaultTolerance<U> cast() {
if (asyncSupport != null) {
throw new IllegalStateException("This FaultTolerance object guards synchronous actions, use `castAsync()`");
}
if (hasFallback) {
throw new IllegalStateException("This FaultTolerance object contains fallback, cannot cast");
}
return (FaultTolerance<U>) this;
}

@Override
public <U> FaultTolerance<U> castAsync(Class<?> asyncType) {
if (asyncSupport == null) {
throw new IllegalStateException("This FaultTolerance object guards synchronous actions, use `cast()`");
}
AsyncSupport<V, T> asyncSupport = AsyncSupportRegistry.get(new Class[0], asyncType);
if (this.asyncSupport != asyncSupport) {
throw new IllegalStateException("This FaultTolerance object guards actions that "
+ this.asyncSupport.doesDescription() + ", cannot cast to " + asyncType);
}
if (hasFallback) {
throw new IllegalStateException("This FaultTolerance object contains fallback, cannot cast");
}
return (FaultTolerance<U>) this;
}

public static final class BuilderImpl<T, R> implements Builder<T, R> {
private final BuilderEagerDependencies eagerDependencies;
private final Supplier<BuilderLazyDependencies> lazyDependencies;
Expand Down Expand Up @@ -257,13 +287,13 @@ private FaultTolerance<T> build(BuilderLazyDependencies lazyDependencies) {

private FaultTolerance<T> buildSync(BuilderLazyDependencies lazyDependencies, EventHandlers eventHandlers) {
FaultToleranceStrategy<T> strategy = buildSyncStrategy(lazyDependencies);
return new FaultToleranceImpl<>(strategy, (AsyncSupport<T, T>) null, eventHandlers);
return new FaultToleranceImpl<>(strategy, (AsyncSupport<T, T>) null, eventHandlers, fallbackBuilder != null);
}

private <V> FaultTolerance<T> buildAsync(BuilderLazyDependencies lazyDependencies, EventHandlers eventHandlers) {
FaultToleranceStrategy<CompletionStage<V>> strategy = buildAsyncStrategy(lazyDependencies);
AsyncSupport<V, T> asyncSupport = AsyncSupportRegistry.get(new Class[0], asyncType);
return new FaultToleranceImpl<>(strategy, asyncSupport, eventHandlers);
return new FaultToleranceImpl<>(strategy, asyncSupport, eventHandlers, fallbackBuilder != null);
}

private FaultToleranceStrategy<T> buildSyncStrategy(BuilderLazyDependencies lazyDependencies) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ public void run(Runnable action) {
instance().run(action);
}

@Override
public <U> FaultTolerance<U> cast() {
// TODO breaks laziness
return instance().cast();
}

@Override
public <U> FaultTolerance<U> castAsync(Class<?> asyncType) {
// TODO breaks laziness
return instance().castAsync(asyncType);
}

private FaultTolerance<T> instance() {
FaultTolerance<T> instance = this.instance;
if (instance == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package io.smallrye.faulttolerance.mutiny.test;

import static org.assertj.core.api.Assertions.assertThatCode;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.eclipse.microprofile.faulttolerance.exceptions.BulkheadException;
import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.smallrye.faulttolerance.api.FaultTolerance;
import io.smallrye.faulttolerance.api.RateLimitException;
import io.smallrye.faulttolerance.core.util.TestException;
import io.smallrye.faulttolerance.core.util.party.Party;
import io.smallrye.faulttolerance.mutiny.api.MutinyFaultTolerance;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.test.UniAssertSubscriber;

public class MutinyFaultToleranceCastTest {
private ExecutorService executor;

@BeforeEach
public void setUp() {
executor = Executors.newFixedThreadPool(6);
}

@AfterEach
public void tearDown() throws InterruptedException {
executor.shutdownNow();
executor.awaitTermination(1, TimeUnit.SECONDS);
}

@Test
public void castingCircuitBreaker() throws Exception {
FaultTolerance<Uni<String>> guardedString = MutinyFaultTolerance.<String> create()
.withCircuitBreaker().requestVolumeThreshold(6).done()
.build();
FaultTolerance<Uni<Integer>> guardedInteger = guardedString.castAsync(Uni.class);

for (int i = 0; i < 3; i++) {
guardedString.call(this::stringAction)
.subscribe().withSubscriber(new UniAssertSubscriber<>())
.awaitFailure(Duration.ofSeconds(10))
.assertFailedWith(TestException.class);

guardedInteger.call(this::integerAction)
.subscribe().withSubscriber(new UniAssertSubscriber<>())
.awaitFailure(Duration.ofSeconds(10))
.assertFailedWith(TestException.class);
}

guardedString.call(this::stringAction)
.subscribe().withSubscriber(new UniAssertSubscriber<>())
.awaitFailure(Duration.ofSeconds(10))
.assertFailedWith(CircuitBreakerOpenException.class);
}

@Test
public void castingBulkhead() throws Exception {
FaultTolerance<Uni<String>> guardedString = MutinyFaultTolerance.<String> create()
.withBulkhead().limit(6).queueSize(2).done()
.withThreadOffload(true)
.build();
FaultTolerance<Uni<Integer>> guardedInteger = guardedString.castAsync(Uni.class);

Party party = Party.create(6);

for (int i = 0; i < 4; i++) {
guardedString.call(() -> {
party.participant().attend();
return Uni.createFrom().item("ignored");
}).subscribe().asCompletionStage();
guardedInteger.call(() -> {
party.participant().attend();
return Uni.createFrom().item(42);
}).subscribe().asCompletionStage();
}

party.organizer().waitForAll();

guardedString.call(() -> Uni.createFrom().item("value"))
.subscribe().withSubscriber(new UniAssertSubscriber<>())
.awaitFailure(Duration.ofSeconds(10))
.assertFailedWith(BulkheadException.class);

party.organizer().disband();
}

@Test
public void castingRateLimit() throws Exception {
FaultTolerance<Uni<String>> guardedString = MutinyFaultTolerance.<String> create()
.withRateLimit().limit(6).window(1, ChronoUnit.MINUTES).done()
.withThreadOffload(true)
.build();
FaultTolerance<Uni<Integer>> guardedInteger = guardedString.castAsync(Uni.class);

List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < 3; i++) {
futures.add(executor.submit(() -> {
return guardedString.call(() -> Uni.createFrom().item("ignored")).await().indefinitely();
}));
futures.add(executor.submit(() -> {
return guardedInteger.call(() -> Uni.createFrom().item(42)).await().indefinitely();
}));
}

for (Future<?> future : futures) {
future.get();
}

guardedString.call(() -> Uni.createFrom().item("value"))
.subscribe().withSubscriber(new UniAssertSubscriber<>())
.awaitFailure(Duration.ofSeconds(10))
.assertFailedWith(RateLimitException.class);
}

@Test
public void castingFallback() {
FaultTolerance<Uni<String>> guarded = MutinyFaultTolerance.<String> create()
.withFallback().handler(() -> Uni.createFrom().item("fallback")).done()
.build();

assertThatCode(() -> guarded.castAsync(Uni.class)).isExactlyInstanceOf(IllegalStateException.class);
}

@Test
public void castingToSync() {
FaultTolerance<Uni<String>> guarded = MutinyFaultTolerance.<String> create().build();

assertThatCode(guarded::cast).isExactlyInstanceOf(IllegalStateException.class);
}

@Test
public void castingToDifferentAsync() {
FaultTolerance<Uni<String>> guarded = MutinyFaultTolerance.<String> create().build();

assertThatCode(() -> guarded.castAsync(CompletionStage.class)).isExactlyInstanceOf(IllegalStateException.class);
}

public Uni<String> stringAction() {
return Uni.createFrom().failure(new TestException());
}

public Uni<Integer> integerAction() {
return Uni.createFrom().failure(new TestException());
}
}
Loading

0 comments on commit bf5f183

Please sign in to comment.