Skip to content

Commit

Permalink
Use the New Driver Walk method for catalog enumeration
Browse files Browse the repository at this point in the history
This changes the Walk Method used for catalog enumeration. Just to show
how much an effect this has on our s3 storage:
Original:
List calls: 6839

real    3m16.636s
user    0m0.000s
sys    0m0.016s

New:
ListObjectsV2 Calls: 1805

real    0m49.970s
user    0m0.008s
sys    0m0.000s

This is because it no longer performs a list and stat per item, and instead
is able to use the metadata gained from the list as a replacement to stat.

Signed-off-by: Sargun Dhillon <[email protected]>
  • Loading branch information
sargun committed Jan 8, 2018
1 parent 32ac467 commit 35b29a6
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 12 deletions.
19 changes: 9 additions & 10 deletions registry/storage/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var errFinishedWalk = errors.New("finished walk")
// Because it's a quite expensive operation, it should only be used when building up
// an initial set of repositories.
func (reg *registry) Repositories(ctx context.Context, repos []string, last string) (n int, err error) {
var finishedWalk bool
var foundRepos []string

if len(repos) == 0 {
Expand All @@ -29,7 +30,7 @@ func (reg *registry) Repositories(ctx context.Context, repos []string, last stri
return 0, err
}

err = Walk(ctx, reg.blobStore.driver, root, func(fileInfo driver.FileInfo) error {
err = reg.blobStore.driver.Walk(ctx, root, func(fileInfo driver.FileInfo) error {
err := handleRepository(fileInfo, root, last, func(repoPath string) error {
foundRepos = append(foundRepos, repoPath)
return nil
Expand All @@ -40,22 +41,20 @@ func (reg *registry) Repositories(ctx context.Context, repos []string, last stri

// if we've filled our array, no need to walk any further
if len(foundRepos) == len(repos) {
return errFinishedWalk
finishedWalk = true
return driver.ErrSkipDir
}

return nil
})

n = copy(repos, foundRepos)

switch err {
case nil:
// nil means that we completed walk and didn't fill buffer. No more
// records are available.
err = io.EOF
case errFinishedWalk:
// more records are available.
err = nil
if err != nil {
return n, err
} else if !finishedWalk {
// We didn't fill buffer. No more records are available.
return n, io.EOF
}

return n, err
Expand Down
129 changes: 127 additions & 2 deletions registry/storage/driver/s3-aws/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"

dcontext "github.com/docker/distribution/context"
"github.com/docker/distribution/registry/client/transport"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/base"
Expand Down Expand Up @@ -876,8 +877,132 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int

// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
func (d *driver) Walk(ctx context.Context, from string, f storagedriver.WalkFn) error {
path := from
if !strings.HasSuffix(path, "/") {
path = path + "/"
}

prefix := ""
if d.s3Path("") == "" {
prefix = "/"
}

var objectCount int64
if err := d.doWalk(ctx, &objectCount, d.s3Path(path), prefix, f); err != nil {
return err
}

// S3 doesn't have the concept of empty directories, so it'll return path not found if there are no objects
if objectCount == 0 {
return storagedriver.PathNotFoundError{Path: from}
}

return nil
}

type walkInfoContainer struct {
storagedriver.FileInfoFields
prefix *string
}

// Path provides the full path of the target of this file info.
func (wi walkInfoContainer) Path() string {
return wi.FileInfoFields.Path
}

// Size returns current length in bytes of the file. The return value can
// be used to write to the end of the file at path. The value is
// meaningless if IsDir returns true.
func (wi walkInfoContainer) Size() int64 {
return wi.FileInfoFields.Size
}

// ModTime returns the modification time for the file. For backends that
// don't have a modification time, the creation time should be returned.
func (wi walkInfoContainer) ModTime() time.Time {
return wi.FileInfoFields.ModTime
}

// IsDir returns true if the path is a directory.
func (wi walkInfoContainer) IsDir() bool {
return wi.FileInfoFields.IsDir
}

func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, prefix string, f storagedriver.WalkFn) error {
var retError error

listObjectsInput := &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(path),
Delimiter: aws.String("/"),
MaxKeys: aws.Int64(listMax),
}

ctx, done := dcontext.WithTrace(parentCtx)
defer done("s3aws.ListObjectsV2Pages(%s)", path)
listObjectErr := d.S3.ListObjectsV2PagesWithContext(ctx, listObjectsInput, func(objects *s3.ListObjectsV2Output, lastPage bool) bool {

*objectCount += *objects.KeyCount
walkInfos := make([]walkInfoContainer, 0, *objects.KeyCount)

for _, dir := range objects.CommonPrefixes {
commonPrefix := *dir.Prefix
walkInfos = append(walkInfos, walkInfoContainer{
prefix: dir.Prefix,
FileInfoFields: storagedriver.FileInfoFields{
IsDir: true,
Path: strings.Replace(commonPrefix[:len(commonPrefix)-1], d.s3Path(""), prefix, 1),
},
})
}

for _, file := range objects.Contents {
walkInfos = append(walkInfos, walkInfoContainer{
FileInfoFields: storagedriver.FileInfoFields{
IsDir: false,
Size: *file.Size,
ModTime: *file.LastModified,
Path: strings.Replace(*file.Key, d.s3Path(""), prefix, 1),
},
})
}

sort.SliceStable(walkInfos, func(i, j int) bool { return walkInfos[i].FileInfoFields.Path < walkInfos[j].FileInfoFields.Path })

for _, walkInfo := range walkInfos {
err := f(walkInfo)

if err == storagedriver.ErrSkipDir {
if walkInfo.IsDir() {
continue
} else {
break
}
} else if err != nil {
retError = err
return false
}

if walkInfo.IsDir() {
if err := d.doWalk(ctx, objectCount, *walkInfo.prefix, prefix, f); err != nil {
retError = err
return false
}
}
}
return true
})

if retError != nil {
return retError
}

if listObjectErr != nil {
return listObjectErr
}

return nil
}

func (d *driver) s3Path(path string) string {
Expand Down

0 comments on commit 35b29a6

Please sign in to comment.