Skip to content

Commit

Permalink
storageccl: use the new PebbleIterator in ExternalSSTReader
Browse files Browse the repository at this point in the history
This PR refactors all call sites of ExternalSSTReader(), to support using the
new PebbleIterator, which has baked in range key support. Most notably, this
PR replaces the multiIterator used in the restore data processor with the
PebbleSSTIterator.

This patch is apart of a larger effort to teach backup and restore about MVCC
bulk operations. Next, the readAsOfIterator will need to learn how to
deal with range keys.

Informs #71155

This PR addresses a bug created in #83984: loop variables in
ExternalSSTReader were captured by reference, leading to roachtest failures
(#84240, #84162).

Informs #71155i

Fixes: #84240, #84162, #84181

Release note: none
  • Loading branch information
msbutler committed Jul 19, 2022
1 parent 6e9f5a9 commit 9b2bb35
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 77 deletions.
21 changes: 13 additions & 8 deletions pkg/ccl/backupccl/backupinfo/backup_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ const (
sstTenantsPrefix = "tenant/"
)

var iterOpts = storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
LowerBound: keys.LocalMax,
UpperBound: keys.MaxKey,
}

// WriteBackupMetadataSST is responsible for constructing and writing the
// `metadata.sst` to dest. This file contains the metadata corresponding to this
// backup.
Expand Down Expand Up @@ -618,7 +624,7 @@ func debugDumpFileSST(
}
encOpts = &roachpb.FileEncryptionOptions{Key: key}
}
iter, err := storageccl.ExternalSSTReader(ctx, store, fileInfoPath, encOpts)
iter, err := storageccl.ExternalSSTReader(ctx, []storageccl.StoreFile{{Store: store, FilePath: fileInfoPath}}, encOpts, iterOpts)
if err != nil {
return err
}
Expand Down Expand Up @@ -664,8 +670,7 @@ func DebugDumpMetadataSST(
}
encOpts = &roachpb.FileEncryptionOptions{Key: key}
}

iter, err := storageccl.ExternalSSTReader(ctx, store, path, encOpts)
iter, err := storageccl.ExternalSSTReader(ctx, []storageccl.StoreFile{{Store: store, FilePath: path}}, encOpts, iterOpts)
if err != nil {
return err
}
Expand Down Expand Up @@ -804,8 +809,7 @@ func NewBackupMetadata(
}
encOpts = &roachpb.FileEncryptionOptions{Key: key}
}

