From d34ce8407d7596fe911b1c69f1ef61ff7b15e55f Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Wed, 26 Apr 2023 11:09:55 -0400 Subject: [PATCH] db: synchronize ingests with flushes of later memtables Pebble's consistency relies on maintaining the sequence number invariant: for two internal keys k#s1 and k#s2 with the same user key and s1 < s2, k#s2 must be in a higher level of the LSM. Previously, it was possible for a sequence number inversion to occur during a concurrent ingest and batch application writing to the same key. If the ingest acquired its sequence number before the batch, but the batch application completed first AND the containing memtable flushed first, the batch's higher sequenced key could exist below the older conflicting key. Informs #2196. --- ingest.go | 56 +++++++++---- ingest_test.go | 135 ++++++++++++++++++++++++++++++ internal/manifest/version_edit.go | 45 ++++++++++ 3 files changed, 222 insertions(+), 14 deletions(-) diff --git a/ingest.go b/ingest.go index 9d07d6811e..6f14dad493 100644 --- a/ingest.go +++ b/ingest.go @@ -644,18 +644,18 @@ func ingestTargetLevel( // // The steps for ingestion are: // -// 1. Allocate file numbers for every sstable being ingested. -// 2. Load the metadata for all sstables being ingest. -// 3. Sort the sstables by smallest key, verifying non overlap. -// 4. Hard link (or copy) the sstables into the DB directory. -// 5. Allocate a sequence number to use for all of the entries in the -// sstables. This is the step where overlap with memtables is -// determined. If there is overlap, we remember the most recent memtable -// that overlaps. -// 6. Update the sequence number in the ingested sstables. -// 7. Wait for the most recent memtable that overlaps to flush (if any). -// 8. Add the ingested sstables to the version (DB.ingestApply). -// 9. Publish the ingestion sequence number. +// 1. Allocate file numbers for every sstable being ingested. +// 2. Load the metadata for all sstables being ingest. +// 3. Sort the sstables by smallest key, verifying non overlap. +// 4. Hard link (or copy) the sstables into the DB directory. +// 5. Allocate a sequence number to use for all of the entries in the +// sstables. This is the step where overlap with memtables is +// determined. If there is overlap, we remember the most recent memtable +// that overlaps. +// 6. Update the sequence number in the ingested sstables. +// 7. Wait for the most recent memtable that overlaps to flush (if any). +// 8. Add the ingested sstables to the version (DB.ingestApply). +// 9. Publish the ingestion sequence number. // // Note that if the mutable memtable overlaps with ingestion, a flush of the // memtable is forced equivalent to DB.Flush. Additionally, subsequent @@ -750,11 +750,23 @@ func (d *DB) ingest( } var mem *flushableEntry + var mut *memTable prepare := func() { // Note that d.commit.mu is held by commitPipeline when calling prepare. d.mu.Lock() defer d.mu.Unlock() + defer func() { + // New writes with higher sequence numbers may be concurrently + // committed. We must ensure they don't flush before this ingest + // completes. To do that, we ref the mutable memtable as a writer, + // preventing its flushing (and the flushing of all subsequent + // flushables in the queue). Once we've acquired the manifest lock + // to add the ingested sstables to the LSM, we can unref as we're + // guaranteed that the flush won't edit the LSM before this ingest. + mut = d.mu.mem.mutable + mut.writerRef() + }() // Check to see if any files overlap with any of the memtables. The queue // is ordered from oldest to newest with the mutable memtable being the @@ -778,6 +790,9 @@ func (d *DB) ingest( apply := func(seqNum uint64) { if err != nil { // An error occurred during prepare. + if mut != nil { + mut.writerUnref() + } return } @@ -788,6 +803,9 @@ func (d *DB) ingest( if err = ingestUpdateSeqNum( d.cmp, d.opts.Comparer.FormatKey, seqNum, meta, ); err != nil { + if mut != nil { + mut.writerUnref() + } return } @@ -799,7 +817,7 @@ func (d *DB) ingest( // Assign the sstables to the correct level in the LSM and apply the // version edit. - ve, err = d.ingestApply(jobID, meta, targetLevelFunc) + ve, err = d.ingestApply(jobID, meta, targetLevelFunc, mut) } d.commit.AllocateSeqNum(len(meta), prepare, apply) @@ -854,7 +872,7 @@ type ingestTargetLevelFunc func( ) (int, error) func (d *DB) ingestApply( - jobID int, meta []*fileMetadata, findTargetLevel ingestTargetLevelFunc, + jobID int, meta []*fileMetadata, findTargetLevel ingestTargetLevelFunc, mut *memTable, ) (*versionEdit, error) { d.mu.Lock() defer d.mu.Unlock() @@ -871,6 +889,16 @@ func (d *DB) ingestApply( // logAndApply unconditionally releases the manifest lock, but any earlier // returns must unlock the manifest. d.mu.versions.logLock() + + if mut != nil { + // Unref the mutable memtable to allows its flush to proceed. Now that we've + // acquired the manifest lock, we can be certain that if the mutable + // memtable has received more recent conflicting writes, the flush won't + // beat us to applying to the manifest resulting in sequence number + // inversion. + mut.writerUnref() + } + current := d.mu.versions.currentVersion() baseLevel := d.mu.versions.picker.getBaseLevel() iterOps := IterOptions{logger: d.opts.Logger} diff --git a/ingest_test.go b/ingest_test.go index f9e13fb688..b96a44022e 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -7,12 +7,15 @@ package pebble import ( "bytes" "fmt" + "io" "io/ioutil" + "math" "os" "sort" "strconv" "strings" "sync" + "sync/atomic" "testing" "time" @@ -22,8 +25,10 @@ import ( "github.com/cockroachdb/pebble/internal/datadriven" "github.com/cockroachdb/pebble/internal/errorfs" "github.com/cockroachdb/pebble/internal/keyspan" + "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/internal/rangekey" "github.com/cockroachdb/pebble/internal/testkeys" + "github.com/cockroachdb/pebble/record" "github.com/cockroachdb/pebble/sstable" "github.com/cockroachdb/pebble/vfs" "github.com/kr/pretty" @@ -1227,6 +1232,136 @@ func TestIngestMemtablePendingOverlap(t *testing.T) { require.NoError(t, d.Close()) } +// TestIngestMemtableOverlapRace is a regression test for the race described in +// #2196. If an ingest that checks for overlap with the mutable memtable and +// finds no overlap, it must not allow overlapping keys with later sequence +// numbers to be applied to the memtable and the memtable to be flushed before +// the ingest completes. +// +// This test operates by committing the same key concurrently: +// - 1 goroutine repeatedly ingests the same sstable writing the key `foo` +// - n goroutines repeatedly apply batches writing the key `foo` and trigger +// flushes. +// +// After a while, the database is closed and the manifest is verified. Version +// edits should contain new files with monotonically increasing sequence +// numbers, since every flush and every ingest conflicts with one another. +func TestIngestMemtableOverlapRace(t *testing.T) { + mem := vfs.NewMem() + d, err := Open("", &Options{ + FS: mem, + // Disable automatic compactions to keep the manifest clean; only + // flushes and ingests. + DisableAutomaticCompactions: true, + // Disable the WAL to speed up batch commits. + DisableWAL: true, + EventListener: MakeLoggingEventListener(DefaultLogger), + // We're endlessly appending to L0 without clearing it, so set a maximal + // stop writes threshold. + L0StopWritesThreshold: math.MaxInt, + // Accumulating more than 1 immutable memtable doesn't help us exercise + // the bug, since the committed keys need to be flushed promptly. + MemTableStopWritesThreshold: 2, + }) + require.NoError(t, err) + + // Prepare a sstable `ext` deleting foo. + f, err := mem.Create("ext") + require.NoError(t, err) + w := sstable.NewWriter(f, sstable.WriterOptions{}) + require.NoError(t, w.Delete([]byte("foo"))) + require.NoError(t, w.Close()) + + var done uint32 + const numSetters = 2 + var wg sync.WaitGroup + wg.Add(numSetters + 1) + + untilDone := func(fn func()) { + defer wg.Done() + for atomic.LoadUint32(&done) == 0 { + fn() + } + } + + // Ingest in the background. + totalIngests := 0 + go untilDone(func() { + filename := fmt.Sprintf("ext%d", totalIngests) + require.NoError(t, mem.Link("ext", filename)) + require.NoError(t, d.Ingest([]string{filename})) + totalIngests++ + }) + + // Apply batches and trigger flushes in the background. + wo := &WriteOptions{Sync: false} + var localCommits [numSetters]int + for i := 0; i < numSetters; i++ { + i := i + v := []byte(fmt.Sprintf("v%d", i+1)) + go untilDone(func() { + // Commit a batch setting foo=vN. + b := d.NewBatch() + require.NoError(t, b.Set([]byte("foo"), v, nil)) + require.NoError(t, b.Commit(wo)) + localCommits[i]++ + d.AsyncFlush() + }) + } + time.Sleep(100 * time.Millisecond) + atomic.StoreUint32(&done, 1) + wg.Wait() + + var totalCommits int + for i := 0; i < numSetters; i++ { + totalCommits += localCommits[i] + } + m := d.Metrics() + tot := m.Total() + t.Logf("Committed %d batches.", totalCommits) + t.Logf("Flushed %d times.", m.Flush.Count) + t.Logf("Ingested %d sstables.", tot.TablesIngested) + require.NoError(t, d.CheckLevels(nil)) + require.NoError(t, d.Close()) + + // Replay the manifest. Every flush and ingest is a separate version edit. + // Since they all write the same key and compactions are disabled, sequence + // numbers of new files should be monotonically increasing. + // + // This check is necessary because most of these sstables are ingested into + // L0. The L0 sublevels construction will order them by LargestSeqNum, even + // if they're added to L0 out-of-order. The CheckLevels call at the end of + // the test may find that the sublevels are all appropriately ordered, but + // the manifest may reveal they were added to the LSM out-of-order. + dbDesc, err := Peek("", mem) + require.NoError(t, err) + require.True(t, dbDesc.Exists) + f, err = mem.Open(dbDesc.ManifestFilename) + require.NoError(t, err) + defer f.Close() + rr := record.NewReader(f, 0 /* logNum */) + var largest *fileMetadata + for { + r, err := rr.Next() + if err == io.EOF || err == record.ErrInvalidChunk { + break + } + require.NoError(t, err) + var ve manifest.VersionEdit + require.NoError(t, ve.Decode(r)) + t.Log(ve.String()) + for _, f := range ve.NewFiles { + if largest != nil { + require.Equal(t, 0, f.Level) + if largest.LargestSeqNum > f.Meta.LargestSeqNum { + t.Fatalf("previous largest file %s has sequence number > next file %s", largest, f.Meta) + } + } + largest = f.Meta + } + } +} + type ingestCrashFS struct { vfs.FS } diff --git a/internal/manifest/version_edit.go b/internal/manifest/version_edit.go index 75509ab171..52903db36a 100644 --- a/internal/manifest/version_edit.go +++ b/internal/manifest/version_edit.go @@ -8,8 +8,11 @@ import ( "bufio" "bytes" "encoding/binary" + "fmt" "io" + "sort" "sync/atomic" + "time" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" @@ -364,6 +367,48 @@ func (v *VersionEdit) Decode(r io.Reader) error { return nil } +// String implements fmt.Stringer for a VersionEdit. +func (v *VersionEdit) String() string { + var buf bytes.Buffer + if v.ComparerName != "" { + fmt.Fprintf(&buf, " comparer: %s", v.ComparerName) + } + if v.MinUnflushedLogNum != 0 { + fmt.Fprintf(&buf, " log-num: %d\n", v.MinUnflushedLogNum) + } + if v.ObsoletePrevLogNum != 0 { + fmt.Fprintf(&buf, " prev-log-num: %d\n", v.ObsoletePrevLogNum) + } + if v.NextFileNum != 0 { + fmt.Fprintf(&buf, " next-file-num: %d\n", v.NextFileNum) + } + if v.LastSeqNum != 0 { + fmt.Fprintf(&buf, " last-seq-num: %d\n", v.LastSeqNum) + } + entries := make([]DeletedFileEntry, 0, len(v.DeletedFiles)) + for df := range v.DeletedFiles { + entries = append(entries, df) + } + sort.Slice(entries, func(i, j int) bool { + if entries[i].Level != entries[j].Level { + return entries[i].Level < entries[j].Level + } + return entries[i].FileNum < entries[j].FileNum + }) + for _, df := range entries { + fmt.Fprintf(&buf, " deleted: L%d %s\n", df.Level, df.FileNum) + } + for _, nf := range v.NewFiles { + fmt.Fprintf(&buf, " added: L%d %s", nf.Level, nf.Meta.String()) + if nf.Meta.CreationTime != 0 { + fmt.Fprintf(&buf, " (%s)", + time.Unix(nf.Meta.CreationTime, 0).UTC().Format(time.RFC3339)) + } + fmt.Fprintln(&buf) + } + return buf.String() +} + // Encode encodes an edit to the specified writer. func (v *VersionEdit) Encode(w io.Writer) error { e := versionEditEncoder{new(bytes.Buffer)}