Skip to content

Commit

Permalink
sstable: fix improper buffer reuse in copyDataBlocks
Browse files Browse the repository at this point in the history
Previously, in the colblk implementation of copyDataBlocks,
we were reusing a buffer that could under some cases be passed
directly to the write queue and would get written to sstable
while later blocks are being read into the same buffer.

This change also improves tests around CopySpan() to better test
cache hit/miss cases.

Fixes cockroachdb/cockroach#131332.
  • Loading branch information
itsbilal committed Nov 25, 2024
1 parent 9efb5b8 commit f5057c9
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 23 deletions.
14 changes: 7 additions & 7 deletions sstable/colblk_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ func shouldFlushWithoutLatestKV(
func (w *RawColumnWriter) copyDataBlocks(
ctx context.Context, blocks []indexEntry, rh objstorage.ReadHandle,
) error {
buf := make([]byte, 0, 256<<10)
const readSizeTarget = 256 << 10
readAndFlushBlocks := func(firstBlockIdx, lastBlockIdx int) error {
if firstBlockIdx > lastBlockIdx {
panic("pebble: readAndFlushBlocks called with invalid block range")
Expand All @@ -1099,9 +1099,9 @@ func (w *RawColumnWriter) copyDataBlocks(
// blocks in one request.
lastBH := blocks[lastBlockIdx].bh
blocksToReadLen := lastBH.Offset + lastBH.Length + block.TrailerLen - blocks[firstBlockIdx].bh.Offset
if blocksToReadLen > uint64(cap(buf)) {
buf = make([]byte, 0, blocksToReadLen)
}
// We need to create a new buffer for each read, as w.enqueuePhysicalBlock passes
// a pointer to the buffer to the write queue.
buf := make([]byte, 0, blocksToReadLen)
if err := rh.ReadAt(ctx, buf[:blocksToReadLen], int64(blocks[firstBlockIdx].bh.Offset)); err != nil {
return err
}
Expand All @@ -1116,8 +1116,8 @@ func (w *RawColumnWriter) copyDataBlocks(
}
return nil
}
// Iterate through blocks until we have enough to fill cap(buf). When we have more than
// one block in blocksToRead and adding the next block would exceed the buffer capacity,
// Iterate through blocks until we have enough to fill readSizeTarget. When we have more than
// one block in blocksToRead and adding the next block would exceed the target buffer capacity,
// we read and flush existing blocks in blocksToRead. This allows us to read as many
// blocks in one IO request as possible, while still utilizing the write queue in this
// writer.
Expand All @@ -1129,7 +1129,7 @@ func (w *RawColumnWriter) copyDataBlocks(
start := i
// Note the i++ in the initializing condition; this means we will always flush at least
// one block.
for i++; i < len(blocks) && (blocks[i].bh.Length+blocks[i].bh.Offset+block.TrailerLen-blocks[start].bh.Offset) <= uint64(cap(buf)); i++ {
for i++; i < len(blocks) && (blocks[i].bh.Length+blocks[i].bh.Offset+block.TrailerLen-blocks[start].bh.Offset) <= uint64(readSizeTarget); i++ {
}
// i points to one index past the last block we want to read.
if err := readAndFlushBlocks(start, i-1); err != nil {
Expand Down
30 changes: 26 additions & 4 deletions sstable/copier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import (

func TestCopySpan(t *testing.T) {
fs := vfs.NewMem()
blockCache := cache.New(1 << 20 /* 1 MB */)
blockCache := cache.New(2 << 20 /* 1 MB */)
cacheID := cache.ID(1)
fileNameToNum := make(map[string]base.FileNum)
nextFileNum := base.FileNum(1)
defer blockCache.Unref()

keySchema := colblk.DefaultKeySchema(testkeys.Comparer, 16)
Expand All @@ -35,6 +38,8 @@ func TestCopySpan(t *testing.T) {
if err != nil {
return err.Error()
}
fileNameToNum[d.CmdArgs[0].Key] = nextFileNum
nextFileNum++
tableFormat := TableFormatMax
blockSize := 1
var indexBlockSize int
Expand Down Expand Up @@ -92,15 +97,28 @@ func TestCopySpan(t *testing.T) {
if err != nil {
return err.Error()
}
r, err := NewReader(context.TODO(), readable, ReaderOptions{
var start, end []byte
for _, arg := range d.CmdArgs[1:] {
switch arg.Key {
case "start":
start = []byte(arg.FirstVal(t))
case "end":
end = []byte(arg.FirstVal(t))
}
}
rOpts := ReaderOptions{
Comparer: testkeys.Comparer,
KeySchemas: KeySchemas{keySchema.Name: &keySchema},
})
}
rOpts.internal.CacheOpts.Cache = blockCache
rOpts.internal.CacheOpts.CacheID = cacheID
rOpts.internal.CacheOpts.FileNum = base.DiskFileNum(fileNameToNum[d.CmdArgs[0].Key])
r, err := NewReader(context.TODO(), readable, rOpts)
defer r.Close()
if err != nil {
return err.Error()
}
iter, err := r.NewIter(block.NoTransforms, nil, nil)
iter, err := r.NewIter(block.NoTransforms, start, end)
if err != nil {
return err.Error()
}
Expand All @@ -126,6 +144,8 @@ func TestCopySpan(t *testing.T) {
return err.Error()
}
writable := objstorageprovider.NewFileWritable(output)
fileNameToNum[outputFile] = nextFileNum
nextFileNum++

f, err := fs.Open(inputFile)
if err != nil {
Expand All @@ -140,6 +160,8 @@ func TestCopySpan(t *testing.T) {
KeySchemas: KeySchemas{keySchema.Name: &keySchema},
}
rOpts.internal.CacheOpts.Cache = blockCache
rOpts.internal.CacheOpts.CacheID = cacheID
rOpts.internal.CacheOpts.FileNum = base.DiskFileNum(fileNameToNum[inputFile])
r, err := NewReader(context.TODO(), readable, rOpts)
if err != nil {
return err.Error()
Expand Down
14 changes: 2 additions & 12 deletions sstable/testdata/copy_span
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,8 @@ c.SET.4:baz
d.SET.5:foobar
----

iter test3
iter test3 start=c
----
a#0,SET: foo
b#0,SET: bar
c#0,SET: baz
d#0,SET: foobar

Expand Down Expand Up @@ -134,18 +132,10 @@ i.SET.5:foo
j.SET.5:foo
----

iter test32
iter test32 start=c end=e
----
a#0,SET: foo
b#0,SET: bar
c#0,SET: baz
d#0,SET: foobar
e#0,SET: foo
f#0,SET: foo
g#0,SET: foo
h#0,SET: foo
i#0,SET: foo
j#0,SET: foo

copy-span test32 test33 b.SET.10 cc.SET.0
----
Expand Down

0 comments on commit f5057c9

Please sign in to comment.