Skip to content

Commit

Permalink
[7.2.0] Block in repo fetching state's close() method (#22573)
Browse files Browse the repository at this point in the history
This greatly simplifies the code flow. Instead of using `volatile` and
resorting to some very unsavory workarounds, we can simply make sure
only one thread is changing `state.workerFuture` using plain old
synchronization, and on memory pressure, make absolutely sure that the
state object is cleaned up after we remove it from the central state
cache.

This goes against the advice introduced in
8ef0a51;
the wording for `SkyKeyComputeState#close()` has been updated.

Also changed the "retry on cancellation" logic from using recursion to
using a `while`-loop for better clarity around nested `finally` blocks.

Fixes #22393.

PiperOrigin-RevId: 637975501
Change-Id: Ied43f0310ec8953f4ff1c2712fe07b8ccbd6c184

Commit
de4d519

Co-authored-by: Googler <[email protected]>
  • Loading branch information
bazel-io and Wyverald authored May 28, 2024
1 parent 1536fba commit c6cabd8
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

/**
* Captures state that persists across different invocations of {@link
Expand Down Expand Up @@ -66,16 +67,16 @@ class RepoFetchingSkyKeyComputeState implements SkyKeyComputeState {
* This future holds on to the worker thread in order to cancel it when necessary; it also serves
* to tell whether a worker thread is already running.
*/
// This is volatile since we set it to null to indicate the worker thread isn't running, and this
// could happen on multiple threads. Canceling a future multiple times is safe, though, so we
// only need to worry about nullness. Using a mutex/synchronization is an alternative but it means
// we might block in `close()`, which is potentially bad (see its javadoc).
@Nullable volatile ListenableFuture<RepositoryDirectoryValue.Builder> workerFuture = null;
@GuardedBy("this")
@Nullable
private ListenableFuture<RepositoryDirectoryValue.Builder> workerFuture = null;

/** The executor service that manages the worker thread. */
// We hold on to this alongside `workerFuture` because it offers a convenient mechanism to make
// sure the worker thread has shut down (with its blocking `close()` method).
ListeningExecutorService workerExecutorService;
@GuardedBy("this")
@Nullable
private ListeningExecutorService workerExecutorService = null;

private final String repoName;

