diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/bulkhead/Bulkhead.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/bulkhead/Bulkhead.java index 558b6b7e..dc6d7d26 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/bulkhead/Bulkhead.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/bulkhead/Bulkhead.java @@ -67,20 +67,32 @@ public Future apply(FaultToleranceContext ctx) { } private Future applySync(FaultToleranceContext ctx) { - if (workSemaphore.tryAcquire()) { - LOG.trace("Semaphore acquired, accepting task into bulkhead"); - ctx.fireEvent(BulkheadEvents.DecisionMade.ACCEPTED); - ctx.fireEvent(BulkheadEvents.StartedRunning.INSTANCE); - try { - return delegate.apply(ctx); - } finally { - workSemaphore.release(); - LOG.trace("Semaphore released, task leaving bulkhead"); - ctx.fireEvent(BulkheadEvents.FinishedRunning.INSTANCE); + if (capacitySemaphore.tryAcquire()) { + LOG.trace("Capacity semaphore acquired, accepting task into bulkhead"); + if (workSemaphore.tryAcquire()) { + LOG.trace("Work semaphore acquired, running task"); + ctx.fireEvent(BulkheadEvents.DecisionMade.ACCEPTED); + ctx.fireEvent(BulkheadEvents.StartedRunning.INSTANCE); + try { + return delegate.apply(ctx); + } finally { + workSemaphore.release(); + LOG.trace("Work semaphore released, task finished"); + capacitySemaphore.release(); + LOG.trace("Capacity semaphore released, task leaving bulkhead"); + ctx.fireEvent(BulkheadEvents.FinishedRunning.INSTANCE); + } + } else { + capacitySemaphore.release(); + + LOG.debugOrTrace(description + " invocation prevented by bulkhead", + "Work semaphore not acquired, rejecting task from bulkhead"); + ctx.fireEvent(BulkheadEvents.DecisionMade.REJECTED); + return Future.ofError(new BulkheadException(description + " rejected from bulkhead")); } } else { LOG.debugOrTrace(description + " invocation prevented by bulkhead", - "Semaphore not acquired, rejecting task from bulkhead"); + "Capacity semaphore not acquired, rejecting task from bulkhead"); ctx.fireEvent(BulkheadEvents.DecisionMade.REJECTED); return Future.ofError(new BulkheadException(description + " rejected from bulkhead")); } diff --git a/testsuite/basic/src/test/java/io/smallrye/faulttolerance/reuse/mixed/bulkhead/MixedReuseBulkheadTest.java b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/reuse/mixed/bulkhead/MixedReuseBulkheadTest.java new file mode 100644 index 00000000..2bbec0af --- /dev/null +++ b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/reuse/mixed/bulkhead/MixedReuseBulkheadTest.java @@ -0,0 +1,53 @@ +package io.smallrye.faulttolerance.reuse.mixed.bulkhead; + +import static io.smallrye.faulttolerance.core.util.SneakyThrow.sneakyThrow; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; + +import org.eclipse.microprofile.faulttolerance.exceptions.BulkheadException; +import org.jboss.weld.junit5.auto.AddBeanClasses; +import org.junit.jupiter.api.Test; + +import io.smallrye.faulttolerance.core.util.barrier.Barrier; +import io.smallrye.faulttolerance.util.FaultToleranceBasicTest; + +@FaultToleranceBasicTest +@AddBeanClasses(MyFaultTolerance.class) +public class MixedReuseBulkheadTest { + @Test + public void test(MyService service) throws ExecutionException, InterruptedException { + Barrier barrier1 = Barrier.interruptible(); + Barrier barrier2 = Barrier.interruptible(); + Barrier barrier3 = Barrier.interruptible(); + + // accepted + new Thread(() -> { + try { + service.hello(barrier1); + } catch (InterruptedException e) { + throw sneakyThrow(e); + } + }).start(); + service.theAnswer(barrier2); + service.badNumber(barrier3).subscribeAsCompletionStage(); + + // queued + service.theAnswer(Barrier.interruptible()); + service.badNumber(Barrier.interruptible()).subscribeAsCompletionStage(); + + // rejected + assertThatThrownBy(() -> service.hello(Barrier.interruptible())).isExactlyInstanceOf(BulkheadException.class); + assertThatThrownBy(() -> service.theAnswer(Barrier.interruptible()).toCompletableFuture().join()) + .isExactlyInstanceOf(CompletionException.class) + .hasCauseExactlyInstanceOf(BulkheadException.class); + assertThatThrownBy(() -> service.badNumber(Barrier.interruptible()).subscribeAsCompletionStage().join()) + .isExactlyInstanceOf(CompletionException.class) + .hasCauseExactlyInstanceOf(BulkheadException.class); + + barrier1.open(); + barrier2.open(); + barrier3.open(); + } +} diff --git a/testsuite/basic/src/test/java/io/smallrye/faulttolerance/reuse/mixed/bulkhead/MyFaultTolerance.java b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/reuse/mixed/bulkhead/MyFaultTolerance.java new file mode 100644 index 00000000..b218bfb8 --- /dev/null +++ b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/reuse/mixed/bulkhead/MyFaultTolerance.java @@ -0,0 +1,16 @@ +package io.smallrye.faulttolerance.reuse.mixed.bulkhead; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Produces; + +import io.smallrye.common.annotation.Identifier; +import io.smallrye.faulttolerance.api.Guard; + +@ApplicationScoped +public class MyFaultTolerance { + @Produces + @Identifier("my-fault-tolerance") + public static final Guard GUARD = Guard.create() + .withBulkhead().limit(3).queueSize(2).done() + .build(); +} diff --git a/testsuite/basic/src/test/java/io/smallrye/faulttolerance/reuse/mixed/bulkhead/MyService.java b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/reuse/mixed/bulkhead/MyService.java new file mode 100644 index 00000000..26e9aaf0 --- /dev/null +++ b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/reuse/mixed/bulkhead/MyService.java @@ -0,0 +1,36 @@ +package io.smallrye.faulttolerance.reuse.mixed.bulkhead; + +import static java.util.concurrent.CompletableFuture.completedFuture; + +import java.util.concurrent.CompletionStage; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.faulttolerance.Asynchronous; + +import io.smallrye.faulttolerance.api.ApplyGuard; +import io.smallrye.faulttolerance.core.util.barrier.Barrier; +import io.smallrye.mutiny.Uni; + +@ApplicationScoped +public class MyService { + @ApplyGuard("my-fault-tolerance") + public String hello(Barrier barrier) throws InterruptedException { + barrier.await(); + return "hello"; + } + + @ApplyGuard("my-fault-tolerance") + @Asynchronous + public CompletionStage theAnswer(Barrier barrier) throws InterruptedException { + barrier.await(); + return completedFuture(42); + } + + @ApplyGuard("my-fault-tolerance") + @Asynchronous + public Uni badNumber(Barrier barrier) throws InterruptedException { + barrier.await(); + return Uni.createFrom().item(13L); + } +}