Skip to content

Commit

Permalink
objstorage,sstable: add read-before for reader creation and iter inde…
Browse files Browse the repository at this point in the history
…x/filter blocks

We consider two use cases for read-before when using a
objstorageprovider.remoteReadable, given the high latency (and cost) of
each read operation:

- When a sstable.Reader is opened, it needs to read the footer, metaindex
  block and meta properties block. It starts by reading the footer which is
  at the end of the table and then proceeds to read the other two. Instead
  of doing 3 tiny reads, we would like to do one read.

- When a single-level or two-level iterator is opened, it reads the
  (top-level) index block first. When the iterator is used, it will
  typically follow this by reading the filter block (since SeeKPrefixGE is
  common in CockroachDB). For a two-level iterator it will also read the
  lower-level index blocks which are after the filter block and before the
  top-level index block. It would be ideal if reading the top-level index
  block read enough to include the filter block. And for two-level
  iterators this would also include the lower-level index blocks.

In both use-cases we want the first read from the remoteReadable to do a
larger read, and read bytes earlier than the requested read, hence
"read-before". Subsequent reads from the remoteReadable can use the usual
readahead logic (for the second use-case above, this can help with
sequential reads of the lower-level index blocks when the read-before was
insufficient to satisfy such reads).

Since remoteReadHandle already has a buffer for read-ahead, we utilize
it for this read-before buffering pattern too.

While here, we bump up the read-ahead size for compactions to 8MB, given
the lower concurrency (and thereby higher tolerance to memory usage).

Informs cockroachdb#2328
  • Loading branch information
sumeerbhola committed May 14, 2024
1 parent a1142af commit fc743dd
Show file tree
Hide file tree
Showing 19 changed files with 452 additions and 91 deletions.
33 changes: 31 additions & 2 deletions objstorage/objstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,38 @@ type Readable interface {
// The ReadHandle must be closed before the Readable is closed.
//
// Multiple separate ReadHandles can be used.
NewReadHandle(ctx context.Context) ReadHandle
NewReadHandle(ctx context.Context, readBeforeSize ReadBeforeSize) ReadHandle
}

// ReadBeforeSize specifies whether the first read should read additional
// bytes before the offset, and how big the overall read should be. This is
// just a suggestion that the callee can ignore (and does ignore in
// fileReadable).
//
// When 0, the first read will only read what it is asked to read, say n
// bytes. When it is a value b > 0, if b > n, then the read will be padded by
// an additional b-n bytes to the left, resulting in an overall read size of
// b. This behavior is akin to what the read-ahead implementation does -- when
// the n bytes are not buffered, and there is read-ahead of b > n, the read
// length is b bytes.
type ReadBeforeSize int64

const (
// NoReadBefore specifies no read-before.
NoReadBefore ReadBeforeSize = 0
// ReadBeforeForNewReader is used for a new Reader reading the footer,
// metaindex, properties. 32KB is unnecessarily large, but it is still small
// when considering remote object storage.
ReadBeforeForNewReader = 32 * 1024
// ReadBeforeForIndexAndFilter is used for an iterator reading the top-level
// index, filter and second-level index blocks.
//
// Consider a 128MB sstable with 32KB blocks, so 4K blocks. Say keys are
// ~100 bytes, then the size of the index blocks is ~400KB. 512KB is a bit
// bigger, and not too large to be a memory concern.
ReadBeforeForIndexAndFilter = 512 * 1024
)

