From 206907ff0f989b1dbe13f1e1ef4ed0e5e143552a Mon Sep 17 00:00:00 2001 From: Googler Date: Tue, 19 Nov 2024 06:55:05 -0800 Subject: [PATCH] RequestBatcher: Improve Deadlock Safety with Dedicated Executor This change enhances the RequestBatcher by introducing an internal executor specifically for critical queue draining operations. By isolating this functionality, clients are relieved from managing potential deadlocks associated with executor usage. Previously, sharing the FingerprintValueService's executor (used only for deserialization operations) introduced a theoretical risk, though likely benign due to its limited queuing on deserialization. PiperOrigin-RevId: 697999540 Change-Id: I8b00f6b5d22bc3d2a674226e8f30e673cd5e5307 --- .../build/lib/concurrent/RequestBatcher.java | 54 ++++++++++++++++--- .../lib/concurrent/RequestBatcherTest.java | 19 ++++--- 2 files changed, 56 insertions(+), 17 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/RequestBatcher.java b/src/main/java/com/google/devtools/build/lib/concurrent/RequestBatcher.java index 58652320dc394f..b065558ea9e6af 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/RequestBatcher.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/RequestBatcher.java @@ -20,6 +20,7 @@ import static com.google.devtools.build.lib.concurrent.PaddedAddresses.createPaddedBaseAddress; import static com.google.devtools.build.lib.concurrent.PaddedAddresses.getAlignedAddress; import static java.lang.Math.min; +import static java.util.concurrent.Executors.newFixedThreadPool; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; @@ -108,7 +109,32 @@ public final class RequestBatcher { */ @VisibleForTesting static final int BATCH_SIZE = 4095; - private final Executor executor; + /** + * Executor provided by the client to invoke callbacks for individual responses within a batched + * response. + * + *

