From 9b2bb35b15500f4aa87dc6101ed39a66f195888b Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Thu, 7 Jul 2022 06:34:10 -0400 Subject: [PATCH] storageccl: use the new PebbleIterator in ExternalSSTReader This PR refactors all call sites of ExternalSSTReader(), to support using the new PebbleIterator, which has baked in range key support. Most notably, this PR replaces the multiIterator used in the restore data processor with the PebbleSSTIterator. This patch is apart of a larger effort to teach backup and restore about MVCC bulk operations. Next, the readAsOfIterator will need to learn how to deal with range keys. Informs #71155 This PR addresses a bug created in #83984: loop variables in ExternalSSTReader were captured by reference, leading to roachtest failures (#84240, #84162). Informs #71155i Fixes: #84240, #84162, #84181 Release note: none --- .../backupccl/backupinfo/backup_metadata.go | 21 +-- pkg/ccl/backupccl/restore_data_processor.go | 42 +++--- pkg/ccl/cliccl/debug_backup.go | 9 +- pkg/ccl/storageccl/BUILD.bazel | 11 ++ pkg/ccl/storageccl/encryption.go | 4 +- pkg/ccl/storageccl/external_sst_reader.go | 132 ++++++++++++------ .../storageccl/external_sst_reader_test.go | 85 +++++++++++ pkg/storage/multi_iterator.go | 2 + 8 files changed, 229 insertions(+), 77 deletions(-) diff --git a/pkg/ccl/backupccl/backupinfo/backup_metadata.go b/pkg/ccl/backupccl/backupinfo/backup_metadata.go index ac074f8e023c..3ae75d27d93e 100644 --- a/pkg/ccl/backupccl/backupinfo/backup_metadata.go +++ b/pkg/ccl/backupccl/backupinfo/backup_metadata.go @@ -49,6 +49,12 @@ const ( sstTenantsPrefix = "tenant/" ) +var iterOpts = storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsAndRanges, + LowerBound: keys.LocalMax, + UpperBound: keys.MaxKey, +} + // WriteBackupMetadataSST is responsible for constructing and writing the // `metadata.sst` to dest. This file contains the metadata corresponding to this // backup. @@ -618,7 +624,7 @@ func debugDumpFileSST( } encOpts = &roachpb.FileEncryptionOptions{Key: key} } - iter, err := storageccl.ExternalSSTReader(ctx, store, fileInfoPath, encOpts) + iter, err := storageccl.ExternalSSTReader(ctx, []storageccl.StoreFile{{Store: store, FilePath: fileInfoPath}}, encOpts, iterOpts) if err != nil { return err } @@ -664,8 +670,7 @@ func DebugDumpMetadataSST( } encOpts = &roachpb.FileEncryptionOptions{Key: key} } - - iter, err := storageccl.ExternalSSTReader(ctx, store, path, encOpts) + iter, err := storageccl.ExternalSSTReader(ctx, []storageccl.StoreFile{{Store: store, FilePath: path}}, encOpts, iterOpts) if err != nil { return err } @@ -804,8 +809,7 @@ func NewBackupMetadata( } encOpts = &roachpb.FileEncryptionOptions{Key: key} } - - iter, err := storageccl.ExternalSSTReader(ctx, exportStore, sstFileName, encOpts) + iter, err := storageccl.ExternalSSTReader(ctx, []storageccl.StoreFile{{Store: exportStore, FilePath: sstFileName}}, encOpts, iterOpts) if err != nil { return nil, err } @@ -921,8 +925,8 @@ func (b *BackupMetadata) FileIter(ctx context.Context) FileIterator { if err != nil { break } - - iter, err := storageccl.ExternalSSTReader(ctx, b.store, path, encOpts) + iter, err := storageccl.ExternalSSTReader(ctx, []storageccl.StoreFile{{Store: b.store, + FilePath: path}}, encOpts, iterOpts) if err != nil { return FileIterator{err: err} } @@ -1232,7 +1236,8 @@ func makeBytesIter( encOpts = &roachpb.FileEncryptionOptions{Key: key} } - iter, err := storageccl.ExternalSSTReader(ctx, store, path, encOpts) + iter, err := storageccl.ExternalSSTReader(ctx, []storageccl.StoreFile{{Store: store, + FilePath: path}}, encOpts, iterOpts) if err != nil { return bytesIter{iterError: err} } diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 50d69fca0ac2..1fd761106abb 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -277,18 +277,13 @@ func (rd *restoreDataProcessor) openSSTs( ) error { ctxDone := ctx.Done() - // The sstables only contain MVCC data and no intents, so using an MVCC - // iterator is sufficient. - var iters []storage.SimpleMVCCIterator + // TODO(msbutler): use a a map of external storage factories to avoid reopening the same dir + // in a given restore span entry var dirs []cloud.ExternalStorage // If we bail early and haven't handed off responsibility of the dirs/iters to // the channel, close anything that we had open. defer func() { - for _, iter := range iters { - iter.Close() - } - for _, dir := range dirs { if err := dir.Close(); err != nil { log.Warningf(ctx, "close export storage failed %v", err) @@ -296,18 +291,13 @@ func (rd *restoreDataProcessor) openSSTs( } }() - // sendIters sends all of the currently accumulated iterators over the + // sendIter sends a multiplexed iterator covering the currently accumulated files over the // channel. - sendIters := func(itersToSend []storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error { - multiIter := storage.MakeMultiIterator(itersToSend) - readAsOfIter := storage.NewReadAsOfIterator(multiIter, rd.spec.RestoreTime) + sendIter := func(iter storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error { + readAsOfIter := storage.NewReadAsOfIterator(iter, rd.spec.RestoreTime) cleanup := func() { readAsOfIter.Close() - multiIter.Close() - for _, iter := range itersToSend { - iter.Close() - } for _, dir := range dirsToSend { if err := dir.Close(); err != nil { @@ -328,13 +318,13 @@ func (rd *restoreDataProcessor) openSSTs( return ctx.Err() } - iters = make([]storage.SimpleMVCCIterator, 0) dirs = make([]cloud.ExternalStorage, 0) return nil } log.VEventf(ctx, 1 /* level */, "ingesting span [%s-%s)", entry.Span.Key, entry.Span.EndKey) + storeFiles := make([]storageccl.StoreFile, 0, len(EntryFiles{})) for _, file := range entry.Files { log.VEventf(ctx, 2, "import file %s which starts at %s", file.Path, entry.Span.Key) @@ -343,17 +333,21 @@ func (rd *restoreDataProcessor) openSSTs( return err } dirs = append(dirs, dir) - + storeFiles = append(storeFiles, storageccl.StoreFile{Store:dir, FilePath:file.Path}) // TODO(pbardea): When memory monitoring is added, send the currently // accumulated iterators on the channel if we run into memory pressure. - iter, err := storageccl.ExternalSSTReader(ctx, dir, file.Path, rd.spec.Encryption) - if err != nil { - return err - } - iters = append(iters, iter) } - - return sendIters(iters, dirs) + iterOpts := storage.IterOptions{ + RangeKeyMaskingBelow: rd.spec.RestoreTime, + KeyTypes: storage.IterKeyTypePointsAndRanges, + LowerBound: keys.LocalMax, + UpperBound: keys.MaxKey, + } + iter, err := storageccl.ExternalSSTReader(ctx, storeFiles, rd.spec.Encryption, iterOpts) + if err != nil { + return err + } + return sendIter(iter, dirs) } func (rd *restoreDataProcessor) runRestoreWorkers(ctx context.Context, ssts chan mergedSST) error { diff --git a/pkg/ccl/cliccl/debug_backup.go b/pkg/ccl/cliccl/debug_backup.go index 42459e3a5c32..b73113ab7d81 100644 --- a/pkg/ccl/cliccl/debug_backup.go +++ b/pkg/ccl/cliccl/debug_backup.go @@ -591,7 +591,14 @@ func makeIters( return nil, nil, errors.Wrapf(err, "making external storage") } - iters[i], err = storageccl.ExternalSSTReader(ctx, dirStorage[i], file.Path, nil) + var iterOpts = storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsAndRanges, + LowerBound: keys.LocalMax, + UpperBound: keys.MaxKey, + } + + iters[i], err = storageccl.ExternalSSTReader(ctx, []storageccl.StoreFile{{Store:dirStorage[i], FilePath:file.Path}}, nil,iterOpts) + if err != nil { return nil, nil, errors.Wrapf(err, "fetching sst reader") } diff --git a/pkg/ccl/storageccl/BUILD.bazel b/pkg/ccl/storageccl/BUILD.bazel index 729bebee11d7..a0f48d58ea1d 100644 --- a/pkg/ccl/storageccl/BUILD.bazel +++ b/pkg/ccl/storageccl/BUILD.bazel @@ -35,14 +35,25 @@ go_test( ], embed = [":storageccl"], deps = [ + "//pkg/base", + "//pkg/blobs", + "//pkg/cloud", + "//pkg/cloud/nodelocal", + "//pkg/keys", "//pkg/security/securityassets", "//pkg/security/securitytest", + "//pkg/security/username", "//pkg/server", + "//pkg/sql", + "//pkg/storage", + "//pkg/testutils", "//pkg/testutils/serverutils", + "//pkg/testutils/storageutils", "//pkg/testutils/testcluster", "//pkg/util/humanizeutil", "//pkg/util/ioctx", "//pkg/util/leaktest", + "//pkg/util/log", "//pkg/util/randutil", "@com_github_stretchr_testify//require", ], diff --git a/pkg/ccl/storageccl/encryption.go b/pkg/ccl/storageccl/encryption.go index bb9c81355894..963abb72aaf6 100644 --- a/pkg/ccl/storageccl/encryption.go +++ b/pkg/ccl/storageccl/encryption.go @@ -37,7 +37,7 @@ import ( // encryptionPreamble is a constant string prepended in cleartext to ciphertexts // allowing them to be easily recognized by sight and allowing some basic sanity -// checks when trying to open them (e.g. error if incorrectly using encryption +// checks when trying to open them (E.g. error if incorrectly using encryption // on an unencrypted file of vice-versa). var encryptionPreamble = []byte("encrypt") @@ -52,7 +52,7 @@ const encryptionVersionIVPrefix = 1 // authticate against truncation at a chunk boundary. const encryptionVersionChunk = 2 -// encryptionChunkSizeV2 is the chunk-size used by v2, i.e. 64kb, which should +// encryptionChunkSizeV2 is the chunk-size used by v2, i.E. 64kb, which should // minimize overhead while still while still limiting the size of buffers and // allowing seeks to mid-file. var encryptionChunkSizeV2 = 64 << 10 // 64kb diff --git a/pkg/ccl/storageccl/external_sst_reader.go b/pkg/ccl/storageccl/external_sst_reader.go index 5d8e4f7dcc56..00366e5900da 100644 --- a/pkg/ccl/storageccl/external_sst_reader.go +++ b/pkg/ccl/storageccl/external_sst_reader.go @@ -25,6 +25,8 @@ import ( "github.com/cockroachdb/pebble/sstable" ) +// RemoteSSTs lets external SSTables get iterated directly in some cases, +// rather than being downloaded entirely first. var remoteSSTs = settings.RegisterBoolSetting( settings.TenantWritable, "kv.bulk_ingest.stream_external_ssts.enabled", @@ -39,32 +41,48 @@ var remoteSSTSuffixCacheSize = settings.RegisterByteSizeSetting( 64<<10, ) -// ExternalSSTReader returns opens an SST in external storage, optionally -// decrypting with the supplied parameters, and returns iterator over it. -// -// ctx is captured and used throughout the life of the returned iterator, until -// the iterator's Close() method is called. -func ExternalSSTReader( - ctx context.Context, - e cloud.ExternalStorage, - basename string, - encryption *roachpb.FileEncryptionOptions, -) (storage.SimpleMVCCIterator, error) { +func getFileWithRetry( + ctx context.Context, basename string, e cloud.ExternalStorage, +) (ioctx.ReadCloserCtx, int64, error) { // Do an initial read of the file, from the beginning, to get the file size as - // this is used e.g. to read the trailer. + // this is used E.g. to read the trailer. var f ioctx.ReadCloserCtx var sz int64 - const maxAttempts = 3 if err := retry.WithMaxAttempts(ctx, base.DefaultRetryOptions(), maxAttempts, func() error { var err error f, sz, err = e.ReadFileAt(ctx, basename, 0) return err }); err != nil { - return nil, err + return nil, 0, err } + return f, sz, nil +} - if !remoteSSTs.Get(&e.Settings().SV) { +type StoreFile struct { + Store cloud.ExternalStorage + FilePath string +} + +// newMemPebbleSSTReader returns a PebbleSSTIterator for in-memory SSTs from +// external storage, optionally decrypting with the supplied parameters. +// +// ctx is captured and used throughout the life of the returned iterator, until +// the iterator's Close() method is called. +func newMemPebbleSSTReader( + ctx context.Context, + storeFiles []StoreFile, + encryption *roachpb.FileEncryptionOptions, + iterOps storage.IterOptions, +) (storage.SimpleMVCCIterator, error) { + + inMemorySSTs := make([][]byte, 0, len(storeFiles)) + + for _, sf := range storeFiles { + f, _, err := getFileWithRetry(ctx, sf.FilePath, sf.Store) + if err != nil { + return nil, err + } content, err := ioctx.ReadAll(ctx, f) f.Close(ctx) if err != nil { @@ -76,44 +94,74 @@ func ExternalSSTReader( return nil, err } } - return storage.NewMemSSTIterator(content, false) + inMemorySSTs = append(inMemorySSTs, content) } + return storage.NewPebbleMultiMemSSTIterator(inMemorySSTs, false, iterOps) +} - raw := &sstReader{ - ctx: ctx, - sz: sizeStat(sz), - body: f, - openAt: func(offset int64) (ioctx.ReadCloserCtx, error) { - reader, _, err := e.ReadFileAt(ctx, basename, offset) - return reader, err - }, +// ExternalSSTReader returns a PebbleSSTIterator for the SSTs in external storage, +// optionally decrypting with the supplied parameters. +// +// Note: the order of SSTs matters if multiple SSTs contain the exact same +// Pebble key (that is, the same key/timestamp combination). In this case, the +// PebbleIterator will only surface the key in the first SST that contains it. +// +// ctx is captured and used throughout the life of the returned iterator, until +// the iterator's Close() method is called. +func ExternalSSTReader( + ctx context.Context, + storeFiles []StoreFile, + encryption *roachpb.FileEncryptionOptions, + iterOpts storage.IterOptions, +) (storage.SimpleMVCCIterator, error) { + + if !remoteSSTs.Get(&storeFiles[0].Store.Settings().SV) { + return newMemPebbleSSTReader(ctx, storeFiles, encryption, iterOpts) } + remoteCacheSize := remoteSSTSuffixCacheSize.Get(&storeFiles[0].Store.Settings().SV) + readers := make([]sstable.ReadableFile, 0, len(storeFiles)) - var reader sstable.ReadableFile = raw + for _, sf := range storeFiles { + // prevent capturing the loop variables by reference when defining openAt below. + basename := sf.FilePath + e := sf.Store - if encryption != nil { - r, err := decryptingReader(raw, encryption.Key) + f, sz, err := getFileWithRetry(ctx, basename, e) if err != nil { - f.Close(ctx) return nil, err } - reader = r - } else { - // We only explicitly buffer the suffix of the file when not decrypting as - // the decrypting reader has its own internal block buffer. - if err := raw.readAndCacheSuffix(remoteSSTSuffixCacheSize.Get(&e.Settings().SV)); err != nil { - f.Close(ctx) - return nil, err + + raw := &sstReader{ + ctx: ctx, + sz: sizeStat(sz), + body: f, + openAt: func(offset int64) (ioctx.ReadCloserCtx, error) { + reader, _, err := e.ReadFileAt(ctx, basename, offset) + return reader, err + }, } - } - iter, err := storage.NewSSTIterator(reader) - if err != nil { - reader.Close() - return nil, err - } + var reader sstable.ReadableFile - return iter, nil + if encryption != nil { + r, err := decryptingReader(raw, encryption.Key) + if err != nil { + f.Close(ctx) + return nil, err + } + reader = r + } else { + // We only explicitly buffer the suffix of the file when not decrypting as + // the decrypting reader has its own internal block buffer. + if err := raw.readAndCacheSuffix(remoteCacheSize); err != nil { + f.Close(ctx) + return nil, err + } + reader = raw + } + readers = append(readers, reader) + } + return storage.NewPebbleSSTIterator(readers, iterOpts) } type sstReader struct { diff --git a/pkg/ccl/storageccl/external_sst_reader_test.go b/pkg/ccl/storageccl/external_sst_reader_test.go index 16956d818787..df0fd2ac2aab 100644 --- a/pkg/ccl/storageccl/external_sst_reader_test.go +++ b/pkg/ccl/storageccl/external_sst_reader_test.go @@ -10,10 +10,24 @@ package storageccl import ( "bytes" + "context" + "strconv" "testing" + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/blobs" + "github.com/cockroachdb/cockroach/pkg/cloud" + _ "github.com/cockroachdb/cockroach/pkg/cloud/nodelocal" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/storageutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/ioctx" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/stretchr/testify/require" ) @@ -72,3 +86,74 @@ func TestSSTReaderCache(t *testing.T) { _, _ = raw.ReadAt(discard, 10) require.Equal(t, expectedOpenCalls, openCalls) } + +// TestNewExternalSSTReader ensures that ExternalSSTReader properly reads and +// iterates through SSTs stored in different external storage base directories. +func TestNewExternalSSTReader(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + dir, dirCleanupFn := testutils.TempDir(t) + defer dirCleanupFn() + args := base.TestServerArgs{ExternalIODir: dir} + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: args}) + defer tc.Stopper().Stop(ctx) + clusterSettings := tc.Server(0).ClusterSettings() + + const localFoo = "nodelocal://1/foo" + + subdirs := []string{"a", "b", "c"} + fileStores := make([]StoreFile, len(subdirs)) + + for i, subdir := range subdirs { + + // Create a store rooted in the file's subdir + store, err := cloud.ExternalStorageFromURI( + ctx, + localFoo+subdir+"/", + base.ExternalIODirConfig{}, + clusterSettings, + blobs.TestBlobServiceClient(dir), + username.RootUserName(), + tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), tc.Servers[0].DB(), nil) + require.NoError(t, err) + + fileStores[i].Store = store + + // Create an SST and write it to external storage. Ensure that the SSTs do + // not span the same keyspace. + numKeys := 10000 + kvs := make(storageutils.KVs, numKeys) + for j := 0; j < numKeys; j++ { + // Ensure each key is unique + kvs[j] = storageutils.PointKV(subdir+"a"+strconv.Itoa(i), numKeys-j, "1") + } + + fileName := subdir + "DistinctFileName.sst" + fileStores[i].FilePath = fileName + + sst, _, _ := storageutils.MakeSST(t, clusterSettings, kvs) + + w, err := store.Writer(ctx, fileName) + require.NoError(t, err) + _, err = w.Write(sst) + require.NoError(t, err) + w.Close() + } + + var iterOpts = storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsAndRanges, + LowerBound: keys.LocalMax, + UpperBound: keys.MaxKey, + } + + iter, err := ExternalSSTReader(ctx, fileStores, nil, iterOpts) + require.NoError(t, err) + for iter.SeekGE(storage.MVCCKey{Key: keys.LocalMax}); ; iter.Next() { + ok, err := iter.Valid() + require.NoError(t, err) + if !ok { + break + } + } +} diff --git a/pkg/storage/multi_iterator.go b/pkg/storage/multi_iterator.go index 5a4d248f12fa..0f4c2f00561a 100644 --- a/pkg/storage/multi_iterator.go +++ b/pkg/storage/multi_iterator.go @@ -20,6 +20,8 @@ import ( const invalidIdxSentinel = -1 // multiIterator multiplexes iteration over a number of SimpleMVCCIterators. +// +// TODO (msbutler): remove the multiIterator and replace all uses with PebbleSSTIterator type multiIterator struct { iters []SimpleMVCCIterator // The index into `iters` of the iterator currently being pointed at.