Skip to content

Commit

Permalink
RequestBatcher: Improve Deadlock Safety with Dedicated Executor
Browse files Browse the repository at this point in the history
This change enhances the RequestBatcher by introducing an internal
executor specifically for critical queue draining operations.

By isolating this functionality, clients are relieved from managing
potential deadlocks associated with executor usage. Previously, sharing
the FingerprintValueService's executor (used only for deserialization
operations) introduced a theoretical risk, though likely benign due to
its limited queuing on deserialization.

PiperOrigin-RevId: 697999540
Change-Id: I8b00f6b5d22bc3d2a674226e8f30e673cd5e5307
  • Loading branch information
aoeui authored and copybara-github committed Nov 19, 2024
1 parent f54c853 commit 206907f
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.devtools.build.lib.concurrent.PaddedAddresses.createPaddedBaseAddress;
import static com.google.devtools.build.lib.concurrent.PaddedAddresses.getAlignedAddress;
import static java.lang.Math.min;
import static java.util.concurrent.Executors.newFixedThreadPool;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -108,7 +109,32 @@ public final class RequestBatcher<RequestT, ResponseT> {
*/
@VisibleForTesting static final int BATCH_SIZE = 4095;

private final Executor executor;
/**
* Executor provided by the client to invoke callbacks for individual responses within a batched
* response.
*
* <p><b>Important:</b> For each batch, response callbacks are executed sequentially on a single
* thread. If a callback involves significant processing, the client should offload the work to
* separate threads to prevent delays in processing subsequent responses.
*/
private final Executor responseDistributionExecutor;

/**
* Executor dedicated to draining the queue, specifically the {@link
* #continueToNextBatchOrBecomeIdle} method.
*
* <p><b>Purpose of Isolation:</b> This executor is isolated to prevent potential deadlocks. The
* {@link #submit} method can block if the task queue is full. If all threads in the client's
* executor become blocked waiting to submit tasks, only {@link #continueToNextBatchOrBecomeIdle}
* can free up space in the queue. Scheduling this continuation logic on the same, potentially
* blocked, client executor would lead to a deadlock.
*
* <p><b>Deadlock Avoidance:</b> As long as {@link #continueToNextBatchOrBecomeIdle} does not
* contain blocking operations (which is true in the current implementation), using a separate
* executor is sufficient to prevent this specific deadlock scenario.
*/
private final Executor queueDrainingExecutor;

private final Multiplexer<RequestT, ResponseT> multiplexer;

/** Number of active workers to target. */
Expand Down Expand Up @@ -146,7 +172,9 @@ public interface Multiplexer<RequestT, ResponseT> {
}

public static <RequestT, ResponseT> RequestBatcher<RequestT, ResponseT> create(
Executor executor, Multiplexer<RequestT, ResponseT> multiplexer, int targetWorkerCount) {
Executor responseDistributionExecutor,
Multiplexer<RequestT, ResponseT> multiplexer,
int targetWorkerCount) {
long baseAddress = createPaddedBaseAddress(4);
long countersAddress = getAlignedAddress(baseAddress, /* offset= */ 0);

Expand All @@ -159,7 +187,15 @@ public static <RequestT, ResponseT> RequestBatcher<RequestT, ResponseT> create(

var batcher =
new RequestBatcher<RequestT, ResponseT>(
executor, multiplexer, targetWorkerCount, countersAddress, queue);
/* responseDistributionExecutor= */ responseDistributionExecutor,
// `targetWorkerCount` is the maximum level of invocation concurrency possible for the
// `queueDrainingExecutor`. It is possible for this to overrun, but the work is
// relatively lightweight and the batch round trip latency is expected to dominate.
/* queueDrainingExecutor= */ newFixedThreadPool(targetWorkerCount),
multiplexer,
targetWorkerCount,
countersAddress,
queue);

cleaner.register(batcher, new AddressFreer(baseAddress));

Expand All @@ -174,7 +210,8 @@ public static <RequestT, ResponseT> RequestBatcher<RequestT, ResponseT> create(
*/
@VisibleForTesting
RequestBatcher(
Executor executor,
Executor responseDistributionExecutor,
Executor queueDrainingExecutor,
Multiplexer<RequestT, ResponseT> multiplexer,
int targetWorkerCount,
long countersAddress,
Expand All @@ -185,7 +222,8 @@ public static <RequestT, ResponseT> RequestBatcher<RequestT, ResponseT> create(
"targetWorkerCount=%s > %s",
targetWorkerCount,
ACTIVE_WORKERS_COUNT_MAX);
this.executor = executor;
this.responseDistributionExecutor = responseDistributionExecutor;
this.queueDrainingExecutor = queueDrainingExecutor;
this.multiplexer = multiplexer;
this.targetWorkerCount = targetWorkerCount;
this.countersAddress = countersAddress;
Expand All @@ -198,6 +236,8 @@ public static <RequestT, ResponseT> RequestBatcher<RequestT, ResponseT> create(
/**
* Submits a request, subject to batching.
*
* <p>This method <em>blocks</em> when the queue is full.
*
* <p>Callers should consider processing the response on a different executor if processing is
* expensive to avoid delaying work pending other responses in the batch.
*/
Expand Down Expand Up @@ -276,7 +316,7 @@ private void executeBatch(RequestResponse<RequestT, ResponseT> requestResponse)
ListenableFuture<List<ResponseT>> futureResponses =
multiplexer.execute(Lists.transform(batch, RequestResponse::request));

futureResponses.addListener(this::continueToNextBatchOrBecomeIdle, executor);
futureResponses.addListener(this::continueToNextBatchOrBecomeIdle, queueDrainingExecutor);

addCallback(
futureResponses,
Expand Down Expand Up @@ -304,7 +344,7 @@ public void onSuccess(List<ResponseT> responses) {
}
}
},
executor);
responseDistributionExecutor);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void concurrentWorkCompletion_startsNewWorker() throws Exception {
// This test uses fakes to achieve the narrow set of conditions needed to reach this code path.
long baseAddress = createPaddedBaseAddress(4);

var executor = new FakeExecutor();
var queueDrainingExecutor = new FakeExecutor();
var fifo =
new FakeConcurrentFifo(
getAlignedAddress(baseAddress, /* offset= */ 1),
Expand All @@ -157,15 +157,16 @@ public void concurrentWorkCompletion_startsNewWorker() throws Exception {
long countersAddress = getAlignedAddress(baseAddress, /* offset= */ 0);
var batcher =
new RequestBatcher<Request, Response>(
executor,
/* responseDistributionExecutor= */ commonPool(),
/* queueDrainingExecutor= */ queueDrainingExecutor,
requests -> immediateFuture(respondTo(requests)),
/* targetWorkerCount= */ 1,
countersAddress,
fifo);
cleaner.register(batcher, new AddressFreer(baseAddress));

// Submits a request. This starts a worker to run the batch, but it gets blocked on the executor
// and can't continue.
// Submits a request. This starts a worker to run the batch, but it gets blocked on
// `queueDrainingExecutor` and can't continue.
ListenableFuture<Response> response1 = batcher.submit(new Request(1));

// Submits a 2nd request. This request observes that there are enough active workers so it tries
Expand All @@ -176,10 +177,9 @@ public void concurrentWorkCompletion_startsNewWorker() throws Exception {
// Waits until the 2nd request starts enqueuing.
fifo.tryAppendTokens.acquireUninterruptibly();

// Allows the 1st worker to continue. It will go idle. There are two Runnables, one to run the
// continuation logic and one that sets the response callbacks. Runs both.
executor.queue.take().run();
executor.queue.take().run();
// Allows the 1st worker to continue. This calls an enqueued `continueToNextBatchOrBecomeIdle`
// invocation that will cause the 1st worker to go idle.
queueDrainingExecutor.queue.take().run();

assertThat(response1.get()).isEqualTo(new Response(1));

Expand All @@ -188,8 +188,7 @@ public void concurrentWorkCompletion_startsNewWorker() throws Exception {

// Allows the 2nd request to enqueue and complete processing.
fifo.appendPermits.release();
executor.queue.take().run();
executor.queue.take().run();
queueDrainingExecutor.queue.take().run();

assertThat(response2.get().get()).isEqualTo(new Response(2));
}
Expand Down

0 comments on commit 206907f

Please sign in to comment.