diff --git a/pkg/builder/BUILD.bazel b/pkg/builder/BUILD.bazel index 9188b05..eca6154 100644 --- a/pkg/builder/BUILD.bazel +++ b/pkg/builder/BUILD.bazel @@ -60,6 +60,7 @@ go_library( "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", "@org_golang_google_protobuf//encoding/protojson", + "@org_golang_google_protobuf//encoding/protowire", "@org_golang_google_protobuf//proto", "@org_golang_google_protobuf//types/known/anypb", "@org_golang_google_protobuf//types/known/durationpb", diff --git a/pkg/builder/output_hierarchy.go b/pkg/builder/output_hierarchy.go index c1c4643..61175e6 100644 --- a/pkg/builder/output_hierarchy.go +++ b/pkg/builder/output_hierarchy.go @@ -7,6 +7,7 @@ import ( remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" "github.com/buildbarn/bb-storage/pkg/blobstore" + "github.com/buildbarn/bb-storage/pkg/blobstore/buffer" "github.com/buildbarn/bb-storage/pkg/digest" "github.com/buildbarn/bb-storage/pkg/filesystem" "github.com/buildbarn/bb-storage/pkg/filesystem/path" @@ -14,6 +15,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/encoding/protowire" "google.golang.org/protobuf/proto" ) @@ -201,97 +203,57 @@ func (s *uploadOutputsState) saveError(err error) { } } -// UploadDirectory is called to upload a single directory. Elements in -// the directory are stored in a remoteexecution.Directory, so that they -// can be placed in a remoteexecution.Tree. -func (s *uploadOutputsState) uploadDirectory(d UploadableDirectory, dPath *path.Trace, children map[digest.Digest]*remoteexecution.Directory) *remoteexecution.Directory { - files, err := d.ReadDir() - if err != nil { - s.saveError(util.StatusWrapf(err, "Failed to read output directory %#v", dPath.String())) - return &remoteexecution.Directory{} - } - - var directory remoteexecution.Directory - for _, file := range files { - name := file.Name() - childPath := dPath.Append(name) - switch fileType := file.Type(); fileType { - case filesystem.FileTypeRegularFile: - if childDigest, err := d.UploadFile(s.context, name, s.digestFunction); err == nil { - directory.Files = append(directory.Files, &remoteexecution.FileNode{ - Name: name.String(), - Digest: childDigest.GetProto(), - IsExecutable: file.IsExecutable(), - }) - } else { - s.saveError(util.StatusWrapf(err, "Failed to store output file %#v", childPath.String())) - } - case filesystem.FileTypeDirectory: - if childDirectory, err := d.EnterUploadableDirectory(name); err == nil { - child := s.uploadDirectory(childDirectory, dPath, children) - childDirectory.Close() - - // Compute digest of the child directory. This requires serializing it. - if data, err := proto.Marshal(child); err == nil { - digestGenerator := s.digestFunction.NewGenerator() - if _, err := digestGenerator.Write(data); err == nil { - childDigest := digestGenerator.Sum() - children[childDigest] = child - directory.Directories = append(directory.Directories, &remoteexecution.DirectoryNode{ - Name: name.String(), - Digest: childDigest.GetProto(), - }) - } else { - s.saveError(util.StatusWrapf(err, "Failed to compute digest of output directory %#v", childPath.String())) - } - } else { - s.saveError(util.StatusWrapf(err, "Failed to marshal output directory %#v", childPath.String())) - } - } else { - s.saveError(util.StatusWrapf(err, "Failed to enter output directory %#v", childPath.String())) - } - case filesystem.FileTypeSymlink: - if target, err := d.Readlink(name); err == nil { - directory.Symlinks = append(directory.Symlinks, &remoteexecution.SymlinkNode{ - Name: name.String(), - Target: target, - }) - } else { - s.saveError(util.StatusWrapf(err, "Failed to read output symlink %#v", childPath.String())) - } - } - } - return &directory -} - // UploadOutputDirectoryEntered is called to upload a single output // directory as a remoteexecution.Tree. The root directory is assumed to // already be opened. func (s *uploadOutputsState) uploadOutputDirectoryEntered(d UploadableDirectory, dPath *path.Trace, paths []string) { - children := map[digest.Digest]*remoteexecution.Directory{} - tree := &remoteexecution.Tree{ - Root: s.uploadDirectory(d, dPath, children), - } - - childDigests := digest.NewSetBuilder() - for childDigest := range children { - childDigests.Add(childDigest) - } - for _, childDigest := range childDigests.Build().Items() { - tree.Children = append(tree.Children, children[childDigest]) + dState := uploadOutputDirectoryState{ + uploadOutputsState: s, + directoriesSeen: map[digest.Digest]struct{}{}, } + if rootDirectory, err := dState.uploadDirectory(d, dPath); err == nil { + // Approximate the size of the resulting Tree object, so + // that we may allocate all space at once. + directories := append(dState.directories, rootDirectory) + maximumTreeSizeBytes := 0 + for _, directory := range directories { + maximumTreeSizeBytes += len(directory) + } + maximumTreeSizeBytes += len(directories) * (1 + protowire.SizeVarint(uint64(maximumTreeSizeBytes))) + + // Construct the Tree object. We don't want to use + // proto.Marshal() for this, as it would require us to + // marshal all of the directories a second time. + treeData := make([]byte, 0, maximumTreeSizeBytes) + tag := byte(blobstore.TreeRootFieldNumber<<3) | byte(protowire.BytesType) + for i := len(directories); i > 0; i-- { + directory := directories[i-1] + treeData = append(treeData, tag) + treeData = protowire.AppendVarint(treeData, uint64(len(directory))) + treeData = append(treeData, directory...) + tag = byte(blobstore.TreeChildrenFieldNumber<<3) | byte(protowire.BytesType) + } - if treeDigest, err := blobstore.CASPutProto(s.context, s.contentAddressableStorage, tree, s.digestFunction); err == nil { - for _, path := range paths { - s.actionResult.OutputDirectories = append( - s.actionResult.OutputDirectories, - &remoteexecution.OutputDirectory{ - Path: path, - TreeDigest: treeDigest.GetProto(), - }) + digestGenerator := s.digestFunction.NewGenerator() + if _, err := digestGenerator.Write(treeData); err != nil { + panic(err) + } + treeDigest := digestGenerator.Sum() + + if err := s.contentAddressableStorage.Put(s.context, treeDigest, buffer.NewValidatedBufferFromByteSlice(treeData)); err == nil { + for _, path := range paths { + s.actionResult.OutputDirectories = append( + s.actionResult.OutputDirectories, + &remoteexecution.OutputDirectory{ + Path: path, + TreeDigest: treeDigest.GetProto(), + }) + } + } else { + s.saveError(util.StatusWrapf(err, "Failed to store output directory %#v", dPath.String())) } } else { - s.saveError(util.StatusWrapf(err, "Failed to store output directory %#v", dPath.String())) + s.saveError(err) } } @@ -341,6 +303,91 @@ func (s *uploadOutputsState) uploadOutputSymlink(d UploadableDirectory, name pat } } +// UploadOutputDirectoryState is used by OutputHierarchy.UploadOutputs() +// to track state specific to uploading a single output directory. +type uploadOutputDirectoryState struct { + *uploadOutputsState + + directories [][]byte + directoriesSeen map[digest.Digest]struct{} +} + +// UploadDirectory is called to upload a single directory. Elements in +// the directory are stored in a remoteexecution.Directory, so that they +// can be placed in a remoteexecution.Tree. +func (s *uploadOutputDirectoryState) uploadDirectory(d UploadableDirectory, dPath *path.Trace) ([]byte, error) { + files, err := d.ReadDir() + if err != nil { + return nil, util.StatusWrapf(err, "Failed to read output directory %#v", dPath.String()) + } + + var directory remoteexecution.Directory + for _, file := range files { + name := file.Name() + childPath := dPath.Append(name) + switch fileType := file.Type(); fileType { + case filesystem.FileTypeRegularFile: + if childDigest, err := d.UploadFile(s.context, name, s.digestFunction); err == nil { + directory.Files = append(directory.Files, &remoteexecution.FileNode{ + Name: name.String(), + Digest: childDigest.GetProto(), + IsExecutable: file.IsExecutable(), + }) + } else { + s.saveError(util.StatusWrapf(err, "Failed to store output file %#v", childPath.String())) + } + case filesystem.FileTypeDirectory: + if childDirectory, err := d.EnterUploadableDirectory(name); err == nil { + childData, err := s.uploadDirectory(childDirectory, dPath) + childDirectory.Close() + if err == nil { + // Compute the digest of the child + // directory, so that it may be + // referenced by the parent. + digestGenerator := s.digestFunction.NewGenerator() + if _, err := digestGenerator.Write(childData); err != nil { + panic(err) + } + childDigest := digestGenerator.Sum() + + // There is no need to make the + // directory part of the Tree if we + // have seen an identical directory + // previously. + if _, ok := s.directoriesSeen[childDigest]; !ok { + s.directories = append(s.directories, childData) + s.directoriesSeen[childDigest] = struct{}{} + } + + directory.Directories = append(directory.Directories, &remoteexecution.DirectoryNode{ + Name: name.String(), + Digest: childDigest.GetProto(), + }) + } else { + s.saveError(err) + } + } else { + s.saveError(util.StatusWrapf(err, "Failed to enter output directory %#v", childPath.String())) + } + case filesystem.FileTypeSymlink: + if target, err := d.Readlink(name); err == nil { + directory.Symlinks = append(directory.Symlinks, &remoteexecution.SymlinkNode{ + Name: name.String(), + Target: target, + }) + } else { + s.saveError(util.StatusWrapf(err, "Failed to read output symlink %#v", childPath.String())) + } + } + } + + data, err := proto.Marshal(&directory) + if err != nil { + return nil, util.StatusWrapf(err, "Failed to marshal output directory %#v", dPath.String()) + } + return data, nil +} + // outputNodePath is an implementation of path.ComponentWalker that is // used by NewOutputHierarchy() to compute normalized paths of outputs // of a build action.