// ReadHandle is used to perform reads that are related and might benefit from
// optimizations like read-ahead.
type ReadHandle interface {
Expand Down Expand Up @@ -347,7 +376,7 @@ type RemoteObjectToAttach struct {

// Copy copies the specified range from the input to the output.
func Copy(ctx context.Context, in Readable, out Writable, offset, length uint64) error {
r := in.NewReadHandle(ctx)
r := in.NewReadHandle(ctx, NoReadBefore)
r.SetupForCompaction()
buf := make([]byte, 256<<10)
end := offset + length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,13 @@ func (r *readable) Size() int64 {
}

// NewReadHandle is part of the objstorage.Readable interface.
func (r *readable) NewReadHandle(ctx context.Context) objstorage.ReadHandle {
func (r *readable) NewReadHandle(
ctx context.Context, readBeforeSize objstorage.ReadBeforeSize,
) objstorage.ReadHandle {
// It's safe to get the tracer from the generator without the mutex since it never changes.
t := r.mu.g.t
return &readHandle{
rh: r.r.NewReadHandle(ctx),
rh: r.r.NewReadHandle(ctx, readBeforeSize),
fileNum: r.fileNum,
handleID: t.handleID.Add(1),
g: makeEventGenerator(ctx, t),
Expand Down
2 changes: 1 addition & 1 deletion objstorage/objstorageprovider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func TestProvider(t *testing.T) {
if err != nil {
return err.Error()
}
rh := r.NewReadHandle(ctx)
rh := r.NewReadHandle(ctx, objstorage.NoReadBefore)
if forCompaction {
rh.SetupForCompaction()
}
Expand Down
159 changes: 130 additions & 29 deletions objstorage/objstorageprovider/remote_readable.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package objstorageprovider
import (
"context"
"io"
"sync"

"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/objstorage"
Expand All @@ -24,8 +25,15 @@ func NewRemoteReadable(objReader remote.ObjectReader, size int64) objstorage.Rea

const remoteMaxReadaheadSize = 1024 * 1024 /* 1MB */

// Number of concurrent compactions is bounded and significantly lower than
// the number of concurrent queries, and compactions consume reads from a few
// levels, so there is no risk of high memory usage due to a higher readahead
// size. So set this higher than remoteMaxReadaheadSize
const remoteReadaheadSizeForCompaction = 8 * 1024 * 1024 /* 8MB */

// remoteReadable is a very simple implementation of Readable on top of the
// ReadCloser returned by remote.Storage.CreateObject.
// remote.ObjectReader returned by remote.Storage.ReadObject. It is stateless
// and can be called concurrently.
type remoteReadable struct {
objReader remote.ObjectReader
size int64
Expand Down Expand Up @@ -75,17 +83,73 @@ func (r *remoteReadable) Size() int64 {
return r.size
}

func (r *remoteReadable) NewReadHandle(_ context.Context) objstorage.ReadHandle {
// TODO(radu): use a pool.
rh := &remoteReadHandle{readable: r}
rh.readahead.state = makeReadaheadState(remoteMaxReadaheadSize)
// TODO(sumeer): both readBeforeSize and ReadHandle.SetupForCompaction are
// initial configuration of a ReadHandle. So they should both be passed as
// Options to NewReadHandle. But currently the latter is a separate method.
// This is because of how the sstable.Reader calls setupForCompaction on the
// iterators after they are constructed. Consider fixing this oddity.

func (r *remoteReadable) NewReadHandle(
ctx context.Context, readBeforeSize objstorage.ReadBeforeSize,
) objstorage.ReadHandle {
rh := remoteReadHandlePool.Get().(*remoteReadHandle)
*rh = remoteReadHandle{readable: r, readBeforeSize: readBeforeSize}
rh.readAheadState = makeReadaheadState(remoteMaxReadaheadSize)
return rh
}

// TODO(sumeer): add test for remoteReadHandle.

// remoteReadHandle supports doing larger reads, and buffering the additional
// data, to serve future reads. It is not thread-safe. There are two kinds of
// larger reads (a) read-ahead (for sequential data reads), (b) read-before,
// for non-data reads.
//
// For both (a) and (b), the goal is to reduce the number of reads since
// remote read latency and cost can be high. We have to balance this with
// buffers consuming too much memory, since there can be a large number of
// iterators holding remoteReadHandles open for every level.
//
// For (b) we have to two use-cases:
//
// - When a sstable.Reader is opened, it needs to read the footer, metaindex
// block and meta properties block. It starts by reading the footer which is
// at the end of the table and then proceeds to read the other two. Instead
// of doing 3 tiny reads, we would like to do one read.
//
// - When a single-level or two-level iterator is opened, it reads the
// (top-level) index block first. When the iterator is used, it will
// typically follow this by reading the filter block (since SeeKPrefixGE is
// common in CockroachDB). For a two-level iterator it will also read the
// lower-level index blocks which are after the filter block and before the
// top-level index block. It would be ideal if reading the top-level index
// block read enough to include the filter block. And for two-level
// iterators this would also include the lower-level index blocks.
//
// In both use-cases we want the first read from the remoteReadable to do a
// larger read, and read bytes earlier than the requested read, hence
// "read-before". Subsequent reads from the remoteReadable can use the usual
// readahead logic (for the second use-case above, this can help with
// sequential reads of the lower-level index blocks when the read-before was
// insufficient to satisfy such reads). In the first use-case, the block cache
// is not used. In the second use-case, the block cache is used, and if the
// first read, which reads the top-level index, has a cache hit, we do not do
// any read-before, since we assume that with some locality in the workload
// the other reads will also have a cache hit (it is also messier code to try
// to preserve some read-before).
//
// Note that both use-cases can often occur near each other if there is enough
// locality of access, in which case table cache and block cache misses are
// mainly happening for new sstables created by compactions -- in this case a
// user-facing read will cause a table cache miss and a new sstable.Reader to
// be created, followed by an iterator creation. We don't currently combine
// the reads across the Reader and the iterator creation, since the code
// structure is not simple enough, but we could consider that in the future.
type remoteReadHandle struct {
readable *remoteReadable
readahead struct {
state readaheadState
readable *remoteReadable
readBeforeSize objstorage.ReadBeforeSize
readAheadState readaheadState
buffered struct {
data []byte
offset int64
}
Expand All @@ -94,14 +158,41 @@ type remoteReadHandle struct {

var _ objstorage.ReadHandle = (*remoteReadHandle)(nil)

var remoteReadHandlePool = sync.Pool{
New: func() interface{} {
return &remoteReadHandle{}
},
}

// ReadAt is part of the objstorage.ReadHandle interface.
func (r *remoteReadHandle) ReadAt(ctx context.Context, p []byte, offset int64) error {
var extraBytesBefore int64
if r.readBeforeSize > 0 {
if int(r.readBeforeSize) > len(p) {
extraBytesBefore = min(int64(int(r.readBeforeSize) - len(p)), offset)
}
// Only first read uses read-before.
r.readBeforeSize = 0
}
readaheadSize := r.maybeReadahead(offset, len(p))

// Check if we already have the data from a previous read-ahead.
if rhSize := int64(len(r.readahead.data)); rhSize > 0 {
if r.readahead.offset <= offset && r.readahead.offset+rhSize > offset {
n := copy(p, r.readahead.data[offset-r.readahead.offset:])
// Prefer read-before to read-ahead since only first call does read-before.
// Also, since this is the first call, the buffer must be empty.
if extraBytesBefore > 0 {
r.buffered.offset = offset - extraBytesBefore
err := r.readToBuffer(ctx, offset - extraBytesBefore, len(p) + int(extraBytesBefore))
if err != nil {
return err
}
copy(p, r.buffered.data[int(extraBytesBefore):])
return nil
}
// Check if we already have the data from a previous read-ahead/read-before.
if rhSize := int64(len(r.buffered.data)); rhSize > 0 {
// We only consider the case where we have a prefix of the needed data. We
// could enhance this to utilize a suffix of the needed data.
if r.buffered.offset <= offset && r.buffered.offset+rhSize > offset {
n := copy(p, r.buffered.data[offset-r.buffered.offset:])
if n == len(p) {
// All data was available.
return nil
Expand All @@ -123,20 +214,10 @@ func (r *remoteReadHandle) ReadAt(ctx context.Context, p []byte, offset int64) e
return io.EOF
}
}
r.readahead.offset = offset
// TODO(radu): we need to somehow account for this memory.
if cap(r.readahead.data) >= readaheadSize {
r.readahead.data = r.readahead.data[:readaheadSize]
} else {
r.readahead.data = make([]byte, readaheadSize)
}

if err := r.readable.readInternal(ctx, r.readahead.data, offset, r.forCompaction); err != nil {
// Make sure we don't treat the data as valid next time.
r.readahead.data = r.readahead.data[:0]
if err := r.readToBuffer(ctx, offset, readaheadSize); err != nil {
return err
}
copy(p, r.readahead.data)
copy(p, r.buffered.data)
return nil
}

Expand All @@ -145,15 +226,32 @@ func (r *remoteReadHandle) ReadAt(ctx context.Context, p []byte, offset int64) e

func (r *remoteReadHandle) maybeReadahead(offset int64, len int) int {
if r.forCompaction {
return remoteMaxReadaheadSize
return remoteReadaheadSizeForCompaction
}
return int(r.readahead.state.maybeReadahead(offset, int64(len)))
return int(r.readAheadState.maybeReadahead(offset, int64(len)))
}

func (r *remoteReadHandle) readToBuffer(ctx context.Context, offset int64, length int) error {
r.buffered.offset = offset
// TODO(radu): we need to somehow account for this memory.
if cap(r.buffered.data) >= length {
r.buffered.data = r.buffered.data[:length]
} else {
r.buffered.data = make([]byte, length)
}
if err := r.readable.readInternal(
ctx, r.buffered.data, r.buffered.offset, r.forCompaction); err != nil {
// Make sure we don't treat the data as valid next time.
r.buffered.data = r.buffered.data[:0]
return err
}
return nil
}

// Close is part of the objstorage.ReadHandle interface.
func (r *remoteReadHandle) Close() error {
r.readable = nil
r.readahead.data = nil
*r = remoteReadHandle{}
remoteReadHandlePool.Put(r)
return nil
}

Expand All @@ -165,6 +263,9 @@ func (r *remoteReadHandle) SetupForCompaction() {
// RecordCacheHit is part of the objstorage.ReadHandle interface.
func (r *remoteReadHandle) RecordCacheHit(_ context.Context, offset, size int64) {
if !r.forCompaction {
r.readahead.state.recordCacheHit(offset, size)
r.readAheadState.recordCacheHit(offset, size)
}
if r.readBeforeSize > 0 {
r.readBeforeSize = 0
}
}
89 changes: 89 additions & 0 deletions objstorage/objstorageprovider/remote_readable_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package objstorageprovider

import (
"context"
"fmt"
"strings"
"testing"

"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/pebble/objstorage"
"github.com/stretchr/testify/require"
)

type testObjectReader struct {
b strings.Builder
}

func (r *testObjectReader) ReadAt(ctx context.Context, p []byte, offset int64) error {
fmt.Fprintf(&r.b, "ReadAt(len=%d, offset=%d)\n", len(p), offset)
return nil
}

func (r *testObjectReader) Close() error {
fmt.Fprintf(&r.b, "Close()\n")
return nil
}

func TestRemoteReadHandle(t *testing.T) {
var or testObjectReader
var rr *remoteReadable
var rh objstorage.ReadHandle
defer func() {
if rh != nil {
require.NoError(t, rh.Close())
}
if rr != nil {
require.NoError(t, rr.Close())
}
}()
datadriven.RunTest(t, "testdata/remote_read_handle", func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "init-readable":
if rr != nil {
require.NoError(t, rr.Close())
}
var size int64
d.ScanArgs(t, "size", &size)
rr = &remoteReadable{
objReader: &or,
size: size,
}
return ""

case "new-read-handle":
if rh != nil {
require.NoError(t, rh.Close())
}
var readBeforeSize int
d.ScanArgs(t, "read-before-size", &readBeforeSize)
rh = rr.NewReadHandle(context.Background(), objstorage.ReadBeforeSize(readBeforeSize))
if d.HasArg("setup-for-compaction") {
rh.SetupForCompaction()
}
return ""

case "read":
var length int
d.ScanArgs(t, "len", &length)
b := make([]byte, length)
var offset int64
d.ScanArgs(t, "offset", &offset)
// TODO: verify what is returned. Using randomization.
err := rh.ReadAt(context.Background(), b, offset)
if err != nil {
fmt.Fprintf(&or.b, "err: %s\n", err.Error())
}
str := or.b.String()
or.b.Reset()
return str

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
})
}
Loading

0 comments on commit fc743dd

Please sign in to comment.