Important: For each batch, response callbacks are executed sequentially on a single + * thread. If a callback involves significant processing, the client should offload the work to + * separate threads to prevent delays in processing subsequent responses. + */ + private final Executor responseDistributionExecutor; + + /** + * Executor dedicated to draining the queue, specifically the {@link + * #continueToNextBatchOrBecomeIdle} method. + * + *

Purpose of Isolation: This executor is isolated to prevent potential deadlocks. The + * {@link #submit} method can block if the task queue is full. If all threads in the client's + * executor become blocked waiting to submit tasks, only {@link #continueToNextBatchOrBecomeIdle} + * can free up space in the queue. Scheduling this continuation logic on the same, potentially + * blocked, client executor would lead to a deadlock. + * + *

Deadlock Avoidance: As long as {@link #continueToNextBatchOrBecomeIdle} does not + * contain blocking operations (which is true in the current implementation), using a separate + * executor is sufficient to prevent this specific deadlock scenario. + */ + private final Executor queueDrainingExecutor; + private final Multiplexer multiplexer; /** Number of active workers to target. */ @@ -146,7 +172,9 @@ public interface Multiplexer { } public static RequestBatcher create( - Executor executor, Multiplexer multiplexer, int targetWorkerCount) { + Executor responseDistributionExecutor, + Multiplexer multiplexer, + int targetWorkerCount) { long baseAddress = createPaddedBaseAddress(4); long countersAddress = getAlignedAddress(baseAddress, /* offset= */ 0); @@ -159,7 +187,15 @@ public static RequestBatcher create( var batcher = new RequestBatcher( - executor, multiplexer, targetWorkerCount, countersAddress, queue); + /* responseDistributionExecutor= */ responseDistributionExecutor, + // `targetWorkerCount` is the maximum level of invocation concurrency possible for the + // `queueDrainingExecutor`. It is possible for this to overrun, but the work is + // relatively lightweight and the batch round trip latency is expected to dominate. + /* queueDrainingExecutor= */ newFixedThreadPool(targetWorkerCount), + multiplexer, + targetWorkerCount, + countersAddress, + queue); cleaner.register(batcher, new AddressFreer(baseAddress)); @@ -174,7 +210,8 @@ public static RequestBatcher create( */ @VisibleForTesting RequestBatcher( - Executor executor, + Executor responseDistributionExecutor, + Executor queueDrainingExecutor, Multiplexer multiplexer, int targetWorkerCount, long countersAddress, @@ -185,7 +222,8 @@ public static RequestBatcher create( "targetWorkerCount=%s > %s", targetWorkerCount, ACTIVE_WORKERS_COUNT_MAX); - this.executor = executor; + this.responseDistributionExecutor = responseDistributionExecutor; + this.queueDrainingExecutor = queueDrainingExecutor; this.multiplexer = multiplexer; this.targetWorkerCount = targetWorkerCount; this.countersAddress = countersAddress; @@ -198,6 +236,8 @@ public static RequestBatcher create( /** * Submits a request, subject to batching. * + *

This method blocks when the queue is full. + * *

Callers should consider processing the response on a different executor if processing is * expensive to avoid delaying work pending other responses in the batch. */ @@ -276,7 +316,7 @@ private void executeBatch(RequestResponse requestResponse) ListenableFuture> futureResponses = multiplexer.execute(Lists.transform(batch, RequestResponse::request)); - futureResponses.addListener(this::continueToNextBatchOrBecomeIdle, executor); + futureResponses.addListener(this::continueToNextBatchOrBecomeIdle, queueDrainingExecutor); addCallback( futureResponses, @@ -304,7 +344,7 @@ public void onSuccess(List responses) { } } }, - executor); + responseDistributionExecutor); } /** diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/RequestBatcherTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/RequestBatcherTest.java index 08e3c7da2026f9..73a1424c6d4024 100644 --- a/src/test/java/com/google/devtools/build/lib/concurrent/RequestBatcherTest.java +++ b/src/test/java/com/google/devtools/build/lib/concurrent/RequestBatcherTest.java @@ -148,7 +148,7 @@ public void concurrentWorkCompletion_startsNewWorker() throws Exception { // This test uses fakes to achieve the narrow set of conditions needed to reach this code path. long baseAddress = createPaddedBaseAddress(4); - var executor = new FakeExecutor(); + var queueDrainingExecutor = new FakeExecutor(); var fifo = new FakeConcurrentFifo( getAlignedAddress(baseAddress, /* offset= */ 1), @@ -157,15 +157,16 @@ public void concurrentWorkCompletion_startsNewWorker() throws Exception { long countersAddress = getAlignedAddress(baseAddress, /* offset= */ 0); var batcher = new RequestBatcher( - executor, + /* responseDistributionExecutor= */ commonPool(), + /* queueDrainingExecutor= */ queueDrainingExecutor, requests -> immediateFuture(respondTo(requests)), /* targetWorkerCount= */ 1, countersAddress, fifo); cleaner.register(batcher, new AddressFreer(baseAddress)); - // Submits a request. This starts a worker to run the batch, but it gets blocked on the executor - // and can't continue. + // Submits a request. This starts a worker to run the batch, but it gets blocked on + // `queueDrainingExecutor` and can't continue. ListenableFuture response1 = batcher.submit(new Request(1)); // Submits a 2nd request. This request observes that there are enough active workers so it tries @@ -176,10 +177,9 @@ public void concurrentWorkCompletion_startsNewWorker() throws Exception { // Waits until the 2nd request starts enqueuing. fifo.tryAppendTokens.acquireUninterruptibly(); - // Allows the 1st worker to continue. It will go idle. There are two Runnables, one to run the - // continuation logic and one that sets the response callbacks. Runs both. - executor.queue.take().run(); - executor.queue.take().run(); + // Allows the 1st worker to continue. This calls an enqueued `continueToNextBatchOrBecomeIdle` + // invocation that will cause the 1st worker to go idle. + queueDrainingExecutor.queue.take().run(); assertThat(response1.get()).isEqualTo(new Response(1)); @@ -188,8 +188,7 @@ public void concurrentWorkCompletion_startsNewWorker() throws Exception { // Allows the 2nd request to enqueue and complete processing. fifo.appendPermits.release(); - executor.queue.take().run(); - executor.queue.take().run(); + queueDrainingExecutor.queue.take().run(); assertThat(response2.get().get()).isEqualTo(new Response(2)); }