Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make bazel run works with minimal mode #16545

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -524,4 +524,8 @@ public void finalizeAction(Action action, MetadataHandler metadataHandler) {
prefetchFiles(outputsToDownload, metadataHandler, Priority.LOW);
}
}

public void flushOutputTree() throws InterruptedException {
downloadCache.awaitInProgressTasks();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -929,22 +929,21 @@ public void executorInit(CommandEnvironment env, BuildRequest request, ExecutorB
remoteOutputService.setActionInputFetcher(actionInputFetcher);
actionContextProvider.setActionInputFetcher(actionInputFetcher);

if (remoteOutputsMode.downloadToplevelOutputsOnly()) {
toplevelArtifactsDownloader =
new ToplevelArtifactsDownloader(
env.getCommandName(),
env.getSkyframeExecutor().getEvaluator(),
actionInputFetcher,
(path) -> {
FileSystem fileSystem = path.getFileSystem();
Preconditions.checkState(
fileSystem instanceof RemoteActionFileSystem,
"fileSystem must be an instance of RemoteActionFileSystem");
return ((RemoteActionFileSystem) path.getFileSystem())
.getRemoteMetadata(path.asFragment());
});
env.getEventBus().register(toplevelArtifactsDownloader);
}
toplevelArtifactsDownloader =
new ToplevelArtifactsDownloader(
env.getCommandName(),
remoteOutputsMode.downloadToplevelOutputsOnly(),
env.getSkyframeExecutor().getEvaluator(),
actionInputFetcher,
(path) -> {
FileSystem fileSystem = path.getFileSystem();
Preconditions.checkState(
fileSystem instanceof RemoteActionFileSystem,
"fileSystem must be an instance of RemoteActionFileSystem");
return ((RemoteActionFileSystem) path.getFileSystem())
.getRemoteMetadata(path.asFragment());
});
env.getEventBus().register(toplevelArtifactsDownloader);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ public ModifiedFileSet startBuild(
return ModifiedFileSet.EVERYTHING_MODIFIED;
}

@Override
public void flushOutputTree() throws InterruptedException {
if (actionInputFetcher != null) {
actionInputFetcher.flushOutputTree();
}
}

@Override
public void finalizeBuild(boolean buildSuccessful) {
// Intentionally left empty.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,14 @@ private enum CommandMode {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

private final CommandMode commandMode;
private final boolean downloadToplevel;
private final MemoizingEvaluator memoizingEvaluator;
private final AbstractActionInputPrefetcher actionInputPrefetcher;
private final PathToMetadataConverter pathToMetadataConverter;

public ToplevelArtifactsDownloader(
String commandName,
boolean downloadToplevel,
MemoizingEvaluator memoizingEvaluator,
AbstractActionInputPrefetcher actionInputPrefetcher,
PathToMetadataConverter pathToMetadataConverter) {
Expand All @@ -84,6 +86,7 @@ public ToplevelArtifactsDownloader(
default:
this.commandMode = CommandMode.UNKNOWN;
}
this.downloadToplevel = downloadToplevel;
this.memoizingEvaluator = memoizingEvaluator;
this.actionInputPrefetcher = actionInputPrefetcher;
this.pathToMetadataConverter = pathToMetadataConverter;
Expand Down Expand Up @@ -133,6 +136,10 @@ public void onFailure(Throwable throwable) {
@Subscribe
@AllowConcurrentEvents
public void onAspectComplete(AspectCompleteEvent event) {
if (!shouldDownloadToplevelOutputs(event.getAspectKey().getBaseConfiguredTargetKey())) {
return;
}

if (event.failed()) {
return;
}
Expand All @@ -143,7 +150,7 @@ public void onAspectComplete(AspectCompleteEvent event) {
@Subscribe
@AllowConcurrentEvents
public void onTargetComplete(TargetCompleteEvent event) {
if (!shouldDownloadToplevelOutputsForTarget(event.getConfiguredTargetKey())) {
if (!shouldDownloadToplevelOutputs(event.getConfiguredTargetKey())) {
return;
}

Expand All @@ -156,28 +163,32 @@ public void onTargetComplete(TargetCompleteEvent event) {
event.getExecutableTargetData().getRunfiles());
}

private boolean shouldDownloadToplevelOutputsForTarget(ConfiguredTargetKey configuredTargetKey) {
if (commandMode != CommandMode.TEST) {
return true;
}

// Do not download test binary in test mode.
try {
var configuredTargetValue =
(ConfiguredTargetValue) memoizingEvaluator.getExistingValue(configuredTargetKey);
if (configuredTargetValue == null) {
return false;
}
ConfiguredTarget configuredTarget = configuredTargetValue.getConfiguredTarget();
if (configuredTarget instanceof RuleConfiguredTarget) {
var ruleConfiguredTarget = (RuleConfiguredTarget) configuredTarget;
var isTestRule = isTestRuleName(ruleConfiguredTarget.getRuleClassString());
return !isTestRule;
}
return true;
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
return false;
private boolean shouldDownloadToplevelOutputs(ConfiguredTargetKey configuredTargetKey) {
switch (commandMode) {
case RUN:
// Always download outputs of toplevel targets in RUN mode
return true;
case TEST:
// Do not download test binary in test mode.
try {
var configuredTargetValue =
(ConfiguredTargetValue) memoizingEvaluator.getExistingValue(configuredTargetKey);
if (configuredTargetValue == null) {
return false;
}
ConfiguredTarget configuredTarget = configuredTargetValue.getConfiguredTarget();
if (configuredTarget instanceof RuleConfiguredTarget) {
var ruleConfiguredTarget = (RuleConfiguredTarget) configuredTarget;
var isTestRule = isTestRuleName(ruleConfiguredTarget.getRuleClassString());
return !isTestRule;
}
return true;
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
return false;
}
default:
return downloadToplevel;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableEmitter;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleObserver;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.subjects.AsyncSubject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -131,6 +134,8 @@ class Execution extends Single<ValueT> implements SingleObserver<ValueT> {
@GuardedBy("lock")
private final List<SingleObserver<? super ValueT>> observers = new ArrayList<>();

private final AsyncSubject<ValueT> completion = AsyncSubject.create();

Execution(KeyT key, Single<ValueT> upstream) {
this.key = key;
this.upstream = upstream;
Expand Down Expand Up @@ -182,6 +187,9 @@ public void onSuccess(@NonNull ValueT value) {
observer.onSuccess(value);
}

completion.onNext(value);
completion.onComplete();

maybeNotifyTermination();
}
}
Expand All @@ -198,6 +206,8 @@ public void onError(@NonNull Throwable error) {
observer.onError(error);
}

completion.onError(error);

maybeNotifyTermination();
}
}
Expand Down Expand Up @@ -348,7 +358,42 @@ public void shutdown() {
}
}

/** Waits for the channel to become terminated. */
/**
* Waits for the in-progress tasks to finish. Any tasks that are submitted after the call are not
* waited.
*/
public void awaitInProgressTasks() throws InterruptedException {
Completable completable =
Completable.defer(
() -> {
Collection<Execution> executions;
synchronized (lock) {
executions = ImmutableList.copyOf(inProgress.values());
}

if (executions.isEmpty()) {
return Completable.complete();
}

return Completable.fromPublisher(
Flowable.fromIterable(executions)
.flatMapSingle(e -> Single.fromObservable(e.completion)));
});

try {
completable.blockingAwait();
} catch (RuntimeException e) {
Throwable cause = e.getCause();
if (cause != null) {
throwIfInstanceOf(cause, InterruptedException.class);
}
throw e;
}
}

/**
* Waits for the channel to become terminated.
*/
public void awaitTermination() throws InterruptedException {
Completable completable =
Completable.create(
Expand Down Expand Up @@ -493,7 +538,17 @@ public void shutdown() {
cache.shutdown();
}

/** Waits for the cache to become terminated. */
/**
* Waits for the in-progress tasks to finish. Any tasks that are submitted after the call are
* not waited.
*/
public void awaitInProgressTasks() throws InterruptedException {
cache.awaitInProgressTasks();
}

/**
* Waits for the cache to become terminated.
*/
public void awaitTermination() throws InterruptedException {
cache.awaitTermination();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,17 @@ public BlazeCommandResult exec(CommandEnvironment env, OptionsParsingResult opti
return BlazeCommandResult.detailedExitCode(result.getDetailedExitCode());
}

// If Bazel is using an output service (e.g. Build without the Bytes), the toplevel outputs
// might still be downloading in the background. Flush the output tree to wait for all the
// downloads complete.
if (env.getOutputService() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment here explaining why this is necessary (since a casual reader of RunCommand.java might not be familiar with BwoB).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

try {
env.getOutputService().flushOutputTree();
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}

// Make sure that we have exactly 1 built target (excluding --run_under),
// and that it is executable.
// These checks should only fail if keepGoing is true, because we already did
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,10 @@ Path getExecRoot() {
return executorEngine.getExecRoot();
}

ActionInputPrefetcher getActionInputPrefetcher() {
return actionInputPrefetcher;
}

ActionContextRegistry getActionContextRegistry() {
return executorEngine;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2630,6 +2630,10 @@ public MemoizingEvaluator getEvaluator() {
return memoizingEvaluator;
}

public ActionInputPrefetcher getActionInputPrefetcher() {
return skyframeActionExecutor.getActionInputPrefetcher();
}

@VisibleForTesting
public ConfiguredRuleClassProvider getRuleClassProviderForTesting() {
return ruleClassProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ public default boolean shouldTrustRemoteArtifacts() {
ModifiedFileSet startBuild(EventHandler eventHandler, UUID buildId, boolean finalizeActions)
throws BuildFailedException, AbruptExitException, InterruptedException;

/** Flush and wait for in-progress downloads. */
void flushOutputTree() throws InterruptedException;

/**
* Finish the build.
*
Expand Down
22 changes: 22 additions & 0 deletions src/test/shell/bazel/remote/build_without_the_bytes_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -1331,4 +1331,26 @@ EOF
[[ ! -e "bazel-bin/a/liblib.jdeps" ]] || fail "bazel-bin/a/liblib.jdeps shouldn't exist"
}

function test_bazel_run_with_minimal() {
# Test that `bazel run` works in minimal mode.
mkdir -p a

cat > a/BUILD <<'EOF'
genrule(
name = 'bin',
srcs = [],
outs = ['bin.out'],
cmd = "echo 'echo bin-message' > $@",
executable = True,
)
EOF

bazel run \
--remote_executor=grpc://localhost:${worker_port} \
--remote_download_minimal \
//a:bin >& $TEST_log || fail "Failed to run //a:bin"

expect_log "bin-message"
}

run_suite "Build without the Bytes tests"