Expand All @@ -89,19 +90,6 @@ class RepoFetchingSkyKeyComputeState implements SkyKeyComputeState {

RepoFetchingSkyKeyComputeState(String repoName) {
this.repoName = repoName;
reset();
}

// This may only be called from the host Skyframe thread, *and* only when no worker thread is
// running.
private void reset() {
workerExecutorService =
MoreExecutors.listeningDecorator(
Executors.newThreadPerTaskExecutor(
Thread.ofVirtual().name("starlark-repository-" + repoName).factory()));
signalSemaphore.drainPermits();
delegateEnvQueue.clear();
recordedInputValues.clear();
}

/**
Expand All @@ -114,44 +102,48 @@ SkyFunction.Environment signalForFreshEnv() throws InterruptedException {
}

/**
* Starts a worker thread running the given callable. This sets the {@code workerFuture} field,
* and makes sure to release a permit on the {@code signalSemaphore} when the worker finishes,
* successfully or otherwise. Returns the worker future. This may only be called from the host
* Returns the worker future, or if a worker is not already running, starts a worker thread
* running the given callable. This makes sure to release a permit on the {@code signalSemaphore}
* when the worker finishes, successfully or otherwise. This may only be called from the host
* Skyframe thread.
*/
ListenableFuture<RepositoryDirectoryValue.Builder> startWorker(
synchronized ListenableFuture<RepositoryDirectoryValue.Builder> getOrStartWorker(
Callable<RepositoryDirectoryValue.Builder> c) {
var workerFuture = workerExecutorService.submit(c);
this.workerFuture = workerFuture;
if (workerFuture != null) {
return workerFuture;
}
// We reset the state object back to its very initial state, since the host SkyFunction may have
// been re-entered (for example b/330892334 and
// https://github.com/bazelbuild/bazel/issues/21238), and/or the previous worker thread may have
// been interrupted while the host SkyFunction was inactive.
workerExecutorService =
MoreExecutors.listeningDecorator(
Executors.newThreadPerTaskExecutor(
Thread.ofVirtual().name("starlark-repository-" + repoName).factory()));
signalSemaphore.drainPermits();
delegateEnvQueue.clear();
recordedInputValues.clear();

// Start the worker.
workerFuture = workerExecutorService.submit(c);
workerFuture.addListener(signalSemaphore::release, directExecutor());
return workerFuture;
}

/**
* Closes the state object, and blocks until all pending async work is finished. The state object
* will reset to a clean slate after this method finishes.
*/
// This may be called from any thread, including the host Skyframe thread and the
// high-memory-pressure listener thread.
@Override
public void close() {
var myWorkerFuture = workerFuture;
workerFuture = null;
if (myWorkerFuture != null) {
myWorkerFuture.cancel(true);
public synchronized void close() {
if (workerFuture != null) {
workerFuture.cancel(true);
}
workerExecutorService.shutdownNow();
}

/**
* Closes the state object, and blocks until all pending async work is finished. The state object
* will reset to a clean slate after this method finishes. This may only be called from the host
* Skyframe thread.
*/
public void closeAndWaitForTermination() throws InterruptedException {
close();
workerExecutorService.close(); // This blocks
// We reset the state object back to its very initial state, since the host SkyFunction may be
// re-entered (for example b/330892334 and https://github.com/bazelbuild/bazel/issues/21238).
reset();
if (Thread.interrupted()) {
throw new InterruptedException();
workerFuture = null;
if (workerExecutorService != null) {
workerExecutorService.close(); // This blocks
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ class RepoFetchingWorkerSkyFunctionEnvironment
private final RepoFetchingSkyKeyComputeState state;
private SkyFunction.Environment delegate;

RepoFetchingWorkerSkyFunctionEnvironment(
RepoFetchingSkyKeyComputeState state, SkyFunction.Environment delegate) {
RepoFetchingWorkerSkyFunctionEnvironment(RepoFetchingSkyKeyComputeState state)
throws InterruptedException {
this.state = state;
this.delegate = delegate;
this.delegate = state.delegateEnvQueue.take();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Table;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.analysis.BlazeDirectories;
import com.google.devtools.build.lib.analysis.RuleDefinition;
import com.google.devtools.build.lib.bazel.bzlmod.NonRegistryOverride;
Expand Down Expand Up @@ -143,66 +144,51 @@ public RepositoryDirectoryValue.Builder fetch(
if (!useWorkers) {
return fetchInternal(args);
}
var state = env.getState(() -> new RepoFetchingSkyKeyComputeState(rule.getName()));
if (state.workerExecutorService.isShutdown()) {
// If we get here and the worker executor is shut down, this can only mean that the worker
// future was cancelled while we (the host Skyframe thread) were inactive (as in, having
// returned `null` but not yet restarted). So we wait for the previous worker thread to finish
// first.
// TODO: instead of this complicated dance, consider making it legal for
// `SkyKeyComputeState#close()` to block. This would undo the advice added in commit 8ef0a51,
// but would allow us to merge `close()` and `closeAndWaitForTermination()` and avoid some
// headache.
state.closeAndWaitForTermination();
}
boolean shouldShutDownWorkerExecutorInFinally = true;
try {
var workerFuture = state.workerFuture;
if (workerFuture == null) {
// No worker is running yet, which means we're just starting to fetch this repo. Start with
// a clean slate, and create the worker.
setupRepoRoot(outputDirectory);
Environment workerEnv = new RepoFetchingWorkerSkyFunctionEnvironment(state, env);
workerFuture =
state.startWorker(
() -> fetchInternal(args.toWorkerArgs(workerEnv, state.recordedInputValues)));
} else {
// A worker is already running. This can only mean one thing -- we just had a Skyframe
// restart, and need to send over a fresh Environment.
// See below (the `catch CancellationException` clause) for why there's a `while` loop here.
while (true) {
var state = env.getState(() -> new RepoFetchingSkyKeyComputeState(rule.getName()));
ListenableFuture<RepositoryDirectoryValue.Builder> workerFuture =
state.getOrStartWorker(
() -> {
Environment workerEnv = new RepoFetchingWorkerSkyFunctionEnvironment(state);
setupRepoRoot(outputDirectory);
return fetchInternal(args.toWorkerArgs(workerEnv, state.recordedInputValues));
});
try {
state.delegateEnvQueue.put(env);
}
state.signalSemaphore.acquire();
if (!workerFuture.isDone()) {
// This means that the worker is still running, and expecting a fresh Environment. Return
// null to trigger a Skyframe restart, but *don't* shut down the worker executor.
shouldShutDownWorkerExecutorInFinally = false;
return null;
}
RepositoryDirectoryValue.Builder result = workerFuture.get();
recordedInputValues.putAll(state.recordedInputValues);
return result;
} catch (ExecutionException e) {
Throwables.throwIfInstanceOf(e.getCause(), RepositoryFunctionException.class);
Throwables.throwIfUnchecked(e.getCause());
throw new IllegalStateException("unexpected exception type: " + e.getClass(), e.getCause());
} catch (CancellationException e) {
// This can only happen if the state object was invalidated due to memory pressure, in
// which case we can simply reattempt the fetch.
env.getListener()
.post(
RepositoryFetchProgress.ongoing(
RepositoryName.createUnvalidated(rule.getName()),
"fetch interrupted due to memory pressure; restarting."));
return fetch(rule, outputDirectory, directories, env, recordedInputValues, key);
} finally {
if (shouldShutDownWorkerExecutorInFinally) {
// Unless we know the worker is waiting on a fresh Environment, we should *always* shut down
// the worker executor and reset the state by the time we finish executing (successfully or
// otherwise). This ensures that 1) no background work happens without our knowledge, and
// 2) if the SkyFunction is re-entered for any reason (for example b/330892334 and
// https://github.com/bazelbuild/bazel/issues/21238), we don't have lingering state messing
// things up.
state.closeAndWaitForTermination();
state.signalSemaphore.acquire();
if (!workerFuture.isDone()) {
// This means that the worker is still running, and expecting a fresh Environment. Return
// null to trigger a Skyframe restart, but *don't* shut down the worker executor.
return null;
}
RepositoryDirectoryValue.Builder result = workerFuture.get();
recordedInputValues.putAll(state.recordedInputValues);
return result;
} catch (ExecutionException e) {
Throwables.throwIfInstanceOf(e.getCause(), RepositoryFunctionException.class);
Throwables.throwIfUnchecked(e.getCause());
throw new IllegalStateException(
"unexpected exception type: " + e.getCause().getClass(), e.getCause());
} catch (CancellationException e) {
// This can only happen if the state object was invalidated due to memory pressure, in
// which case we can simply reattempt the fetch. Show a message and continue into the next
// `while` iteration.
env.getListener()
.post(
RepositoryFetchProgress.ongoing(
RepositoryName.createUnvalidated(rule.getName()),
"fetch interrupted due to memory pressure; restarting."));
} finally {
if (workerFuture.isDone()) {
// Unless we know the worker is waiting on a fresh Environment, we should *always* shut
// down the worker executor by the time we finish executing (successfully or otherwise).
// This ensures that 1) no background work happens without our knowledge, and 2) if the
// SkyFunction is re-entered for any reason (for example b/330892334 and
// https://github.com/bazelbuild/bazel/issues/21238), we know we'll need to create a new
// worker from scratch.
state.close();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,8 @@ interface SkyKeyComputeState extends AutoCloseable {
*
* <p>Implementations <strong>MUST</strong> be idempotent.
*
* <p>Note also that this method should not perform any heavy work (especially blocking
* operations).
* <p>Note also that this method could be invoked from arbitrary threads, so avoid heavy
* operations if possible.
*/
@Override
default void close() {}
Expand Down

0 comments on commit c6cabd8

Please sign in to comment.