Skip to content

Commit

Permalink
[7.1.0] Compute output directories in parallel when building the uplo…
Browse files Browse the repository at this point in the history
…ad manifest. (#21386)

Some users have very large tree artifacts (with hundreds of thousands of
files) which are extremely slow to traverse sequentially. In one
particular scenario with ~250k files, this change reduces the time spent
computing the manifest from ~70s to ~5s.

(We could parallelize further across all outputs, but I prefer to wait
until there is a concrete need for it.)

PiperOrigin-RevId: 607678324
Change-Id: Ia80d0a663909b8ed96c6720d9eb5393fb3027b48
  • Loading branch information
tjgq authored Feb 16, 2024
1 parent 3dc745a commit 59ab922
Show file tree
Hide file tree
Showing 2 changed files with 220 additions and 106 deletions.
302 changes: 211 additions & 91 deletions src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,45 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable;
import static com.google.devtools.build.lib.remote.util.RxFutures.toSingle;
import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer;
import static com.google.devtools.build.lib.remote.util.RxUtils.toTransferResult;
import static java.util.Comparator.comparing;
import static java.util.Comparator.naturalOrder;
import static java.util.Comparator.reverseOrder;

import build.bazel.remote.execution.v2.Action;
import build.bazel.remote.execution.v2.ActionResult;
import build.bazel.remote.execution.v2.CacheCapabilities;
import build.bazel.remote.execution.v2.Command;
import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.Directory;
import build.bazel.remote.execution.v2.FileNode;
import build.bazel.remote.execution.v2.OutputSymlink;
import build.bazel.remote.execution.v2.SymlinkAbsolutePathStrategy;
import build.bazel.remote.execution.v2.SymlinkNode;
import build.bazel.remote.execution.v2.Tree;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SortedSetMultimap;
import com.google.common.collect.TreeMultimap;
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
import com.google.devtools.build.lib.actions.ActionUploadFinishedEvent;
import com.google.devtools.build.lib.actions.ActionUploadStartedEvent;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.UserExecException;
import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
import com.google.devtools.build.lib.concurrent.ErrorClassifier;
import com.google.devtools.build.lib.concurrent.NamedForkJoinPool;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
Expand Down Expand Up @@ -66,25 +80,28 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

/** UploadManifest adds output metadata to a {@link ActionResult}. */
public class UploadManifest {

private final DigestUtil digestUtil;
private final RemotePathResolver remotePathResolver;
private final ActionResult.Builder result;
private final boolean followSymlinks;
private final boolean allowDanglingSymlinks;
private final boolean allowAbsoluteSymlinks;
private final Map<Digest, Path> digestToFile = new HashMap<>();
private final Map<Digest, ByteString> digestToBlobs = new HashMap<>();
private final ConcurrentHashMap<Digest, Path> digestToFile = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Digest, ByteString> digestToBlobs = new ConcurrentHashMap<>();
@Nullable private ActionKey actionKey;
private Digest stderrDigest;
private Digest stdoutDigest;
Expand All @@ -102,7 +119,7 @@ public static UploadManifest create(
int exitCode,
Instant startTime,
int wallTimeInMs)
throws ExecException, IOException {
throws ExecException, IOException, InterruptedException {
ActionResult.Builder result = ActionResult.newBuilder();
result.setExitCode(exitCode);

Expand Down Expand Up @@ -208,7 +225,7 @@ private void setStdoutStderr(FileOutErr outErr) throws IOException {
* after execution, both for cache hits and misses.
*/
@VisibleForTesting
void addFiles(Collection<Path> files) throws ExecException, IOException {
void addFiles(Collection<Path> files) throws ExecException, IOException, InterruptedException {
// TODO(tjgq): Non-dangling absolute symlinks are uploaded as the file or directory they point
// to even when followSymlinks is false. This is inconsistent with the treatment of relative
// symlinks, but fixing it would require an incompatible change.
Expand Down Expand Up @@ -341,107 +358,210 @@ private void addFile(Digest digest, Path file) throws IOException {
digestToFile.put(digest, file);
}

// Field numbers of the 'root' and 'directory' fields in the Tree message.
private static final int TREE_ROOT_FIELD_NUMBER =
Tree.getDescriptor().findFieldByName("root").getNumber();
private static final int TREE_CHILDREN_FIELD_NUMBER =
Tree.getDescriptor().findFieldByName("children").getNumber();
private static final class WrappedException extends RuntimeException {
private final Exception wrapped;

private void addDirectory(Path dir) throws ExecException, IOException {
LinkedHashSet<ByteString> directories = new LinkedHashSet<>();
var ignored = computeDirectory(dir, directories);

// Convert individual Directory messages to a Tree message. As we want the
// records to be topologically sorted (parents before children), we iterate
// over the directories in reverse insertion order.
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(byteArrayOutputStream);
int fieldNumber = TREE_ROOT_FIELD_NUMBER;
for (ByteString directory : Lists.reverse(new ArrayList<ByteString>(directories))) {
codedOutputStream.writeBytes(fieldNumber, directory);
fieldNumber = TREE_CHILDREN_FIELD_NUMBER;
WrappedException(Exception wrapped) {
super(wrapped);
this.wrapped = wrapped;
}
codedOutputStream.flush();

ByteString data = ByteString.copyFrom(byteArrayOutputStream.toByteArray());
Digest digest = digestUtil.compute(data.toByteArray());

if (result != null) {
result
.addOutputDirectoriesBuilder()
.setPath(remotePathResolver.localPathToOutputPath(dir))
.setTreeDigest(digest)
.setIsTopologicallySorted(true);
Exception unwrap() {
return wrapped;
}

digestToBlobs.put(digest, data);
}

/** A thread pool shared by all {@link DirectoryBuilder} instances. */
private static final ForkJoinPool VISITOR_POOL =
NamedForkJoinPool.newNamedPool(
"upload-manifest-directory-visitor", Runtime.getRuntime().availableProcessors());

/**
* Computes the {@link Directory} message describing the transitive contents of a directory.
*
* @param path the directory path.
* @param directories an output parameter accepting the wire-format {@link Directory} messages for
* every visited (sub-)directory of {@code path}, including {@code path} itself, in a
* deterministic topological order (children before parents).
* @return the wire-format {@link Directory} message for {@code path}.
* A {@link DirectoryBuilder} constructs a {@link Tree} message for an output directory, doing as
* much as possible in parallel.
*/
private ByteString computeDirectory(Path path, LinkedHashSet<ByteString> directories)
throws ExecException, IOException {
Directory.Builder b = Directory.newBuilder();

List<Dirent> sortedDirent = new ArrayList<>(path.readdir(Symlinks.NOFOLLOW));
sortedDirent.sort(Comparator.comparing(Dirent::getName));

for (Dirent dirent : sortedDirent) {
String name = dirent.getName();
Path child = path.getRelative(name);
if (dirent.getType() == Dirent.Type.FILE) {
Digest digest = digestUtil.compute(child);
b.addFilesBuilder().setName(name).setDigest(digest).setIsExecutable(true);
digestToFile.put(digest, child);
continue;
private class DirectoryBuilder extends AbstractQueueVisitor {
private final Path rootDir;

// Directories found during the traversal, including the root.
// Sorted in reverse so that children iterate before parents.
private final SortedSet<Path> dirs =
Collections.synchronizedSortedSet(new TreeSet<Path>(reverseOrder()));

// Maps each directory found during the traversal to its subdirectories.
private final SortedSetMultimap<Path, Path> dirToSubdirs =
Multimaps.synchronizedSortedSetMultimap(TreeMultimap.create());

// Maps each directory found during the traversal to its files.
private final SortedSetMultimap<Path, FileNode> dirToFiles =
Multimaps.synchronizedSortedSetMultimap(
TreeMultimap.<Path, FileNode>create(naturalOrder(), comparing(FileNode::getName)));

// Maps each directory found during the traversal to its symlinks.
private final SortedSetMultimap<Path, SymlinkNode> dirToSymlinks =
Multimaps.synchronizedSortedSetMultimap(
TreeMultimap.<Path, SymlinkNode>create(
naturalOrder(), comparing(SymlinkNode::getName)));

DirectoryBuilder(Path rootDir) {
super(
VISITOR_POOL,
ExecutorOwnership.SHARED,
ExceptionHandlingMode.FAIL_FAST,
ErrorClassifier.DEFAULT);
this.rootDir = checkNotNull(rootDir);
}

/**
* Returns a {@link Tree} message in wire format describing the directory contents, obeying the
* requirements of the {@code OutputDirectory.is_topologically_sorted} field.
*/
ByteString build() throws ExecException, IOException, InterruptedException {
// Collect directory entries (subdirectories, files, symlinks) in parallel.
// This is a major speedup for large tree artifacts with hundreds of thousands of files.
execute(() -> visit(rootDir, Dirent.Type.DIRECTORY));
try {
awaitQuiescence(true);
} catch (WrappedException e) {
Throwables.throwIfInstanceOf(e.unwrap(), ExecException.class);
Throwables.throwIfInstanceOf(e.unwrap(), IOException.class);
throw new AssertionError("unexpected exception", e.unwrap());
}
if (dirent.getType() == Dirent.Type.DIRECTORY) {
ByteString dir = computeDirectory(child, directories);
b.addDirectoriesBuilder().setName(name).setDigest(digestUtil.compute(dir.toByteArray()));
continue;

// Compute the Directory message for every node, including the root. Since each directory
// references its subdirectories by their digest, the messages must be computed in topological
// order (children before parents). In addition, the contents of each Directory message must
// be sorted, which is already ensured by the use of sorted maps.

HashMap<Path, Digest> dirToDigest = new HashMap<>();
LinkedHashSet<ByteString> dirBlobs = new LinkedHashSet<>();

for (Path dir : dirs) {
Directory.Builder builder = Directory.newBuilder();
builder.addAllFiles(dirToFiles.get(dir));
builder.addAllSymlinks(dirToSymlinks.get(dir));
for (Path subdir : dirToSubdirs.get(dir)) {
checkState(subdir.getParentDirectory().equals(dir));
builder
.addDirectoriesBuilder()
.setName(subdir.getBaseName())
.setDigest(dirToDigest.get(subdir));
}
ByteString dirBlob = builder.build().toByteString();

dirToDigest.put(dir, digestUtil.compute(dirBlob.toByteArray()));
dirBlobs.add(dirBlob);
}
if (dirent.getType() == Dirent.Type.SYMLINK) {
PathFragment target = child.readSymbolicLink();
FileStatus statFollow = child.statIfFound(Symlinks.FOLLOW);
if (statFollow == null || (!followSymlinks && !target.isAbsolute())) {
// Symlink uploaded as a symlink.
if (statFollow == null) {
checkDanglingSymlinkAllowed(child, target);
}
if (target.isAbsolute()) {
checkAbsoluteSymlinkAllowed(child, target);
}
b.addSymlinksBuilder().setName(name).setTarget(target.toString());
continue;

// Convert individual Directory messages to a Tree message. As we want the records to be
// topologically sorted (parents before children), we iterate over the directories in reverse
// insertion order. We construct the message through direct byte manipulation to ensure that
// the strict requirements on the encoding are observed.

ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(byteArrayOutputStream);
int fieldNumber = TREE_ROOT_FIELD_NUMBER;
for (ByteString directory : Lists.reverse(new ArrayList<>(dirBlobs))) {
codedOutputStream.writeBytes(fieldNumber, directory);
fieldNumber = TREE_CHILDREN_FIELD_NUMBER;
}
codedOutputStream.flush();

return ByteString.copyFrom(byteArrayOutputStream.toByteArray());
}

private void visit(Path path, Dirent.Type type) {
try {
if (type == Dirent.Type.FILE) {
visitAsFile(path);
return;
}
if (statFollow.isFile() && !statFollow.isSpecialFile()) {
// Symlink to file uploaded as a file.
Digest digest = digestUtil.compute(child);
b.addFilesBuilder().setName(name).setDigest(digest).setIsExecutable(true);
digestToFile.put(digest, child);
continue;
if (type == Dirent.Type.DIRECTORY) {
visitAsDirectory(path);
for (Dirent dirent : path.readdir(Symlinks.NOFOLLOW)) {
Path childPath = path.getChild(dirent.getName());
Dirent.Type childType = dirent.getType();
execute(() -> visit(childPath, childType));
}
return;
}
if (statFollow.isDirectory()) {
// Symlink to directory uploaded as a directory.
ByteString dir = computeDirectory(child, directories);
b.addDirectoriesBuilder().setName(name).setDigest(digestUtil.compute(dir.toByteArray()));
continue;
if (type == Dirent.Type.SYMLINK) {
PathFragment target = path.readSymbolicLink();
FileStatus statFollow = path.statIfFound(Symlinks.FOLLOW);
if (statFollow == null || (!followSymlinks && !target.isAbsolute())) {
// Symlink uploaded as a symlink.
if (statFollow == null) {
checkDanglingSymlinkAllowed(path, target);
}
if (target.isAbsolute()) {
checkAbsoluteSymlinkAllowed(path, target);
}
visitAsSymlink(path, target);
return;
}
if (statFollow.isFile() && !statFollow.isSpecialFile()) {
// Symlink to file uploaded as a file.
execute(() -> visit(path, Dirent.Type.FILE));
return;
}
if (statFollow.isDirectory()) {
// Symlink to directory uploaded as a directory.
execute(() -> visit(path, Dirent.Type.DIRECTORY));
return;
}
}
rejectSpecialFile(path);
} catch (ExecException | IOException e) {
// We can't throw checked exceptions here since AQV expects Runnables
throw new WrappedException(e);
}
}

private void visitAsDirectory(Path path) {
dirs.add(path);
if (!path.equals(rootDir)) {
dirToSubdirs.put(path.getParentDirectory(), path);
}
// Special file or dereferenced symlink to special file.
rejectSpecialFile(child);
}

ByteString directory = b.build().toByteString();
directories.add(directory);
return directory;
private void visitAsFile(Path path) throws IOException {
Path parentPath = path.getParentDirectory();
Digest digest = digestUtil.compute(path);
FileNode node =
FileNode.newBuilder()
.setName(path.getBaseName())
.setDigest(digest)
.setIsExecutable(true)
.build();
digestToFile.put(digest, path);
dirToFiles.put(parentPath, node);
}

private void visitAsSymlink(Path path, PathFragment target) {
Path parentPath = path.getParentDirectory();
SymlinkNode node =
SymlinkNode.newBuilder().setName(path.getBaseName()).setTarget(target.toString()).build();
dirToSymlinks.put(parentPath, node);
}
}

// Field numbers of the 'root' and 'directory' fields in the Tree message.
private static final int TREE_ROOT_FIELD_NUMBER =
Tree.getDescriptor().findFieldByName("root").getNumber();
private static final int TREE_CHILDREN_FIELD_NUMBER =
Tree.getDescriptor().findFieldByName("children").getNumber();

private void addDirectory(Path dir) throws ExecException, IOException, InterruptedException {
ByteString treeBlob = new DirectoryBuilder(dir).build();
Digest treeDigest = digestUtil.compute(treeBlob.toByteArray());

result
.addOutputDirectoriesBuilder()
.setPath(remotePathResolver.localPathToOutputPath(dir))
.setTreeDigest(treeDigest)
.setIsTopologicallySorted(true);

digestToBlobs.put(treeDigest, treeBlob);
}

private void checkDanglingSymlinkAllowed(Path file, PathFragment target) throws IOException {
Expand Down
Loading

0 comments on commit 59ab922

Please sign in to comment.