Skip to content

Commit

Permalink
db: fix bug with restricted checkpoints
Browse files Browse the repository at this point in the history
When a checkpoint is restricted to a set of spans, we append a record
to the checkpoint's manifest removing all the excluded ssts.

We append the record after copying the raw data from the existing
manifest. This is not quite ok: the `record` library works by breaking
up records in chunks and packing chunks into 32KB blocks. Chunks
cannot straddle a 32KB boundary, but this invariant is violated by our
append method. In practice, this only happens if the record we are
appending is big and/or we are very unlucky and the existing manifest
is close to a 32KB boundary.

To fix this: instead of doing a raw data copy of the existing
manifest, we copy at the record level (using a record reader and a
record writer). Then we can add a new record using the same writer.

Informs cockroachdb/cockroach#100935
  • Loading branch information
RaduBerinde committed Apr 18, 2023
1 parent 101876a commit d9fd066
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 8 deletions.
36 changes: 28 additions & 8 deletions checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,29 @@ func (d *DB) writeCheckpointManifest(
}
defer dst.Close()

if _, err := io.Copy(dst, &io.LimitedReader{R: src, N: manifestSize}); err != nil {
return err
// Copy all existing records. We need to copy at the record level in case we
// need to append another record with the excluded files (we cannot simply
// append a record after a raw data copy; see
// https://github.com/cockroachdb/cockroach/issues/100935).
r := record.NewReader(&io.LimitedReader{R: src, N: manifestSize}, manifestFileNum.FileNum())
w := record.NewWriter(dst)
for {
rr, err := r.Next()
if err != nil {
// We stop at an invalid record for consistency with versionSet.load.
if err == io.EOF || record.IsInvalidRecord(err) {
break
}
return err
}

rw, err := w.Next()
if err != nil {
return err
}
if _, err := io.Copy(rw, rr); err != nil {
return err
}
}

if len(excludedFiles) > 0 {
Expand All @@ -382,18 +403,17 @@ func (d *DB) writeCheckpointManifest(
RemovedBackingTables: removeBackingTables,
}

rw := record.NewWriter(dst)
w, err := rw.Next()
rw, err := w.Next()
if err != nil {
return err
}
if err := ve.Encode(w); err != nil {
return err
}
if err := rw.Close(); err != nil {
if err := ve.Encode(rw); err != nil {
return err
}
}
if err := w.Close(); err != nil {
return err
}
return dst.Sync()
}(); err != nil {
return err
Expand Down
47 changes: 47 additions & 0 deletions checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,50 @@ func TestCheckpointFlushWAL(t *testing.T) {
require.NoError(t, d.Close())
}
}

func TestCheckpointManyFiles(t *testing.T) {
if testing.Short() {
t.Skip("skipping because of short flag")
}
const checkpointPath = "checkpoint"
opts := &Options{
FS: vfs.NewMem(),
FormatMajorVersion: FormatNewest,
DisableAutomaticCompactions: true,
}

d, err := Open("", opts)
require.NoError(t, err)
defer d.Close()

for i := 0; i < 4000; i++ {
err := d.Set([]byte(fmt.Sprintf("key%06d", i)), nil, nil)
require.NoError(t, err)
err = d.Flush()
require.NoError(t, err)
}
err = d.Checkpoint(checkpointPath, WithRestrictToSpans([]CheckpointSpan{
{
Start: []byte(fmt.Sprintf("key%06d", 0)),
End: []byte(fmt.Sprintf("key%06d", 100)),
},
}))
require.NoError(t, err)

// Open the checkpoint and iterate through all the keys.
{
d, err := Open(checkpointPath, opts)
require.NoError(t, err)
iter := d.NewIter(nil)
require.True(t, iter.First())
require.NoError(t, iter.Error())
n := 1
for iter.Next() {
n++
}
require.NoError(t, iter.Error())
require.NoError(t, iter.Close())
require.NoError(t, d.Close())
require.Equal(t, 100, n)
}
}
1 change: 1 addition & 0 deletions record/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ func (r *Reader) nextChunk(wantFirst bool) error {
r.begin = r.end + headerSize
r.end = r.begin + int(length)
if r.end > r.n {
// The chunk straddles a 32KB boundary (or the end of file).
if r.recovering {
r.recover()
continue
Expand Down

0 comments on commit d9fd066

Please sign in to comment.