Skip to content

Commit

Permalink
Pull upload(ActionResult) into super class.
Browse files Browse the repository at this point in the history
Remove the custom upload(ActionResult) implementations from SimpleBlobStoreActionCache and GrpcRemoteCache.

This is a big step towards merging SimpleBlobStoreActionCache and GrpcRemoteCache.

Closes #9167.

PiperOrigin-RevId: 264563384
  • Loading branch information
buchgr authored and copybara-github committed Aug 21, 2019
1 parent 6e4f9e4 commit 12ebb84
Show file tree
Hide file tree
Showing 9 changed files with 312 additions and 317 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import build.bazel.remote.execution.v2.Tree;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -79,6 +80,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -120,20 +122,8 @@ public AbstractRemoteActionCache(RemoteOptions options, DigestUtil digestUtil) {
abstract ActionResult getCachedActionResult(ActionKey actionKey)
throws IOException, InterruptedException;

/**
* Upload the result of a locally executed action to the remote cache.
*
* @throws IOException if there was an error uploading to the remote cache
* @throws ExecException if uploading any of the action outputs is not supported
*/
abstract void upload(
SimpleBlobStore.ActionKey actionKey,
Action action,
Command command,
Path execRoot,
Collection<Path> files,
FileOutErr outErr)
throws ExecException, IOException, InterruptedException;
protected abstract void setCachedActionResult(ActionKey actionKey, ActionResult action)
throws IOException, InterruptedException;

/**
* Uploads a file
Expand All @@ -157,6 +147,116 @@ abstract void upload(
*/
protected abstract ListenableFuture<Void> uploadBlob(Digest digest, ByteString data);

protected abstract ImmutableSet<Digest> getMissingDigests(Iterable<Digest> digests)
throws IOException, InterruptedException;

/**
* Upload the result of a locally executed action to the remote cache.
*
* @throws IOException if there was an error uploading to the remote cache
* @throws ExecException if uploading any of the action outputs is not supported
*/
public ActionResult upload(
ActionKey actionKey,
Action action,
Command command,
Path execRoot,
Collection<Path> outputs,
FileOutErr outErr,
int exitCode)
throws ExecException, IOException, InterruptedException {
ActionResult.Builder resultBuilder = ActionResult.newBuilder();
uploadOutputs(execRoot, actionKey, action, command, outputs, outErr, resultBuilder);
resultBuilder.setExitCode(exitCode);
ActionResult result = resultBuilder.build();
if (exitCode == 0 && !action.getDoNotCache()) {
setCachedActionResult(actionKey, result);
}
return result;
}

public ActionResult upload(
ActionKey actionKey,
Action action,
Command command,
Path execRoot,
Collection<Path> outputs,
FileOutErr outErr)
throws ExecException, IOException, InterruptedException {
return upload(actionKey, action, command, execRoot, outputs, outErr, /* exitCode= */ 0);
}

private void uploadOutputs(
Path execRoot,
ActionKey actionKey,
Action action,
Command command,
Collection<Path> files,
FileOutErr outErr,
ActionResult.Builder result)
throws ExecException, IOException, InterruptedException {
UploadManifest manifest =
new UploadManifest(
digestUtil,
result,
execRoot,
options.incompatibleRemoteSymlinks,
options.allowSymlinkUpload);
manifest.addFiles(files);
manifest.setStdoutStderr(outErr);
manifest.addAction(actionKey, action, command);

Map<Digest, Path> digestToFile = manifest.getDigestToFile();
Map<Digest, ByteString> digestToBlobs = manifest.getDigestToBlobs();
Collection<Digest> digests = new ArrayList<>();
digests.addAll(digestToFile.keySet());
digests.addAll(digestToBlobs.keySet());

ImmutableSet<Digest> digestsToUpload = getMissingDigests(digests);
ImmutableList.Builder<ListenableFuture<Void>> uploads = ImmutableList.builder();
for (Digest digest : digestsToUpload) {
Path file = digestToFile.get(digest);
if (file != null) {
uploads.add(uploadFile(digest, file));
} else {
ByteString blob = digestToBlobs.get(digest);
if (blob == null) {
String message = "FindMissingBlobs call returned an unknown digest: " + digest;
throw new IOException(message);
}
uploads.add(uploadBlob(digest, blob));
}
}

waitForUploads(uploads.build());

if (manifest.getStderrDigest() != null) {
result.setStderrDigest(manifest.getStderrDigest());
}
if (manifest.getStdoutDigest() != null) {
result.setStdoutDigest(manifest.getStdoutDigest());
}
}

private static void waitForUploads(List<ListenableFuture<Void>> uploads)
throws IOException, InterruptedException {
try {
for (ListenableFuture<Void> upload : uploads) {
upload.get();
}
} catch (ExecutionException e) {
// TODO(buchgr): Add support for cancellation and factor this method out to be shared
// between ByteStreamUploader as well.
Throwable cause = e.getCause();
Throwables.throwIfInstanceOf(cause, IOException.class);
Throwables.throwIfInstanceOf(cause, InterruptedException.class);
if (cause != null) {
throw new IOException(cause);
}
throw new IOException(e);
}
}

/**
* Downloads a blob with a content hash {@code digest} to {@code out}.
*
Expand Down
144 changes: 30 additions & 114 deletions src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.String.format;

import build.bazel.remote.execution.v2.Action;
import build.bazel.remote.execution.v2.ActionCacheGrpc;
import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheBlockingStub;
import build.bazel.remote.execution.v2.ActionResult;
import build.bazel.remote.execution.v2.Command;
import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc;
import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageFutureStub;
import build.bazel.remote.execution.v2.Digest;
Expand All @@ -38,7 +36,6 @@
import com.google.common.base.Ascii;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
Expand All @@ -50,15 +47,13 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.remote.RemoteRetrier.ProgressiveBackoff;
import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
Expand All @@ -70,7 +65,6 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -166,17 +160,8 @@ public static boolean isRemoteCacheOptions(RemoteOptions options) {
|| Ascii.toLowerCase(options.remoteCache).startsWith("https://"));
}

private ListenableFuture<FindMissingBlobsResponse> getMissingDigests(
FindMissingBlobsRequest request) throws IOException, InterruptedException {
Context ctx = Context.current();
try {
return retrier.executeAsync(() -> ctx.call(() -> casFutureStub().findMissingBlobs(request)));
} catch (StatusRuntimeException e) {
throw new IOException(e);
}
}

private ImmutableSet<Digest> getMissingDigests(Iterable<Digest> digests)
@Override
protected ImmutableSet<Digest> getMissingDigests(Iterable<Digest> digests)
throws IOException, InterruptedException {
if (Iterables.isEmpty(digests)) {
return ImmutableSet.of();
Expand Down Expand Up @@ -208,6 +193,16 @@ private ImmutableSet<Digest> getMissingDigests(Iterable<Digest> digests)
return result.build();
}

private ListenableFuture<FindMissingBlobsResponse> getMissingDigests(
FindMissingBlobsRequest request) throws IOException, InterruptedException {
Context ctx = Context.current();
try {
return retrier.executeAsync(() -> ctx.call(() -> casFutureStub().findMissingBlobs(request)));
} catch (StatusRuntimeException e) {
throw new IOException(e);
}
}

/**
* Ensures that the tree structure of the inputs, the input files themselves, and the command are
* available in the remote cache, such that the tree can be reassembled and executed on another
Expand Down Expand Up @@ -376,32 +371,6 @@ public void onCompleted() {
return future;
}

@Override
public void upload(
ActionKey actionKey,
Action action,
Command command,
Path execRoot,
Collection<Path> files,
FileOutErr outErr)
throws ExecException, IOException, InterruptedException {
ActionResult.Builder result = ActionResult.newBuilder();
upload(execRoot, actionKey, action, command, files, outErr, result);
try {
retrier.execute(
() ->
acBlockingStub()
.updateActionResult(
UpdateActionResultRequest.newBuilder()
.setInstanceName(options.remoteInstanceName)
.setActionDigest(actionKey.getDigest())
.setActionResult(result)
.build()));
} catch (StatusRuntimeException e) {
throw new IOException(e);
}
}

@Override
protected ListenableFuture<Void> uploadFile(Digest digest, Path path) {
return uploader.uploadBlobAsync(
Expand All @@ -418,77 +387,6 @@ protected ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
/* forceUpload= */ true);
}

