diff --git a/pkg/ccl/storageccl/import.go b/pkg/ccl/storageccl/import.go index 126270feedb7..bd48cdebb5ec 100644 --- a/pkg/ccl/storageccl/import.go +++ b/pkg/ccl/storageccl/import.go @@ -93,6 +93,12 @@ var remoteSSTs = settings.RegisterBoolSetting( true, ) +var remoteSSTSuffixCacheSize = settings.RegisterByteSizeSetting( + "kv.bulk_ingest.stream_external_ssts.suffix_cache_size", + "size of suffix of remote SSTs to download and cache before reading from remote stream", + 64<<10, +) + // commandMetadataEstimate is an estimate of how much metadata Raft will add to // an AddSSTable command. It is intentionally a vast overestimate to avoid // embedding intricate knowledge of the Raft encoding scheme here. @@ -333,6 +339,7 @@ func ExternalSSTReader( } var reader sstable.ReadableFile = raw + if encryption != nil { r, err := decryptingReader(raw, encryption.Key) if err != nil { @@ -340,6 +347,13 @@ func ExternalSSTReader( return nil, err } reader = r + } else { + // We only explicitly buffer the suffix of the file when not decrypting as + // the decrypting reader has its own internal block buffer. + if err := raw.readAndCacheSuffix(remoteSSTSuffixCacheSize.Get(&e.Settings().SV)); err != nil { + f.Close() + return nil, err + } } iter, err := storage.NewSSTIterator(reader) @@ -359,6 +373,15 @@ type sstReader struct { pos int64 readPos int64 // readPos is used to transform Read() to ReadAt(readPos). + + // This wrapper's primary purpose is reading SSTs which often perform many + // tiny reads in a cluster of offsets near the end of the file. If we can read + // the whole region once and fullfil those from a cache, we can avoid repeated + // RPCs. + cache struct { + pos int64 + buf []byte + } } // Close implements io.Closer. @@ -381,11 +404,49 @@ func (r *sstReader) Read(p []byte) (int, error) { return n, err } +// readAndCacheSuffix caches the `size` suffix of the file (which could the +// whole file) for use by later ReadAt calls to avoid making additional RPCs. +func (r *sstReader) readAndCacheSuffix(size int64) error { + if size == 0 { + return nil + } + r.cache.buf = nil + r.cache.pos = int64(r.sz) - size + if r.cache.pos <= 0 { + r.cache.pos = 0 + } + reader, err := r.openAt(r.cache.pos) + if err != nil { + return err + } + defer reader.Close() + read, err := ioutil.ReadAll(reader) + if err != nil { + return err + } + r.cache.buf = read + return nil +} + // ReadAt implements io.ReaderAt by opening a Reader at an offset before reading // from it. Note: contrary to io.ReaderAt, ReadAt does *not* support parallel // calls. func (r *sstReader) ReadAt(p []byte, offset int64) (int, error) { var read int + if offset >= r.cache.pos && offset < r.cache.pos+int64(len(r.cache.buf)) { + read += copy(p, r.cache.buf[offset-r.cache.pos:]) + if read == len(p) { + return read, nil + } + // Advance offset to end of what cache read. + offset += int64(read) + } + + if offset == int64(r.sz) { + return read, io.EOF + } + + // Position the underlying reader at offset if needed. if r.pos != offset { if err := r.Close(); err != nil { return 0, err @@ -397,6 +458,7 @@ func (r *sstReader) ReadAt(p []byte, offset int64) (int, error) { r.pos = offset r.body = b } + var err error for n := 0; read < len(p); n, err = r.body.Read(p[read:]) { read += n diff --git a/pkg/ccl/storageccl/import_test.go b/pkg/ccl/storageccl/import_test.go index 84a7285e00e7..9b412a42da22 100644 --- a/pkg/ccl/storageccl/import_test.go +++ b/pkg/ccl/storageccl/import_test.go @@ -9,8 +9,10 @@ package storageccl import ( + "bytes" "context" "fmt" + "io" "io/ioutil" "os" "path/filepath" @@ -37,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/pebble/vfs" + "github.com/stretchr/testify/require" ) func TestMaxImportBatchSize(t *testing.T) { @@ -395,3 +398,59 @@ func runTestImport(t *testing.T, init func(*cluster.Settings)) { }) } } + +func TestSSTReaderCache(t *testing.T) { + defer leaktest.AfterTest(t)() + + var openCalls, expectedOpenCalls int + const sz, suffix = 100, 10 + raw := &sstReader{ + sz: sizeStat(sz), + body: ioutil.NopCloser(bytes.NewReader(nil)), + openAt: func(offset int64) (io.ReadCloser, error) { + openCalls++ + return ioutil.NopCloser(bytes.NewReader(make([]byte, sz-int(offset)))), nil + }, + } + + require.Equal(t, 0, openCalls) + _ = raw.readAndCacheSuffix(suffix) + expectedOpenCalls++ + + discard := make([]byte, 5) + + // Reading in the suffix doesn't make another call. + _, _ = raw.ReadAt(discard, 90) + require.Equal(t, expectedOpenCalls, openCalls) + + // Reading in the suffix again doesn't make another call. + _, _ = raw.ReadAt(discard, 95) + require.Equal(t, expectedOpenCalls, openCalls) + + // Reading outside the suffix makes a new call. + _, _ = raw.ReadAt(discard, 85) + expectedOpenCalls++ + require.Equal(t, expectedOpenCalls, openCalls) + + // Reading at same offset, outside the suffix, does make a new call to rewind. + _, _ = raw.ReadAt(discard, 85) + expectedOpenCalls++ + require.Equal(t, expectedOpenCalls, openCalls) + + // Read at new pos does makes a new call. + _, _ = raw.ReadAt(discard, 0) + expectedOpenCalls++ + require.Equal(t, expectedOpenCalls, openCalls) + + // Read at cur pos (where last read stopped) does not reposition. + _, _ = raw.ReadAt(discard, 5) + require.Equal(t, expectedOpenCalls, openCalls) + + // Read at in suffix between non-suffix reads does not make a call. + _, _ = raw.ReadAt(discard, 92) + require.Equal(t, expectedOpenCalls, openCalls) + + // Read at where prior non-suffix read finished does not make a new call. + _, _ = raw.ReadAt(discard, 10) + require.Equal(t, expectedOpenCalls, openCalls) +}