iter, err := storageccl.ExternalSSTReader(ctx, exportStore, sstFileName, encOpts)
iter, err := storageccl.ExternalSSTReader(ctx, []storageccl.StoreFile{{Store: exportStore, FilePath: sstFileName}}, encOpts, iterOpts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -921,8 +925,8 @@ func (b *BackupMetadata) FileIter(ctx context.Context) FileIterator {
if err != nil {
break
}

iter, err := storageccl.ExternalSSTReader(ctx, b.store, path, encOpts)
iter, err := storageccl.ExternalSSTReader(ctx, []storageccl.StoreFile{{Store: b.store,
FilePath: path}}, encOpts, iterOpts)
if err != nil {
return FileIterator{err: err}
}
Expand Down Expand Up @@ -1232,7 +1236,8 @@ func makeBytesIter(
encOpts = &roachpb.FileEncryptionOptions{Key: key}
}

iter, err := storageccl.ExternalSSTReader(ctx, store, path, encOpts)
iter, err := storageccl.ExternalSSTReader(ctx, []storageccl.StoreFile{{Store: store,
FilePath: path}}, encOpts, iterOpts)
if err != nil {
return bytesIter{iterError: err}
}
Expand Down
42 changes: 18 additions & 24 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,37 +277,27 @@ func (rd *restoreDataProcessor) openSSTs(
) error {
ctxDone := ctx.Done()

// The sstables only contain MVCC data and no intents, so using an MVCC
// iterator is sufficient.
var iters []storage.SimpleMVCCIterator
// TODO(msbutler): use a a map of external storage factories to avoid reopening the same dir
// in a given restore span entry
var dirs []cloud.ExternalStorage

// If we bail early and haven't handed off responsibility of the dirs/iters to
// the channel, close anything that we had open.
defer func() {
for _, iter := range iters {
iter.Close()
}

for _, dir := range dirs {
if err := dir.Close(); err != nil {
log.Warningf(ctx, "close export storage failed %v", err)
}
}
}()

// sendIters sends all of the currently accumulated iterators over the
// sendIter sends a multiplexed iterator covering the currently accumulated files over the
// channel.
sendIters := func(itersToSend []storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error {
multiIter := storage.MakeMultiIterator(itersToSend)
readAsOfIter := storage.NewReadAsOfIterator(multiIter, rd.spec.RestoreTime)
sendIter := func(iter storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error {
readAsOfIter := storage.NewReadAsOfIterator(iter, rd.spec.RestoreTime)

cleanup := func() {
readAsOfIter.Close()
multiIter.Close()
for _, iter := range itersToSend {
iter.Close()
}

for _, dir := range dirsToSend {
if err := dir.Close(); err != nil {
Expand All @@ -328,13 +318,13 @@ func (rd *restoreDataProcessor) openSSTs(
return ctx.Err()
}

iters = make([]storage.SimpleMVCCIterator, 0)
dirs = make([]cloud.ExternalStorage, 0)
return nil
}

log.VEventf(ctx, 1 /* level */, "ingesting span [%s-%s)", entry.Span.Key, entry.Span.EndKey)

storeFiles := make([]storageccl.StoreFile, 0, len(EntryFiles{}))
for _, file := range entry.Files {
log.VEventf(ctx, 2, "import file %s which starts at %s", file.Path, entry.Span.Key)

Expand All @@ -343,17 +333,21 @@ func (rd *restoreDataProcessor) openSSTs(
return err
}
dirs = append(dirs, dir)

storeFiles = append(storeFiles, storageccl.StoreFile{Store:dir, FilePath:file.Path})
// TODO(pbardea): When memory monitoring is added, send the currently
// accumulated iterators on the channel if we run into memory pressure.
iter, err := storageccl.ExternalSSTReader(ctx, dir, file.Path, rd.spec.Encryption)
if err != nil {
return err
}
iters = append(iters, iter)
}

return sendIters(iters, dirs)
iterOpts := storage.IterOptions{
RangeKeyMaskingBelow: rd.spec.RestoreTime,
KeyTypes: storage.IterKeyTypePointsAndRanges,
LowerBound: keys.LocalMax,
UpperBound: keys.MaxKey,
}
iter, err := storageccl.ExternalSSTReader(ctx, storeFiles, rd.spec.Encryption, iterOpts)
if err != nil {
return err
}
return sendIter(iter, dirs)
}

func (rd *restoreDataProcessor) runRestoreWorkers(ctx context.Context, ssts chan mergedSST) error {
Expand Down
9 changes: 8 additions & 1 deletion pkg/ccl/cliccl/debug_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,14 @@ func makeIters(
return nil, nil, errors.Wrapf(err, "making external storage")
}

iters[i], err = storageccl.ExternalSSTReader(ctx, dirStorage[i], file.Path, nil)
var iterOpts = storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
LowerBound: keys.LocalMax,
UpperBound: keys.MaxKey,
}

iters[i], err = storageccl.ExternalSSTReader(ctx, []storageccl.StoreFile{{Store:dirStorage[i], FilePath:file.Path}}, nil,iterOpts)

if err != nil {
return nil, nil, errors.Wrapf(err, "fetching sst reader")
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/ccl/storageccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,25 @@ go_test(
],
embed = [":storageccl"],
deps = [
"//pkg/base",
"//pkg/blobs",
"//pkg/cloud",
"//pkg/cloud/nodelocal",
"//pkg/keys",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/security/username",
"//pkg/server",
"//pkg/sql",
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/storageutils",
"//pkg/testutils/testcluster",
"//pkg/util/humanizeutil",
"//pkg/util/ioctx",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"@com_github_stretchr_testify//require",
],
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/storageccl/encryption.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (

// encryptionPreamble is a constant string prepended in cleartext to ciphertexts
// allowing them to be easily recognized by sight and allowing some basic sanity
// checks when trying to open them (e.g. error if incorrectly using encryption
// checks when trying to open them (E.g. error if incorrectly using encryption
// on an unencrypted file of vice-versa).
var encryptionPreamble = []byte("encrypt")

Expand All @@ -52,7 +52,7 @@ const encryptionVersionIVPrefix = 1
// authticate against truncation at a chunk boundary.
const encryptionVersionChunk = 2

// encryptionChunkSizeV2 is the chunk-size used by v2, i.e. 64kb, which should
// encryptionChunkSizeV2 is the chunk-size used by v2, i.E. 64kb, which should
// minimize overhead while still while still limiting the size of buffers and
// allowing seeks to mid-file.
var encryptionChunkSizeV2 = 64 << 10 // 64kb
Expand Down
132 changes: 90 additions & 42 deletions pkg/ccl/storageccl/external_sst_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/cockroachdb/pebble/sstable"
)

// RemoteSSTs lets external SSTables get iterated directly in some cases,
// rather than being downloaded entirely first.
var remoteSSTs = settings.RegisterBoolSetting(
settings.TenantWritable,
"kv.bulk_ingest.stream_external_ssts.enabled",
Expand All @@ -39,32 +41,48 @@ var remoteSSTSuffixCacheSize = settings.RegisterByteSizeSetting(
64<<10,
)

// ExternalSSTReader returns opens an SST in external storage, optionally
// decrypting with the supplied parameters, and returns iterator over it.
//
// ctx is captured and used throughout the life of the returned iterator, until
// the iterator's Close() method is called.
func ExternalSSTReader(
ctx context.Context,
e cloud.ExternalStorage,
basename string,
encryption *roachpb.FileEncryptionOptions,
) (storage.SimpleMVCCIterator, error) {
func getFileWithRetry(
ctx context.Context, basename string, e cloud.ExternalStorage,
) (ioctx.ReadCloserCtx, int64, error) {
// Do an initial read of the file, from the beginning, to get the file size as
// this is used e.g. to read the trailer.
// this is used E.g. to read the trailer.
var f ioctx.ReadCloserCtx
var sz int64

const maxAttempts = 3
if err := retry.WithMaxAttempts(ctx, base.DefaultRetryOptions(), maxAttempts, func() error {
var err error
f, sz, err = e.ReadFileAt(ctx, basename, 0)
return err
}); err != nil {
return nil, err
return nil, 0, err
}
return f, sz, nil
}

if !remoteSSTs.Get(&e.Settings().SV) {
type StoreFile struct {
Store cloud.ExternalStorage
FilePath string
}

// newMemPebbleSSTReader returns a PebbleSSTIterator for in-memory SSTs from
// external storage, optionally decrypting with the supplied parameters.
//
// ctx is captured and used throughout the life of the returned iterator, until
// the iterator's Close() method is called.
func newMemPebbleSSTReader(
ctx context.Context,
storeFiles []StoreFile,
encryption *roachpb.FileEncryptionOptions,
iterOps storage.IterOptions,
) (storage.SimpleMVCCIterator, error) {

inMemorySSTs := make([][]byte, 0, len(storeFiles))

for _, sf := range storeFiles {
f, _, err := getFileWithRetry(ctx, sf.FilePath, sf.Store)
if err != nil {
return nil, err
}
content, err := ioctx.ReadAll(ctx, f)
f.Close(ctx)
if err != nil {
Expand All @@ -76,44 +94,74 @@ func ExternalSSTReader(
return nil, err
}
}
return storage.NewMemSSTIterator(content, false)
inMemorySSTs = append(inMemorySSTs, content)
}
return storage.NewPebbleMultiMemSSTIterator(inMemorySSTs, false, iterOps)
}

raw := &sstReader{
ctx: ctx,
sz: sizeStat(sz),
body: f,
openAt: func(offset int64) (ioctx.ReadCloserCtx, error) {
reader, _, err := e.ReadFileAt(ctx, basename, offset)
return reader, err
},
// ExternalSSTReader returns a PebbleSSTIterator for the SSTs in external storage,
// optionally decrypting with the supplied parameters.
//
// Note: the order of SSTs matters if multiple SSTs contain the exact same
// Pebble key (that is, the same key/timestamp combination). In this case, the
// PebbleIterator will only surface the key in the first SST that contains it.
//
// ctx is captured and used throughout the life of the returned iterator, until
// the iterator's Close() method is called.
func ExternalSSTReader(
ctx context.Context,
storeFiles []StoreFile,
encryption *roachpb.FileEncryptionOptions,
iterOpts storage.IterOptions,
) (storage.SimpleMVCCIterator, error) {

if !remoteSSTs.Get(&storeFiles[0].Store.Settings().SV) {
return newMemPebbleSSTReader(ctx, storeFiles, encryption, iterOpts)
}
remoteCacheSize := remoteSSTSuffixCacheSize.Get(&storeFiles[0].Store.Settings().SV)
readers := make([]sstable.ReadableFile, 0, len(storeFiles))

var reader sstable.ReadableFile = raw
for _, sf := range storeFiles {
// prevent capturing the loop variables by reference when defining openAt below.
basename := sf.FilePath
e := sf.Store

if encryption != nil {
r, err := decryptingReader(raw, encryption.Key)
f, sz, err := getFileWithRetry(ctx, basename, e)
if err != nil {
f.Close(ctx)
return nil, err
}
reader = r
} else {
// We only explicitly buffer the suffix of the file when not decrypting as
// the decrypting reader has its own internal block buffer.
if err := raw.readAndCacheSuffix(remoteSSTSuffixCacheSize.Get(&e.Settings().SV)); err != nil {
f.Close(ctx)
return nil, err

raw := &sstReader{
ctx: ctx,
sz: sizeStat(sz),
body: f,
openAt: func(offset int64) (ioctx.ReadCloserCtx, error) {
reader, _, err := e.ReadFileAt(ctx, basename, offset)
return reader, err
},
}
}

iter, err := storage.NewSSTIterator(reader)
if err != nil {
reader.Close()
return nil, err
}
var reader sstable.ReadableFile

return iter, nil
if encryption != nil {
r, err := decryptingReader(raw, encryption.Key)
if err != nil {
f.Close(ctx)
return nil, err
}
reader = r
} else {
// We only explicitly buffer the suffix of the file when not decrypting as
// the decrypting reader has its own internal block buffer.
if err := raw.readAndCacheSuffix(remoteCacheSize); err != nil {
f.Close(ctx)
return nil, err
}
reader = raw
}
readers = append(readers, reader)
}
return storage.NewPebbleSSTIterator(readers, iterOpts)
}

type sstReader struct {
Expand Down
Loading

0 comments on commit 9b2bb35

Please sign in to comment.