void upload(
Path execRoot,
ActionKey actionKey,
Action action,
Command command,
Collection<Path> files,
FileOutErr outErr,
ActionResult.Builder result)
throws ExecException, IOException, InterruptedException {
UploadManifest manifest =
new UploadManifest(
digestUtil,
result,
execRoot,
options.incompatibleRemoteSymlinks,
options.allowSymlinkUpload);
manifest.addFiles(files);
manifest.setStdoutStderr(outErr);
manifest.addAction(actionKey, action, command);

Map<Digest, Path> digestToFile = manifest.getDigestToFile();
Map<Digest, ByteString> digestToBlobs = manifest.getDigestToBlobs();
Collection<Digest> digests = new ArrayList<>();
digests.addAll(digestToFile.keySet());
digests.addAll(digestToBlobs.keySet());

ImmutableSet<Digest> digestsToUpload = getMissingDigests(digests);
ImmutableList.Builder<ListenableFuture<Void>> uploads = ImmutableList.builder();
for (Digest digest : digestsToUpload) {
Path file = digestToFile.get(digest);
if (file != null) {
uploads.add(uploadFile(digest, file));
} else {
ByteString blob = digestToBlobs.get(digest);
if (blob == null) {
String message = "FindMissingBlobs call returned an unknown digest: " + digest;
throw new IOException(message);
}
uploads.add(uploadBlob(digest, blob));
}
}

waitForUploads(uploads.build());

if (manifest.getStderrDigest() != null) {
result.setStderrDigest(manifest.getStderrDigest());
}
if (manifest.getStdoutDigest() != null) {
result.setStdoutDigest(manifest.getStdoutDigest());
}
}

private static void waitForUploads(List<ListenableFuture<Void>> uploads)
throws IOException, InterruptedException {
try {
for (ListenableFuture<Void> upload : uploads) {
upload.get();
}
} catch (ExecutionException e) {
// TODO(buchgr): Add support for cancellation and factor this method out to be shared
// between ByteStreamUploader as well.
Throwable cause = e.getCause();
Throwables.throwIfInstanceOf(cause, IOException.class);
Throwables.throwIfInstanceOf(cause, InterruptedException.class);
if (cause != null) {
throw new IOException(cause);
}
throw new IOException(e);
}
}

// Execution Cache API

@Override
Expand All @@ -511,4 +409,22 @@ public ActionResult getCachedActionResult(ActionKey actionKey)
throw new IOException(e);
}
}

@Override
protected void setCachedActionResult(ActionKey actionKey, ActionResult result)
throws IOException, InterruptedException {
try {
retrier.execute(
() ->
acBlockingStub()
.updateActionResult(
UpdateActionResultRequest.newBuilder()
.setInstanceName(options.remoteInstanceName)
.setActionDigest(actionKey.getDigest())
.setActionResult(result)
.build()));
} catch (StatusRuntimeException e) {
throw new IOException(e);
}
}
}
Loading

0 comments on commit 12ebb84

Please sign in to comment.