Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

registry/cshoop/storagedriver Added StorageDriver method WalkFiles, optimize blobstore enumeration with S3 #16

Closed
wants to merge 8 commits into from
7 changes: 1 addition & 6 deletions registry/storage/blobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,7 @@ func (bs *blobStore) Enumerate(ctx context.Context, ingester func(dgst digest.Di
return err
}

return bs.driver.Walk(ctx, specPath, func(fileInfo driver.FileInfo) error {
// skip directories
if fileInfo.IsDir() {
return nil
}

return bs.driver.WalkFiles(ctx, specPath, func(fileInfo driver.FileInfo) error {
currentPath := fileInfo.Path()
// we only want to parse paths that end with /data
_, fileName := path.Split(currentPath)
Expand Down
8 changes: 7 additions & 1 deletion registry/storage/driver/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,11 +360,17 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
}

// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
// from the given path, calling f on each file and directory
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}

// WalkFiles traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) WalkFiles(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFilesFallback(ctx, d, path, f)
}

// directDescendants will find direct descendants (blobs or virtual containers)
// of from list of blob paths and will return their full paths. Elements in blobs
// list must be prefixed with a "/" and
Expand Down
12 changes: 12 additions & 0 deletions registry/storage/driver/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,15 @@ func (base *Base) Walk(ctx context.Context, path string, f storagedriver.WalkFn)

return base.setDriverName(base.StorageDriver.Walk(ctx, path, f))
}

// WalkFiles wraps WalkFiles of underlying storage driver.
func (base *Base) WalkFiles(ctx context.Context, path string, f storagedriver.WalkFn) error {
ctx, done := dcontext.WithTrace(ctx)
defer done("%s.WalkFiles(%q)", base.Name(), path)

if !storagedriver.PathRegexp.MatchString(path) && path != "/" {
return storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()}
}

return base.setDriverName(base.StorageDriver.WalkFiles(ctx, path, f))
}
8 changes: 7 additions & 1 deletion registry/storage/driver/filesystem/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,11 +290,17 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
}

// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
// from the given path, calling f on each file and directory
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}

// WalkFiles traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) WalkFiles(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFilesFallback(ctx, d, path, f)
}

// fullPath returns the absolute path of a key within the Driver's storage.
func (d *driver) fullPath(subPath string) string {
return path.Join(d.rootDirectory, subPath)
Expand Down
8 changes: 7 additions & 1 deletion registry/storage/driver/inmemory/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,17 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
}

// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
// from the given path, calling f on each file and directory
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}

// WalkFiles traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) WalkFiles(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFilesFallback(ctx, d, path, f)
}

type writer struct {
d *driver
f *file
Expand Down
42 changes: 36 additions & 6 deletions registry/storage/driver/s3-aws/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -1057,7 +1057,7 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
}

// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
// from the given path, calling f on each file and directory
func (d *driver) Walk(ctx context.Context, from string, f storagedriver.WalkFn) error {
path := from
if !strings.HasSuffix(path, "/") {
Expand All @@ -1070,7 +1070,33 @@ func (d *driver) Walk(ctx context.Context, from string, f storagedriver.WalkFn)
}

var objectCount int64
if err := d.doWalk(ctx, &objectCount, d.s3Path(path), prefix, f); err != nil {
if err := d.doWalk(ctx, &objectCount, d.s3Path(path), prefix, true, f); err != nil {
return err
}

// S3 doesn't have the concept of empty directories, so it'll return path not found if there are no objects
if objectCount == 0 {
return storagedriver.PathNotFoundError{Path: from}
}

return nil
}

// WalkFiles traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) WalkFiles(ctx context.Context, from string, f storagedriver.WalkFn) error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update the walk_test for this slightly different logic?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There aren't even tests for Walk really. I don't quite want to get into the work of adding tests for them but could if it's worth the effort

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good point. But still worth the consideration, especially with regards to preventing regressions in functionality. Lack of proper testing for current functionality is never a reason for not adding tests 😉

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out my fallback method was actually incorrect. Added a bunch of unit tests to check the behavior more explicitly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to add S3 tests as well but that's way more involved, I don't want to spent too too long on this... 😭

path := from
if !strings.HasSuffix(path, "/") {
path = path + "/"
}

prefix := ""
if d.s3Path("") == "" {
prefix = "/"
}

var objectCount int64
if err := d.doWalk(ctx, &objectCount, d.s3Path(path), prefix, false, f); err != nil {
return err
}

Expand Down Expand Up @@ -1110,13 +1136,17 @@ func (wi walkInfoContainer) IsDir() bool {
return wi.FileInfoFields.IsDir
}

func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, prefix string, f storagedriver.WalkFn) error {
func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, prefix string, walkDirectories bool, f storagedriver.WalkFn) error {
var retError error

delimiter := ""
if walkDirectories {
delimiter = "/"
}
listObjectsInput := &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(path),
Delimiter: aws.String("/"),
Delimiter: aws.String(delimiter),
MaxKeys: aws.Int64(listMax),
}

