Skip to content

Commit

Permalink
Merge #79359
Browse files Browse the repository at this point in the history
79359: backupccl: Removed limit parameter from List() API r=DarrylWong a=DarrylWong

Release note: none

Jira issue: CRDB-14747

Co-authored-by: Darryl <[email protected]>
  • Loading branch information
craig[bot] and Darryl committed Apr 5, 2022
2 parents 3d6f41b + d2375eb commit b7b37f4
Show file tree
Hide file tree
Showing 18 changed files with 39 additions and 157 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ func (b *backupResumer) deleteCheckpoint(
// all files and delete each file one by one.
return exportStore.List(ctx, backupProgressDirectory, "", func(p string) error {
return exportStore.Delete(ctx, backupProgressDirectory+p)
}, 0 /*limit*/)
})
}(); err != nil {
log.Warningf(ctx, "unable to delete checkpointed backup descriptor file in progress directory: %+v", err)
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ func TestBackupRestoreAppend(t *testing.T) {
files = append(files, f)
}
return err
}, 0 /*limit*/))
}))
full1 = strings.TrimSuffix(files[0], backupManifestName)
full2 = strings.TrimSuffix(files[1], backupManifestName)

Expand All @@ -592,7 +592,7 @@ func TestBackupRestoreAppend(t *testing.T) {
subdirFiles = append(subdirFiles, f)
}
return err
}, 0 /*limit*/))
}))
require.NoError(t, err)
subdirFull1 = strings.TrimSuffix(strings.TrimPrefix(subdirFiles[0], "foo"),
backupManifestName)
Expand Down Expand Up @@ -9689,7 +9689,7 @@ func TestBackupNoOverwriteCheckpoint(t *testing.T) {
}
}
return nil
}, 0 /*limit*/))
}))

// numCheckpointWritten only accounts for checkpoints written in the
// progress loop, each time we Resume we write another checkpoint.
Expand Down Expand Up @@ -9814,10 +9814,11 @@ func TestBackupTimestampedCheckpointsAreLexicographical(t *testing.T) {
}
require.NoError(t, err)
var actual string
require.NoError(t, store.List(ctx, "/progress/", "", func(f string) error {
err = store.List(ctx, "/progress/", "", func(f string) error {
actual = f
return nil
}, 1 /*limit*/))
return cloud.ErrListingUnsupported
})
require.Equal(t, err, cloud.ErrListingUnsupported)
require.Equal(t, expectedCheckpoint, actual)
for _, checkpoint := range checkpoints {
require.NoError(t, store.Delete(ctx, "/progress/"+checkpoint))
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/incrementals.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func FindPriorBackups(
prev = append(prev, p)
}
return nil
}, 0 /*limit*/); err != nil {
}); err != nil {
return nil, errors.Wrap(err, "reading previous backup layers")
}
sort.Strings(prev)
Expand Down
12 changes: 7 additions & 5 deletions pkg/ccl/backupccl/manifest_handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -1154,7 +1154,7 @@ func getEncryptionInfoFiles(ctx context.Context, dest cloud.ExternalStorage) ([]
}

return nil
}, 0 /*limit*/)
})
if len(files) < 1 {
return nil, errors.New("no ENCRYPTION-INFO files found")
}
Expand Down Expand Up @@ -1255,7 +1255,7 @@ func ListFullBackupsInCollection(
backupPaths = append(backupPaths, f)
}
return nil
}, 0 /*limit*/); err != nil {
}); err != nil {
return nil, err
}
for i, backupPath := range backupPaths {
Expand Down Expand Up @@ -1287,8 +1287,10 @@ func readLatestCheckpointFile(
p = strings.TrimPrefix(p, "/")
checkpoint = strings.TrimSuffix(p, backupManifestChecksumSuffix)
checkpointFound = true
return nil
}, 1 /*limit*/)
// We only want the first checkpoint so return an error that it is
// listing.
return cloud.ErrListingDone
})
// If the list failed because the storage used does not support listing,
// such as http we can try reading the non timestamped backup checkpoint
// directly. This can still fail if it is a mixed cluster and the
Expand All @@ -1300,7 +1302,7 @@ func readLatestCheckpointFile(
if err == nil {
return r, nil
}
} else if err != nil {
} else if err != nil && !errors.Is(err, cloud.ErrListingDone) {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/workloadccl/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ func listDir(
if log.V(1) {
log.Infof(ctx, "Listing %s", dir)
}
return es.List(ctx, dir, "/", lsFn, 0 /*limit*/)
return es.List(ctx, dir, "/", lsFn)
}

// ListFixtures returns the object paths to all fixtures stored in a FixtureConfig.
Expand Down
6 changes: 3 additions & 3 deletions pkg/cli/userfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func runUserFileGet(cmd *cobra.Command, args []string) (resErr error) {
}
files = append(files, s)
return nil
}, 0 /*limit*/); err != nil {
}); err != nil {
return err
}

