From d2375ebe8d82908b214bacf7691ca0e9f65bec19 Mon Sep 17 00:00:00 2001 From: Darryl Date: Mon, 4 Apr 2022 14:24:30 -0400 Subject: [PATCH] backupccl: Removed limit parameter from List() API Release note: none --- pkg/ccl/backupccl/backup_job.go | 2 +- pkg/ccl/backupccl/backup_test.go | 13 +-- pkg/ccl/backupccl/incrementals.go | 2 +- pkg/ccl/backupccl/manifest_handling.go | 12 ++- pkg/ccl/workloadccl/fixture.go | 2 +- pkg/cli/userfile.go | 6 +- pkg/cloud/amazon/s3_storage.go | 8 +- pkg/cloud/azure/azure_storage.go | 9 +- .../cloudtestutils/cloud_test_helpers.go | 98 +------------------ pkg/cloud/external_storage.go | 14 +-- pkg/cloud/gcp/gcs_storage.go | 8 +- pkg/cloud/httpsink/http_storage.go | 2 +- pkg/cloud/nodelocal/nodelocal_storage.go | 6 +- pkg/cloud/nullsink/nullsink_storage.go | 2 +- pkg/cloud/userfile/file_table_storage.go | 6 +- pkg/sql/importer/import_planning.go | 2 +- pkg/sql/importer/import_stmt_test.go | 2 +- pkg/sql/importer/testutils_test.go | 2 +- 18 files changed, 39 insertions(+), 157 deletions(-) diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index b175ec69a2a2..1f215c42bd03 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -835,7 +835,7 @@ func (b *backupResumer) deleteCheckpoint( // all files and delete each file one by one. return exportStore.List(ctx, backupProgressDirectory, "", func(p string) error { return exportStore.Delete(ctx, backupProgressDirectory+p) - }, 0 /*limit*/) + }) }(); err != nil { log.Warningf(ctx, "unable to delete checkpointed backup descriptor file in progress directory: %+v", err) } diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 650e52955b6f..e570b95af286 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -578,7 +578,7 @@ func TestBackupRestoreAppend(t *testing.T) { files = append(files, f) } return err - }, 0 /*limit*/)) + })) full1 = strings.TrimSuffix(files[0], backupManifestName) full2 = strings.TrimSuffix(files[1], backupManifestName) @@ -592,7 +592,7 @@ func TestBackupRestoreAppend(t *testing.T) { subdirFiles = append(subdirFiles, f) } return err - }, 0 /*limit*/)) + })) require.NoError(t, err) subdirFull1 = strings.TrimSuffix(strings.TrimPrefix(subdirFiles[0], "foo"), backupManifestName) @@ -9689,7 +9689,7 @@ func TestBackupNoOverwriteCheckpoint(t *testing.T) { } } return nil - }, 0 /*limit*/)) + })) // numCheckpointWritten only accounts for checkpoints written in the // progress loop, each time we Resume we write another checkpoint. @@ -9814,10 +9814,11 @@ func TestBackupTimestampedCheckpointsAreLexicographical(t *testing.T) { } require.NoError(t, err) var actual string - require.NoError(t, store.List(ctx, "/progress/", "", func(f string) error { + err = store.List(ctx, "/progress/", "", func(f string) error { actual = f - return nil - }, 1 /*limit*/)) + return cloud.ErrListingUnsupported + }) + require.Equal(t, err, cloud.ErrListingUnsupported) require.Equal(t, expectedCheckpoint, actual) for _, checkpoint := range checkpoints { require.NoError(t, store.Delete(ctx, "/progress/"+checkpoint)) diff --git a/pkg/ccl/backupccl/incrementals.go b/pkg/ccl/backupccl/incrementals.go index 219277fbc2ba..4d484987ae87 100644 --- a/pkg/ccl/backupccl/incrementals.go +++ b/pkg/ccl/backupccl/incrementals.go @@ -83,7 +83,7 @@ func FindPriorBackups( prev = append(prev, p) } return nil - }, 0 /*limit*/); err != nil { + }); err != nil { return nil, errors.Wrap(err, "reading previous backup layers") } sort.Strings(prev) diff --git a/pkg/ccl/backupccl/manifest_handling.go b/pkg/ccl/backupccl/manifest_handling.go index ec185e529d90..f8325c7691b5 100644 --- a/pkg/ccl/backupccl/manifest_handling.go +++ b/pkg/ccl/backupccl/manifest_handling.go @@ -1154,7 +1154,7 @@ func getEncryptionInfoFiles(ctx context.Context, dest cloud.ExternalStorage) ([] } return nil - }, 0 /*limit*/) + }) if len(files) < 1 { return nil, errors.New("no ENCRYPTION-INFO files found") } @@ -1255,7 +1255,7 @@ func ListFullBackupsInCollection( backupPaths = append(backupPaths, f) } return nil - }, 0 /*limit*/); err != nil { + }); err != nil { return nil, err } for i, backupPath := range backupPaths { @@ -1287,8 +1287,10 @@ func readLatestCheckpointFile( p = strings.TrimPrefix(p, "/") checkpoint = strings.TrimSuffix(p, backupManifestChecksumSuffix) checkpointFound = true - return nil - }, 1 /*limit*/) + // We only want the first checkpoint so return an error that it is + // listing. + return cloud.ErrListingDone + }) // If the list failed because the storage used does not support listing, // such as http we can try reading the non timestamped backup checkpoint // directly. This can still fail if it is a mixed cluster and the @@ -1300,7 +1302,7 @@ func readLatestCheckpointFile( if err == nil { return r, nil } - } else if err != nil { + } else if err != nil && !errors.Is(err, cloud.ErrListingDone) { return nil, err } diff --git a/pkg/ccl/workloadccl/fixture.go b/pkg/ccl/workloadccl/fixture.go index 777b16b1d402..e8d0e60d4c90 100644 --- a/pkg/ccl/workloadccl/fixture.go +++ b/pkg/ccl/workloadccl/fixture.go @@ -659,7 +659,7 @@ func listDir( if log.V(1) { log.Infof(ctx, "Listing %s", dir) } - return es.List(ctx, dir, "/", lsFn, 0 /*limit*/) + return es.List(ctx, dir, "/", lsFn) } // ListFixtures returns the object paths to all fixtures stored in a FixtureConfig. diff --git a/pkg/cli/userfile.go b/pkg/cli/userfile.go index ac6e7d465bb5..1b09ebc98884 100644 --- a/pkg/cli/userfile.go +++ b/pkg/cli/userfile.go @@ -256,7 +256,7 @@ func runUserFileGet(cmd *cobra.Command, args []string) (resErr error) { } files = append(files, s) return nil - }, 0 /*limit*/); err != nil { + }); err != nil { return err } @@ -451,7 +451,7 @@ func listUserFile(ctx context.Context, conn clisqlclient.Conn, glob string) ([]s } res = append(res, displayPrefix+s) return nil - }, 0 /*limit*/); err != nil { + }); err != nil { return nil, err } return res, nil @@ -532,7 +532,7 @@ func deleteUserFile(ctx context.Context, conn clisqlclient.Conn, glob string) ([ } deleted = append(deleted, displayRoot+s) return nil - }, 0 /*limit*/); err != nil { + }); err != nil { return nil, err } diff --git a/pkg/cloud/amazon/s3_storage.go b/pkg/cloud/amazon/s3_storage.go index 9fe4f2ff2f98..82ea478e2190 100644 --- a/pkg/cloud/amazon/s3_storage.go +++ b/pkg/cloud/amazon/s3_storage.go @@ -498,9 +498,7 @@ func (s *s3Storage) ReadFileAt( cloud.IsResumableHTTPError, s3ErrDelay), size, nil } -func (s *s3Storage) List( - ctx context.Context, prefix, delim string, fn cloud.ListingFn, limit int, -) error { +func (s *s3Storage) List(ctx context.Context, prefix, delim string, fn cloud.ListingFn) error { ctx, sp := tracing.ChildSpan(ctx, "s3.List") defer sp.Finish() @@ -513,7 +511,6 @@ func (s *s3Storage) List( } var fnErr error - count := 0 pageFn := func(page *s3.ListObjectsOutput, lastPage bool) bool { for _, x := range page.CommonPrefixes { if fnErr = fn(strings.TrimPrefix(*x.Prefix, dest)); fnErr != nil { @@ -524,9 +521,6 @@ func (s *s3Storage) List( if fnErr = fn(strings.TrimPrefix(*fileObject.Key, dest)); fnErr != nil { return false } - if count++; limit != 0 && count >= limit { - return false - } } return true diff --git a/pkg/cloud/azure/azure_storage.go b/pkg/cloud/azure/azure_storage.go index 481e74906c94..310124458493 100644 --- a/pkg/cloud/azure/azure_storage.go +++ b/pkg/cloud/azure/azure_storage.go @@ -180,9 +180,7 @@ func (s *azureStorage) ReadFileAt( return ioctx.ReadCloserAdapter(reader), size, nil } -func (s *azureStorage) List( - ctx context.Context, prefix, delim string, fn cloud.ListingFn, limit int, -) error { +func (s *azureStorage) List(ctx context.Context, prefix, delim string, fn cloud.ListingFn) error { ctx, sp := tracing.ChildSpan(ctx, "azure.List") defer sp.Finish() @@ -190,7 +188,6 @@ func (s *azureStorage) List( sp.RecordStructured(&types.StringValue{Value: fmt.Sprintf("azure.List: %s", dest)}) var marker azblob.Marker - count := 0 for marker.NotDone() { response, err := s.container.ListBlobsHierarchySegment( ctx, marker, delim, azblob.ListBlobsSegmentOptions{Prefix: dest}, @@ -207,10 +204,6 @@ func (s *azureStorage) List( if err := fn(strings.TrimPrefix(blob.Name, dest)); err != nil { return err } - - if count++; limit != 0 && count >= limit { - return nil - } } marker = response.NextMarker } diff --git a/pkg/cloud/cloudtestutils/cloud_test_helpers.go b/pkg/cloud/cloudtestutils/cloud_test_helpers.go index 2668e881d1be..ff2ce7849d19 100644 --- a/pkg/cloud/cloudtestutils/cloud_test_helpers.go +++ b/pkg/cloud/cloudtestutils/cloud_test_helpers.go @@ -467,103 +467,7 @@ func CheckListFilesCanonical( require.NoError(t, s.List(ctx, tc.prefix, tc.delimiter, func(f string) error { actual = append(actual, f) return nil - }, 0 /*limit*/)) - sort.Strings(actual) - require.Equal(t, tc.expected, actual) - }) - } - }) - - // Similar test as above but with the limit not set to 0. - t.Run("List-with-limit", func(t *testing.T) { - for _, tc := range []struct { - name string - uri string - prefix string - delimiter string - expected []string - limit int - }{ - { - "root", - storeURI, - "", - "", - []string{"/file/abc/A.csv", "/file/abc/B.csv", "/file/abc/C.csv"}, - 3, - }, - { - "file-slash-numbers-slash", - storeURI, - "file/numbers/", - "", - []string{"data1.csv", "data2.csv"}, - 2, - }, - { - "root-slash", - storeURI, - "/", - "", - []string{"file/abc/A.csv", "file/abc/B.csv", "file/abc/C.csv"}, - 3, - }, - { - "file", - storeURI, - "file", - "", - []string{"/abc/A.csv", "/abc/B.csv", "/abc/C.csv"}, - 3, - }, - { - "file-slash", - storeURI, - "file/", - "", - []string{"abc/A.csv", "abc/B.csv", "abc/C.csv"}, - 3, - }, - { - "slash-f", - storeURI, - "/f", - "", - []string{"ile/abc/A.csv", "ile/abc/B.csv", "ile/abc/C.csv"}, - 3, - }, - { - "nothing", - storeURI, - "nothing", - "", - nil, - 1, - }, - { - "delim-slash-file-slash", - storeURI, - "file/", - "/", - []string{"abc/", "letters/"}, - 2, - }, - { - "delim-data", - storeURI, - "", - "data", - []string{"/file/abc/A.csv", "/file/abc/B.csv"}, - 2, - }, - } { - t.Run(tc.name, func(t *testing.T) { - s := storeFromURI(ctx, t, tc.uri, clientFactory, user, ie, kvDB, testSettings) - var actual []string - require.NoError(t, s.List(ctx, tc.prefix, tc.delimiter, func(f string) error { - actual = append(actual, f) - return nil - }, tc.limit /*limit*/)) + })) sort.Strings(actual) require.Equal(t, tc.expected, actual) }) diff --git a/pkg/cloud/external_storage.go b/pkg/cloud/external_storage.go index 40787c6f52ea..654fb17a76d0 100644 --- a/pkg/cloud/external_storage.go +++ b/pkg/cloud/external_storage.go @@ -78,12 +78,11 @@ type ExternalStorage interface { // List enumerates files within the supplied prefix, calling the passed // function with the name of each file found, relative to the external storage // destination's configured prefix. If the passed function returns a non-nil - // error, iteration is stopped it is returned. Iteration is also stopped - // if the number of filenames reaches the limit, 0 is no limit. - // If delimiter is non-empty names which have the same prefix, prior - // to the delimiter, are grouped into a single result which is that - // prefix. The order that results are passed to the callback is undefined. - List(ctx context.Context, prefix, delimiter string, fn ListingFn, limit int) error + // error, iteration is stopped it is returned. If delimiter is non-empty + // names which have the same prefix, prior to the delimiter, are grouped + // into a single result which is that prefix. The order that results are + // passed to the callback is undefined. + List(ctx context.Context, prefix, delimiter string, fn ListingFn) error // Delete removes the named file from the store. Delete(ctx context.Context, basename string) error @@ -117,6 +116,9 @@ var ErrFileDoesNotExist = errors.New("external_storage: file doesn't exist") // ErrListingUnsupported is a marker for indicating listing is unsupported. var ErrListingUnsupported = errors.New("listing is not supported") +// ErrListingDone is a marker for indicating listing is done. +var ErrListingDone = errors.New("listing is done") + // RedactedParams is a helper for making a set of param names to redact in URIs. func RedactedParams(strs ...string) map[string]struct{} { if len(strs) == 0 { diff --git a/pkg/cloud/gcp/gcs_storage.go b/pkg/cloud/gcp/gcs_storage.go index cc0b6a8e2627..2bd8bf7233ea 100644 --- a/pkg/cloud/gcp/gcs_storage.go +++ b/pkg/cloud/gcp/gcs_storage.go @@ -212,16 +212,13 @@ func (g *gcsStorage) ReadFileAt( return r, r.Reader.(*gcs.Reader).Attrs.Size, nil } -func (g *gcsStorage) List( - ctx context.Context, prefix, delim string, fn cloud.ListingFn, limit int, -) error { +func (g *gcsStorage) List(ctx context.Context, prefix, delim string, fn cloud.ListingFn) error { dest := cloud.JoinPathPreservingTrailingSlash(g.prefix, prefix) ctx, sp := tracing.ChildSpan(ctx, "gcs.List") defer sp.Finish() sp.RecordStructured(&types.StringValue{Value: fmt.Sprintf("gcs.List: %s", dest)}) it := g.bucket.Objects(ctx, &gcs.Query{Prefix: dest, Delimiter: delim}) - count := 0 for { attrs, err := it.Next() if errors.Is(err, iterator.Done) { @@ -237,9 +234,6 @@ func (g *gcsStorage) List( if err := fn(strings.TrimPrefix(name, dest)); err != nil { return err } - if count++; limit != 0 && count >= limit { - return nil - } } } diff --git a/pkg/cloud/httpsink/http_storage.go b/pkg/cloud/httpsink/http_storage.go index 84d54a289187..8ea5f091d4dd 100644 --- a/pkg/cloud/httpsink/http_storage.go +++ b/pkg/cloud/httpsink/http_storage.go @@ -180,7 +180,7 @@ func (h *httpStorage) Writer(ctx context.Context, basename string) (io.WriteClos }), nil } -func (h *httpStorage) List(_ context.Context, _, _ string, _ cloud.ListingFn, _ int) error { +func (h *httpStorage) List(_ context.Context, _, _ string, _ cloud.ListingFn) error { return errors.Mark(errors.New("http storage does not support listing"), cloud.ErrListingUnsupported) } diff --git a/pkg/cloud/nodelocal/nodelocal_storage.go b/pkg/cloud/nodelocal/nodelocal_storage.go index 0eb42fd403ce..01ac575f0dd8 100644 --- a/pkg/cloud/nodelocal/nodelocal_storage.go +++ b/pkg/cloud/nodelocal/nodelocal_storage.go @@ -160,7 +160,7 @@ func (l *localFileStorage) ReadFileAt( } func (l *localFileStorage) List( - ctx context.Context, prefix, delim string, fn cloud.ListingFn, limit int, + ctx context.Context, prefix, delim string, fn cloud.ListingFn, ) error { dest := cloud.JoinPathPreservingTrailingSlash(l.base, prefix) @@ -172,7 +172,6 @@ func (l *localFileStorage) List( // Sort results so that we can group as we go. sort.Strings(res) var prevPrefix string - count := 0 for _, f := range res { f = strings.TrimPrefix(f, dest) if delim != "" { @@ -187,9 +186,6 @@ func (l *localFileStorage) List( if err := fn(f); err != nil { return err } - if count++; limit != 0 && count >= limit { - return nil - } } return nil } diff --git a/pkg/cloud/nullsink/nullsink_storage.go b/pkg/cloud/nullsink/nullsink_storage.go index fcfdbb5c02b1..ac680122320f 100644 --- a/pkg/cloud/nullsink/nullsink_storage.go +++ b/pkg/cloud/nullsink/nullsink_storage.go @@ -83,7 +83,7 @@ func (n *nullSinkStorage) Writer(_ context.Context, _ string) (io.WriteCloser, e return nullWriter{}, nil } -func (n *nullSinkStorage) List(_ context.Context, _, _ string, _ cloud.ListingFn, _ int) error { +func (n *nullSinkStorage) List(_ context.Context, _, _ string, _ cloud.ListingFn) error { return nil } diff --git a/pkg/cloud/userfile/file_table_storage.go b/pkg/cloud/userfile/file_table_storage.go index 0404424346d4..0f7166e6c5d4 100644 --- a/pkg/cloud/userfile/file_table_storage.go +++ b/pkg/cloud/userfile/file_table_storage.go @@ -252,7 +252,7 @@ func (f *fileTableStorage) Writer(ctx context.Context, basename string) (io.Writ // List implements the ExternalStorage interface. func (f *fileTableStorage) List( - ctx context.Context, prefix, delim string, fn cloud.ListingFn, limit int, + ctx context.Context, prefix, delim string, fn cloud.ListingFn, ) error { dest := cloud.JoinPathPreservingTrailingSlash(f.prefix, prefix) @@ -263,7 +263,6 @@ func (f *fileTableStorage) List( sort.Strings(res) var prevPrefix string - count := 0 for _, f := range res { f = strings.TrimPrefix(f, dest) if delim != "" { @@ -278,9 +277,6 @@ func (f *fileTableStorage) List( if err := fn(f); err != nil { return err } - if count++; limit != 0 && count >= limit { - return nil - } } return nil diff --git a/pkg/sql/importer/import_planning.go b/pkg/sql/importer/import_planning.go index daa4048fff60..a7b8aa01085f 100644 --- a/pkg/sql/importer/import_planning.go +++ b/pkg/sql/importer/import_planning.go @@ -437,7 +437,7 @@ func importPlanHook( expandedFiles = append(expandedFiles, uri.String()) } return err - }, 0 /*limit*/); err != nil { + }); err != nil { return err } if len(expandedFiles) < 1 { diff --git a/pkg/sql/importer/import_stmt_test.go b/pkg/sql/importer/import_stmt_test.go index fb99672ea000..453dc52038bf 100644 --- a/pkg/sql/importer/import_stmt_test.go +++ b/pkg/sql/importer/import_stmt_test.go @@ -5744,7 +5744,7 @@ func TestImportPgDumpIgnoredStmts(t *testing.T) { files = append(files, f) } return err - }, 0 /*limit*/)) + })) for i, file := range files { require.Equal(t, file, path.Join(dirName, logSubdir, fmt.Sprintf("%d.log", i))) content, err := store.ReadFile(ctx, file) diff --git a/pkg/sql/importer/testutils_test.go b/pkg/sql/importer/testutils_test.go index fa6e7e8c3c4b..87e89b6470f2 100644 --- a/pkg/sql/importer/testutils_test.go +++ b/pkg/sql/importer/testutils_test.go @@ -296,7 +296,7 @@ func (es *generatorExternalStorage) Writer( } func (es *generatorExternalStorage) List( - ctx context.Context, _, _ string, _ cloud.ListingFn, _ int, + ctx context.Context, _, _ string, _ cloud.ListingFn, ) error { return errors.New("unsupported") }