Expand Down Expand Up @@ -1180,8 +1210,8 @@ func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, pre
return false
}

if walkInfo.IsDir() {
if err := d.doWalk(ctx, objectCount, *walkInfo.prefix, prefix, f); err != nil {
if walkInfo.IsDir() && walkDirectories {
if err := d.doWalk(ctx, objectCount, *walkInfo.prefix, prefix, walkDirectories, f); err != nil {
retError = err
return false
}
Expand Down
5 changes: 5 additions & 0 deletions registry/storage/driver/storagedriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ type StorageDriver interface {
// to a directory, the directory will not be entered and Walk
// will continue the traversal. If fileInfo refers to a normal file, processing stops
Walk(ctx context.Context, path string, f WalkFn) error

// WalkFiles traverses a filesystem defined within driver, starting
// from the given path, calling f on each file but does not call f with directories.
// If an error is returned from the WalkFn, processing stops
WalkFiles(ctx context.Context, path string, f WalkFn) error
}

// FileWriter provides an abstraction for an opened writable file-like object in
Expand Down
8 changes: 7 additions & 1 deletion registry/storage/driver/swift/swift.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,11 +658,17 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
}

// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
// from the given path, calling f on each file and directory
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}

// WalkFiles traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) WalkFiles(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFilesFallback(ctx, d, path, f)
}

func (d *driver) swiftPath(path string) string {
return strings.TrimLeft(strings.TrimRight(d.Prefix+"/files"+path, "/"), "/")
}
Expand Down
23 changes: 20 additions & 3 deletions registry/storage/driver/walk.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@ type WalkFn func(fileInfo FileInfo) error
// to a directory, the directory will not be entered and Walk
// will continue the traversal. If fileInfo refers to a normal file, processing stops
func WalkFallback(ctx context.Context, driver StorageDriver, from string, f WalkFn) error {
return doWalk(ctx, driver, from, true, f)
}

// WalkFilesFallback traverses a filesystem defined within driver, starting
// from the given path, calling f on each file. It uses the List method and Stat to drive itself.
// If an error is returned from WalkFn, processing stops
func WalkFilesFallback(ctx context.Context, driver StorageDriver, from string, f WalkFn) error {
return doWalk(ctx, driver, from, false, f)
}

// WalkFilesFallback traverses a filesystem defined within driver, starting
CollinShoop marked this conversation as resolved.
Show resolved Hide resolved
// from the given path, calling f on each file. It uses the List method and Stat to drive itself.
// If an error is returned from WalkFn, processing stops
func doWalk(ctx context.Context, driver StorageDriver, from string, walkDir bool, f WalkFn) error {
children, err := driver.List(ctx, from)
if err != nil {
return err
Expand All @@ -43,12 +57,15 @@ func WalkFallback(ctx context.Context, driver StorageDriver, from string, f Walk
return err
}
}
err = f(fileInfo)
err = nil
if !fileInfo.IsDir() || walkDir {
err = f(fileInfo)
}
if err == nil && fileInfo.IsDir() {
if err := WalkFallback(ctx, driver, child, f); err != nil {
if err := doWalk(ctx, driver, child, walkDir, f); err != nil {
return err
}
} else if err == ErrSkipDir {
} else if err == ErrSkipDir && walkDir {
// Stop iteration if it's a file, otherwise noop if it's a directory
if !fileInfo.IsDir() {
return nil
Expand Down
Loading