From 46ded2bacc0cf76db999065023f3674b5bce9110 Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Tue, 22 Oct 2024 15:52:18 +0200 Subject: [PATCH] add FaultTolerance.BulkheadBuilder.enableVirtualThreadsQueueing() This method enables bulkhead queueing for synchronous actions executed on virtual threads. The implementation is a simple two-semaphores bulkhead, which has in fact already existed as `FutureThreadPoolBulkhead`. This commit generalizes that class and leaves `FutureThreadPoolBulkhead` as a special case, nearly empty, for guarding asynchronous actions returning `Future`. --- .../faulttolerance/api/FaultTolerance.java | 12 ++ .../core/apiimpl/FaultToleranceImpl.java | 16 ++- .../bulkhead/FutureThreadPoolBulkhead.java | 96 +--------------- .../bulkhead/SemaphoreThreadPoolBulkhead.java | 107 ++++++++++++++++++ 4 files changed, 135 insertions(+), 96 deletions(-) create mode 100644 implementation/core/src/main/java/io/smallrye/faulttolerance/core/bulkhead/SemaphoreThreadPoolBulkhead.java diff --git a/api/src/main/java/io/smallrye/faulttolerance/api/FaultTolerance.java b/api/src/main/java/io/smallrye/faulttolerance/api/FaultTolerance.java index 3ffbc172d..32aa2e955 100644 --- a/api/src/main/java/io/smallrye/faulttolerance/api/FaultTolerance.java +++ b/api/src/main/java/io/smallrye/faulttolerance/api/FaultTolerance.java @@ -409,6 +409,18 @@ interface BulkheadBuilder { */ BulkheadBuilder queueSize(int value); + /** + * Enables bulkhead queueing for synchronous actions executed on virtual threads. + * This makes it possible to call {@link #queueSize(int)} even if this builder does + * not configure fault tolerance for asynchronous actions. + *

+ * If you use this method, you have to ensure that the guard + * is executed on a virtual thread. + * + * @return this bulkhead builder + */ + BulkheadBuilder enableVirtualThreadsQueueing(); + /** * Sets a callback that will be invoked when this bulkhead accepts an invocation. * In case of asynchronous actions, accepting into bulkhead doesn't mean the action diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/apiimpl/FaultToleranceImpl.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/apiimpl/FaultToleranceImpl.java index f28d3a25a..7e059b395 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/apiimpl/FaultToleranceImpl.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/apiimpl/FaultToleranceImpl.java @@ -28,6 +28,7 @@ import io.smallrye.faulttolerance.core.async.RememberEventLoop; import io.smallrye.faulttolerance.core.bulkhead.CompletionStageThreadPoolBulkhead; import io.smallrye.faulttolerance.core.bulkhead.SemaphoreBulkhead; +import io.smallrye.faulttolerance.core.bulkhead.SemaphoreThreadPoolBulkhead; import io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreaker; import io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreakerEvents; import io.smallrye.faulttolerance.core.circuit.breaker.CompletionStageCircuitBreaker; @@ -313,7 +314,10 @@ private FaultToleranceStrategy buildSyncStrategy(BuilderLazyDependencies lazy FaultToleranceStrategy result = invocation(); if (lazyDependencies.ftEnabled() && bulkheadBuilder != null) { - result = new SemaphoreBulkhead<>(result, description, bulkheadBuilder.limit); + result = bulkheadBuilder.queueingEnabled + ? new SemaphoreThreadPoolBulkhead<>(result, description, bulkheadBuilder.limit, + bulkheadBuilder.queueSize) + : new SemaphoreBulkhead<>(result, description, bulkheadBuilder.limit); } if (lazyDependencies.ftEnabled() && timeoutBuilder != null) { @@ -523,6 +527,7 @@ static class BulkheadBuilderImpl implements BulkheadBuilder { private int limit = 10; private int queueSize = 10; + private boolean queueingEnabled; private Runnable onAccepted; private Runnable onRejected; @@ -530,6 +535,7 @@ static class BulkheadBuilderImpl implements BulkheadBuilder { BulkheadBuilderImpl(BuilderImpl parent) { this.parent = parent; + this.queueingEnabled = parent.isAsync; } @Override @@ -540,7 +546,7 @@ public BulkheadBuilder limit(int value) { @Override public BulkheadBuilder queueSize(int value) { - if (!parent.isAsync) { + if (!queueingEnabled) { throw new IllegalStateException("Bulkhead queue size may only be set for asynchronous invocations"); } @@ -548,6 +554,12 @@ public BulkheadBuilder queueSize(int value) { return this; } + @Override + public BulkheadBuilder enableVirtualThreadsQueueing() { + this.queueingEnabled = true; + return this; + } + @Override public BulkheadBuilder onAccepted(Runnable callback) { this.onAccepted = Preconditions.checkNotNull(callback, "Accepted callback must be set"); diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/bulkhead/FutureThreadPoolBulkhead.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/bulkhead/FutureThreadPoolBulkhead.java index 69923ece1..2728e3053 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/bulkhead/FutureThreadPoolBulkhead.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/bulkhead/FutureThreadPoolBulkhead.java @@ -1,16 +1,8 @@ package io.smallrye.faulttolerance.core.bulkhead; -import static io.smallrye.faulttolerance.core.bulkhead.BulkheadLogger.LOG; - -import java.util.concurrent.CancellationException; import java.util.concurrent.Future; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import io.smallrye.faulttolerance.core.FaultToleranceStrategy; -import io.smallrye.faulttolerance.core.InvocationContext; -import io.smallrye.faulttolerance.core.async.FutureCancellationEvent; /** * Thread pool style bulkhead for {@code Future} asynchronous executions. @@ -21,92 +13,8 @@ * but is the easiest way to implement them for {@code Future}. We do it properly * for {@code CompletionStage}, which is much more useful anyway. */ -public class FutureThreadPoolBulkhead extends BulkheadBase> { - private final int queueSize; - private final Semaphore capacitySemaphore; - private final Semaphore workSemaphore; - +public class FutureThreadPoolBulkhead extends SemaphoreThreadPoolBulkhead> { public FutureThreadPoolBulkhead(FaultToleranceStrategy> delegate, String description, int size, int queueSize) { - super(description, delegate); - this.queueSize = queueSize; - this.capacitySemaphore = new Semaphore(size + queueSize, true); - this.workSemaphore = new Semaphore(size, true); - } - - @Override - public Future apply(InvocationContext> ctx) throws Exception { - LOG.trace("ThreadPoolBulkhead started"); - try { - return doApply(ctx); - } finally { - LOG.trace("ThreadPoolBulkhead finished"); - } - } - - private Future doApply(InvocationContext> ctx) throws Exception { - if (capacitySemaphore.tryAcquire()) { - LOG.trace("Capacity semaphore acquired, accepting task into bulkhead"); - ctx.fireEvent(BulkheadEvents.DecisionMade.ACCEPTED); - ctx.fireEvent(BulkheadEvents.StartedWaiting.INSTANCE); - - AtomicBoolean cancellationInvalid = new AtomicBoolean(false); - AtomicBoolean cancelled = new AtomicBoolean(false); - AtomicReference executingThread = new AtomicReference<>(Thread.currentThread()); - ctx.registerEventHandler(FutureCancellationEvent.class, event -> { - if (cancellationInvalid.get()) { - // in case of retries, multiple handlers of FutureCancellationEvent may be registered, - // need to make sure that a handler belonging to an older bulkhead task doesn't do anything - return; - } - - if (LOG.isTraceEnabled()) { - LOG.tracef("Cancelling bulkhead task,%s interrupting executing thread", - (event.interruptible ? "" : " NOT")); - } - cancelled.set(true); - if (event.interruptible) { - executingThread.get().interrupt(); - } - }); - - try { - workSemaphore.acquire(); - LOG.trace("Work semaphore acquired, running task"); - } catch (InterruptedException e) { - cancellationInvalid.set(true); - - capacitySemaphore.release(); - LOG.trace("Capacity semaphore released, task leaving bulkhead"); - ctx.fireEvent(BulkheadEvents.FinishedWaiting.INSTANCE); - throw new CancellationException(); - } - - ctx.fireEvent(BulkheadEvents.FinishedWaiting.INSTANCE); - ctx.fireEvent(BulkheadEvents.StartedRunning.INSTANCE); - try { - if (cancelled.get()) { - throw new CancellationException(); - } - return delegate.apply(ctx); - } finally { - cancellationInvalid.set(true); - - workSemaphore.release(); - LOG.trace("Work semaphore released, task finished"); - capacitySemaphore.release(); - LOG.trace("Capacity semaphore released, task leaving bulkhead"); - ctx.fireEvent(BulkheadEvents.FinishedRunning.INSTANCE); - } - } else { - LOG.debugOrTrace(description + " invocation prevented by bulkhead", - "Capacity semaphore not acquired, rejecting task from bulkhead"); - ctx.fireEvent(BulkheadEvents.DecisionMade.REJECTED); - throw bulkheadRejected(); - } - } - - // only for tests - int getQueueSize() { - return Math.max(0, queueSize - capacitySemaphore.availablePermits()); + super(delegate, description, size, queueSize); } } diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/bulkhead/SemaphoreThreadPoolBulkhead.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/bulkhead/SemaphoreThreadPoolBulkhead.java new file mode 100644 index 000000000..ea607ae5b --- /dev/null +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/bulkhead/SemaphoreThreadPoolBulkhead.java @@ -0,0 +1,107 @@ +package io.smallrye.faulttolerance.core.bulkhead; + +import static io.smallrye.faulttolerance.core.bulkhead.BulkheadLogger.LOG; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import io.smallrye.faulttolerance.core.FaultToleranceStrategy; +import io.smallrye.faulttolerance.core.InvocationContext; +import io.smallrye.faulttolerance.core.async.FutureCancellationEvent; + +/** + * Assumes that this bulkhead is already executed on an extra thread. + * Under that assumption, we don't have to submit tasks to another executor + * or use an actual queue. We just limit the task execution by semaphores. + */ +public class SemaphoreThreadPoolBulkhead extends BulkheadBase { + private final int queueSize; + private final Semaphore capacitySemaphore; + private final Semaphore workSemaphore; + + public SemaphoreThreadPoolBulkhead(FaultToleranceStrategy delegate, String description, int size, int queueSize) { + super(description, delegate); + this.queueSize = queueSize; + this.capacitySemaphore = new Semaphore(size + queueSize, true); + this.workSemaphore = new Semaphore(size, true); + } + + @Override + public V apply(InvocationContext ctx) throws Exception { + LOG.trace("ThreadPoolBulkhead started"); + try { + return doApply(ctx); + } finally { + LOG.trace("ThreadPoolBulkhead finished"); + } + } + + private V doApply(InvocationContext ctx) throws Exception { + if (capacitySemaphore.tryAcquire()) { + LOG.trace("Capacity semaphore acquired, accepting task into bulkhead"); + ctx.fireEvent(BulkheadEvents.DecisionMade.ACCEPTED); + ctx.fireEvent(BulkheadEvents.StartedWaiting.INSTANCE); + + AtomicBoolean cancellationInvalid = new AtomicBoolean(false); + AtomicBoolean cancelled = new AtomicBoolean(false); + AtomicReference executingThread = new AtomicReference<>(Thread.currentThread()); + ctx.registerEventHandler(FutureCancellationEvent.class, event -> { + if (cancellationInvalid.get()) { + // in case of retries, multiple handlers of FutureCancellationEvent may be registered, + // need to make sure that a handler belonging to an older bulkhead task doesn't do anything + return; + } + + if (LOG.isTraceEnabled()) { + LOG.tracef("Cancelling bulkhead task,%s interrupting executing thread", + (event.interruptible ? "" : " NOT")); + } + cancelled.set(true); + if (event.interruptible) { + executingThread.get().interrupt(); + } + }); + + try { + workSemaphore.acquire(); + LOG.trace("Work semaphore acquired, running task"); + } catch (InterruptedException e) { + cancellationInvalid.set(true); + + capacitySemaphore.release(); + LOG.trace("Capacity semaphore released, task leaving bulkhead"); + ctx.fireEvent(BulkheadEvents.FinishedWaiting.INSTANCE); + throw new CancellationException(); + } + + ctx.fireEvent(BulkheadEvents.FinishedWaiting.INSTANCE); + ctx.fireEvent(BulkheadEvents.StartedRunning.INSTANCE); + try { + if (cancelled.get()) { + throw new CancellationException(); + } + return delegate.apply(ctx); + } finally { + cancellationInvalid.set(true); + + workSemaphore.release(); + LOG.trace("Work semaphore released, task finished"); + capacitySemaphore.release(); + LOG.trace("Capacity semaphore released, task leaving bulkhead"); + ctx.fireEvent(BulkheadEvents.FinishedRunning.INSTANCE); + } + } else { + LOG.debugOrTrace(description + " invocation prevented by bulkhead", + "Capacity semaphore not acquired, rejecting task from bulkhead"); + ctx.fireEvent(BulkheadEvents.DecisionMade.REJECTED); + throw bulkheadRejected(); + } + } + + // only for tests + int getQueueSize() { + return Math.max(0, queueSize - capacitySemaphore.availablePermits()); + } +}