Skip to content

Commit

Permalink
fix bulkhead in case multiple kinds are used at the same time
Browse files Browse the repository at this point in the history
The bulkhead implementation for the synchronous case didn't acquire the capacity
semaphore, it only used the work semaphore. This is wrong in case multiple kinds
of the bulkhead are used at the same time, where the bulkhead might already be
full due to synchronous invocations, but an asynchronous invocation could still
be accepted into the bulkhead because the capacity semaphore is not in the right
state.

This commit fixes that by changing the synchronous implementation to also use
the capacity semaphore, in addition to the usage of the work semaphore.
  • Loading branch information
Ladicek committed Dec 10, 2024
1 parent 4f0f94d commit 9d316f4
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,32 @@ public Future<V> apply(FaultToleranceContext<V> ctx) {
}

private Future<V> applySync(FaultToleranceContext<V> ctx) {
if (workSemaphore.tryAcquire()) {
LOG.trace("Semaphore acquired, accepting task into bulkhead");
ctx.fireEvent(BulkheadEvents.DecisionMade.ACCEPTED);
ctx.fireEvent(BulkheadEvents.StartedRunning.INSTANCE);
try {
return delegate.apply(ctx);
} finally {
workSemaphore.release();
LOG.trace("Semaphore released, task leaving bulkhead");
ctx.fireEvent(BulkheadEvents.FinishedRunning.INSTANCE);
if (capacitySemaphore.tryAcquire()) {
LOG.trace("Capacity semaphore acquired, accepting task into bulkhead");
if (workSemaphore.tryAcquire()) {
LOG.trace("Work semaphore acquired, running task");
ctx.fireEvent(BulkheadEvents.DecisionMade.ACCEPTED);
ctx.fireEvent(BulkheadEvents.StartedRunning.INSTANCE);
try {
return delegate.apply(ctx);
} finally {
workSemaphore.release();
LOG.trace("Work semaphore released, task finished");
capacitySemaphore.release();
LOG.trace("Capacity semaphore released, task leaving bulkhead");
ctx.fireEvent(BulkheadEvents.FinishedRunning.INSTANCE);
}
} else {
capacitySemaphore.release();

LOG.debugOrTrace(description + " invocation prevented by bulkhead",
"Work semaphore not acquired, rejecting task from bulkhead");
ctx.fireEvent(BulkheadEvents.DecisionMade.REJECTED);
return Future.ofError(new BulkheadException(description + " rejected from bulkhead"));
}
} else {
LOG.debugOrTrace(description + " invocation prevented by bulkhead",
"Semaphore not acquired, rejecting task from bulkhead");
"Capacity semaphore not acquired, rejecting task from bulkhead");
ctx.fireEvent(BulkheadEvents.DecisionMade.REJECTED);
return Future.ofError(new BulkheadException(description + " rejected from bulkhead"));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.smallrye.faulttolerance.reuse.mixed.bulkhead;

import static io.smallrye.faulttolerance.core.util.SneakyThrow.sneakyThrow;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;

import org.eclipse.microprofile.faulttolerance.exceptions.BulkheadException;
import org.jboss.weld.junit5.auto.AddBeanClasses;
import org.junit.jupiter.api.Test;

import io.smallrye.faulttolerance.core.util.barrier.Barrier;
import io.smallrye.faulttolerance.util.FaultToleranceBasicTest;

@FaultToleranceBasicTest
@AddBeanClasses(MyFaultTolerance.class)
public class MixedReuseBulkheadTest {
@Test
public void test(MyService service) throws ExecutionException, InterruptedException {
Barrier barrier1 = Barrier.interruptible();
Barrier barrier2 = Barrier.interruptible();
Barrier barrier3 = Barrier.interruptible();

// accepted
new Thread(() -> {
try {
service.hello(barrier1);
} catch (InterruptedException e) {
throw sneakyThrow(e);
}
}).start();
service.theAnswer(barrier2);
service.badNumber(barrier3).subscribeAsCompletionStage();

// queued
service.theAnswer(Barrier.interruptible());
service.badNumber(Barrier.interruptible()).subscribeAsCompletionStage();

// rejected
assertThatThrownBy(() -> service.hello(Barrier.interruptible())).isExactlyInstanceOf(BulkheadException.class);
assertThatThrownBy(() -> service.theAnswer(Barrier.interruptible()).toCompletableFuture().join())
.isExactlyInstanceOf(CompletionException.class)
.hasCauseExactlyInstanceOf(BulkheadException.class);
assertThatThrownBy(() -> service.badNumber(Barrier.interruptible()).subscribeAsCompletionStage().join())
.isExactlyInstanceOf(CompletionException.class)
.hasCauseExactlyInstanceOf(BulkheadException.class);

barrier1.open();
barrier2.open();
barrier3.open();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.smallrye.faulttolerance.reuse.mixed.bulkhead;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.faulttolerance.api.Guard;

@ApplicationScoped
public class MyFaultTolerance {
@Produces
@Identifier("my-fault-tolerance")
public static final Guard GUARD = Guard.create()
.withBulkhead().limit(3).queueSize(2).done()
.build();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.smallrye.faulttolerance.reuse.mixed.bulkhead;

import static java.util.concurrent.CompletableFuture.completedFuture;

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.faulttolerance.Asynchronous;

import io.smallrye.faulttolerance.api.ApplyGuard;
import io.smallrye.faulttolerance.core.util.barrier.Barrier;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class MyService {
@ApplyGuard("my-fault-tolerance")
public String hello(Barrier barrier) throws InterruptedException {
barrier.await();
return "hello";
}

@ApplyGuard("my-fault-tolerance")
@Asynchronous
public CompletionStage<Integer> theAnswer(Barrier barrier) throws InterruptedException {
barrier.await();
return completedFuture(42);
}

@ApplyGuard("my-fault-tolerance")
@Asynchronous
public Uni<Long> badNumber(Barrier barrier) throws InterruptedException {
barrier.await();
return Uni.createFrom().item(13L);
}
}

0 comments on commit 9d316f4

Please sign in to comment.