Skip to content

Commit

Permalink
cloudimpl: cache suffix in remote file sst wrapper
Browse files Browse the repository at this point in the history
Reading SSTs starts with multiple tiny reads in offsets near the end of the file.
If we can read that whole region once and fulfill those reads from a cached buffer,
we can avoid repeated RPCs.

Release note: none.
  • Loading branch information
dt committed Jan 28, 2021
1 parent c298abe commit 9c76309
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 0 deletions.
62 changes: 62 additions & 0 deletions pkg/ccl/storageccl/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ var remoteSSTs = settings.RegisterBoolSetting(
true,
)

var remoteSSTSuffixCacheSize = settings.RegisterByteSizeSetting(
"kv.bulk_ingest.stream_external_ssts.suffix_cache_size",
"size of suffix of remote SSTs to download and cache before reading from remote stream",
64<<10,
)

// commandMetadataEstimate is an estimate of how much metadata Raft will add to
// an AddSSTable command. It is intentionally a vast overestimate to avoid
// embedding intricate knowledge of the Raft encoding scheme here.
Expand Down Expand Up @@ -333,13 +339,21 @@ func ExternalSSTReader(
}

var reader sstable.ReadableFile = raw

if encryption != nil {
r, err := decryptingReader(raw, encryption.Key)
if err != nil {
f.Close()
return nil, err
}
reader = r
} else {
// We only explicitly buffer the suffix of the file when not decrypting as
// the decrypting reader has its own internal block buffer.
if err := raw.readAndCacheSuffix(remoteSSTSuffixCacheSize.Get(&e.Settings().SV)); err != nil {
f.Close()
return nil, err
}
}

iter, err := storage.NewSSTIterator(reader)
Expand All @@ -359,6 +373,15 @@ type sstReader struct {
pos int64

readPos int64 // readPos is used to transform Read() to ReadAt(readPos).

// This wrapper's primary purpose is reading SSTs which often perform many
// tiny reads in a cluster of offsets near the end of the file. If we can read
// the whole region once and fullfil those from a cache, we can avoid repeated
// RPCs.
cache struct {
pos int64
buf []byte
}
}

// Close implements io.Closer.
Expand All @@ -381,11 +404,49 @@ func (r *sstReader) Read(p []byte) (int, error) {
return n, err
}

// readAndCacheSuffix caches the `size` suffix of the file (which could the
// whole file) for use by later ReadAt calls to avoid making additional RPCs.
func (r *sstReader) readAndCacheSuffix(size int64) error {
if size == 0 {
return nil
}
r.cache.buf = nil
r.cache.pos = int64(r.sz) - size
if r.cache.pos <= 0 {
r.cache.pos = 0
}
reader, err := r.openAt(r.cache.pos)
if err != nil {
return err
}
defer reader.Close()
read, err := ioutil.ReadAll(reader)
if err != nil {
return err
}
r.cache.buf = read
return nil
}

// ReadAt implements io.ReaderAt by opening a Reader at an offset before reading
// from it. Note: contrary to io.ReaderAt, ReadAt does *not* support parallel
// calls.
func (r *sstReader) ReadAt(p []byte, offset int64) (int, error) {
var read int
if offset >= r.cache.pos && offset < r.cache.pos+int64(len(r.cache.buf)) {
read += copy(p, r.cache.buf[offset-r.cache.pos:])
if read == len(p) {
return read, nil
}
// Advance offset to end of what cache read.
offset += int64(read)
}

if offset == int64(r.sz) {
return read, io.EOF
}

// Position the underlying reader at offset if needed.
if r.pos != offset {
if err := r.Close(); err != nil {
return 0, err
Expand All @@ -397,6 +458,7 @@ func (r *sstReader) ReadAt(p []byte, offset int64) (int, error) {
r.pos = offset
r.body = b
}

var err error
for n := 0; read < len(p); n, err = r.body.Read(p[read:]) {
read += n
Expand Down
59 changes: 59 additions & 0 deletions pkg/ccl/storageccl/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
package storageccl

import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
Expand All @@ -37,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
)

func TestMaxImportBatchSize(t *testing.T) {
Expand Down Expand Up @@ -395,3 +398,59 @@ func runTestImport(t *testing.T, init func(*cluster.Settings)) {
})
}
}

func TestSSTReaderCache(t *testing.T) {
defer leaktest.AfterTest(t)()

var openCalls, expectedOpenCalls int
const sz, suffix = 100, 10
raw := &sstReader{
sz: sizeStat(sz),
body: ioutil.NopCloser(bytes.NewReader(nil)),
openAt: func(offset int64) (io.ReadCloser, error) {
openCalls++
return ioutil.NopCloser(bytes.NewReader(make([]byte, sz-int(offset)))), nil
},
}

require.Equal(t, 0, openCalls)
_ = raw.readAndCacheSuffix(suffix)
expectedOpenCalls++

discard := make([]byte, 5)

// Reading in the suffix doesn't make another call.
_, _ = raw.ReadAt(discard, 90)
require.Equal(t, expectedOpenCalls, openCalls)

// Reading in the suffix again doesn't make another call.
_, _ = raw.ReadAt(discard, 95)
require.Equal(t, expectedOpenCalls, openCalls)

// Reading outside the suffix makes a new call.
_, _ = raw.ReadAt(discard, 85)
expectedOpenCalls++
require.Equal(t, expectedOpenCalls, openCalls)

// Reading at same offset, outside the suffix, does make a new call to rewind.
_, _ = raw.ReadAt(discard, 85)
expectedOpenCalls++
require.Equal(t, expectedOpenCalls, openCalls)

// Read at new pos does makes a new call.
_, _ = raw.ReadAt(discard, 0)
expectedOpenCalls++
require.Equal(t, expectedOpenCalls, openCalls)

// Read at cur pos (where last read stopped) does not reposition.
_, _ = raw.ReadAt(discard, 5)
require.Equal(t, expectedOpenCalls, openCalls)

// Read at in suffix between non-suffix reads does not make a call.
_, _ = raw.ReadAt(discard, 92)
require.Equal(t, expectedOpenCalls, openCalls)

// Read at where prior non-suffix read finished does not make a new call.
_, _ = raw.ReadAt(discard, 10)
require.Equal(t, expectedOpenCalls, openCalls)
}

0 comments on commit 9c76309

Please sign in to comment.