From 4d1f66e12d174fb3bf59549c4f660931552ffc8d Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Thu, 7 Jul 2022 06:34:10 -0400 Subject: [PATCH] storageccl: use NewPebbleIterator in restore data processor This PR replaces the multiIterator used in the restore data processor with the PebbleSSTIterator, which has baked in range key support. This patch is apart of a larger effort to teach backup and restore about MVCC bulk operations. Next, the readAsOfIterator will need to learn how to deal with range keys. This PR leaves some clean up work to remove all calls of DeprecatingExternalSSTReader. Informs #71155 Release note: none --- .../backupccl/backupinfo/backup_metadata.go | 10 +- pkg/ccl/backupccl/restore_data_processor.go | 42 +++--- pkg/ccl/cliccl/debug_backup.go | 2 +- pkg/ccl/storageccl/external_sst_reader.go | 120 +++++++++++++++++- pkg/storage/multi_iterator.go | 2 + 5 files changed, 145 insertions(+), 31 deletions(-) diff --git a/pkg/ccl/backupccl/backupinfo/backup_metadata.go b/pkg/ccl/backupccl/backupinfo/backup_metadata.go index ac074f8e023c..61d97da7e10f 100644 --- a/pkg/ccl/backupccl/backupinfo/backup_metadata.go +++ b/pkg/ccl/backupccl/backupinfo/backup_metadata.go @@ -618,7 +618,7 @@ func debugDumpFileSST( } encOpts = &roachpb.FileEncryptionOptions{Key: key} } - iter, err := storageccl.ExternalSSTReader(ctx, store, fileInfoPath, encOpts) + iter, err := storageccl.DeprecatingExternalSSTReader(ctx, store, fileInfoPath, encOpts) if err != nil { return err } @@ -665,7 +665,7 @@ func DebugDumpMetadataSST( encOpts = &roachpb.FileEncryptionOptions{Key: key} } - iter, err := storageccl.ExternalSSTReader(ctx, store, path, encOpts) + iter, err := storageccl.DeprecatingExternalSSTReader(ctx, store, path, encOpts) if err != nil { return err } @@ -805,7 +805,7 @@ func NewBackupMetadata( encOpts = &roachpb.FileEncryptionOptions{Key: key} } - iter, err := storageccl.ExternalSSTReader(ctx, exportStore, sstFileName, encOpts) + iter, err := storageccl.DeprecatingExternalSSTReader(ctx, exportStore, sstFileName, encOpts) if err != nil { return nil, err } @@ -922,7 +922,7 @@ func (b *BackupMetadata) FileIter(ctx context.Context) FileIterator { break } - iter, err := storageccl.ExternalSSTReader(ctx, b.store, path, encOpts) + iter, err := storageccl.DeprecatingExternalSSTReader(ctx, b.store, path, encOpts) if err != nil { return FileIterator{err: err} } @@ -1232,7 +1232,7 @@ func makeBytesIter( encOpts = &roachpb.FileEncryptionOptions{Key: key} } - iter, err := storageccl.ExternalSSTReader(ctx, store, path, encOpts) + iter, err := storageccl.DeprecatingExternalSSTReader(ctx, store, path, encOpts) if err != nil { return bytesIter{iterError: err} } diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 0c9ca8de4e06..00e62c7c4054 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -278,18 +278,13 @@ func (rd *restoreDataProcessor) openSSTs( ) error { ctxDone := ctx.Done() - // The sstables only contain MVCC data and no intents, so using an MVCC - // iterator is sufficient. - var iters []storage.SimpleMVCCIterator + // TODO(msbutler): use a a map of external storage factories to avoid reopening the same dir + // in a given restore span entry var dirs []cloud.ExternalStorage // If we bail early and haven't handed off responsibility of the dirs/iters to // the channel, close anything that we had open. defer func() { - for _, iter := range iters { - iter.Close() - } - for _, dir := range dirs { if err := dir.Close(); err != nil { log.Warningf(ctx, "close export storage failed %v", err) @@ -297,18 +292,13 @@ func (rd *restoreDataProcessor) openSSTs( } }() - // sendIters sends all of the currently accumulated iterators over the + // sendIter sends a multiplexed iterator covering the currently accumulated files over the // channel. - sendIters := func(itersToSend []storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error { - multiIter := storage.MakeMultiIterator(itersToSend) - readAsOfIter := storage.NewReadAsOfIterator(multiIter, rd.spec.RestoreTime) + sendIter := func(iter storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error { + readAsOfIter := storage.NewReadAsOfIterator(iter, rd.spec.RestoreTime) cleanup := func() { readAsOfIter.Close() - multiIter.Close() - for _, iter := range itersToSend { - iter.Close() - } for _, dir := range dirsToSend { if err := dir.Close(); err != nil { @@ -329,13 +319,13 @@ func (rd *restoreDataProcessor) openSSTs( return ctx.Err() } - iters = make([]storage.SimpleMVCCIterator, 0) dirs = make([]cloud.ExternalStorage, 0) return nil } log.VEventf(ctx, 1 /* level */, "ingesting span [%s-%s)", entry.Span.Key, entry.Span.EndKey) + filePaths := make([]string, 0, len(EntryFiles{})) for _, file := range entry.Files { log.VEventf(ctx, 2, "import file %s which starts at %s", file.Path, entry.Span.Key) @@ -344,17 +334,23 @@ func (rd *restoreDataProcessor) openSSTs( return err } dirs = append(dirs, dir) + filePaths = append(filePaths, file.Path) // TODO(pbardea): When memory monitoring is added, send the currently // accumulated iterators on the channel if we run into memory pressure. - iter, err := storageccl.ExternalSSTReader(ctx, dir, file.Path, rd.spec.Encryption) - if err != nil { - return err - } - iters = append(iters, iter) } - - return sendIters(iters, dirs) + iterOpts := storage.IterOptions{ + RangeKeyMaskingBelow: rd.spec.RestoreTime, + KeyTypes: storage.IterKeyTypePointsAndRanges, + LowerBound: keys.LocalMax, + UpperBound: keys.MaxKey, + } + iter, err := storageccl.ExternalSSTReader(ctx, dirs, filePaths, rd.spec.Encryption, + iterOpts) + if err != nil { + return err + } + return sendIter(iter, dirs) } func (rd *restoreDataProcessor) runRestoreWorkers(ctx context.Context, ssts chan mergedSST) error { diff --git a/pkg/ccl/cliccl/debug_backup.go b/pkg/ccl/cliccl/debug_backup.go index 42459e3a5c32..74a9d7d63c9a 100644 --- a/pkg/ccl/cliccl/debug_backup.go +++ b/pkg/ccl/cliccl/debug_backup.go @@ -591,7 +591,7 @@ func makeIters( return nil, nil, errors.Wrapf(err, "making external storage") } - iters[i], err = storageccl.ExternalSSTReader(ctx, dirStorage[i], file.Path, nil) + iters[i], err = storageccl.DeprecatingExternalSSTReader(ctx, dirStorage[i], file.Path, nil) if err != nil { return nil, nil, errors.Wrapf(err, "fetching sst reader") } diff --git a/pkg/ccl/storageccl/external_sst_reader.go b/pkg/ccl/storageccl/external_sst_reader.go index 5d8e4f7dcc56..c5a8e73f7e1f 100644 --- a/pkg/ccl/storageccl/external_sst_reader.go +++ b/pkg/ccl/storageccl/external_sst_reader.go @@ -25,6 +25,8 @@ import ( "github.com/cockroachdb/pebble/sstable" ) +// RemoteSSTs lets external SSTables get iterated directly in some cases, +// rather than being downloaded entirely first. var remoteSSTs = settings.RegisterBoolSetting( settings.TenantWritable, "kv.bulk_ingest.stream_external_ssts.enabled", @@ -39,12 +41,126 @@ var remoteSSTSuffixCacheSize = settings.RegisterByteSizeSetting( 64<<10, ) -// ExternalSSTReader returns opens an SST in external storage, optionally -// decrypting with the supplied parameters, and returns iterator over it. +func getFileWithRetry( + ctx context.Context, basename string, e cloud.ExternalStorage, +) (ioctx.ReadCloserCtx, int64, error) { + // Do an initial read of the file, from the beginning, to get the file size as + // this is used e.g. to read the trailer. + var f ioctx.ReadCloserCtx + var sz int64 + const maxAttempts = 3 + if err := retry.WithMaxAttempts(ctx, base.DefaultRetryOptions(), maxAttempts, func() error { + var err error + f, sz, err = e.ReadFileAt(ctx, basename, 0) + return err + }); err != nil { + return nil, 0, err + } + return f, sz, nil +} + +// newMemPebbleSSTReader returns a PebbleSSTIterator for in-memory SSTs from +// external storage, optionally decrypting with the supplied parameters. +// +// ctx is captured and used throughout the life of the returned iterator, until +// the iterator's Close() method is called. +func newMemPebbleSSTReader( + ctx context.Context, + e []cloud.ExternalStorage, + basenames []string, + encryption *roachpb.FileEncryptionOptions, + iterOps storage.IterOptions, +) (storage.SimpleMVCCIterator, error) { + + inMemorySSTs := make([][]byte, 0, len(basenames)) + + for i, basename := range basenames { + f, _, err := getFileWithRetry(ctx, basename, e[i]) + if err != nil { + return nil, err + } + content, err := ioctx.ReadAll(ctx, f) + f.Close(ctx) + if err != nil { + return nil, err + } + if encryption != nil { + content, err = DecryptFile(ctx, content, encryption.Key, nil /* mm */) + if err != nil { + return nil, err + } + } + inMemorySSTs = append(inMemorySSTs, content) + } + return storage.NewPebbleMultiMemSSTIterator(inMemorySSTs, false, iterOps) +} + +// ExternalSSTReader returns a PebbleSSTIterator for the SSTs in external storage, +// optionally decrypting with the supplied parameters. // // ctx is captured and used throughout the life of the returned iterator, until // the iterator's Close() method is called. func ExternalSSTReader( + ctx context.Context, + e []cloud.ExternalStorage, + basenames []string, + encryption *roachpb.FileEncryptionOptions, + iterOps storage.IterOptions, +) (storage.SimpleMVCCIterator, error) { + if !remoteSSTs.Get(&e[0].Settings().SV) { + return newMemPebbleSSTReader(ctx, e, basenames, encryption, iterOps) + } + remoteCacheSize := remoteSSTSuffixCacheSize.Get(&e[0].Settings().SV) + readers := make([]sstable.ReadableFile, 0, len(basenames)) + + for i, basename := range basenames { + f, sz, err := getFileWithRetry(ctx, basename, e[i]) + if err != nil { + return nil, err + } + + raw := &sstReader{ + ctx: ctx, + sz: sizeStat(sz), + body: f, + openAt: func(offset int64) (ioctx.ReadCloserCtx, error) { + reader, _, err := e[i].ReadFileAt(ctx, basename, offset) + return reader, err + }, + } + + var reader sstable.ReadableFile + + if encryption != nil { + r, err := decryptingReader(raw, encryption.Key) + if err != nil { + f.Close(ctx) + return nil, err + } + reader = r + } else { + // We only explicitly buffer the suffix of the file when not decrypting as + // the decrypting reader has its own internal block buffer. + if err := raw.readAndCacheSuffix(remoteCacheSize); err != nil { + f.Close(ctx) + return nil, err + } + reader = raw + } + readers = append(readers, reader) + } + return storage.NewPebbleSSTIterator(readers, iterOps) +} + +// DeprecatingExternalSSTReader returns opens an SST in external storage, optionally +// decrypting with the supplied parameters, and returns iterator over it. +// +// ctx is captured and used throughout the life of the returned iterator, until +// the iterator's Close() method is called. +// +// TODO (msbutler): replace all current calls with new ExternalSSTReader, +// as it does not handle range keys +func DeprecatingExternalSSTReader( ctx context.Context, e cloud.ExternalStorage, basename string, diff --git a/pkg/storage/multi_iterator.go b/pkg/storage/multi_iterator.go index 5a4d248f12fa..0f4c2f00561a 100644 --- a/pkg/storage/multi_iterator.go +++ b/pkg/storage/multi_iterator.go @@ -20,6 +20,8 @@ import ( const invalidIdxSentinel = -1 // multiIterator multiplexes iteration over a number of SimpleMVCCIterators. +// +// TODO (msbutler): remove the multiIterator and replace all uses with PebbleSSTIterator type multiIterator struct { iters []SimpleMVCCIterator // The index into `iters` of the iterator currently being pointed at.