From de4d519ac52bd6400a016654eb85733ba16ac6f7 Mon Sep 17 00:00:00 2001 From: Googler Date: Tue, 28 May 2024 11:53:05 -0700 Subject: [PATCH] Block in repo fetching state's `close()` method This greatly simplifies the code flow. Instead of using `volatile` and resorting to some very unsavory workarounds, we can simply make sure only one thread is changing `state.workerFuture` using plain old synchronization, and on memory pressure, make absolutely sure that the state object is cleaned up after we remove it from the central state cache. This goes against the advice introduced in https://github.com/bazelbuild/bazel/commit/8ef0a519f2e468498fa53f4aede871b658890f92; the wording for `SkyKeyComputeState#close()` has been updated. Also changed the "retry on cancellation" logic from using recursion to using a `while`-loop for better clarity around nested `finally` blocks. Fixes https://github.com/bazelbuild/bazel/issues/22393. PiperOrigin-RevId: 637975501 Change-Id: Ied43f0310ec8953f4ff1c2712fe07b8ccbd6c184 --- .../RepoFetchingSkyKeyComputeState.java | 84 +++++++------- ...oFetchingWorkerSkyFunctionEnvironment.java | 6 +- .../starlark/StarlarkRepositoryFunction.java | 104 ++++++++---------- .../devtools/build/skyframe/SkyFunction.java | 4 +- 4 files changed, 88 insertions(+), 110 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingSkyKeyComputeState.java b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingSkyKeyComputeState.java index cef52f3ea9456a..6bd36490c02093 100644 --- a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingSkyKeyComputeState.java +++ b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingSkyKeyComputeState.java @@ -31,6 +31,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; /** * Captures state that persists across different invocations of {@link @@ -66,16 +67,16 @@ class RepoFetchingSkyKeyComputeState implements SkyKeyComputeState { * This future holds on to the worker thread in order to cancel it when necessary; it also serves * to tell whether a worker thread is already running. */ - // This is volatile since we set it to null to indicate the worker thread isn't running, and this - // could happen on multiple threads. Canceling a future multiple times is safe, though, so we - // only need to worry about nullness. Using a mutex/synchronization is an alternative but it means - // we might block in `close()`, which is potentially bad (see its javadoc). - @Nullable volatile ListenableFuture workerFuture = null; + @GuardedBy("this") + @Nullable + private ListenableFuture workerFuture = null; /** The executor service that manages the worker thread. */ // We hold on to this alongside `workerFuture` because it offers a convenient mechanism to make // sure the worker thread has shut down (with its blocking `close()` method). - ListeningExecutorService workerExecutorService; + @GuardedBy("this") + @Nullable + private ListeningExecutorService workerExecutorService = null; private final String repoName; @@ -89,19 +90,6 @@ class RepoFetchingSkyKeyComputeState implements SkyKeyComputeState { RepoFetchingSkyKeyComputeState(String repoName) { this.repoName = repoName; - reset(); - } - - // This may only be called from the host Skyframe thread, *and* only when no worker thread is - // running. - private void reset() { - workerExecutorService = - MoreExecutors.listeningDecorator( - Executors.newThreadPerTaskExecutor( - Thread.ofVirtual().name("starlark-repository-" + repoName).factory())); - signalSemaphore.drainPermits(); - delegateEnvQueue.clear(); - recordedInputValues.clear(); } /** @@ -114,44 +102,48 @@ SkyFunction.Environment signalForFreshEnv() throws InterruptedException { } /** - * Starts a worker thread running the given callable. This sets the {@code workerFuture} field, - * and makes sure to release a permit on the {@code signalSemaphore} when the worker finishes, - * successfully or otherwise. Returns the worker future. This may only be called from the host + * Returns the worker future, or if a worker is not already running, starts a worker thread + * running the given callable. This makes sure to release a permit on the {@code signalSemaphore} + * when the worker finishes, successfully or otherwise. This may only be called from the host * Skyframe thread. */ - ListenableFuture startWorker( + synchronized ListenableFuture getOrStartWorker( Callable c) { - var workerFuture = workerExecutorService.submit(c); - this.workerFuture = workerFuture; + if (workerFuture != null) { + return workerFuture; + } + // We reset the state object back to its very initial state, since the host SkyFunction may have + // been re-entered (for example b/330892334 and + // https://github.com/bazelbuild/bazel/issues/21238), and/or the previous worker thread may have + // been interrupted while the host SkyFunction was inactive. + workerExecutorService = + MoreExecutors.listeningDecorator( + Executors.newThreadPerTaskExecutor( + Thread.ofVirtual().name("starlark-repository-" + repoName).factory())); + signalSemaphore.drainPermits(); + delegateEnvQueue.clear(); + recordedInputValues.clear(); + + // Start the worker. + workerFuture = workerExecutorService.submit(c); workerFuture.addListener(signalSemaphore::release, directExecutor()); return workerFuture; } + /** + * Closes the state object, and blocks until all pending async work is finished. The state object + * will reset to a clean slate after this method finishes. + */ // This may be called from any thread, including the host Skyframe thread and the // high-memory-pressure listener thread. @Override - public void close() { - var myWorkerFuture = workerFuture; - workerFuture = null; - if (myWorkerFuture != null) { - myWorkerFuture.cancel(true); + public synchronized void close() { + if (workerFuture != null) { + workerFuture.cancel(true); } - workerExecutorService.shutdownNow(); - } - - /** - * Closes the state object, and blocks until all pending async work is finished. The state object - * will reset to a clean slate after this method finishes. This may only be called from the host - * Skyframe thread. - */ - public void closeAndWaitForTermination() throws InterruptedException { - close(); - workerExecutorService.close(); // This blocks - // We reset the state object back to its very initial state, since the host SkyFunction may be - // re-entered (for example b/330892334 and https://github.com/bazelbuild/bazel/issues/21238). - reset(); - if (Thread.interrupted()) { - throw new InterruptedException(); + workerFuture = null; + if (workerExecutorService != null) { + workerExecutorService.close(); // This blocks } } } diff --git a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingWorkerSkyFunctionEnvironment.java b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingWorkerSkyFunctionEnvironment.java index 951ae1200cbc09..5bc9e9ecbaa17d 100644 --- a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingWorkerSkyFunctionEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingWorkerSkyFunctionEnvironment.java @@ -44,10 +44,10 @@ class RepoFetchingWorkerSkyFunctionEnvironment private final RepoFetchingSkyKeyComputeState state; private SkyFunction.Environment delegate; - RepoFetchingWorkerSkyFunctionEnvironment( - RepoFetchingSkyKeyComputeState state, SkyFunction.Environment delegate) { + RepoFetchingWorkerSkyFunctionEnvironment(RepoFetchingSkyKeyComputeState state) + throws InterruptedException { this.state = state; - this.delegate = delegate; + this.delegate = state.delegateEnvQueue.take(); } @Override diff --git a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java index d6e2c35669e369..f603031ca61c78 100644 --- a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java +++ b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.common.collect.Table; +import com.google.common.util.concurrent.ListenableFuture; import com.google.devtools.build.lib.analysis.BlazeDirectories; import com.google.devtools.build.lib.analysis.RuleDefinition; import com.google.devtools.build.lib.bazel.bzlmod.NonRegistryOverride; @@ -143,66 +144,51 @@ public RepositoryDirectoryValue.Builder fetch( if (!useWorkers) { return fetchInternal(args); } - var state = env.getState(() -> new RepoFetchingSkyKeyComputeState(rule.getName())); - if (state.workerExecutorService.isShutdown()) { - // If we get here and the worker executor is shut down, this can only mean that the worker - // future was cancelled while we (the host Skyframe thread) were inactive (as in, having - // returned `null` but not yet restarted). So we wait for the previous worker thread to finish - // first. - // TODO: instead of this complicated dance, consider making it legal for - // `SkyKeyComputeState#close()` to block. This would undo the advice added in commit 8ef0a51, - // but would allow us to merge `close()` and `closeAndWaitForTermination()` and avoid some - // headache. - state.closeAndWaitForTermination(); - } - boolean shouldShutDownWorkerExecutorInFinally = true; - try { - var workerFuture = state.workerFuture; - if (workerFuture == null) { - // No worker is running yet, which means we're just starting to fetch this repo. Start with - // a clean slate, and create the worker. - setupRepoRoot(outputDirectory); - Environment workerEnv = new RepoFetchingWorkerSkyFunctionEnvironment(state, env); - workerFuture = - state.startWorker( - () -> fetchInternal(args.toWorkerArgs(workerEnv, state.recordedInputValues))); - } else { - // A worker is already running. This can only mean one thing -- we just had a Skyframe - // restart, and need to send over a fresh Environment. + // See below (the `catch CancellationException` clause) for why there's a `while` loop here. + while (true) { + var state = env.getState(() -> new RepoFetchingSkyKeyComputeState(rule.getName())); + ListenableFuture workerFuture = + state.getOrStartWorker( + () -> { + Environment workerEnv = new RepoFetchingWorkerSkyFunctionEnvironment(state); + setupRepoRoot(outputDirectory); + return fetchInternal(args.toWorkerArgs(workerEnv, state.recordedInputValues)); + }); + try { state.delegateEnvQueue.put(env); - } - state.signalSemaphore.acquire(); - if (!workerFuture.isDone()) { - // This means that the worker is still running, and expecting a fresh Environment. Return - // null to trigger a Skyframe restart, but *don't* shut down the worker executor. - shouldShutDownWorkerExecutorInFinally = false; - return null; - } - RepositoryDirectoryValue.Builder result = workerFuture.get(); - recordedInputValues.putAll(state.recordedInputValues); - return result; - } catch (ExecutionException e) { - Throwables.throwIfInstanceOf(e.getCause(), RepositoryFunctionException.class); - Throwables.throwIfUnchecked(e.getCause()); - throw new IllegalStateException("unexpected exception type: " + e.getClass(), e.getCause()); - } catch (CancellationException e) { - // This can only happen if the state object was invalidated due to memory pressure, in - // which case we can simply reattempt the fetch. - env.getListener() - .post( - RepositoryFetchProgress.ongoing( - RepositoryName.createUnvalidated(rule.getName()), - "fetch interrupted due to memory pressure; restarting.")); - return fetch(rule, outputDirectory, directories, env, recordedInputValues, key); - } finally { - if (shouldShutDownWorkerExecutorInFinally) { - // Unless we know the worker is waiting on a fresh Environment, we should *always* shut down - // the worker executor and reset the state by the time we finish executing (successfully or - // otherwise). This ensures that 1) no background work happens without our knowledge, and - // 2) if the SkyFunction is re-entered for any reason (for example b/330892334 and - // https://github.com/bazelbuild/bazel/issues/21238), we don't have lingering state messing - // things up. - state.closeAndWaitForTermination(); + state.signalSemaphore.acquire(); + if (!workerFuture.isDone()) { + // This means that the worker is still running, and expecting a fresh Environment. Return + // null to trigger a Skyframe restart, but *don't* shut down the worker executor. + return null; + } + RepositoryDirectoryValue.Builder result = workerFuture.get(); + recordedInputValues.putAll(state.recordedInputValues); + return result; + } catch (ExecutionException e) { + Throwables.throwIfInstanceOf(e.getCause(), RepositoryFunctionException.class); + Throwables.throwIfUnchecked(e.getCause()); + throw new IllegalStateException( + "unexpected exception type: " + e.getCause().getClass(), e.getCause()); + } catch (CancellationException e) { + // This can only happen if the state object was invalidated due to memory pressure, in + // which case we can simply reattempt the fetch. Show a message and continue into the next + // `while` iteration. + env.getListener() + .post( + RepositoryFetchProgress.ongoing( + RepositoryName.createUnvalidated(rule.getName()), + "fetch interrupted due to memory pressure; restarting.")); + } finally { + if (workerFuture.isDone()) { + // Unless we know the worker is waiting on a fresh Environment, we should *always* shut + // down the worker executor by the time we finish executing (successfully or otherwise). + // This ensures that 1) no background work happens without our knowledge, and 2) if the + // SkyFunction is re-entered for any reason (for example b/330892334 and + // https://github.com/bazelbuild/bazel/issues/21238), we know we'll need to create a new + // worker from scratch. + state.close(); + } } } } diff --git a/src/main/java/com/google/devtools/build/skyframe/SkyFunction.java b/src/main/java/com/google/devtools/build/skyframe/SkyFunction.java index 7d3cadaeef35ca..d21c90156e5ebf 100644 --- a/src/main/java/com/google/devtools/build/skyframe/SkyFunction.java +++ b/src/main/java/com/google/devtools/build/skyframe/SkyFunction.java @@ -491,8 +491,8 @@ interface SkyKeyComputeState extends AutoCloseable { * *

Implementations MUST be idempotent. * - *

Note also that this method should not perform any heavy work (especially blocking - * operations). + *

Note also that this method could be invoked from arbitrary threads, so avoid heavy + * operations if possible. */ @Override default void close() {}