Skip to content

Commit

Permalink
objstorage,sstable: add read-before for reader creation and iter inde…
Browse files Browse the repository at this point in the history
…x/filter blocks

We consider two use cases for read-before when using a
objstorageprovider.remoteReadable, given the high latency (and cost) of
each read operation:

- When a sstable.Reader is opened, it needs to read the footer, metaindex
  block and meta properties block. It starts by reading the footer which is
  at the end of the table and then proceeds to read the other two. Instead
  of doing 3 tiny reads, we would like to do one read.

- When a single-level or two-level iterator is opened, it reads the
  (top-level) index block first. When the iterator is used, it will
  typically follow this by reading the filter block (since SeeKPrefixGE is
  common in CockroachDB). For a two-level iterator it will also read the
  lower-level index blocks which are after the filter block and before the
  top-level index block. It would be ideal if reading the top-level index
  block read enough to include the filter block. And for two-level
  iterators this would also include the lower-level index blocks.

In both use-cases we want the first read from the remoteReadable to do a
larger read, and read bytes earlier than the requested read, hence
"read-before". Subsequent reads from the remoteReadable can use the usual
readahead logic (for the second use-case above, this can help with
sequential reads of the lower-level index blocks when the read-before was
insufficient to satisfy such reads).

Since remoteReadHandle already has a buffer for read-ahead, we utilize
it for this read-before buffering pattern too.

While here, we bump up the read-ahead size for compactions to 8MB, given
the lower concurrency (and thereby higher tolerance to memory usage).

Informs cockroachdb#2328
  • Loading branch information
sumeerbhola committed May 12, 2024
1 parent 2a2e3e1 commit ba1b9d3
Show file tree
Hide file tree
Showing 17 changed files with 286 additions and 86 deletions.
26 changes: 24 additions & 2 deletions objstorage/objstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,31 @@ type Readable interface {
// The ReadHandle must be closed before the Readable is closed.
//
// Multiple separate ReadHandles can be used.
NewReadHandle(ctx context.Context) ReadHandle
NewReadHandle(ctx context.Context, readBeforeSize ReadBeforeSize) ReadHandle
}

// ReadBeforeSize specifies whether the first read should read additional
// bytes before the offset, and how big the overall read should be. This is
// just a suggestion that the callee can ignore (and does ignore in
// fileReadable).
type ReadBeforeSize int64

const (
// NoReadBefore specifies no read-before.
NoReadBefore ReadBeforeSize = 0
// ReadBeforeForNewReader is used for a new Reader reading the footer,
// metaindex, properties. 32KB is unnecessarily large, but it is still small
// when considering remote object storage.
ReadBeforeForNewReader = 32 << 10
// ReadBeforeForIndexAndFilter is used for an iterator reading the top-level
// index, filter and second-level index blocks.
//
// Consider a 128MB sstable with 32KB blocks, so 4K blocks. Say keys are
// ~100 bytes, then the size of the index blocks is ~400KB. 512KB is a bit
// bigger, and not too large to be a memory concern.
ReadBeforeForIndexAndFilter = 512 << 10
)

