From 9c76309baeca639d7c73843db6d6dedcc2e72d05 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Sun, 3 Jan 2021 04:55:15 +0000 Subject: [PATCH] cloudimpl: cache suffix in remote file sst wrapper Reading SSTs starts with multiple tiny reads in offsets near the end of the file. If we can read that whole region once and fulfill those reads from a cached buffer, we can avoid repeated RPCs. Release note: none. --- pkg/ccl/storageccl/import.go | 62 +++++++++++++++++++++++++++++++ pkg/ccl/storageccl/import_test.go | 59 +++++++++++++++++++++++++++++ 2 files changed, 121 insertions(+) diff --git a/pkg/ccl/storageccl/import.go b/pkg/ccl/storageccl/import.go index 126270feedb7..bd48cdebb5ec 100644 --- a/pkg/ccl/storageccl/import.go +++ b/pkg/ccl/storageccl/import.go @@ -93,6 +93,12 @@ var remoteSSTs = settings.RegisterBoolSetting( true, ) +var remoteSSTSuffixCacheSize = settings.RegisterByteSizeSetting( + "kv.bulk_ingest.stream_external_ssts.suffix_cache_size", + "size of suffix of remote SSTs to download and cache before reading from remote stream", + 64<<10, +) + // commandMetadataEstimate is an estimate of how much metadata Raft will add to // an AddSSTable command. It is intentionally a vast overestimate to avoid // embedding intricate knowledge of the Raft encoding scheme here. @@ -333,6 +339,7 @@ func ExternalSSTReader( } var reader sstable.ReadableFile = raw + if encryption != nil { r, err := decryptingReader(raw, encryption.Key) if err != nil { @@ -340,6 +347,13 @@ func ExternalSSTReader( return nil, err } reader = r + } else { + // We only explicitly buffer the suffix of the file when not decrypting as + // the decrypting reader has its own internal block buffer. + if err := raw.readAndCacheSuffix(remoteSSTSuffixCacheSize.Get(&e.Settings().SV)); err != nil { + f.Close() + return nil, err + } } iter, err := storage.NewSSTIterator(reader) @@ -359,6 +373,15 @@ type sstReader struct { pos int64 readPos int64 // readPos is used to transform Read() to ReadAt(readPos). + + // This wrapper's primary purpose is reading SSTs which often perform many + // tiny reads in a cluster of offsets near the end of the file. If we can read + // the whole region once and fullfil those from a cache, we can avoid repeated + // RPCs. + cache struct { + pos int64 + buf []byte + } } // Close implements io.Closer. @@ -381,11 +404,49 @@ func (r *sstReader) Read(p []byte) (int, error) { return n, err } +// readAndCacheSuffix caches the `size` suffix of the file (which could the +// whole file) for use by later ReadAt calls to avoid making additional RPCs. +func (r *sstReader) readAndCacheSuffix(size int64) error { + if size == 0 { + return nil + } + r.cache.buf = nil + r.cache.pos = int64(r.sz) - size + if r.cache.pos <= 0 { + r.cache.pos = 0 + } + reader, err := r.openAt(r.cache.pos) + if err != nil { + return err + } + defer reader.Close() + read, err := ioutil.ReadAll(reader) + if err != nil { + return err + } + r.cache.buf = read + return nil +} + // ReadAt implements io.ReaderAt by opening a Reader at an offset before reading // from it. Note: contrary to io.ReaderAt, ReadAt does *not* support parallel // calls. func (r *sstReader) ReadAt(p []byte, offset int64) (int, error) { var read int + if offset >= r.cache.pos && offset < r.cache.pos+int64(len(r.cache.buf)) { + read += copy(p, r.cache.buf[offset-r.cache.pos:]) + if read == len(p) { + return read, nil + } + // Advance offset to end of what cache read. + offset += int64(read) + } + + if offset == int64(r.sz) { + return read, io.EOF + } + + // Position the underlying reader at offset if needed. if r.pos != offset { if err := r.Close(); err != nil { return 0, err @@ -397,6 +458,7 @@ func (r *sstReader) ReadAt(p []byte, offset int64) (int, error) { r.pos = offset r.body = b } + var err error for n := 0; read < len(p); n, err = r.body.Read(p[read:]) { read += n diff --git a/pkg/ccl/storageccl/import_test.go b/pkg/ccl/storageccl/import_test.go index 84a7285e00e7..9b412a42da22 100644 --- a/pkg/ccl/storageccl/import_test.go +++ b/pkg/ccl/storageccl/import_test.go @@ -9,8 +9,10 @@ package storageccl import ( + "bytes" "context" "fmt" + "io" "io/ioutil" "os" "path/filepath" @@ -37,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/pebble/vfs" + "github.com/stretchr/testify/require" ) func TestMaxImportBatchSize(t *testing.T) { @@ -395,3 +398,59 @@ func runTestImport(t *testing.T, init func(*cluster.Settings)) { }) } } + +func TestSSTReaderCache(t *testing.T) { + defer leaktest.AfterTest(t)() + + var openCalls, expectedOpenCalls int + const sz, suffix = 100, 10 + raw := &sstReader{ + sz: sizeStat(sz), + body: ioutil.NopCloser(bytes.NewReader(nil)), + openAt: func(offset int64) (io.ReadCloser, error) { + openCalls++ + return ioutil.NopCloser(bytes.NewReader(make([]byte, sz-int(offset)))), nil + }, + } + + require.Equal(t, 0, openCalls) + _ = raw.readAndCacheSuffix(suffix) + expectedOpenCalls++ + + discard := make([]byte, 5) + + // Reading in the suffix doesn't make another call. + _, _ = raw.ReadAt(discard, 90) + require.Equal(t, expectedOpenCalls, openCalls) + + // Reading in the suffix again doesn't make another call. + _, _ = raw.ReadAt(discard, 95) + require.Equal(t, expectedOpenCalls, openCalls) + + // Reading outside the suffix makes a new call. + _, _ = raw.ReadAt(discard, 85) + expectedOpenCalls++ + require.Equal(t, expectedOpenCalls, openCalls) + + // Reading at same offset, outside the suffix, does make a new call to rewind. + _, _ = raw.ReadAt(discard, 85) + expectedOpenCalls++ + require.Equal(t, expectedOpenCalls, openCalls) + + // Read at new pos does makes a new call. + _, _ = raw.ReadAt(discard, 0) + expectedOpenCalls++ + require.Equal(t, expectedOpenCalls, openCalls) + + // Read at cur pos (where last read stopped) does not reposition. + _, _ = raw.ReadAt(discard, 5) + require.Equal(t, expectedOpenCalls, openCalls) + + // Read at in suffix between non-suffix reads does not make a call. + _, _ = raw.ReadAt(discard, 92) + require.Equal(t, expectedOpenCalls, openCalls) + + // Read at where prior non-suffix read finished does not make a new call. + _, _ = raw.ReadAt(discard, 10) + require.Equal(t, expectedOpenCalls, openCalls) +}