Skip to content

Commit

Permalink
crl-release-23.1: db: fix bug with restricted checkpoints
Browse files Browse the repository at this point in the history
Backport of #2460 for 23.1.x

When a checkpoint is restricted to a set of spans, we append a record
to the checkpoint's manifest removing all the excluded ssts.

We append the record after copying the raw data from the existing
manifest. This is not quite ok: the `record` library works by breaking
up records in chunks and packing chunks into 32KB blocks. Chunks
cannot straddle a 32KB boundary, but this invariant is violated by our
append method. In practice, this only happens if the record we are
appending is big and/or we are very unlucky and the existing manifest
is close to a 32KB boundary.

To fix this: instead of doing a raw data copy of the existing
manifest, we copy at the record level (using a record reader and a
record writer). Then we can add a new record using the same writer.

Informs cockroachdb/cockroach#100935
  • Loading branch information
RaduBerinde committed Apr 20, 2023
1 parent fb56da9 commit 0a342ec
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 8 deletions.
35 changes: 27 additions & 8 deletions checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,27 +342,46 @@ func (d *DB) writeCheckpointManifest(
}
defer dst.Close()

if _, err := io.Copy(dst, &io.LimitedReader{R: src, N: manifestSize}); err != nil {
return err
// Copy all existing records. We need to copy at the record level in case we
// need to append another record with the excluded files (we cannot simply
// append a record after a raw data copy; see
// https://github.com/cockroachdb/cockroach/issues/100935).
r := record.NewReader(&io.LimitedReader{R: src, N: manifestSize}, manifestFileNum)
w := record.NewWriter(dst)
for {
rr, err := r.Next()
if err != nil {
if err == io.EOF {
break
}
return err
}

rw, err := w.Next()
if err != nil {
return err
}
if _, err := io.Copy(rw, rr); err != nil {
return err
}
}

if len(excludedFiles) > 0 {
// Write out an additional VersionEdit that deletes the excluded SST files.
ve := versionEdit{
DeletedFiles: excludedFiles,
}
rw := record.NewWriter(dst)
w, err := rw.Next()
rw, err := w.Next()
if err != nil {
return err
}
if err := ve.Encode(w); err != nil {
return err
}
if err := rw.Close(); err != nil {
if err := ve.Encode(rw); err != nil {
return err
}
}
if err := w.Close(); err != nil {
return err
}
return dst.Sync()
}(); err != nil {
return err
Expand Down
60 changes: 60 additions & 0 deletions checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package pebble
import (
"context"
"fmt"
"math/rand"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -284,3 +285,62 @@ func TestCheckpointFlushWAL(t *testing.T) {
require.NoError(t, d.Close())
}
}

func TestCheckpointManyFiles(t *testing.T) {
if testing.Short() {
t.Skip("skipping because of short flag")
}
const checkpointPath = "checkpoint"
opts := &Options{
FS: vfs.NewMem(),
FormatMajorVersion: FormatNewest,
DisableAutomaticCompactions: true,
}
// Disable compression to speed up the test.
opts.EnsureDefaults()
for i := range opts.Levels {
opts.Levels[i].Compression = NoCompression
}

d, err := Open("", opts)
require.NoError(t, err)
defer d.Close()

mkKey := func(x int) []byte {
return []byte(fmt.Sprintf("key%06d", x))
}
// We want to test the case where the appended record with the excluded files
// makes the manifest cross 32KB. This will happen for a range of values
// around 450.
n := 400 + rand.Intn(100)
for i := 0; i < n; i++ {
err := d.Set(mkKey(i), nil, nil)
require.NoError(t, err)
err = d.Flush()
require.NoError(t, err)
}
err = d.Checkpoint(checkpointPath, WithRestrictToSpans([]CheckpointSpan{
{
Start: mkKey(0),
End: mkKey(10),
},
}))
require.NoError(t, err)

// Open the checkpoint and iterate through all the keys.
{
d, err := Open(checkpointPath, opts)
require.NoError(t, err)
iter := d.NewIter(nil)
require.True(t, iter.First())
require.NoError(t, iter.Error())
n := 1
for iter.Next() {
n++
}
require.NoError(t, iter.Error())
require.NoError(t, iter.Close())
require.NoError(t, d.Close())
require.Equal(t, 10, n)
}
}
1 change: 1 addition & 0 deletions record/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ func (r *Reader) nextChunk(wantFirst bool) error {
r.begin = r.end + headerSize
r.end = r.begin + int(length)
if r.end > r.n {
// The chunk straddles a 32KB boundary (or the end of file).
if r.recovering {
r.recover()
continue
Expand Down

0 comments on commit 0a342ec

Please sign in to comment.