diff --git a/pkg/ccl/backupccl/backupinfo/backup_metadata.go b/pkg/ccl/backupccl/backupinfo/backup_metadata.go index ac074f8e023c..3ae75d27d93e 100644 --- a/pkg/ccl/backupccl/backupinfo/backup_metadata.go +++ b/pkg/ccl/backupccl/backupinfo/backup_metadata.go @@ -49,6 +49,12 @@ const ( sstTenantsPrefix = "tenant/" ) +var iterOpts = storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsAndRanges, + LowerBound: keys.LocalMax, + UpperBound: keys.MaxKey, +} + // WriteBackupMetadataSST is responsible for constructing and writing the // `metadata.sst` to dest. This file contains the metadata corresponding to this // backup. @@ -618,7 +624,7 @@ func debugDumpFileSST( } encOpts = &roachpb.FileEncryptionOptions{Key: key} } - iter, err := storageccl.ExternalSSTReader(ctx, store, fileInfoPath, encOpts) + iter, err := storageccl.ExternalSSTReader(ctx, []storageccl.StoreFile{{Store: store, FilePath: fileInfoPath}}, encOpts, iterOpts) if err != nil { return err } @@ -664,8 +670,7 @@ func DebugDumpMetadataSST( } encOpts = &roachpb.FileEncryptionOptions{Key: key} } - - iter, err := storageccl.ExternalSSTReader(ctx, store, path, encOpts) + iter, err := storageccl.ExternalSSTReader(ctx, []storageccl.StoreFile{{Store: store, FilePath: path}}, encOpts, iterOpts) if err != nil { return err } @@ -804,8 +809,7 @@ func NewBackupMetadata( } encOpts = &roachpb.FileEncryptionOptions{Key: key} } - - iter, err := storageccl.ExternalSSTReader(ctx, exportStore, sstFileName, encOpts) + iter, err := storageccl.ExternalSSTReader(ctx, []storageccl.StoreFile{{Store: exportStore, FilePath: sstFileName}}, encOpts, iterOpts) if err != nil { return nil, err } @@ -921,8 +925,8 @@ func (b *BackupMetadata) FileIter(ctx context.Context) FileIterator { if err != nil { break } - - iter, err := storageccl.ExternalSSTReader(ctx, b.store, path, encOpts) + iter, err := storageccl.ExternalSSTReader(ctx, []storageccl.StoreFile{{Store: b.store, + FilePath: path}}, encOpts, iterOpts) if err != nil { return FileIterator{err: err} } @@ -1232,7 +1236,8 @@ func makeBytesIter( encOpts = &roachpb.FileEncryptionOptions{Key: key} } - iter, err := storageccl.ExternalSSTReader(ctx, store, path, encOpts) + iter, err := storageccl.ExternalSSTReader(ctx, []storageccl.StoreFile{{Store: store, + FilePath: path}}, encOpts, iterOpts) if err != nil { return bytesIter{iterError: err} } diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 50d69fca0ac2..c7d902e86b5c 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -277,18 +277,13 @@ func (rd *restoreDataProcessor) openSSTs( ) error { ctxDone := ctx.Done() - // The sstables only contain MVCC data and no intents, so using an MVCC - // iterator is sufficient. - var iters []storage.SimpleMVCCIterator + // TODO(msbutler): use a a map of external storage factories to avoid reopening the same dir + // in a given restore span entry var dirs []cloud.ExternalStorage // If we bail early and haven't handed off responsibility of the dirs/iters to // the channel, close anything that we had open. defer func() { - for _, iter := range iters { - iter.Close() - } - for _, dir := range dirs { if err := dir.Close(); err != nil { log.Warningf(ctx, "close export storage failed %v", err) @@ -296,18 +291,13 @@ func (rd *restoreDataProcessor) openSSTs( } }() - // sendIters sends all of the currently accumulated iterators over the + // sendIter sends a multiplexed iterator covering the currently accumulated files over the // channel. - sendIters := func(itersToSend []storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error { - multiIter := storage.MakeMultiIterator(itersToSend) - readAsOfIter := storage.NewReadAsOfIterator(multiIter, rd.spec.RestoreTime) + sendIter := func(iter storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error { + readAsOfIter := storage.NewReadAsOfIterator(iter, rd.spec.RestoreTime) cleanup := func() { readAsOfIter.Close() - multiIter.Close() - for _, iter := range itersToSend { - iter.Close() - } for _, dir := range dirsToSend { if err := dir.Close(); err != nil { @@ -328,13 +318,13 @@ func (rd *restoreDataProcessor) openSSTs( return ctx.Err() } - iters = make([]storage.SimpleMVCCIterator, 0) dirs = make([]cloud.ExternalStorage, 0) return nil } log.VEventf(ctx, 1 /* level */, "ingesting span [%s-%s)", entry.Span.Key, entry.Span.EndKey) + storeFiles := make([]storageccl.StoreFile, 0, len(EntryFiles{})) for _, file := range entry.Files { log.VEventf(ctx, 2, "import file %s which starts at %s", file.Path, entry.Span.Key) @@ -343,17 +333,21 @@ func (rd *restoreDataProcessor) openSSTs( return err } dirs = append(dirs, dir) - + storeFiles = append(storeFiles, storageccl.StoreFile{Store: dir, FilePath: file.Path}) // TODO(pbardea): When memory monitoring is added, send the currently // accumulated iterators on the channel if we run into memory pressure. - iter, err := storageccl.ExternalSSTReader(ctx, dir, file.Path, rd.spec.Encryption) - if err != nil { - return err - } - iters = append(iters, iter) } - - return sendIters(iters, dirs) + iterOpts := storage.IterOptions{ + RangeKeyMaskingBelow: rd.spec.RestoreTime, + KeyTypes: storage.IterKeyTypePointsAndRanges, + LowerBound: keys.LocalMax, + UpperBound: keys.MaxKey, + } + iter, err := storageccl.ExternalSSTReader(ctx, storeFiles, rd.spec.Encryption, iterOpts) + if err != nil { + return err + } + return sendIter(iter, dirs) } func (rd *restoreDataProcessor) runRestoreWorkers(ctx context.Context, ssts chan mergedSST) error { diff --git a/pkg/ccl/cliccl/debug_backup.go b/pkg/ccl/cliccl/debug_backup.go index 42459e3a5c32..a325cef37a8c 100644 --- a/pkg/ccl/cliccl/debug_backup.go +++ b/pkg/ccl/cliccl/debug_backup.go @@ -591,7 +591,14 @@ func makeIters( return nil, nil, errors.Wrapf(err, "making external storage") } - iters[i], err = storageccl.ExternalSSTReader(ctx, dirStorage[i], file.Path, nil) + var iterOpts = storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsAndRanges, + LowerBound: keys.LocalMax, + UpperBound: keys.MaxKey, + } + + iters[i], err = storageccl.ExternalSSTReader(ctx, []storageccl.StoreFile{{Store: dirStorage[i], FilePath: file.Path}}, nil, iterOpts) + if err != nil { return nil, nil, errors.Wrapf(err, "fetching sst reader") } diff --git a/pkg/ccl/storageccl/BUILD.bazel b/pkg/ccl/storageccl/BUILD.bazel index 729bebee11d7..38623ab96b43 100644 --- a/pkg/ccl/storageccl/BUILD.bazel +++ b/pkg/ccl/storageccl/BUILD.bazel @@ -35,14 +35,26 @@ go_test( ], embed = [":storageccl"], deps = [ + "//pkg/base", + "//pkg/blobs", + "//pkg/cloud", + "//pkg/cloud/nodelocal", + "//pkg/keys", "//pkg/security/securityassets", "//pkg/security/securitytest", + "//pkg/security/username", "//pkg/server", + "//pkg/sql", + "//pkg/storage", + "//pkg/testutils", "//pkg/testutils/serverutils", + "//pkg/testutils/storageutils", "//pkg/testutils/testcluster", + "//pkg/util/encoding", "//pkg/util/humanizeutil", "//pkg/util/ioctx", "//pkg/util/leaktest", + "//pkg/util/log", "//pkg/util/randutil", "@com_github_stretchr_testify//require", ], diff --git a/pkg/ccl/storageccl/encryption.go b/pkg/ccl/storageccl/encryption.go index bb9c81355894..963abb72aaf6 100644 --- a/pkg/ccl/storageccl/encryption.go +++ b/pkg/ccl/storageccl/encryption.go @@ -37,7 +37,7 @@ import ( // encryptionPreamble is a constant string prepended in cleartext to ciphertexts // allowing them to be easily recognized by sight and allowing some basic sanity -// checks when trying to open them (e.g. error if incorrectly using encryption +// checks when trying to open them (E.g. error if incorrectly using encryption // on an unencrypted file of vice-versa). var encryptionPreamble = []byte("encrypt") @@ -52,7 +52,7 @@ const encryptionVersionIVPrefix = 1 // authticate against truncation at a chunk boundary. const encryptionVersionChunk = 2 -// encryptionChunkSizeV2 is the chunk-size used by v2, i.e. 64kb, which should +// encryptionChunkSizeV2 is the chunk-size used by v2, i.E. 64kb, which should // minimize overhead while still while still limiting the size of buffers and // allowing seeks to mid-file. var encryptionChunkSizeV2 = 64 << 10 // 64kb diff --git a/pkg/ccl/storageccl/external_sst_reader.go b/pkg/ccl/storageccl/external_sst_reader.go index 5d8e4f7dcc56..badc1ebac5f8 100644 --- a/pkg/ccl/storageccl/external_sst_reader.go +++ b/pkg/ccl/storageccl/external_sst_reader.go @@ -25,6 +25,8 @@ import ( "github.com/cockroachdb/pebble/sstable" ) +// RemoteSSTs lets external SSTables get iterated directly in some cases, +// rather than being downloaded entirely first. var remoteSSTs = settings.RegisterBoolSetting( settings.TenantWritable, "kv.bulk_ingest.stream_external_ssts.enabled", @@ -39,32 +41,49 @@ var remoteSSTSuffixCacheSize = settings.RegisterByteSizeSetting( 64<<10, ) -// ExternalSSTReader returns opens an SST in external storage, optionally -// decrypting with the supplied parameters, and returns iterator over it. -// -// ctx is captured and used throughout the life of the returned iterator, until -// the iterator's Close() method is called. -func ExternalSSTReader( - ctx context.Context, - e cloud.ExternalStorage, - basename string, - encryption *roachpb.FileEncryptionOptions, -) (storage.SimpleMVCCIterator, error) { +func getFileWithRetry( + ctx context.Context, basename string, e cloud.ExternalStorage, +) (ioctx.ReadCloserCtx, int64, error) { // Do an initial read of the file, from the beginning, to get the file size as - // this is used e.g. to read the trailer. + // this is used E.g. to read the trailer. var f ioctx.ReadCloserCtx var sz int64 - const maxAttempts = 3 if err := retry.WithMaxAttempts(ctx, base.DefaultRetryOptions(), maxAttempts, func() error { var err error f, sz, err = e.ReadFileAt(ctx, basename, 0) return err }); err != nil { - return nil, err + return nil, 0, err } + return f, sz, nil +} - if !remoteSSTs.Get(&e.Settings().SV) { +// StoreFile groups a file with its corresponding external storage handler. +type StoreFile struct { + Store cloud.ExternalStorage + FilePath string +} + +// newMemPebbleSSTReader returns a PebbleSSTIterator for in-memory SSTs from +// external storage, optionally decrypting with the supplied parameters. +// +// ctx is captured and used throughout the life of the returned iterator, until +// the iterator's Close() method is called. +func newMemPebbleSSTReader( + ctx context.Context, + storeFiles []StoreFile, + encryption *roachpb.FileEncryptionOptions, + iterOps storage.IterOptions, +) (storage.SimpleMVCCIterator, error) { + + inMemorySSTs := make([][]byte, 0, len(storeFiles)) + + for _, sf := range storeFiles { + f, _, err := getFileWithRetry(ctx, sf.FilePath, sf.Store) + if err != nil { + return nil, err + } content, err := ioctx.ReadAll(ctx, f) f.Close(ctx) if err != nil { @@ -76,44 +95,74 @@ func ExternalSSTReader( return nil, err } } - return storage.NewMemSSTIterator(content, false) + inMemorySSTs = append(inMemorySSTs, content) } + return storage.NewPebbleMultiMemSSTIterator(inMemorySSTs, false, iterOps) +} - raw := &sstReader{ - ctx: ctx, - sz: sizeStat(sz), - body: f, - openAt: func(offset int64) (ioctx.ReadCloserCtx, error) { - reader, _, err := e.ReadFileAt(ctx, basename, offset) - return reader, err - }, +// ExternalSSTReader returns a PebbleSSTIterator for the SSTs in external storage, +// optionally decrypting with the supplied parameters. +// +// Note: the order of SSTs matters if multiple SSTs contain the exact same +// Pebble key (that is, the same key/timestamp combination). In this case, the +// PebbleIterator will only surface the key in the first SST that contains it. +// +// ctx is captured and used throughout the life of the returned iterator, until +// the iterator's Close() method is called. +func ExternalSSTReader( + ctx context.Context, + storeFiles []StoreFile, + encryption *roachpb.FileEncryptionOptions, + iterOpts storage.IterOptions, +) (storage.SimpleMVCCIterator, error) { + + if !remoteSSTs.Get(&storeFiles[0].Store.Settings().SV) { + return newMemPebbleSSTReader(ctx, storeFiles, encryption, iterOpts) } + remoteCacheSize := remoteSSTSuffixCacheSize.Get(&storeFiles[0].Store.Settings().SV) + readers := make([]sstable.ReadableFile, 0, len(storeFiles)) - var reader sstable.ReadableFile = raw + for _, sf := range storeFiles { + // prevent capturing the loop variables by reference when defining openAt below. + filePath := sf.FilePath + store := sf.Store - if encryption != nil { - r, err := decryptingReader(raw, encryption.Key) + f, sz, err := getFileWithRetry(ctx, filePath, store) if err != nil { - f.Close(ctx) return nil, err } - reader = r - } else { - // We only explicitly buffer the suffix of the file when not decrypting as - // the decrypting reader has its own internal block buffer. - if err := raw.readAndCacheSuffix(remoteSSTSuffixCacheSize.Get(&e.Settings().SV)); err != nil { - f.Close(ctx) - return nil, err + + raw := &sstReader{ + ctx: ctx, + sz: sizeStat(sz), + body: f, + openAt: func(offset int64) (ioctx.ReadCloserCtx, error) { + reader, _, err := store.ReadFileAt(ctx, filePath, offset) + return reader, err + }, } - } - iter, err := storage.NewSSTIterator(reader) - if err != nil { - reader.Close() - return nil, err - } + var reader sstable.ReadableFile - return iter, nil + if encryption != nil { + r, err := decryptingReader(raw, encryption.Key) + if err != nil { + f.Close(ctx) + return nil, err + } + reader = r + } else { + // We only explicitly buffer the suffix of the file when not decrypting as + // the decrypting reader has its own internal block buffer. + if err := raw.readAndCacheSuffix(remoteCacheSize); err != nil { + f.Close(ctx) + return nil, err + } + reader = raw + } + readers = append(readers, reader) + } + return storage.NewPebbleSSTIterator(readers, iterOpts) } type sstReader struct { diff --git a/pkg/ccl/storageccl/external_sst_reader_test.go b/pkg/ccl/storageccl/external_sst_reader_test.go index 16956d818787..80210375f1cd 100644 --- a/pkg/ccl/storageccl/external_sst_reader_test.go +++ b/pkg/ccl/storageccl/external_sst_reader_test.go @@ -10,10 +10,24 @@ package storageccl import ( "bytes" + "context" "testing" + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/blobs" + "github.com/cockroachdb/cockroach/pkg/cloud" + _ "github.com/cockroachdb/cockroach/pkg/cloud/nodelocal" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/storageutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/ioctx" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/stretchr/testify/require" ) @@ -72,3 +86,85 @@ func TestSSTReaderCache(t *testing.T) { _, _ = raw.ReadAt(discard, 10) require.Equal(t, expectedOpenCalls, openCalls) } + +// TestNewExternalSSTReader ensures that ExternalSSTReader properly reads and +// iterates through semi-overlapping SSTs stored in different external storage +// base directories. The SSTs created have the following spans: +// +// t3 a500--------------------a10000 +// +// t2 a50--------------a1000 +// +// t1 a0----a100 +// +func TestNewExternalSSTReader(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + tempDir, dirCleanupFn := testutils.TempDir(t) + defer dirCleanupFn() + args := base.TestServerArgs{ExternalIODir: tempDir} + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: args}) + defer tc.Stopper().Stop(ctx) + clusterSettings := tc.Server(0).ClusterSettings() + + const localFoo = "nodelocal://1/foo" + + subdirs := []string{"a", "b", "c"} + fileStores := make([]StoreFile, len(subdirs)) + sstSize := []int{100, 1000, 1000} + for i, subdir := range subdirs { + + // Create a store rooted in the file's subdir + store, err := cloud.ExternalStorageFromURI( + ctx, + localFoo+subdir+"/", + base.ExternalIODirConfig{}, + clusterSettings, + blobs.TestBlobServiceClient(tempDir), + username.RootUserName(), + tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), tc.Servers[0].DB(), nil) + require.NoError(t, err) + fileStores[i].Store = store + + // Create the sst at timestamp i+1, and overlap it with the previous SST + ts := i + 1 + startKey := 0 + if i > 0 { + startKey = sstSize[i-1] / 2 + } + kvs := make(storageutils.KVs, 0, sstSize[i]) + + for j := startKey; j < sstSize[i]; j++ { + suffix := string(encoding.EncodeVarintAscending([]byte{}, int64(j))) + kvs = append(kvs, storageutils.PointKV("a"+suffix, ts, "1")) + } + + fileName := subdir + "DistinctFileName.sst" + fileStores[i].FilePath = fileName + + sst, _, _ := storageutils.MakeSST(t, clusterSettings, kvs) + + w, err := store.Writer(ctx, fileName) + require.NoError(t, err) + _, err = w.Write(sst) + require.NoError(t, err) + w.Close() + } + + var iterOpts = storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsAndRanges, + LowerBound: keys.LocalMax, + UpperBound: keys.MaxKey, + } + + iter, err := ExternalSSTReader(ctx, fileStores, nil, iterOpts) + require.NoError(t, err) + for iter.SeekGE(storage.MVCCKey{Key: keys.LocalMax}); ; iter.Next() { + ok, err := iter.Valid() + require.NoError(t, err) + if !ok { + break + } + } +} diff --git a/pkg/storage/multi_iterator.go b/pkg/storage/multi_iterator.go index 5a4d248f12fa..0f4c2f00561a 100644 --- a/pkg/storage/multi_iterator.go +++ b/pkg/storage/multi_iterator.go @@ -20,6 +20,8 @@ import ( const invalidIdxSentinel = -1 // multiIterator multiplexes iteration over a number of SimpleMVCCIterators. +// +// TODO (msbutler): remove the multiIterator and replace all uses with PebbleSSTIterator type multiIterator struct { iters []SimpleMVCCIterator // The index into `iters` of the iterator currently being pointed at.