Expand Down Expand Up @@ -451,7 +451,7 @@ func listUserFile(ctx context.Context, conn clisqlclient.Conn, glob string) ([]s
}
res = append(res, displayPrefix+s)
return nil
}, 0 /*limit*/); err != nil {
}); err != nil {
return nil, err
}
return res, nil
Expand Down Expand Up @@ -532,7 +532,7 @@ func deleteUserFile(ctx context.Context, conn clisqlclient.Conn, glob string) ([
}
deleted = append(deleted, displayRoot+s)
return nil
}, 0 /*limit*/); err != nil {
}); err != nil {
return nil, err
}

Expand Down
8 changes: 1 addition & 7 deletions pkg/cloud/amazon/s3_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,9 +507,7 @@ func (s *s3Storage) ReadFileAt(
cloud.IsResumableHTTPError, s3ErrDelay), size, nil
}

func (s *s3Storage) List(
ctx context.Context, prefix, delim string, fn cloud.ListingFn, limit int,
) error {
func (s *s3Storage) List(ctx context.Context, prefix, delim string, fn cloud.ListingFn) error {
ctx, sp := tracing.ChildSpan(ctx, "s3.List")
defer sp.Finish()

Expand All @@ -522,7 +520,6 @@ func (s *s3Storage) List(
}

var fnErr error
count := 0
pageFn := func(page *s3.ListObjectsOutput, lastPage bool) bool {
for _, x := range page.CommonPrefixes {
if fnErr = fn(strings.TrimPrefix(*x.Prefix, dest)); fnErr != nil {
Expand All @@ -533,9 +530,6 @@ func (s *s3Storage) List(
if fnErr = fn(strings.TrimPrefix(*fileObject.Key, dest)); fnErr != nil {
return false
}
if count++; limit != 0 && count >= limit {
return false
}
}

return true
Expand Down
9 changes: 1 addition & 8 deletions pkg/cloud/azure/azure_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,17 +180,14 @@ func (s *azureStorage) ReadFileAt(
return ioctx.ReadCloserAdapter(reader), size, nil
}

func (s *azureStorage) List(
ctx context.Context, prefix, delim string, fn cloud.ListingFn, limit int,
) error {
func (s *azureStorage) List(ctx context.Context, prefix, delim string, fn cloud.ListingFn) error {
ctx, sp := tracing.ChildSpan(ctx, "azure.List")
defer sp.Finish()

dest := cloud.JoinPathPreservingTrailingSlash(s.prefix, prefix)
sp.RecordStructured(&types.StringValue{Value: fmt.Sprintf("azure.List: %s", dest)})

var marker azblob.Marker
count := 0
for marker.NotDone() {
response, err := s.container.ListBlobsHierarchySegment(
ctx, marker, delim, azblob.ListBlobsSegmentOptions{Prefix: dest},
Expand All @@ -207,10 +204,6 @@ func (s *azureStorage) List(
if err := fn(strings.TrimPrefix(blob.Name, dest)); err != nil {
return err
}

if count++; limit != 0 && count >= limit {
return nil
}
}
marker = response.NextMarker
}
Expand Down
98 changes: 1 addition & 97 deletions pkg/cloud/cloudtestutils/cloud_test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,103 +467,7 @@ func CheckListFilesCanonical(
require.NoError(t, s.List(ctx, tc.prefix, tc.delimiter, func(f string) error {
actual = append(actual, f)
return nil
}, 0 /*limit*/))
sort.Strings(actual)
require.Equal(t, tc.expected, actual)
})
}
})

// Similar test as above but with the limit not set to 0.
t.Run("List-with-limit", func(t *testing.T) {
for _, tc := range []struct {
name string
uri string
prefix string
delimiter string
expected []string
limit int
}{
{
"root",
storeURI,
"",
"",
[]string{"/file/abc/A.csv", "/file/abc/B.csv", "/file/abc/C.csv"},
3,
},
{
"file-slash-numbers-slash",
storeURI,
"file/numbers/",
"",
[]string{"data1.csv", "data2.csv"},
2,
},
{
"root-slash",
storeURI,
"/",
"",
[]string{"file/abc/A.csv", "file/abc/B.csv", "file/abc/C.csv"},
3,
},
{
"file",
storeURI,
"file",
"",
[]string{"/abc/A.csv", "/abc/B.csv", "/abc/C.csv"},
3,
},
{
"file-slash",
storeURI,
"file/",
"",
[]string{"abc/A.csv", "abc/B.csv", "abc/C.csv"},
3,
},
{
"slash-f",
storeURI,
"/f",
"",
[]string{"ile/abc/A.csv", "ile/abc/B.csv", "ile/abc/C.csv"},
3,
},
{
"nothing",
storeURI,
"nothing",
"",
nil,
1,
},
{
"delim-slash-file-slash",
storeURI,
"file/",
"/",
[]string{"abc/", "letters/"},
2,
},
{
"delim-data",
storeURI,
"",
"data",
[]string{"/file/abc/A.csv", "/file/abc/B.csv"},
2,
},
} {
t.Run(tc.name, func(t *testing.T) {
s := storeFromURI(ctx, t, tc.uri, clientFactory, user, ie, kvDB, testSettings)
var actual []string
require.NoError(t, s.List(ctx, tc.prefix, tc.delimiter, func(f string) error {
actual = append(actual, f)
return nil
}, tc.limit /*limit*/))
}))
sort.Strings(actual)
require.Equal(t, tc.expected, actual)
})
Expand Down
14 changes: 8 additions & 6 deletions pkg/cloud/external_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,11 @@ type ExternalStorage interface {
// List enumerates files within the supplied prefix, calling the passed
// function with the name of each file found, relative to the external storage
// destination's configured prefix. If the passed function returns a non-nil
// error, iteration is stopped it is returned. Iteration is also stopped
// if the number of filenames reaches the limit, 0 is no limit.
// If delimiter is non-empty names which have the same prefix, prior
// to the delimiter, are grouped into a single result which is that
// prefix. The order that results are passed to the callback is undefined.
List(ctx context.Context, prefix, delimiter string, fn ListingFn, limit int) error
// error, iteration is stopped it is returned. If delimiter is non-empty
// names which have the same prefix, prior to the delimiter, are grouped
// into a single result which is that prefix. The order that results are
// passed to the callback is undefined.
List(ctx context.Context, prefix, delimiter string, fn ListingFn) error

// Delete removes the named file from the store.
Delete(ctx context.Context, basename string) error
Expand Down Expand Up @@ -117,6 +116,9 @@ var ErrFileDoesNotExist = errors.New("external_storage: file doesn't exist")
// ErrListingUnsupported is a marker for indicating listing is unsupported.
var ErrListingUnsupported = errors.New("listing is not supported")

// ErrListingDone is a marker for indicating listing is done.
var ErrListingDone = errors.New("listing is done")

// RedactedParams is a helper for making a set of param names to redact in URIs.
func RedactedParams(strs ...string) map[string]struct{} {
if len(strs) == 0 {
Expand Down
8 changes: 1 addition & 7 deletions pkg/cloud/gcp/gcs_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,13 @@ func (g *gcsStorage) ReadFileAt(
return r, r.Reader.(*gcs.Reader).Attrs.Size, nil
}

func (g *gcsStorage) List(
ctx context.Context, prefix, delim string, fn cloud.ListingFn, limit int,
) error {
func (g *gcsStorage) List(ctx context.Context, prefix, delim string, fn cloud.ListingFn) error {
dest := cloud.JoinPathPreservingTrailingSlash(g.prefix, prefix)
ctx, sp := tracing.ChildSpan(ctx, "gcs.List")
defer sp.Finish()
sp.RecordStructured(&types.StringValue{Value: fmt.Sprintf("gcs.List: %s", dest)})

it := g.bucket.Objects(ctx, &gcs.Query{Prefix: dest, Delimiter: delim})
count := 0
for {
attrs, err := it.Next()
if errors.Is(err, iterator.Done) {
Expand All @@ -237,9 +234,6 @@ func (g *gcsStorage) List(
if err := fn(strings.TrimPrefix(name, dest)); err != nil {
return err
}
if count++; limit != 0 && count >= limit {
return nil
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/cloud/httpsink/http_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (h *httpStorage) Writer(ctx context.Context, basename string) (io.WriteClos
}), nil
}

func (h *httpStorage) List(_ context.Context, _, _ string, _ cloud.ListingFn, _ int) error {
func (h *httpStorage) List(_ context.Context, _, _ string, _ cloud.ListingFn) error {
return errors.Mark(errors.New("http storage does not support listing"), cloud.ErrListingUnsupported)
}

Expand Down
6 changes: 1 addition & 5 deletions pkg/cloud/nodelocal/nodelocal_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (l *localFileStorage) ReadFileAt(
}

func (l *localFileStorage) List(
ctx context.Context, prefix, delim string, fn cloud.ListingFn, limit int,
ctx context.Context, prefix, delim string, fn cloud.ListingFn,
) error {
dest := cloud.JoinPathPreservingTrailingSlash(l.base, prefix)

Expand All @@ -172,7 +172,6 @@ func (l *localFileStorage) List(
// Sort results so that we can group as we go.
sort.Strings(res)
var prevPrefix string
count := 0
for _, f := range res {
f = strings.TrimPrefix(f, dest)
if delim != "" {
Expand All @@ -187,9 +186,6 @@ func (l *localFileStorage) List(
if err := fn(f); err != nil {
return err
}
if count++; limit != 0 && count >= limit {
return nil
}
}
return nil
}
Expand Down
Loading

0 comments on commit b7b37f4

Please sign in to comment.