// ReadHandle is used to perform reads that are related and might benefit from
// optimizations like read-ahead.
type ReadHandle interface {
Expand Down Expand Up @@ -347,7 +369,7 @@ type RemoteObjectToAttach struct {

// Copy copies the specified range from the input to the output.
func Copy(ctx context.Context, in Readable, out Writable, offset, length uint64) error {
r := in.NewReadHandle(ctx)
r := in.NewReadHandle(ctx, NoReadBefore)
r.SetupForCompaction()
buf := make([]byte, 256<<10)
end := offset + length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,13 @@ func (r *readable) Size() int64 {
}

// NewReadHandle is part of the objstorage.Readable interface.
func (r *readable) NewReadHandle(ctx context.Context) objstorage.ReadHandle {
func (r *readable) NewReadHandle(
ctx context.Context, readBeforeSize objstorage.ReadBeforeSize,
) objstorage.ReadHandle {
// It's safe to get the tracer from the generator without the mutex since it never changes.
t := r.mu.g.t
return &readHandle{
rh: r.r.NewReadHandle(ctx),
rh: r.r.NewReadHandle(ctx, readBeforeSize),
fileNum: r.fileNum,
handleID: t.handleID.Add(1),
g: makeEventGenerator(ctx, t),
Expand Down
2 changes: 1 addition & 1 deletion objstorage/objstorageprovider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func TestProvider(t *testing.T) {
if err != nil {
return err.Error()
}
rh := r.NewReadHandle(ctx)
rh := r.NewReadHandle(ctx, objstorage.NoReadBefore)
if forCompaction {
rh.SetupForCompaction()
}
Expand Down
148 changes: 124 additions & 24 deletions objstorage/objstorageprovider/remote_readable.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package objstorageprovider
import (
"context"
"io"
"sync"

"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/objstorage"
Expand All @@ -24,8 +25,15 @@ func NewRemoteReadable(objReader remote.ObjectReader, size int64) objstorage.Rea

const remoteMaxReadaheadSize = 1024 * 1024 /* 1MB */

// Number of concurrent compactions is bounded and significantly lower than
// the number of concurrent queries, and compactions consume reads from a few
// levels, so there is no risk of high memory usage due to a higher readahead
// size. So set this higher than remoteMaxReadaheadSize
const remoteReadaheadSizeForCompaction = 8 * 1024 * 1024 /* 8MB */

// remoteReadable is a very simple implementation of Readable on top of the
// ReadCloser returned by remote.Storage.CreateObject.
// remote.ObjectReader returned by remote.Storage.ReadObject. It is stateless
// and can be called concurrently.
type remoteReadable struct {
objReader remote.ObjectReader
size int64
Expand Down Expand Up @@ -75,17 +83,67 @@ func (r *remoteReadable) Size() int64 {
return r.size
}

func (r *remoteReadable) NewReadHandle(_ context.Context) objstorage.ReadHandle {
// TODO(radu): use a pool.
rh := &remoteReadHandle{readable: r}
rh.readahead.state = makeReadaheadState(remoteMaxReadaheadSize)
func (r *remoteReadable) NewReadHandle(
ctx context.Context, readBeforeSize objstorage.ReadBeforeSize,
) objstorage.ReadHandle {
rh := remoteReadHandlePool.Get().(*remoteReadHandle)
*rh = remoteReadHandle{readable: r, readBeforeSize: readBeforeSize}
rh.readAheadState = makeReadaheadState(remoteMaxReadaheadSize)
return rh
}

// TODO(sumeer): add test for remoteReadHandle.

// remoteReadHandle supports doing larger reads, and buffering the additional
// data, to serve future reads. It is not thread-safe. There are two kinds of
// larger reads (a) read-ahead (for sequential data reads), (b) read-before,
// for non-data reads.
//
// For both (a) and (b), the goal is to reduce the number of reads since
// remote read latency and cost can be high. We have to balance this with
// buffers consuming too much memory, since there can be a large number of
// iterators holding remoteReadHandles open for every level.
//
// For (b) we have to two use-cases:
//
// - When a sstable.Reader is opened, it needs to read the footer, metaindex
// block and meta properties block. It starts by reading the footer which is
// at the end of the table and then proceeds to read the other two. Instead
// of doing 3 tiny reads, we would like to do one read.
//
// - When a single-level or two-level iterator is opened, it reads the
// (top-level) index block first. When the iterator is used, it will
// typically follow this by reading the filter block (since SeeKPrefixGE is
// common in CockroachDB). For a two-level iterator it will also read the
// lower-level index blocks which are after the filter block and before the
// top-level index block. It would be ideal if reading the top-level index
// block read enough to include the filter block. And for two-level
// iterators this would also include the lower-level index blocks.
//
// In both use-cases we want the first read from the remoteReadable to do a
// larger read, and read bytes earlier than the requested read, hence
// "read-before". Subsequent reads from the remoteReadable can use the usual
// readahead logic (for the second use-case above, this can help with
// sequential reads of the lower-level index blocks when the read-before was
// insufficient to satisfy such reads). In the first use-case, the block cache
// is not used. In the second use-case, the block cache is used, and if the
// first read, which reads the top-level index, has a cache hit, we do not do
// any read-before, since we assume that with some locality in the workload
// the other reads will also have a cache hit (it is also messier code to try
// to preserve some read-before).
//
// Note that both use-cases can often occur near each other if there is enough
// locality of access, in which case table cache and block cache misses are
// mainly happening for new sstables created by compactions -- in this case a
// user-facing read will cause a table cache miss and a new sstable.Reader to
// be created, followed by an iterator creation. We don't currently combine
// the reads across the Reader and the iterator creation, since the code
// structure is not simple enough, but we could consider that in the future.
type remoteReadHandle struct {
readable *remoteReadable
readahead struct {
state readaheadState
readable *remoteReadable
readBeforeSize objstorage.ReadBeforeSize
readAheadState readaheadState
buffered struct {
data []byte
offset int64
}
Expand All @@ -94,14 +152,53 @@ type remoteReadHandle struct {

var _ objstorage.ReadHandle = (*remoteReadHandle)(nil)

var remoteReadHandlePool = sync.Pool{
New: func() interface{} {
return &remoteReadHandle{}
},
}

// ReadAt is part of the objstorage.ReadHandle interface.
func (r *remoteReadHandle) ReadAt(ctx context.Context, p []byte, offset int64) error {
readBeforeSize := int(r.readBeforeSize)
if readBeforeSize > 0 {
// Only first read uses read-before.
r.readBeforeSize = 0
if readBeforeSize <= len(p) || offset == 0 {
readBeforeSize = 0
}
}
readaheadSize := r.maybeReadahead(offset, len(p))

// Check if we already have the data from a previous read-ahead.
if rhSize := int64(len(r.readahead.data)); rhSize > 0 {
if r.readahead.offset <= offset && r.readahead.offset+rhSize > offset {
n := copy(p, r.readahead.data[offset-r.readahead.offset:])
// Prefer read-before to read-ahead since only first call does read-before.
// Also, since this is the first call, the buffer must be empty.
if readBeforeSize > 0 {
r.buffered.offset = offset - int64(readBeforeSize-len(p))
if r.buffered.offset < 0 {
readBeforeSize += int(r.buffered.offset)
r.buffered.offset = 0
}
// TODO(radu): we need to somehow account for this memory.
if cap(r.buffered.data) >= readBeforeSize {
r.buffered.data = r.buffered.data[:readBeforeSize]
} else {
r.buffered.data = make([]byte, readBeforeSize)
}
if err := r.readable.readInternal(
ctx, r.buffered.data, r.buffered.offset, r.forCompaction); err != nil {
// Make sure we don't treat the data as valid next time.
r.buffered.data = r.buffered.data[:0]
return err
}
copy(p, r.buffered.data[int(offset-r.buffered.offset):])
return nil
}
// Check if we already have the data from a previous read-ahead/read-before.
if rhSize := int64(len(r.buffered.data)); rhSize > 0 {
// We only consider the case where we have a prefix of the needed data. We
// could enhance this to utilize a suffix of the needed data.
if r.buffered.offset <= offset && r.buffered.offset+rhSize > offset {
n := copy(p, r.buffered.data[offset-r.buffered.offset:])
if n == len(p) {
// All data was available.
return nil
Expand All @@ -123,20 +220,20 @@ func (r *remoteReadHandle) ReadAt(ctx context.Context, p []byte, offset int64) e
return io.EOF
}
}
r.readahead.offset = offset
r.buffered.offset = offset
// TODO(radu): we need to somehow account for this memory.
if cap(r.readahead.data) >= readaheadSize {
r.readahead.data = r.readahead.data[:readaheadSize]
if cap(r.buffered.data) >= readaheadSize {
r.buffered.data = r.buffered.data[:readaheadSize]
} else {
r.readahead.data = make([]byte, readaheadSize)
r.buffered.data = make([]byte, readaheadSize)
}

if err := r.readable.readInternal(ctx, r.readahead.data, offset, r.forCompaction); err != nil {
if err := r.readable.readInternal(ctx, r.buffered.data, offset, r.forCompaction); err != nil {
// Make sure we don't treat the data as valid next time.
r.readahead.data = r.readahead.data[:0]
r.buffered.data = r.buffered.data[:0]
return err
}
copy(p, r.readahead.data)
copy(p, r.buffered.data)
return nil
}

Expand All @@ -145,15 +242,15 @@ func (r *remoteReadHandle) ReadAt(ctx context.Context, p []byte, offset int64) e

func (r *remoteReadHandle) maybeReadahead(offset int64, len int) int {
if r.forCompaction {
return remoteMaxReadaheadSize
return remoteReadaheadSizeForCompaction
}
return int(r.readahead.state.maybeReadahead(offset, int64(len)))
return int(r.readAheadState.maybeReadahead(offset, int64(len)))
}

// Close is part of the objstorage.ReadHandle interface.
func (r *remoteReadHandle) Close() error {
r.readable = nil
r.readahead.data = nil
*r = remoteReadHandle{}
remoteReadHandlePool.Put(r)
return nil
}

Expand All @@ -165,6 +262,9 @@ func (r *remoteReadHandle) SetupForCompaction() {
// RecordCacheHit is part of the objstorage.ReadHandle interface.
func (r *remoteReadHandle) RecordCacheHit(_ context.Context, offset, size int64) {
if !r.forCompaction {
r.readahead.state.recordCacheHit(offset, size)
r.readAheadState.recordCacheHit(offset, size)
}
if r.readBeforeSize > 0 {
r.readBeforeSize = 0
}
}
23 changes: 22 additions & 1 deletion objstorage/objstorageprovider/testdata/provider/shared_readahead
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ read 1 for-compaction
<remote> size of object "61a6-1-000001.sst.ref.1.000001": 0
<remote> create reader for object "61a6-1-000001.sst": 2000000 bytes
size: 2000000
<remote> read object "61a6-1-000001.sst" at 0 (length 1048576)
<remote> read object "61a6-1-000001.sst" at 0 (length 2000000)
0 1000: ok (salt 1)
1000 15000: ok (salt 1)
16000 30000: ok (salt 1)
Expand All @@ -85,3 +85,24 @@ size: 2000000
180000 10000: ok (salt 1)
210000 30000: ok (salt 1)
<remote> close reader for "61a6-1-000001.sst"

# When reading for a compaction, we should be doing 8MB reads from the start.
create 2 shared 2 15000000
----
<remote> create object "a629-1-000002.sst"
<remote> close writer for "a629-1-000002.sst" after 15000000 bytes
<remote> create object "a629-1-000002.sst.ref.1.000002"
<remote> close writer for "a629-1-000002.sst.ref.1.000002" after 0 bytes

read 2 for-compaction
0 100000
9000000 3000000
----
<remote> size of object "a629-1-000002.sst.ref.1.000002": 0
<remote> create reader for object "a629-1-000002.sst": 15000000 bytes
size: 15000000
<remote> read object "a629-1-000002.sst" at 0 (length 8388608)
0 100000: ok (salt 2)
<remote> read object "a629-1-000002.sst" at 9000000 (length 6000000)
9000000 3000000: ok (salt 2)
<remote> close reader for "a629-1-000002.sst"
11 changes: 8 additions & 3 deletions objstorage/objstorageprovider/vfs_readable.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ func (r *fileReadable) Size() int64 {
}

// NewReadHandle is part of the objstorage.Readable interface.
func (r *fileReadable) NewReadHandle(_ context.Context) objstorage.ReadHandle {
func (r *fileReadable) NewReadHandle(
ctx context.Context, readBeforeSize objstorage.ReadBeforeSize,
) objstorage.ReadHandle {
rh := readHandlePool.Get().(*vfsReadHandle)
rh.r = r
rh.rs = makeReadaheadState(fileMaxReadaheadSize)
Expand Down Expand Up @@ -205,12 +207,15 @@ func (rh *PreallocatedReadHandle) Close() error {
// (currently this happens if we are reading from a local file).
// The returned handle still needs to be closed.
func UsePreallocatedReadHandle(
ctx context.Context, readable objstorage.Readable, rh *PreallocatedReadHandle,
ctx context.Context,
readable objstorage.Readable,
readBeforeSize objstorage.ReadBeforeSize,
rh *PreallocatedReadHandle,
) objstorage.ReadHandle {
if r, ok := readable.(*fileReadable); ok {
// See fileReadable.NewReadHandle.
rh.vfsReadHandle = vfsReadHandle{r: r}
return rh
}
return readable.NewReadHandle(ctx)
return readable.NewReadHandle(ctx, readBeforeSize)
}
2 changes: 1 addition & 1 deletion objstorage/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,6 @@ func (f *MemObj) Size() int64 {
}

// NewReadHandle is part of the Readable interface.
func (f *MemObj) NewReadHandle(ctx context.Context) ReadHandle {
func (f *MemObj) NewReadHandle(ctx context.Context, readBeforeSize ReadBeforeSize) ReadHandle {
return &NoopReadHandle{readable: f}
}
4 changes: 2 additions & 2 deletions sstable/block_property_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ func TestBlockProperties(t *testing.T) {

// Enumerate point key data blocks encoded into the index.
if f != nil {
indexH, err := r.readIndex(context.Background(), nil, nil)
indexH, err := r.readIndex(context.Background(), nil, nil, nil)
if err != nil {
return err.Error()
}
Expand Down Expand Up @@ -1347,7 +1347,7 @@ func runBlockPropertiesBuildCmd(td *datadriven.TestData) (r *Reader, out string)
}

func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string {
bh, err := r.readIndex(context.Background(), nil, nil)
bh, err := r.readIndex(context.Background(), nil, nil, nil)
if err != nil {
return err.Error()
}
Expand Down
Loading

0 comments on commit ba1b9d3

Please sign in to comment.