From 9d316f4b69be86494dfca8018237c5c6629167d5 Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Tue, 10 Dec 2024 14:55:41 +0100 Subject: [PATCH] fix bulkhead in case multiple kinds are used at the same time The bulkhead implementation for the synchronous case didn't acquire the capacity semaphore, it only used the work semaphore. This is wrong in case multiple kinds of the bulkhead are used at the same time, where the bulkhead might already be full due to synchronous invocations, but an asynchronous invocation could still be accepted into the bulkhead because the capacity semaphore is not in the right state. This commit fixes that by changing the synchronous implementation to also use the capacity semaphore, in addition to the usage of the work semaphore. --- .../core/bulkhead/Bulkhead.java | 34 ++++++++---- .../bulkhead/MixedReuseBulkheadTest.java | 53 +++++++++++++++++++ .../mixed/bulkhead/MyFaultTolerance.java | 16 ++++++ .../reuse/mixed/bulkhead/MyService.java | 36 +++++++++++++ 4 files changed, 128 insertions(+), 11 deletions(-) create mode 100644 testsuite/basic/src/test/java/io/smallrye/faulttolerance/reuse/mixed/bulkhead/MixedReuseBulkheadTest.java create mode 100644 testsuite/basic/src/test/java/io/smallrye/faulttolerance/reuse/mixed/bulkhead/MyFaultTolerance.java create mode 100644 testsuite/basic/src/test/java/io/smallrye/faulttolerance/reuse/mixed/bulkhead/MyService.java diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/bulkhead/Bulkhead.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/bulkhead/Bulkhead.java index 558b6b7e..dc6d7d26 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/bulkhead/Bulkhead.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/bulkhead/Bulkhead.java @@ -67,20 +67,32 @@ public Future apply(FaultToleranceContext ctx) { } private Future applySync(FaultToleranceContext ctx) { - if (workSemaphore.tryAcquire()) { - LOG.trace("Semaphore acquired, accepting task into bulkhead"); - ctx.fireEvent(BulkheadEvents.DecisionMade.ACCEPTED); - ctx.fireEvent(BulkheadEvents.StartedRunning.INSTANCE); - try { - return delegate.apply(ctx); - } finally { - workSemaphore.release(); - LOG.trace("Semaphore released, task leaving bulkhead"); - ctx.fireEvent(BulkheadEvents.FinishedRunning.INSTANCE); + if (capacitySemaphore.tryAcquire()) { + LOG.trace("Capacity semaphore acquired, accepting task into bulkhead"); + if (workSemaphore.tryAcquire()) { + LOG.trace("Work semaphore acquired, running task"); + ctx.fireEvent(BulkheadEvents.DecisionMade.ACCEPTED); + ctx.fireEvent(BulkheadEvents.StartedRunning.INSTANCE); + try { + return delegate.apply(ctx); + } finally { + workSemaphore.release(); + LOG.trace("Work semaphore released, task finished"); + capacitySemaphore.release(); + LOG.trace("Capacity semaphore released, task leaving bulkhead"); + ctx.fireEvent(BulkheadEvents.FinishedRunning.INSTANCE); + } + } else { + capacitySemaphore.release(); + + LOG.debugOrTrace(description + " invocation prevented by bulkhead", + "Work semaphore not acquired, rejecting task from bulkhead"); + ctx.fireEvent(BulkheadEvents.DecisionMade.REJECTED); + return Future.ofError(new BulkheadException(description + " rejected from bulkhead")); } } else { LOG.debugOrTrace(description + " invocation prevented by bulkhead", - "Semaphore not acquired, rejecting task from bulkhead"); + "Capacity semaphore not acquired, rejecting task from bulkhead"); ctx.fireEvent(BulkheadEvents.DecisionMade.REJECTED); return Future.ofError(new BulkheadException(description + " rejected from bulkhead")); } diff --git a/testsuite/basic/src/test/java/io/smallrye/faulttolerance/reuse/mixed/bulkhead/MixedReuseBulkheadTest.java b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/reuse/mixed/bulkhead/MixedReuseBulkheadTest.java new file mode 100644 index 00000000..2bbec0af --- /dev/null +++ b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/reuse/mixed/bulkhead/MixedReuseBulkheadTest.java @@ -0,0 +1,53 @@ +package io.smallrye.faulttolerance.reuse.mixed.bulkhead; + +import static io.smallrye.faulttolerance.core.util.SneakyThrow.sneakyThrow; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; + +import org.eclipse.microprofile.faulttolerance.exceptions.BulkheadException; +import org.jboss.weld.junit5.auto.AddBeanClasses; +import org.junit.jupiter.api.Test; + +import io.smallrye.faulttolerance.core.util.barrier.Barrier; +import io.smallrye.faulttolerance.util.FaultToleranceBasicTest; + +@FaultToleranceBasicTest +@AddBeanClasses(MyFaultTolerance.class) +public class MixedReuseBulkheadTest { + @Test + public void test(MyService service) throws ExecutionException, InterruptedException { + Barrier barrier1 = Barrier.interruptible(); + Barrier barrier2 = Barrier.interruptible(); + Barrier barrier3 = Barrier.interruptible(); + + // accepted + new Thread(() -> { + try { + service.hello(barrier1); + } catch (InterruptedException e) { + throw sneakyThrow(e); + } + }).start(); + service.theAnswer(barrier2); + service.badNumber(barrier3).subscribeAsCompletionStage(); + + // queued + service.theAnswer(Barrier.interruptible()); + service.badNumber(Barrier.interruptible()).subscribeAsCompletionStage(); + + // rejected + assertThatThrownBy(() -> service.hello(Barrier.interruptible())).isExactlyInstanceOf(BulkheadException.class); + assertThatThrownBy(() -> service.theAnswer(Barrier.interruptible()).toCompletableFuture().join()) + .isExactlyInstanceOf(CompletionException.class) + .hasCauseExactlyInstanceOf(BulkheadException.class); + assertThatThrownBy(() -> service.badNumber(Barrier.interruptible()).subscribeAsCompletionStage().join()) + .isExactlyInstanceOf(CompletionException.class) + .hasCauseExactlyInstanceOf(BulkheadException.class); + + barrier1.open(); + barrier2.open(); + barrier3.open(); + } +} diff --git a/testsuite/basic/src/test/java/io/smallrye/faulttolerance/reuse/mixed/bulkhead/MyFaultTolerance.java b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/reuse/mixed/bulkhead/MyFaultTolerance.java new file mode 100644 index 00000000..b218bfb8 --- /dev/null +++ b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/reuse/mixed/bulkhead/MyFaultTolerance.java @@ -0,0 +1,16 @@ +package io.smallrye.faulttolerance.reuse.mixed.bulkhead; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Produces; + +import io.smallrye.common.annotation.Identifier; +import io.smallrye.faulttolerance.api.Guard; + +@ApplicationScoped +public class MyFaultTolerance { + @Produces + @Identifier("my-fault-tolerance") + public static final Guard GUARD = Guard.create() + .withBulkhead().limit(3).queueSize(2).done() + .build(); +} diff --git a/testsuite/basic/src/test/java/io/smallrye/faulttolerance/reuse/mixed/bulkhead/MyService.java b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/reuse/mixed/bulkhead/MyService.java new file mode 100644 index 00000000..26e9aaf0 --- /dev/null +++ b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/reuse/mixed/bulkhead/MyService.java @@ -0,0 +1,36 @@ +package io.smallrye.faulttolerance.reuse.mixed.bulkhead; + +import static java.util.concurrent.CompletableFuture.completedFuture; + +import java.util.concurrent.CompletionStage; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.faulttolerance.Asynchronous; + +import io.smallrye.faulttolerance.api.ApplyGuard; +import io.smallrye.faulttolerance.core.util.barrier.Barrier; +import io.smallrye.mutiny.Uni; + +@ApplicationScoped +public class MyService { + @ApplyGuard("my-fault-tolerance") + public String hello(Barrier barrier) throws InterruptedException { + barrier.await(); + return "hello"; + } + + @ApplyGuard("my-fault-tolerance") + @Asynchronous + public CompletionStage theAnswer(Barrier barrier) throws InterruptedException { + barrier.await(); + return completedFuture(42); + } + + @ApplyGuard("my-fault-tolerance") + @Asynchronous + public Uni badNumber(Barrier barrier) throws InterruptedException { + barrier.await(); + return Uni.createFrom().item(13L); + } +}