From 8f49212b6188d2abb88b346470e9ea6a6a6bc44b Mon Sep 17 00:00:00 2001 From: Rui Hu Date: Thu, 23 Feb 2023 17:56:54 -0500 Subject: [PATCH] backupccl: remove generic iterator Remove the generic iterator and replace with duplicate code as generics are not supported in 22.2 Release note: None --- pkg/ccl/backupccl/BUILD.bazel | 1 - pkg/ccl/backupccl/backup_metadata_test.go | 14 +- pkg/ccl/backupccl/backupinfo/BUILD.bazel | 2 - .../backupccl/backupinfo/backup_metadata.go | 350 +++++++++++++----- .../backupccl/backupinfo/manifest_handling.go | 23 +- .../backupinfo/manifest_handling_test.go | 215 ++++++++++- pkg/ccl/backupccl/restore_span_covering.go | 53 +-- pkg/sql/execinfrapb/processors_bulk_io.proto | 2 +- pkg/util/bulk/BUILD.bazel | 5 +- pkg/util/bulk/iterator.go | 46 --- 10 files changed, 484 insertions(+), 227 deletions(-) delete mode 100644 pkg/util/bulk/iterator.go diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 9ab7c611afa5..18ed4c06dcae 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -267,7 +267,6 @@ go_test( "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util", - "//pkg/util/bulk", "//pkg/util/ctxgroup", "//pkg/util/encoding", "//pkg/util/hlc", diff --git a/pkg/ccl/backupccl/backup_metadata_test.go b/pkg/ccl/backupccl/backup_metadata_test.go index b15df4d52e69..851c347d2841 100644 --- a/pkg/ccl/backupccl/backup_metadata_test.go +++ b/pkg/ccl/backupccl/backup_metadata_test.go @@ -26,8 +26,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" - "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/ioctx" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -285,9 +285,15 @@ func checkStats( it := bm.NewStatsIter(ctx) defer it.Close() - metaStats, err := bulk.CollectToSlice(it) - if err != nil { - t.Fatal(err) + var metaStats []*stats.TableStatisticProto + + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + break + } + metaStats = append(metaStats, it.Value()) } require.Equal(t, expectedStats, metaStats) diff --git a/pkg/ccl/backupccl/backupinfo/BUILD.bazel b/pkg/ccl/backupccl/backupinfo/BUILD.bazel index c642036cfba7..3257d805a2b3 100644 --- a/pkg/ccl/backupccl/backupinfo/BUILD.bazel +++ b/pkg/ccl/backupccl/backupinfo/BUILD.bazel @@ -38,7 +38,6 @@ go_library( "//pkg/sql/stats", "//pkg/storage", "//pkg/util", - "//pkg/util/bulk", "//pkg/util/ctxgroup", "//pkg/util/encoding", "//pkg/util/hlc", @@ -78,7 +77,6 @@ go_test( "//pkg/sql/catalog/descpb", "//pkg/testutils/serverutils", "//pkg/testutils/testcluster", - "//pkg/util/bulk", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/randutil", diff --git a/pkg/ccl/backupccl/backupinfo/backup_metadata.go b/pkg/ccl/backupccl/backupinfo/backup_metadata.go index a6a136e5d933..eb18cc3c79fb 100644 --- a/pkg/ccl/backupccl/backupinfo/backup_metadata.go +++ b/pkg/ccl/backupccl/backupinfo/backup_metadata.go @@ -27,7 +27,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/protoreflect" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/json" @@ -974,18 +973,38 @@ func NewBackupMetadata( } // SpanIterator is a simple iterator to iterate over roachpb.Spans. -type SpanIterator struct { +type SpanIterator interface { + // Valid must be called after any call to Next(). It returns (true, nil) if + // the iterator points to a valid value, and (false, nil) if the iterator has + // moved past the last value. It returns (false, err) if there is an error in + // the iterator. + Valid() (bool, error) + + // Value returns the current value. The returned value is only valid until the + // next call to Next(). is only valid until the + Value() roachpb.Span + + // Next advances the iterator to the next value. + Next() + + // Close closes the iterator. + Close() +} + +type spanIterator struct { backing bytesIter filter func(key storage.MVCCKey) bool value *roachpb.Span err error } +var _ SpanIterator = &spanIterator{} + // NewSpanIter creates a new SpanIterator for the backup metadata. -func (b *BackupMetadata) NewSpanIter(ctx context.Context) bulk.Iterator[roachpb.Span] { +func (b *BackupMetadata) NewSpanIter(ctx context.Context) SpanIterator { backing := makeBytesIter(ctx, b.store, b.filename, []byte(sstSpansPrefix), b.enc, true, b.kmsEnv) - it := SpanIterator{ + it := spanIterator{ backing: backing, } it.Next() @@ -993,11 +1012,11 @@ func (b *BackupMetadata) NewSpanIter(ctx context.Context) bulk.Iterator[roachpb. } // NewIntroducedSpanIter creates a new IntroducedSpanIterator for the backup metadata. -func (b *BackupMetadata) NewIntroducedSpanIter(ctx context.Context) bulk.Iterator[roachpb.Span] { +func (b *BackupMetadata) NewIntroducedSpanIter(ctx context.Context) SpanIterator { backing := makeBytesIter(ctx, b.store, b.filename, []byte(sstSpansPrefix), b.enc, false, b.kmsEnv) - it := SpanIterator{ + it := spanIterator{ backing: backing, filter: func(key storage.MVCCKey) bool { return key.Timestamp == hlc.Timestamp{} @@ -1008,28 +1027,28 @@ func (b *BackupMetadata) NewIntroducedSpanIter(ctx context.Context) bulk.Iterato } // Close closes the iterator. -func (si *SpanIterator) Close() { +func (si *spanIterator) Close() { si.backing.close() } -// Valid implements the Iterator interface. -func (si *SpanIterator) Valid() (bool, error) { +// Valid implements the SpanIterator interface. +func (si *spanIterator) Valid() (bool, error) { if si.err != nil { return false, si.err } return si.value != nil, si.err } -// Value implements the Iterator interface. -func (si *SpanIterator) Value() roachpb.Span { +// Value implements the SpanIterator interface. +func (si *spanIterator) Value() roachpb.Span { if si.value == nil { return roachpb.Span{} } return *si.value } -// Next implements the Iterator interface. -func (si *SpanIterator) Next() { +// Next implements the SpanIterator interface. +func (si *spanIterator) Next() { wrapper := resultWrapper{} var nextSpan *roachpb.Span @@ -1050,16 +1069,34 @@ func (si *SpanIterator) Next() { } // FileIterator is a simple iterator to iterate over backuppb.BackupManifest_File. -type FileIterator struct { +type FileIterator interface { + // Valid must be called after any call to Next(). It returns (true, nil) if + // the iterator points to a valid value, and (false, nil) if the iterator has + // moved past the last value. It returns (false, err) if there is an error in + // the iterator. + Valid() (bool, error) + + // Value returns the current value. The returned value is only valid until the + // next call to Next(). is only valid until the + Value() *backuppb.BackupManifest_File + + // Next advances the iterator to the next value. + Next() + + // Close closes the iterator. + Close() +} + +type fileIterator struct { mergedIterator storage.SimpleMVCCIterator err error file *backuppb.BackupManifest_File } +var _ FileIterator = &fileIterator{} + // NewFileIter creates a new FileIterator for the backup metadata. -func (b *BackupMetadata) NewFileIter( - ctx context.Context, -) (bulk.Iterator[*backuppb.BackupManifest_File], error) { +func (b *BackupMetadata) NewFileIter(ctx context.Context) (FileIterator, error) { fileInfoIter := makeBytesIter(ctx, b.store, b.filename, []byte(sstFilesPrefix), b.enc, false, b.kmsEnv) defer fileInfoIter.close() @@ -1090,34 +1127,34 @@ func (b *BackupMetadata) NewFileIter( return newFileSSTIter(ctx, storeFiles, encOpts) } -// NewFileSSTIter creates a new FileIterator to iterate over the storeFile. +// NewFileSSTIter creates a new fileIterator to iterate over the storeFile. // It is the caller's responsibility to Close() the returned iterator. func NewFileSSTIter( ctx context.Context, storeFile storageccl.StoreFile, encOpts *roachpb.FileEncryptionOptions, -) (*FileIterator, error) { +) (FileIterator, error) { return newFileSSTIter(ctx, []storageccl.StoreFile{storeFile}, encOpts) } func newFileSSTIter( ctx context.Context, storeFiles []storageccl.StoreFile, encOpts *roachpb.FileEncryptionOptions, -) (*FileIterator, error) { +) (*fileIterator, error) { iter, err := storageccl.ExternalSSTReader(ctx, storeFiles, encOpts, iterOpts) if err != nil { return nil, err } iter.SeekGE(storage.MVCCKey{}) - fi := &FileIterator{mergedIterator: iter} + fi := &fileIterator{mergedIterator: iter} fi.Next() return fi, nil } // Close closes the iterator. -func (fi *FileIterator) Close() { +func (fi *fileIterator) Close() { fi.mergedIterator.Close() } // Valid indicates whether or not the iterator is pointing to a valid value. -func (fi *FileIterator) Valid() (bool, error) { +func (fi *fileIterator) Valid() (bool, error) { if fi.err != nil { return false, fi.err } @@ -1125,13 +1162,13 @@ func (fi *FileIterator) Valid() (bool, error) { return fi.file != nil, nil } -// Value implements the Iterator interface. -func (fi *FileIterator) Value() *backuppb.BackupManifest_File { +// Value implements the FileIterator interface. +func (fi *fileIterator) Value() *backuppb.BackupManifest_File { return fi.file } -// Next implements the Iterator interface. -func (fi *FileIterator) Next() { +// Next implements the FileIterator interface. +func (fi *fileIterator) Next() { if fi.err != nil { return } @@ -1157,16 +1194,36 @@ func (fi *FileIterator) Next() { } // DescIterator is a simple iterator to iterate over descpb.Descriptors. -type DescIterator struct { +type DescIterator interface { + // Valid must be called after any call to Next(). It returns (true, nil) if + // the iterator points to a valid value, and (false, nil) if the iterator has + // moved past the last value. It returns (false, err) if there is an error in + // the iterator. + Valid() (bool, error) + + // Value returns the current value. The returned value is only valid until the + // next call to Next(). is only valid until the + Value() *descpb.Descriptor + + // Next advances the iterator to the next value. + Next() + + // Close closes the iterator. + Close() +} + +type descIterator struct { backing bytesIter value *descpb.Descriptor err error } +var _ DescIterator = &descIterator{} + // NewDescIter creates a new DescIterator for the backup metadata. -func (b *BackupMetadata) NewDescIter(ctx context.Context) bulk.Iterator[*descpb.Descriptor] { +func (b *BackupMetadata) NewDescIter(ctx context.Context) DescIterator { backing := makeBytesIter(ctx, b.store, b.filename, []byte(sstDescsPrefix), b.enc, true, b.kmsEnv) - it := DescIterator{ + it := descIterator{ backing: backing, } it.Next() @@ -1174,25 +1231,25 @@ func (b *BackupMetadata) NewDescIter(ctx context.Context) bulk.Iterator[*descpb. } // Close closes the iterator. -func (di *DescIterator) Close() { +func (di *descIterator) Close() { di.backing.close() } -// Valid implements the Iterator interface. -func (di *DescIterator) Valid() (bool, error) { +// Valid implements the DescIterator interface. +func (di *descIterator) Valid() (bool, error) { if di.err != nil { return false, di.err } return di.value != nil, nil } -// Value implements the Iterator interface. -func (di *DescIterator) Value() *descpb.Descriptor { +// Value implements the DescIterator interface. +func (di *descIterator) Value() *descpb.Descriptor { return di.value } -// Next implements the Iterator interface. -func (di *DescIterator) Next() { +// Next implements the DescIterator interface. +func (di *descIterator) Next() { if di.err != nil { return } @@ -1218,19 +1275,37 @@ func (di *DescIterator) Next() { } // TenantIterator is a simple iterator to iterate over TenantInfoWithUsages. -type TenantIterator struct { +type TenantIterator interface { + // Valid must be called after any call to Next(). It returns (true, nil) if + // the iterator points to a valid value, and (false, nil) if the iterator has + // moved past the last value. It returns (false, err) if there is an error in + // the iterator. + Valid() (bool, error) + + // Value returns the current value. The returned value is only valid until the + // next call to Next(). is only valid until the + Value() descpb.TenantInfoWithUsage + + // Next advances the iterator to the next value. + Next() + + // Close closes the iterator. + Close() +} + +type tenantIterator struct { backing bytesIter value *descpb.TenantInfoWithUsage err error } +var _ TenantIterator = &tenantIterator{} + // NewTenantIter creates a new TenantIterator for the backup metadata. -func (b *BackupMetadata) NewTenantIter( - ctx context.Context, -) bulk.Iterator[descpb.TenantInfoWithUsage] { +func (b *BackupMetadata) NewTenantIter(ctx context.Context) TenantIterator { backing := makeBytesIter(ctx, b.store, b.filename, []byte(sstTenantsPrefix), b.enc, false, b.kmsEnv) - it := TenantIterator{ + it := tenantIterator{ backing: backing, } it.Next() @@ -1238,28 +1313,28 @@ func (b *BackupMetadata) NewTenantIter( } // Close closes the iterator. -func (ti *TenantIterator) Close() { +func (ti *tenantIterator) Close() { ti.backing.close() } -// Valid implements the Iterator interface. -func (ti *TenantIterator) Valid() (bool, error) { +// Valid implements the TenantIterator interface. +func (ti *tenantIterator) Valid() (bool, error) { if ti.err != nil { return false, ti.err } return ti.value != nil, nil } -// Value implements the Iterator interface. -func (ti *TenantIterator) Value() descpb.TenantInfoWithUsage { +// Value implements the TenantIterator interface. +func (ti *tenantIterator) Value() descpb.TenantInfoWithUsage { if ti.value == nil { return descpb.TenantInfoWithUsage{} } return *ti.value } -// Next implements the Iterator interface. -func (ti *TenantIterator) Next() { +// Next implements the TenantIterator interface. +func (ti *tenantIterator) Next() { if ti.err != nil { return } @@ -1286,24 +1361,42 @@ func (ti *TenantIterator) Next() { } // DescriptorRevisionIterator is a simple iterator to iterate over backuppb.BackupManifest_DescriptorRevisions. -type DescriptorRevisionIterator struct { +type DescriptorRevisionIterator interface { + // Valid must be called after any call to Next(). It returns (true, nil) if + // the iterator points to a valid value, and (false, nil) if the iterator has + // moved past the last value. It returns (false, err) if there is an error in + // the iterator. + Valid() (bool, error) + + // Value returns the current value. The returned value is only valid until the + // next call to Next(). is only valid until the + Value() *backuppb.BackupManifest_DescriptorRevision + + // Next advances the iterator to the next value. + Next() + + // Close closes the iterator. + Close() +} + +type descriptorRevisionIterator struct { backing bytesIter err error value *backuppb.BackupManifest_DescriptorRevision } -// NewDescriptorChangesIter creates a new DescriptorChangesIterator for the backup metadata. -func (b *BackupMetadata) NewDescriptorChangesIter( - ctx context.Context, -) bulk.Iterator[*backuppb.BackupManifest_DescriptorRevision] { +var _ DescriptorRevisionIterator = &descriptorRevisionIterator{} + +// NewDescriptorChangesIter creates a new DescriptorRevisionIterator for the backup metadata. +func (b *BackupMetadata) NewDescriptorChangesIter(ctx context.Context) DescriptorRevisionIterator { if b.MVCCFilter == backuppb.MVCCFilter_Latest { var backing []backuppb.BackupManifest_DescriptorRevision - return newSlicePointerIterator(backing) + return newDescriptorChangesSliceIterator(backing) } backing := makeBytesIter(ctx, b.store, b.filename, []byte(sstDescsPrefix), b.enc, false, b.kmsEnv) - dri := DescriptorRevisionIterator{ + dri := descriptorRevisionIterator{ backing: backing, } @@ -1311,31 +1404,26 @@ func (b *BackupMetadata) NewDescriptorChangesIter( return &dri } -// Valid implements the Iterator interface. -func (dri *DescriptorRevisionIterator) Valid() (bool, error) { +// Valid implements the DescriptorRevisionIterator interface. +func (dri *descriptorRevisionIterator) Valid() (bool, error) { if dri.err != nil { return false, dri.err } return dri.value != nil, nil } -// Value implements the Iterator interface. -func (dri *DescriptorRevisionIterator) Value() *backuppb.BackupManifest_DescriptorRevision { +// Value implements the DescriptorRevisionIterator interface. +func (dri *descriptorRevisionIterator) Value() *backuppb.BackupManifest_DescriptorRevision { return dri.value } // Close closes the iterator. -func (dri *DescriptorRevisionIterator) Close() { +func (dri *descriptorRevisionIterator) Close() { dri.backing.close() } -// Next retrieves the next descriptor revision in the iterator. -// -// Next returns true if next element was successfully unmarshalled into -// revision, and false if there are no more elements or if an error was -// encountered. When Next returns false, the user should call the Err method to -// verify the existence of an error. -func (dri *DescriptorRevisionIterator) Next() { +// Next implements the DescriptorRevisionIterator interface. +func (dri *descriptorRevisionIterator) Next() { if dri.err != nil { return } @@ -1384,19 +1472,37 @@ func unmarshalWrapper(wrapper *resultWrapper) (backuppb.BackupManifest_Descripto } // StatsIterator is a simple iterator to iterate over stats.TableStatisticProtos. -type StatsIterator struct { +type StatsIterator interface { + // Valid must be called after any call to Next(). It returns (true, nil) if + // the iterator points to a valid value, and (false, nil) if the iterator has + // moved past the last value. It returns (false, err) if there is an error in + // the iterator. + Valid() (bool, error) + + // Value returns the current value. The returned value is only valid until the + // next call to Next(). is only valid until the + Value() *stats.TableStatisticProto + + // Next advances the iterator to the next value. + Next() + + // Close closes the iterator. + Close() +} + +type statsIterator struct { backing bytesIter value *stats.TableStatisticProto err error } +var _ StatsIterator = &statsIterator{} + // NewStatsIter creates a new StatsIterator for the backup metadata. -func (b *BackupMetadata) NewStatsIter( - ctx context.Context, -) bulk.Iterator[*stats.TableStatisticProto] { +func (b *BackupMetadata) NewStatsIter(ctx context.Context) StatsIterator { backing := makeBytesIter(ctx, b.store, b.filename, []byte(sstStatsPrefix), b.enc, false, b.kmsEnv) - it := StatsIterator{ + it := statsIterator{ backing: backing, } it.Next() @@ -1404,25 +1510,25 @@ func (b *BackupMetadata) NewStatsIter( } // Close closes the iterator. -func (si *StatsIterator) Close() { +func (si *statsIterator) Close() { si.backing.close() } -// Valid implements the Iterator interface. -func (si *StatsIterator) Valid() (bool, error) { +// Valid implements the StatsIterator interface. +func (si *statsIterator) Valid() (bool, error) { if si.err != nil { return false, si.err } return si.value != nil, nil } -// Value implements the Iterator interface. -func (si *StatsIterator) Value() *stats.TableStatisticProto { +// Value implements the StatsIterator interface. +func (si *statsIterator) Value() *stats.TableStatisticProto { return si.value } -// Next implements the Iterator interface. -func (si *StatsIterator) Next() { +// Next implements the StatsIterator interface. +func (si *statsIterator) Next() { if si.err != nil { return } @@ -1529,24 +1635,90 @@ type resultWrapper struct { value []byte } -type sliceIterator[T any] struct { - backingSlice []T +type fileSliceIterator struct { + backingSlice []backuppb.BackupManifest_File + idx int +} + +var _ FileIterator = &fileSliceIterator{} + +func newFileSliceIterator(backing []backuppb.BackupManifest_File) *fileSliceIterator { + return &fileSliceIterator{ + backingSlice: backing, + } +} + +func (s *fileSliceIterator) Valid() (bool, error) { + return s.idx < len(s.backingSlice), nil +} + +func (s *fileSliceIterator) Value() *backuppb.BackupManifest_File { + if s.idx < len(s.backingSlice) { + return &s.backingSlice[s.idx] + } + + return nil +} + +func (s *fileSliceIterator) Next() { + s.idx++ +} + +func (s *fileSliceIterator) Close() { +} + +type descSliceIterator struct { + backingSlice []descpb.Descriptor + idx int +} + +var _ DescIterator = &descSliceIterator{} + +func newDescSliceIterator(backing []descpb.Descriptor) *descSliceIterator { + return &descSliceIterator{ + backingSlice: backing, + } +} + +func (s *descSliceIterator) Valid() (bool, error) { + return s.idx < len(s.backingSlice), nil +} + +func (s *descSliceIterator) Value() *descpb.Descriptor { + if s.idx < len(s.backingSlice) { + return &s.backingSlice[s.idx] + } + + return nil +} + +func (s *descSliceIterator) Next() { + s.idx++ +} + +func (s *descSliceIterator) Close() { +} + +type descriptorChangesSliceIterator struct { + backingSlice []backuppb.BackupManifest_DescriptorRevision idx int } -var _ bulk.Iterator[*backuppb.BackupManifest_DescriptorRevision] = &sliceIterator[backuppb.BackupManifest_DescriptorRevision]{} +var _ DescriptorRevisionIterator = &descriptorChangesSliceIterator{} -func newSlicePointerIterator[T any](backing []T) *sliceIterator[T] { - return &sliceIterator[T]{ +func newDescriptorChangesSliceIterator( + backing []backuppb.BackupManifest_DescriptorRevision, +) *descriptorChangesSliceIterator { + return &descriptorChangesSliceIterator{ backingSlice: backing, } } -func (s *sliceIterator[T]) Valid() (bool, error) { +func (s *descriptorChangesSliceIterator) Valid() (bool, error) { return s.idx < len(s.backingSlice), nil } -func (s *sliceIterator[T]) Value() *T { +func (s *descriptorChangesSliceIterator) Value() *backuppb.BackupManifest_DescriptorRevision { if s.idx < len(s.backingSlice) { return &s.backingSlice[s.idx] } @@ -1554,9 +1726,9 @@ func (s *sliceIterator[T]) Value() *T { return nil } -func (s *sliceIterator[T]) Next() { +func (s *descriptorChangesSliceIterator) Next() { s.idx++ } -func (s *sliceIterator[T]) Close() { +func (s *descriptorChangesSliceIterator) Close() { } diff --git a/pkg/ccl/backupccl/backupinfo/manifest_handling.go b/pkg/ccl/backupccl/backupinfo/manifest_handling.go index ca5e30497b3a..b19ea75665d1 100644 --- a/pkg/ccl/backupccl/backupinfo/manifest_handling.go +++ b/pkg/ccl/backupccl/backupinfo/manifest_handling.go @@ -45,7 +45,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/util" - "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -1581,9 +1580,7 @@ type LayerToBackupManifestFileIterFactory map[int]*IterFactory // NewFileIter creates a new Iterator over BackupManifest_Files. It is assumed // that the BackupManifest_File are sorted by FileCmp. -func (f *IterFactory) NewFileIter( - ctx context.Context, -) (bulk.Iterator[*backuppb.BackupManifest_File], error) { +func (f *IterFactory) NewFileIter(ctx context.Context) (FileIterator, error) { if f.m.HasExternalManifestSSTs { storeFile := storageccl.StoreFile{ Store: f.store, @@ -1600,40 +1597,38 @@ func (f *IterFactory) NewFileIter( return NewFileSSTIter(ctx, storeFile, encOpts) } - return newSlicePointerIterator(f.m.Files), nil + return newFileSliceIterator(f.m.Files), nil } // NewDescIter creates a new Iterator over Descriptors. -func (f *IterFactory) NewDescIter(ctx context.Context) bulk.Iterator[*descpb.Descriptor] { +func (f *IterFactory) NewDescIter(ctx context.Context) DescIterator { if f.m.HasExternalManifestSSTs { backing := makeBytesIter(ctx, f.store, f.descriptorSSTPath, []byte(sstDescsPrefix), f.encryption, true, f.kmsEnv) - it := DescIterator{ + it := descIterator{ backing: backing, } it.Next() return &it } - return newSlicePointerIterator(f.m.Descriptors) + return newDescSliceIterator(f.m.Descriptors) } // NewDescriptorChangesIter creates a new Iterator over // BackupManifest_DescriptorRevisions. It is assumed that descriptor changes are // sorted by DescChangesLess. -func (f *IterFactory) NewDescriptorChangesIter( - ctx context.Context, -) bulk.Iterator[*backuppb.BackupManifest_DescriptorRevision] { +func (f *IterFactory) NewDescriptorChangesIter(ctx context.Context) DescriptorRevisionIterator { if f.m.HasExternalManifestSSTs { if f.m.MVCCFilter == backuppb.MVCCFilter_Latest { // If the manifest is backuppb.MVCCFilter_Latest, then return an empty // iterator for descriptor changes. var backing []backuppb.BackupManifest_DescriptorRevision - return newSlicePointerIterator(backing) + return newDescriptorChangesSliceIterator(backing) } backing := makeBytesIter(ctx, f.store, f.descriptorSSTPath, []byte(sstDescsPrefix), f.encryption, false, f.kmsEnv) - dri := DescriptorRevisionIterator{ + dri := descriptorRevisionIterator{ backing: backing, } @@ -1641,7 +1636,7 @@ func (f *IterFactory) NewDescriptorChangesIter( return &dri } - return newSlicePointerIterator(f.m.DescriptorChanges) + return newDescriptorChangesSliceIterator(f.m.DescriptorChanges) } // GetBackupManifestIterFactories constructs a mapping from the idx of the diff --git a/pkg/ccl/backupccl/backupinfo/manifest_handling_test.go b/pkg/ccl/backupccl/backupinfo/manifest_handling_test.go index b3246203ec5f..1f2e3dfaebc4 100644 --- a/pkg/ccl/backupccl/backupinfo/manifest_handling_test.go +++ b/pkg/ccl/backupccl/backupinfo/manifest_handling_test.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" - "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/stretchr/testify/require" @@ -101,13 +100,13 @@ func TestManifestHandlingIteratorOperations(t *testing.T) { }) t.Run("files", func(t *testing.T) { - checkIteratorOperations(t, mustCreateFileIterFactory(t, iterFactory), sortedFiles, fileLess) + checkFileIteratorOperations(t, mustCreateFileIterFactory(t, iterFactory), sortedFiles, fileLess) }) t.Run("descriptors", func(t *testing.T) { - checkIteratorOperations(t, iterFactory.NewDescIter, sortedDescs, descLess) + checkDescIteratorOperations(t, iterFactory.NewDescIter, sortedDescs, descLess) }) t.Run("descriptor-changes", func(t *testing.T) { - checkIteratorOperations(t, iterFactory.NewDescriptorChangesIter, sortedDescRevs, descRevsLess) + checkDescriptorChangesIteratorOperations(t, iterFactory.NewDescriptorChangesIter, sortedDescRevs, descRevsLess) }) } @@ -138,13 +137,13 @@ func TestManifestHandlingEmptyIterators(t *testing.T) { iterFactory := backupinfo.NewIterFactory(&m, store, nil, nil) t.Run("files", func(t *testing.T) { - checkEmptyIteratorOperations(t, mustCreateFileIterFactory(t, iterFactory)) + checkEmptyFileIteratorOperations(t, mustCreateFileIterFactory(t, iterFactory)) }) t.Run("descriptors", func(t *testing.T) { - checkEmptyIteratorOperations(t, iterFactory.NewDescIter) + checkEmptyDescIteratorOperations(t, iterFactory.NewDescIter) }) t.Run("descriptor-changes", func(t *testing.T) { - checkEmptyIteratorOperations(t, iterFactory.NewDescriptorChangesIter) + checkEmptyDescriptorChangesIteratorOperations(t, iterFactory.NewDescriptorChangesIter) }) } @@ -198,18 +197,18 @@ func makeMockManifest( return m } -func checkIteratorOperations[T any]( +func checkFileIteratorOperations( t *testing.T, - mkIter func(context.Context) bulk.Iterator[*T], - expected []T, - less func(left T, right T) bool, + mkIter func(context.Context) backupinfo.FileIterator, + expected []backuppb.BackupManifest_File, + less func(left backuppb.BackupManifest_File, right backuppb.BackupManifest_File) bool, ) { ctx := context.Background() // 1. Check if the iterator returns the expected contents, regardless of how // many times value is called between calls to Next(). for numValueCalls := 1; numValueCalls <= 5; numValueCalls++ { - var actual []T + var actual []backuppb.BackupManifest_File it := mkIter(ctx) defer it.Close() for ; ; it.Next() { @@ -219,7 +218,7 @@ func checkIteratorOperations[T any]( break } - var value T + var value backuppb.BackupManifest_File for i := 0; i < numValueCalls; i++ { value = *it.Value() } @@ -267,8 +266,190 @@ func checkIteratorOperations[T any]( require.NoError(t, err) } -func checkEmptyIteratorOperations[T any]( - t *testing.T, mkIter func(context.Context) bulk.Iterator[*T], +func checkDescIteratorOperations( + t *testing.T, + mkIter func(context.Context) backupinfo.DescIterator, + expected []descpb.Descriptor, + less func(left descpb.Descriptor, right descpb.Descriptor) bool, +) { + ctx := context.Background() + + // 1. Check if the iterator returns the expected contents, regardless of how + // many times value is called between calls to Next(). + for numValueCalls := 1; numValueCalls <= 5; numValueCalls++ { + var actual []descpb.Descriptor + it := mkIter(ctx) + defer it.Close() + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + break + } + + var value descpb.Descriptor + for i := 0; i < numValueCalls; i++ { + value = *it.Value() + } + + actual = append(actual, value) + } + + sort.Slice(actual, func(i, j int) bool { + return less(actual[i], actual[j]) + }) + + require.Equal(t, expected, actual, fmt.Sprintf("contents not equal if there are %d calls to Value()", numValueCalls)) + } + + // 2. Check that we can repeatedly call Next() and Value() after the iterator + // is done. + it := mkIter(ctx) + defer it.Close() + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + break + } + } + + for i := 0; i < 10; i++ { + it.Next() + ok, err := it.Valid() + require.False(t, ok) + require.NoError(t, err) + + it.Value() // Should not error or panic. + } + + // 3. Check that we can get the value without calling Valid(). + itNoCheck := mkIter(ctx) + defer itNoCheck.Close() + require.Greater(t, len(expected), 0) + value := itNoCheck.Value() + require.Contains(t, expected, *value) + + ok, err := itNoCheck.Valid() + require.True(t, ok) + require.NoError(t, err) +} + +func checkDescriptorChangesIteratorOperations( + t *testing.T, + mkIter func(context.Context) backupinfo.DescriptorRevisionIterator, + expected []backuppb.BackupManifest_DescriptorRevision, + less func(left backuppb.BackupManifest_DescriptorRevision, right backuppb.BackupManifest_DescriptorRevision) bool, +) { + ctx := context.Background() + + // 1. Check if the iterator returns the expected contents, regardless of how + // many times value is called between calls to Next(). + for numValueCalls := 1; numValueCalls <= 5; numValueCalls++ { + var actual []backuppb.BackupManifest_DescriptorRevision + it := mkIter(ctx) + defer it.Close() + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + break + } + + var value backuppb.BackupManifest_DescriptorRevision + for i := 0; i < numValueCalls; i++ { + value = *it.Value() + } + + actual = append(actual, value) + } + + sort.Slice(actual, func(i, j int) bool { + return less(actual[i], actual[j]) + }) + + require.Equal(t, expected, actual, fmt.Sprintf("contents not equal if there are %d calls to Value()", numValueCalls)) + } + + // 2. Check that we can repeatedly call Next() and Value() after the iterator + // is done. + it := mkIter(ctx) + defer it.Close() + for ; ; it.Next() { + if ok, err := it.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + break + } + } + + for i := 0; i < 10; i++ { + it.Next() + ok, err := it.Valid() + require.False(t, ok) + require.NoError(t, err) + + it.Value() // Should not error or panic. + } + + // 3. Check that we can get the value without calling Valid(). + itNoCheck := mkIter(ctx) + defer itNoCheck.Close() + require.Greater(t, len(expected), 0) + value := itNoCheck.Value() + require.Contains(t, expected, *value) + + ok, err := itNoCheck.Valid() + require.True(t, ok) + require.NoError(t, err) +} + +func checkEmptyFileIteratorOperations( + t *testing.T, mkIter func(context.Context) backupinfo.FileIterator, +) { + ctx := context.Background() + + // Check that regardless of how many calls to Next() the iterator will not be + // valid. + for numNextCalls := 0; numNextCalls < 5; numNextCalls++ { + it := mkIter(ctx) + defer it.Close() + for i := 0; i < numNextCalls; i++ { + it.Next() + } + + ok, err := it.Valid() + require.NoError(t, err) + require.False(t, ok) + + it.Value() // Should not error or panic. + } +} + +func checkEmptyDescIteratorOperations( + t *testing.T, mkIter func(context.Context) backupinfo.DescIterator, +) { + ctx := context.Background() + + // Check that regardless of how many calls to Next() the iterator will not be + // valid. + for numNextCalls := 0; numNextCalls < 5; numNextCalls++ { + it := mkIter(ctx) + defer it.Close() + for i := 0; i < numNextCalls; i++ { + it.Next() + } + + ok, err := it.Valid() + require.NoError(t, err) + require.False(t, ok) + + it.Value() // Should not error or panic. + } +} + +func checkEmptyDescriptorChangesIteratorOperations( + t *testing.T, mkIter func(context.Context) backupinfo.DescriptorRevisionIterator, ) { ctx := context.Background() @@ -291,8 +472,8 @@ func checkEmptyIteratorOperations[T any]( func mustCreateFileIterFactory( t *testing.T, iterFactory *backupinfo.IterFactory, -) func(ctx context.Context) bulk.Iterator[*backuppb.BackupManifest_File] { - return func(ctx context.Context) bulk.Iterator[*backuppb.BackupManifest_File] { +) func(ctx context.Context) backupinfo.FileIterator { + return func(ctx context.Context) backupinfo.FileIterator { it, err := iterFactory.NewFileIter(ctx) require.NoError(t, err) return it diff --git a/pkg/ccl/backupccl/restore_span_covering.go b/pkg/ccl/backupccl/restore_span_covering.go index 181b474cf160..6b742951accb 100644 --- a/pkg/ccl/backupccl/restore_span_covering.go +++ b/pkg/ccl/backupccl/restore_span_covering.go @@ -18,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" - "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/interval" spanUtils "github.com/cockroachdb/cockroach/pkg/util/span" @@ -56,48 +55,6 @@ var targetRestoreSpanSize = settings.RegisterByteSizeSetting( 384<<20, ) -// backupManifestFileIterator exposes methods that can be used to iterate over -// the `BackupManifest_Files` field of a manifest. -type backupManifestFileIterator interface { - next() (backuppb.BackupManifest_File, bool) - peek() (backuppb.BackupManifest_File, bool) - err() error - close() -} - -// sstFileIterator uses an underlying `backupinfo.FileIterator` to read the -// `BackupManifest_Files` from the SST file. -type sstFileIterator struct { - fi *backupinfo.FileIterator -} - -func (s *sstFileIterator) next() (backuppb.BackupManifest_File, bool) { - f, ok := s.peek() - if ok { - s.fi.Next() - } - - return f, ok -} - -func (s *sstFileIterator) peek() (backuppb.BackupManifest_File, bool) { - if ok, _ := s.fi.Valid(); !ok { - return backuppb.BackupManifest_File{}, false - } - return *s.fi.Value(), true -} - -func (s *sstFileIterator) err() error { - _, err := s.fi.Valid() - return err -} - -func (s *sstFileIterator) close() { - s.fi.Close() -} - -var _ backupManifestFileIterator = &sstFileIterator{} - // makeSimpleImportSpans partitions the spans of requiredSpans into a covering // of RestoreSpanEntry's which each have all overlapping files from the passed // backups assigned to them. The spans of requiredSpans are trimmed/removed @@ -376,7 +333,7 @@ func generateAndSendImportSpans( return err } - fileIterByLayer := make([]bulk.Iterator[*backuppb.BackupManifest_File], 0, len(backups)) + fileIterByLayer := make([]backupinfo.FileIterator, 0, len(backups)) for layer := range backups { iter, err := layerToBackupManifestFileIterFactory[layer].NewFileIter(ctx) if err != nil { @@ -591,7 +548,7 @@ func generateAndSendImportSpans( // [a, b, c, e, f, g] type fileSpanStartAndEndKeyIterator struct { heap *fileHeap - allIters []bulk.Iterator[*backuppb.BackupManifest_File] + allIters []backupinfo.FileIterator err error } @@ -681,7 +638,7 @@ func (i *fileSpanStartAndEndKeyIterator) reset() { } type fileHeapItem struct { - fileIter bulk.Iterator[*backuppb.BackupManifest_File] + fileIter backupinfo.FileIterator file *backuppb.BackupManifest_File cmpEndKey bool } @@ -727,9 +684,7 @@ func (f *fileHeap) Pop() any { } func getNewIntersectingFilesByLayer( - span roachpb.Span, - layersCoveredLater map[int]bool, - fileIters []bulk.Iterator[*backuppb.BackupManifest_File], + span roachpb.Span, layersCoveredLater map[int]bool, fileIters []backupinfo.FileIterator, ) ([][]*backuppb.BackupManifest_File, error) { var files [][]*backuppb.BackupManifest_File diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index 5a3a3fb6992d..d77f07710b2d 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -395,7 +395,7 @@ message GenerativeSplitAndScatterSpec { repeated roachpb.Span spans = 10 [(gogoproto.nullable) = false]; repeated jobs.jobspb.RestoreDetails.BackupLocalityInfo backup_locality_info = 11 [(gogoproto.nullable) = false]; // HighWater is the high watermark of the previous run of restore. - optional bytes high_water = 12 [(gogoproto.nullable) = false]; + optional bytes high_water = 12; // User who initiated the restore. optional string user_proto = 13 [(gogoproto.nullable) = false, (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/security/username.SQLUsernameProto"]; // ChunkSize is the number of import spans per chunk. diff --git a/pkg/util/bulk/BUILD.bazel b/pkg/util/bulk/BUILD.bazel index ba065f15f790..a8ccf07aa3af 100644 --- a/pkg/util/bulk/BUILD.bazel +++ b/pkg/util/bulk/BUILD.bazel @@ -3,10 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "bulk", - srcs = [ - "iterator.go", - "tracing_aggregator.go", - ], + srcs = ["tracing_aggregator.go"], importpath = "github.com/cockroachdb/cockroach/pkg/util/bulk", visibility = ["//visibility:public"], deps = [ diff --git a/pkg/util/bulk/iterator.go b/pkg/util/bulk/iterator.go deleted file mode 100644 index 57f7b09dced9..000000000000 --- a/pkg/util/bulk/iterator.go +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2023 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package bulk - -// Iterator is an interface to iterate a collection of objects of type T. -type Iterator[T any] interface { - // Valid must be called after any call to Next(). It returns (true, nil) if - // the iterator points to a valid value, and (false, nil) if the iterator has - // moved past the last value. It returns (false, err) if there is an error in - // the iterator. - Valid() (bool, error) - - // Value returns the current value. The returned value is only valid until the - // next call to Next(). is only valid until the - Value() T - - // Next advances the iterator to the next value. - Next() - - // Close closes the iterator. - Close() -} - -// CollectToSlice iterates over all objects in iterator and collects them into a -// slice. -func CollectToSlice[T any](iterator Iterator[T]) ([]T, error) { - var values []T - for ; ; iterator.Next() { - if ok, err := iterator.Valid(); err != nil { - return nil, err - } else if !ok { - break - } - - values = append(values, iterator.Value()) - } - return values, nil -}