From d9fd066bc39f421ba38d5e643281f9fd00c1f5c8 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Tue, 18 Apr 2023 09:44:09 -0700 Subject: [PATCH] db: fix bug with restricted checkpoints When a checkpoint is restricted to a set of spans, we append a record to the checkpoint's manifest removing all the excluded ssts. We append the record after copying the raw data from the existing manifest. This is not quite ok: the `record` library works by breaking up records in chunks and packing chunks into 32KB blocks. Chunks cannot straddle a 32KB boundary, but this invariant is violated by our append method. In practice, this only happens if the record we are appending is big and/or we are very unlucky and the existing manifest is close to a 32KB boundary. To fix this: instead of doing a raw data copy of the existing manifest, we copy at the record level (using a record reader and a record writer). Then we can add a new record using the same writer. Informs https://github.com/cockroachdb/cockroach/issues/100935 --- checkpoint.go | 36 +++++++++++++++++++++++++++-------- checkpoint_test.go | 47 ++++++++++++++++++++++++++++++++++++++++++++++ record/record.go | 1 + 3 files changed, 76 insertions(+), 8 deletions(-) diff --git a/checkpoint.go b/checkpoint.go index c5b9e825d4..594ddc2e5f 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -371,8 +371,29 @@ func (d *DB) writeCheckpointManifest( } defer dst.Close() - if _, err := io.Copy(dst, &io.LimitedReader{R: src, N: manifestSize}); err != nil { - return err + // Copy all existing records. We need to copy at the record level in case we + // need to append another record with the excluded files (we cannot simply + // append a record after a raw data copy; see + // https://github.com/cockroachdb/cockroach/issues/100935). + r := record.NewReader(&io.LimitedReader{R: src, N: manifestSize}, manifestFileNum.FileNum()) + w := record.NewWriter(dst) + for { + rr, err := r.Next() + if err != nil { + // We stop at an invalid record for consistency with versionSet.load. + if err == io.EOF || record.IsInvalidRecord(err) { + break + } + return err + } + + rw, err := w.Next() + if err != nil { + return err + } + if _, err := io.Copy(rw, rr); err != nil { + return err + } } if len(excludedFiles) > 0 { @@ -382,18 +403,17 @@ func (d *DB) writeCheckpointManifest( RemovedBackingTables: removeBackingTables, } - rw := record.NewWriter(dst) - w, err := rw.Next() + rw, err := w.Next() if err != nil { return err } - if err := ve.Encode(w); err != nil { - return err - } - if err := rw.Close(); err != nil { + if err := ve.Encode(rw); err != nil { return err } } + if err := w.Close(); err != nil { + return err + } return dst.Sync() }(); err != nil { return err diff --git a/checkpoint_test.go b/checkpoint_test.go index a7f466b73c..5bb7ec37a0 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -287,3 +287,50 @@ func TestCheckpointFlushWAL(t *testing.T) { require.NoError(t, d.Close()) } } + +func TestCheckpointManyFiles(t *testing.T) { + if testing.Short() { + t.Skip("skipping because of short flag") + } + const checkpointPath = "checkpoint" + opts := &Options{ + FS: vfs.NewMem(), + FormatMajorVersion: FormatNewest, + DisableAutomaticCompactions: true, + } + + d, err := Open("", opts) + require.NoError(t, err) + defer d.Close() + + for i := 0; i < 4000; i++ { + err := d.Set([]byte(fmt.Sprintf("key%06d", i)), nil, nil) + require.NoError(t, err) + err = d.Flush() + require.NoError(t, err) + } + err = d.Checkpoint(checkpointPath, WithRestrictToSpans([]CheckpointSpan{ + { + Start: []byte(fmt.Sprintf("key%06d", 0)), + End: []byte(fmt.Sprintf("key%06d", 100)), + }, + })) + require.NoError(t, err) + + // Open the checkpoint and iterate through all the keys. + { + d, err := Open(checkpointPath, opts) + require.NoError(t, err) + iter := d.NewIter(nil) + require.True(t, iter.First()) + require.NoError(t, iter.Error()) + n := 1 + for iter.Next() { + n++ + } + require.NoError(t, iter.Error()) + require.NoError(t, iter.Close()) + require.NoError(t, d.Close()) + require.Equal(t, 100, n) + } +} diff --git a/record/record.go b/record/record.go index 8c9f58540c..9b42a4c510 100644 --- a/record/record.go +++ b/record/record.go @@ -248,6 +248,7 @@ func (r *Reader) nextChunk(wantFirst bool) error { r.begin = r.end + headerSize r.end = r.begin + int(length) if r.end > r.n { + // The chunk straddles a 32KB boundary (or the end of file). if r.recovering { r.recover() continue