diff --git a/api/src/main/java/io/smallrye/faulttolerance/api/FaultTolerance.java b/api/src/main/java/io/smallrye/faulttolerance/api/FaultTolerance.java index fd145e5f..3ffbc172 100644 --- a/api/src/main/java/io/smallrye/faulttolerance/api/FaultTolerance.java +++ b/api/src/main/java/io/smallrye/faulttolerance/api/FaultTolerance.java @@ -226,6 +226,34 @@ default Runnable adaptRunnable(Runnable action) { return () -> run(action); } + /** + * Casts this {@link FaultTolerance} object so that it guards actions of a different type. + * Since the type of the action is only used in fallback, this is usually safe; if this + * {@code FaultTolerance} object contains a fallback, this method throws an exception. + *

+ * Note that this method may only be used to cast synchronous {@code FaultTolerance}. + * If this {@code FaultTolerance} object guards asynchronous actions, this method throws + * an exception. + * + * @param type of value of the guarded action + */ + FaultTolerance cast(); + + /** + * Casts this {@link FaultTolerance} object so that it guards actions of a different type. + * Since the type of the action is only used in fallback, this is usually safe; if this + * {@code FaultTolerance} object contains a fallback, this method throws an exception. + *

+ * Note that this method may only be used to cast asynchronous {@code FaultTolerance} + * of given type (such as {@code CompletionStage} or {@code Uni}). If this {@code FaultTolerance} + * object guards synchronous actions or asynchronous actions of different type, this method + * throws an exception. + * + * @param asyncType the asynchronous type, such as {@code CompletionStage} or {@code Uni} + * @param type of value of the guarded action + */ + FaultTolerance castAsync(Class asyncType); + /** * A builder for configuring fault tolerance strategies. A fault tolerance strategy is included in the resulting * set if the corresponding {@code with[Strategy]} method is called. Each strategy has its own builder to configure diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/apiimpl/FaultToleranceImpl.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/apiimpl/FaultToleranceImpl.java index 08a68b81..c861b288 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/apiimpl/FaultToleranceImpl.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/apiimpl/FaultToleranceImpl.java @@ -75,6 +75,7 @@ public final class FaultToleranceImpl implements FaultTolerance { private final FaultToleranceStrategy strategy; private final AsyncSupport asyncSupport; private final EventHandlers eventHandlers; + private final boolean hasFallback; // Circuit breakers created using the programmatic API are registered with `CircuitBreakerMaintenance` // in two phases: @@ -94,10 +95,12 @@ public final class FaultToleranceImpl implements FaultTolerance { // but that instance is never used. The useful `FaultTolerance` instance is held by the actual bean instance, // which is created lazily, on the first method invocation on the client proxy. - FaultToleranceImpl(FaultToleranceStrategy strategy, AsyncSupport asyncSupport, EventHandlers eventHandlers) { + FaultToleranceImpl(FaultToleranceStrategy strategy, AsyncSupport asyncSupport, + EventHandlers eventHandlers, boolean hasFallback) { this.strategy = strategy; this.asyncSupport = asyncSupport; this.eventHandlers = eventHandlers; + this.hasFallback = hasFallback; } @Override @@ -130,6 +133,33 @@ public void run(Runnable action) { } } + @Override + public FaultTolerance cast() { + if (asyncSupport != null) { + throw new IllegalStateException("This FaultTolerance object guards synchronous actions, use `castAsync()`"); + } + if (hasFallback) { + throw new IllegalStateException("This FaultTolerance object contains fallback, cannot cast"); + } + return (FaultTolerance) this; + } + + @Override + public FaultTolerance castAsync(Class asyncType) { + if (asyncSupport == null) { + throw new IllegalStateException("This FaultTolerance object guards synchronous actions, use `cast()`"); + } + AsyncSupport asyncSupport = AsyncSupportRegistry.get(new Class[0], asyncType); + if (this.asyncSupport != asyncSupport) { + throw new IllegalStateException("This FaultTolerance object guards actions that " + + this.asyncSupport.doesDescription() + ", cannot cast to " + asyncType); + } + if (hasFallback) { + throw new IllegalStateException("This FaultTolerance object contains fallback, cannot cast"); + } + return (FaultTolerance) this; + } + public static final class BuilderImpl implements Builder { private final BuilderEagerDependencies eagerDependencies; private final Supplier lazyDependencies; @@ -257,13 +287,13 @@ private FaultTolerance build(BuilderLazyDependencies lazyDependencies) { private FaultTolerance buildSync(BuilderLazyDependencies lazyDependencies, EventHandlers eventHandlers) { FaultToleranceStrategy strategy = buildSyncStrategy(lazyDependencies); - return new FaultToleranceImpl<>(strategy, (AsyncSupport) null, eventHandlers); + return new FaultToleranceImpl<>(strategy, (AsyncSupport) null, eventHandlers, fallbackBuilder != null); } private FaultTolerance buildAsync(BuilderLazyDependencies lazyDependencies, EventHandlers eventHandlers) { FaultToleranceStrategy> strategy = buildAsyncStrategy(lazyDependencies); AsyncSupport asyncSupport = AsyncSupportRegistry.get(new Class[0], asyncType); - return new FaultToleranceImpl<>(strategy, asyncSupport, eventHandlers); + return new FaultToleranceImpl<>(strategy, asyncSupport, eventHandlers, fallbackBuilder != null); } private FaultToleranceStrategy buildSyncStrategy(BuilderLazyDependencies lazyDependencies) { diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/apiimpl/LazyFaultTolerance.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/apiimpl/LazyFaultTolerance.java index 6d925e6f..c65ca6f3 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/apiimpl/LazyFaultTolerance.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/apiimpl/LazyFaultTolerance.java @@ -38,6 +38,18 @@ public void run(Runnable action) { instance().run(action); } + @Override + public FaultTolerance cast() { + // TODO breaks laziness + return instance().cast(); + } + + @Override + public FaultTolerance castAsync(Class asyncType) { + // TODO breaks laziness + return instance().castAsync(asyncType); + } + private FaultTolerance instance() { FaultTolerance instance = this.instance; if (instance == null) { diff --git a/implementation/mutiny/src/test/java/io/smallrye/faulttolerance/mutiny/test/MutinyFaultToleranceCastTest.java b/implementation/mutiny/src/test/java/io/smallrye/faulttolerance/mutiny/test/MutinyFaultToleranceCastTest.java new file mode 100644 index 00000000..63640e3a --- /dev/null +++ b/implementation/mutiny/src/test/java/io/smallrye/faulttolerance/mutiny/test/MutinyFaultToleranceCastTest.java @@ -0,0 +1,157 @@ +package io.smallrye.faulttolerance.mutiny.test; + +import static org.assertj.core.api.Assertions.assertThatCode; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.eclipse.microprofile.faulttolerance.exceptions.BulkheadException; +import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.smallrye.faulttolerance.api.FaultTolerance; +import io.smallrye.faulttolerance.api.RateLimitException; +import io.smallrye.faulttolerance.core.util.TestException; +import io.smallrye.faulttolerance.core.util.party.Party; +import io.smallrye.faulttolerance.mutiny.api.MutinyFaultTolerance; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.helpers.test.UniAssertSubscriber; + +public class MutinyFaultToleranceCastTest { + private ExecutorService executor; + + @BeforeEach + public void setUp() { + executor = Executors.newFixedThreadPool(6); + } + + @AfterEach + public void tearDown() throws InterruptedException { + executor.shutdownNow(); + executor.awaitTermination(1, TimeUnit.SECONDS); + } + + @Test + public void castingCircuitBreaker() throws Exception { + FaultTolerance> guardedString = MutinyFaultTolerance. create() + .withCircuitBreaker().requestVolumeThreshold(6).done() + .build(); + FaultTolerance> guardedInteger = guardedString.castAsync(Uni.class); + + for (int i = 0; i < 3; i++) { + guardedString.call(this::stringAction) + .subscribe().withSubscriber(new UniAssertSubscriber<>()) + .awaitFailure(Duration.ofSeconds(10)) + .assertFailedWith(TestException.class); + + guardedInteger.call(this::integerAction) + .subscribe().withSubscriber(new UniAssertSubscriber<>()) + .awaitFailure(Duration.ofSeconds(10)) + .assertFailedWith(TestException.class); + } + + guardedString.call(this::stringAction) + .subscribe().withSubscriber(new UniAssertSubscriber<>()) + .awaitFailure(Duration.ofSeconds(10)) + .assertFailedWith(CircuitBreakerOpenException.class); + } + + @Test + public void castingBulkhead() throws Exception { + FaultTolerance> guardedString = MutinyFaultTolerance. create() + .withBulkhead().limit(6).queueSize(2).done() + .withThreadOffload(true) + .build(); + FaultTolerance> guardedInteger = guardedString.castAsync(Uni.class); + + Party party = Party.create(6); + + for (int i = 0; i < 4; i++) { + guardedString.call(() -> { + party.participant().attend(); + return Uni.createFrom().item("ignored"); + }).subscribe().asCompletionStage(); + guardedInteger.call(() -> { + party.participant().attend(); + return Uni.createFrom().item(42); + }).subscribe().asCompletionStage(); + } + + party.organizer().waitForAll(); + + guardedString.call(() -> Uni.createFrom().item("value")) + .subscribe().withSubscriber(new UniAssertSubscriber<>()) + .awaitFailure(Duration.ofSeconds(10)) + .assertFailedWith(BulkheadException.class); + + party.organizer().disband(); + } + + @Test + public void castingRateLimit() throws Exception { + FaultTolerance> guardedString = MutinyFaultTolerance. create() + .withRateLimit().limit(6).window(1, ChronoUnit.MINUTES).done() + .withThreadOffload(true) + .build(); + FaultTolerance> guardedInteger = guardedString.castAsync(Uni.class); + + List> futures = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + futures.add(executor.submit(() -> { + return guardedString.call(() -> Uni.createFrom().item("ignored")).await().indefinitely(); + })); + futures.add(executor.submit(() -> { + return guardedInteger.call(() -> Uni.createFrom().item(42)).await().indefinitely(); + })); + } + + for (Future future : futures) { + future.get(); + } + + guardedString.call(() -> Uni.createFrom().item("value")) + .subscribe().withSubscriber(new UniAssertSubscriber<>()) + .awaitFailure(Duration.ofSeconds(10)) + .assertFailedWith(RateLimitException.class); + } + + @Test + public void castingFallback() { + FaultTolerance> guarded = MutinyFaultTolerance. create() + .withFallback().handler(() -> Uni.createFrom().item("fallback")).done() + .build(); + + assertThatCode(() -> guarded.castAsync(Uni.class)).isExactlyInstanceOf(IllegalStateException.class); + } + + @Test + public void castingToSync() { + FaultTolerance> guarded = MutinyFaultTolerance. create().build(); + + assertThatCode(guarded::cast).isExactlyInstanceOf(IllegalStateException.class); + } + + @Test + public void castingToDifferentAsync() { + FaultTolerance> guarded = MutinyFaultTolerance. create().build(); + + assertThatCode(() -> guarded.castAsync(CompletionStage.class)).isExactlyInstanceOf(IllegalStateException.class); + } + + public Uni stringAction() { + return Uni.createFrom().failure(new TestException()); + } + + public Uni integerAction() { + return Uni.createFrom().failure(new TestException()); + } +} diff --git a/implementation/standalone/src/test/java/io/smallrye/faulttolerance/standalone/test/StandaloneFaultToleranceCastAsyncTest.java b/implementation/standalone/src/test/java/io/smallrye/faulttolerance/standalone/test/StandaloneFaultToleranceCastAsyncTest.java new file mode 100644 index 00000000..208651e5 --- /dev/null +++ b/implementation/standalone/src/test/java/io/smallrye/faulttolerance/standalone/test/StandaloneFaultToleranceCastAsyncTest.java @@ -0,0 +1,150 @@ +package io.smallrye.faulttolerance.standalone.test; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.failedFuture; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.eclipse.microprofile.faulttolerance.exceptions.BulkheadException; +import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.smallrye.faulttolerance.api.FaultTolerance; +import io.smallrye.faulttolerance.api.RateLimitException; +import io.smallrye.faulttolerance.core.util.TestException; +import io.smallrye.faulttolerance.core.util.party.Party; + +public class StandaloneFaultToleranceCastAsyncTest { + private ExecutorService executor; + + @BeforeEach + public void setUp() { + executor = Executors.newFixedThreadPool(6); + } + + @AfterEach + public void tearDown() throws InterruptedException { + executor.shutdownNow(); + executor.awaitTermination(1, TimeUnit.SECONDS); + } + + @Test + public void castingAsyncCircuitBreaker() throws Exception { + FaultTolerance> guardedString = FaultTolerance. createAsync() + .withCircuitBreaker().requestVolumeThreshold(6).done() + .build(); + FaultTolerance> guardedInteger = guardedString.castAsync(CompletionStage.class); + + for (int i = 0; i < 3; i++) { + assertThat(guardedString.call(this::stringAction)) + .failsWithin(10, TimeUnit.SECONDS) + .withThrowableOfType(ExecutionException.class) // caused by AssertJ calling future.get() + .withCauseExactlyInstanceOf(TestException.class); + + assertThat(guardedInteger.call(this::integerAction)) + .failsWithin(10, TimeUnit.SECONDS) + .withThrowableOfType(ExecutionException.class) // caused by AssertJ calling future.get() + .withCauseExactlyInstanceOf(TestException.class); + } + + assertThat(guardedString.call(this::stringAction)) + .failsWithin(10, TimeUnit.SECONDS) + .withThrowableOfType(ExecutionException.class) // caused by AssertJ calling future.get() + .withCauseExactlyInstanceOf(CircuitBreakerOpenException.class); + } + + @Test + public void castingAsyncBulkhead() throws Exception { + FaultTolerance> guardedString = FaultTolerance. createAsync() + .withBulkhead().limit(6).queueSize(2).done() + .withThreadOffload(true) + .build(); + FaultTolerance> guardedInteger = guardedString.castAsync(CompletionStage.class); + + Party party = Party.create(6); + + for (int i = 0; i < 4; i++) { + guardedString.call(() -> { + party.participant().attend(); + return completedFuture("ignored"); + }); + guardedInteger.call(() -> { + party.participant().attend(); + return completedFuture(42); + }); + } + + party.organizer().waitForAll(); + + assertThat(guardedString.call(() -> completedFuture("value"))) + .failsWithin(10, TimeUnit.SECONDS) + .withThrowableOfType(ExecutionException.class) + .withCauseExactlyInstanceOf(BulkheadException.class); + + party.organizer().disband(); + } + + @Test + public void castingAsyncRateLimit() throws Exception { + FaultTolerance> guardedString = FaultTolerance. createAsync() + .withRateLimit().limit(6).window(1, ChronoUnit.MINUTES).done() + .withThreadOffload(true) + .build(); + FaultTolerance> guardedInteger = guardedString.castAsync(CompletionStage.class); + + List> futures = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + futures.add(executor.submit(() -> { + return guardedString.call(() -> completedFuture("ignored")).toCompletableFuture().get(); + })); + futures.add(executor.submit(() -> { + return guardedInteger.call(() -> completedFuture(42)).toCompletableFuture().get(); + })); + } + + for (Future future : futures) { + future.get(); + } + + assertThat(guardedString.call(() -> completedFuture("value"))) + .failsWithin(10, TimeUnit.SECONDS) + .withThrowableOfType(ExecutionException.class) + .withCauseExactlyInstanceOf(RateLimitException.class); + } + + @Test + public void castingAsyncFallback() { + FaultTolerance> guarded = FaultTolerance. createAsync() + .withFallback().handler(() -> completedFuture("fallback")).done() + .build(); + + assertThatCode(() -> guarded.castAsync(CompletionStage.class)).isExactlyInstanceOf(IllegalStateException.class); + } + + @Test + public void castingToSync() { + FaultTolerance> guarded = FaultTolerance. createAsync().build(); + + assertThatCode(guarded::cast).isExactlyInstanceOf(IllegalStateException.class); + } + + public CompletionStage stringAction() { + return failedFuture(new TestException()); + } + + public CompletionStage integerAction() { + return failedFuture(new TestException()); + } +} diff --git a/implementation/standalone/src/test/java/io/smallrye/faulttolerance/standalone/test/StandaloneFaultToleranceCastTest.java b/implementation/standalone/src/test/java/io/smallrye/faulttolerance/standalone/test/StandaloneFaultToleranceCastTest.java new file mode 100644 index 00000000..d0ae37b0 --- /dev/null +++ b/implementation/standalone/src/test/java/io/smallrye/faulttolerance/standalone/test/StandaloneFaultToleranceCastTest.java @@ -0,0 +1,131 @@ +package io.smallrye.faulttolerance.standalone.test; + +import static org.assertj.core.api.Assertions.assertThatCode; + +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.eclipse.microprofile.faulttolerance.exceptions.BulkheadException; +import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.smallrye.faulttolerance.api.FaultTolerance; +import io.smallrye.faulttolerance.api.RateLimitException; +import io.smallrye.faulttolerance.core.util.TestException; +import io.smallrye.faulttolerance.core.util.party.Party; + +public class StandaloneFaultToleranceCastTest { + private ExecutorService executor; + + @BeforeEach + public void setUp() { + executor = Executors.newFixedThreadPool(6); + } + + @AfterEach + public void tearDown() throws InterruptedException { + executor.shutdownNow(); + executor.awaitTermination(1, TimeUnit.SECONDS); + } + + @Test + public void castingCircuitBreaker() { + FaultTolerance guardedString = FaultTolerance. create() + .withCircuitBreaker().requestVolumeThreshold(6).done() + .build(); + FaultTolerance guardedInteger = guardedString.cast(); + + for (int i = 0; i < 3; i++) { + assertThatCode(() -> guardedString.call(this::stringAction)).isExactlyInstanceOf(TestException.class); + assertThatCode(() -> guardedInteger.call(this::integerAction)).isExactlyInstanceOf(TestException.class); + } + + assertThatCode(() -> guardedString.call(this::stringAction)).isExactlyInstanceOf(CircuitBreakerOpenException.class); + } + + @Test + public void castingBulkhead() throws Exception { + FaultTolerance guardedString = FaultTolerance. create() + .withBulkhead().limit(6).done() + .build(); + FaultTolerance guardedInteger = guardedString.cast(); + + Party party = Party.create(6); + + for (int i = 0; i < 3; i++) { + executor.submit(() -> { + return guardedString.call(() -> { + party.participant().attend(); + return "ignored"; + }); + }); + executor.submit(() -> { + return guardedInteger.call(() -> { + party.participant().attend(); + return 42; + }); + }); + } + + party.organizer().waitForAll(); + + assertThatCode(() -> guardedString.call(() -> "value")).isExactlyInstanceOf(BulkheadException.class); + + party.organizer().disband(); + } + + @Test + public void castingRateLimit() throws ExecutionException, InterruptedException { + FaultTolerance guardedString = FaultTolerance. create() + .withRateLimit().limit(6).window(1, ChronoUnit.MINUTES).done() + .build(); + FaultTolerance guardedInteger = guardedString.cast(); + + List> futures = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + futures.add(executor.submit(() -> { + return guardedString.call(() -> "ignored"); + })); + futures.add(executor.submit(() -> { + return guardedInteger.call(() -> 42); + })); + } + + for (Future future : futures) { + future.get(); + } + + assertThatCode(() -> guardedString.call(() -> "value")).isExactlyInstanceOf(RateLimitException.class); + } + + @Test + public void castingFallback() { + FaultTolerance guarded = FaultTolerance. create() + .withFallback().handler(() -> "fallback").done() + .build(); + assertThatCode(guarded::cast).isExactlyInstanceOf(IllegalStateException.class); + } + + @Test + public void castingToAsync() { + FaultTolerance guarded = FaultTolerance. create().build(); + assertThatCode(() -> guarded.castAsync(CompletionStage.class)).isExactlyInstanceOf(IllegalStateException.class); + } + + public String stringAction() throws TestException { + throw new TestException(); + } + + public Integer integerAction() throws TestException { + throw new TestException(); + } +} diff --git a/testsuite/basic/src/test/java/io/smallrye/faulttolerance/programmatic/CdiFaultToleranceCopyAsyncTest.java b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/programmatic/CdiFaultToleranceCopyAsyncTest.java new file mode 100644 index 00000000..3367012e --- /dev/null +++ b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/programmatic/CdiFaultToleranceCopyAsyncTest.java @@ -0,0 +1,8 @@ +package io.smallrye.faulttolerance.programmatic; + +import io.smallrye.faulttolerance.standalone.test.StandaloneFaultToleranceCastAsyncTest; +import io.smallrye.faulttolerance.util.FaultToleranceBasicTest; + +@FaultToleranceBasicTest +public class CdiFaultToleranceCopyAsyncTest extends StandaloneFaultToleranceCastAsyncTest { +} diff --git a/testsuite/basic/src/test/java/io/smallrye/faulttolerance/programmatic/CdiFaultToleranceCopyTest.java b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/programmatic/CdiFaultToleranceCopyTest.java new file mode 100644 index 00000000..00466816 --- /dev/null +++ b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/programmatic/CdiFaultToleranceCopyTest.java @@ -0,0 +1,8 @@ +package io.smallrye.faulttolerance.programmatic; + +import io.smallrye.faulttolerance.standalone.test.StandaloneFaultToleranceCastTest; +import io.smallrye.faulttolerance.util.FaultToleranceBasicTest; + +@FaultToleranceBasicTest +public class CdiFaultToleranceCopyTest extends StandaloneFaultToleranceCastTest { +}