Skip to content

Commit

Permalink
db: synchronize ingests with flushes of later memtables
Browse files Browse the repository at this point in the history
Pebble's consistency relies on maintaining the sequence number invariant: for
two internal keys k#s1 and k#s2 with the same user key and s1 < s2, k#s2 must
be in a higher level of the LSM.

Previously, it was possible for a sequence number inversion to occur during a
concurrent ingest and batch application writing to the same key. If the ingest
acquired its sequence number before the batch, but the batch application
completed first AND the containing memtable flushed first, the batch's higher
sequenced key could exist below the older conflicting key.

Informs #2196.
  • Loading branch information
jbowens committed Apr 28, 2023
1 parent bd56caf commit d34ce84
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 14 deletions.
56 changes: 42 additions & 14 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,18 +644,18 @@ func ingestTargetLevel(
//
// The steps for ingestion are:
//
// 1. Allocate file numbers for every sstable being ingested.
// 2. Load the metadata for all sstables being ingest.
// 3. Sort the sstables by smallest key, verifying non overlap.
// 4. Hard link (or copy) the sstables into the DB directory.
// 5. Allocate a sequence number to use for all of the entries in the
// sstables. This is the step where overlap with memtables is
// determined. If there is overlap, we remember the most recent memtable
// that overlaps.
// 6. Update the sequence number in the ingested sstables.
// 7. Wait for the most recent memtable that overlaps to flush (if any).
// 8. Add the ingested sstables to the version (DB.ingestApply).
// 9. Publish the ingestion sequence number.
// 1. Allocate file numbers for every sstable being ingested.
// 2. Load the metadata for all sstables being ingest.
// 3. Sort the sstables by smallest key, verifying non overlap.
// 4. Hard link (or copy) the sstables into the DB directory.
// 5. Allocate a sequence number to use for all of the entries in the
// sstables. This is the step where overlap with memtables is
// determined. If there is overlap, we remember the most recent memtable
// that overlaps.
// 6. Update the sequence number in the ingested sstables.
// 7. Wait for the most recent memtable that overlaps to flush (if any).
// 8. Add the ingested sstables to the version (DB.ingestApply).
// 9. Publish the ingestion sequence number.
//
// Note that if the mutable memtable overlaps with ingestion, a flush of the
// memtable is forced equivalent to DB.Flush. Additionally, subsequent
Expand Down Expand Up @@ -750,11 +750,23 @@ func (d *DB) ingest(
}

var mem *flushableEntry
var mut *memTable
prepare := func() {
// Note that d.commit.mu is held by commitPipeline when calling prepare.

d.mu.Lock()
defer d.mu.Unlock()
defer func() {
// New writes with higher sequence numbers may be concurrently
// committed. We must ensure they don't flush before this ingest
// completes. To do that, we ref the mutable memtable as a writer,
// preventing its flushing (and the flushing of all subsequent
// flushables in the queue). Once we've acquired the manifest lock
// to add the ingested sstables to the LSM, we can unref as we're
// guaranteed that the flush won't edit the LSM before this ingest.
mut = d.mu.mem.mutable
mut.writerRef()
}()

// Check to see if any files overlap with any of the memtables. The queue
// is ordered from oldest to newest with the mutable memtable being the
Expand All @@ -778,6 +790,9 @@ func (d *DB) ingest(
apply := func(seqNum uint64) {
if err != nil {
// An error occurred during prepare.
if mut != nil {
mut.writerUnref()
}
return
}

Expand All @@ -788,6 +803,9 @@ func (d *DB) ingest(
if err = ingestUpdateSeqNum(
d.cmp, d.opts.Comparer.FormatKey, seqNum, meta,
); err != nil {
if mut != nil {
mut.writerUnref()
}
return
}

Expand All @@ -799,7 +817,7 @@ func (d *DB) ingest(

// Assign the sstables to the correct level in the LSM and apply the
// version edit.
ve, err = d.ingestApply(jobID, meta, targetLevelFunc)
ve, err = d.ingestApply(jobID, meta, targetLevelFunc, mut)
}

d.commit.AllocateSeqNum(len(meta), prepare, apply)
Expand Down Expand Up @@ -854,7 +872,7 @@ type ingestTargetLevelFunc func(
) (int, error)

func (d *DB) ingestApply(
jobID int, meta []*fileMetadata, findTargetLevel ingestTargetLevelFunc,
jobID int, meta []*fileMetadata, findTargetLevel ingestTargetLevelFunc, mut *memTable,
) (*versionEdit, error) {
d.mu.Lock()
defer d.mu.Unlock()
Expand All @@ -871,6 +889,16 @@ func (d *DB) ingestApply(
// logAndApply unconditionally releases the manifest lock, but any earlier
// returns must unlock the manifest.
d.mu.versions.logLock()

if mut != nil {
// Unref the mutable memtable to allows its flush to proceed. Now that we've
// acquired the manifest lock, we can be certain that if the mutable
// memtable has received more recent conflicting writes, the flush won't
// beat us to applying to the manifest resulting in sequence number
// inversion.
mut.writerUnref()
}

current := d.mu.versions.currentVersion()
baseLevel := d.mu.versions.picker.getBaseLevel()
iterOps := IterOptions{logger: d.opts.Logger}
Expand Down
135 changes: 135 additions & 0 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ package pebble
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"math"
"os"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -22,8 +25,10 @@ import (
"github.com/cockroachdb/pebble/internal/datadriven"
"github.com/cockroachdb/pebble/internal/errorfs"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/rangekey"
"github.com/cockroachdb/pebble/internal/testkeys"
"github.com/cockroachdb/pebble/record"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
"github.com/kr/pretty"
Expand Down Expand Up @@ -1227,6 +1232,136 @@ func TestIngestMemtablePendingOverlap(t *testing.T) {
require.NoError(t, d.Close())
}

// TestIngestMemtableOverlapRace is a regression test for the race described in
// #2196. If an ingest that checks for overlap with the mutable memtable and
// finds no overlap, it must not allow overlapping keys with later sequence
// numbers to be applied to the memtable and the memtable to be flushed before
// the ingest completes.
//
// This test operates by committing the same key concurrently:
// - 1 goroutine repeatedly ingests the same sstable writing the key `foo`
// - n goroutines repeatedly apply batches writing the key `foo` and trigger
// flushes.
//
// After a while, the database is closed and the manifest is verified. Version
// edits should contain new files with monotonically increasing sequence
// numbers, since every flush and every ingest conflicts with one another.
func TestIngestMemtableOverlapRace(t *testing.T) {
mem := vfs.NewMem()
d, err := Open("", &Options{
FS: mem,
// Disable automatic compactions to keep the manifest clean; only
// flushes and ingests.
DisableAutomaticCompactions: true,
// Disable the WAL to speed up batch commits.
DisableWAL: true,
EventListener: MakeLoggingEventListener(DefaultLogger),
// We're endlessly appending to L0 without clearing it, so set a maximal
// stop writes threshold.
L0StopWritesThreshold: math.MaxInt,
// Accumulating more than 1 immutable memtable doesn't help us exercise
// the bug, since the committed keys need to be flushed promptly.
MemTableStopWritesThreshold: 2,
})
require.NoError(t, err)

// Prepare a sstable `ext` deleting foo.
f, err := mem.Create("ext")
require.NoError(t, err)
w := sstable.NewWriter(f, sstable.WriterOptions{})
require.NoError(t, w.Delete([]byte("foo")))
require.NoError(t, w.Close())

var done uint32
const numSetters = 2
var wg sync.WaitGroup
wg.Add(numSetters + 1)

untilDone := func(fn func()) {
defer wg.Done()
for atomic.LoadUint32(&done) == 0 {
fn()
}
}

// Ingest in the background.
totalIngests := 0
go untilDone(func() {
filename := fmt.Sprintf("ext%d", totalIngests)
require.NoError(t, mem.Link("ext", filename))
require.NoError(t, d.Ingest([]string{filename}))
totalIngests++
})

// Apply batches and trigger flushes in the background.
wo := &WriteOptions{Sync: false}
var localCommits [numSetters]int
for i := 0; i < numSetters; i++ {
i := i
v := []byte(fmt.Sprintf("v%d", i+1))
go untilDone(func() {
// Commit a batch setting foo=vN.
b := d.NewBatch()
require.NoError(t, b.Set([]byte("foo"), v, nil))
require.NoError(t, b.Commit(wo))
localCommits[i]++
d.AsyncFlush()
})
}
time.Sleep(100 * time.Millisecond)
atomic.StoreUint32(&done, 1)
wg.Wait()

var totalCommits int
for i := 0; i < numSetters; i++ {
totalCommits += localCommits[i]
}
m := d.Metrics()
tot := m.Total()
t.Logf("Committed %d batches.", totalCommits)
t.Logf("Flushed %d times.", m.Flush.Count)
t.Logf("Ingested %d sstables.", tot.TablesIngested)
require.NoError(t, d.CheckLevels(nil))
require.NoError(t, d.Close())

// Replay the manifest. Every flush and ingest is a separate version edit.
// Since they all write the same key and compactions are disabled, sequence
// numbers of new files should be monotonically increasing.
//
// This check is necessary because most of these sstables are ingested into
// L0. The L0 sublevels construction will order them by LargestSeqNum, even
// if they're added to L0 out-of-order. The CheckLevels call at the end of
// the test may find that the sublevels are all appropriately ordered, but
// the manifest may reveal they were added to the LSM out-of-order.
dbDesc, err := Peek("", mem)
require.NoError(t, err)
require.True(t, dbDesc.Exists)
f, err = mem.Open(dbDesc.ManifestFilename)
require.NoError(t, err)
defer f.Close()
rr := record.NewReader(f, 0 /* logNum */)
var largest *fileMetadata
for {
r, err := rr.Next()
if err == io.EOF || err == record.ErrInvalidChunk {
break
}
require.NoError(t, err)
var ve manifest.VersionEdit
require.NoError(t, ve.Decode(r))
t.Log(ve.String())
for _, f := range ve.NewFiles {
if largest != nil {
require.Equal(t, 0, f.Level)
if largest.LargestSeqNum > f.Meta.LargestSeqNum {
t.Fatalf("previous largest file %s has sequence number > next file %s", largest, f.Meta)
}
}
largest = f.Meta
}
}
}

type ingestCrashFS struct {
vfs.FS
}
Expand Down
45 changes: 45 additions & 0 deletions internal/manifest/version_edit.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"io"
"sort"
"sync/atomic"
"time"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
Expand Down Expand Up @@ -364,6 +367,48 @@ func (v *VersionEdit) Decode(r io.Reader) error {
return nil
}

// String implements fmt.Stringer for a VersionEdit.
func (v *VersionEdit) String() string {
var buf bytes.Buffer
if v.ComparerName != "" {
fmt.Fprintf(&buf, " comparer: %s", v.ComparerName)
}
if v.MinUnflushedLogNum != 0 {
fmt.Fprintf(&buf, " log-num: %d\n", v.MinUnflushedLogNum)
}
if v.ObsoletePrevLogNum != 0 {
fmt.Fprintf(&buf, " prev-log-num: %d\n", v.ObsoletePrevLogNum)
}
if v.NextFileNum != 0 {
fmt.Fprintf(&buf, " next-file-num: %d\n", v.NextFileNum)
}
if v.LastSeqNum != 0 {
fmt.Fprintf(&buf, " last-seq-num: %d\n", v.LastSeqNum)
}
entries := make([]DeletedFileEntry, 0, len(v.DeletedFiles))
for df := range v.DeletedFiles {
entries = append(entries, df)
}
sort.Slice(entries, func(i, j int) bool {
if entries[i].Level != entries[j].Level {
return entries[i].Level < entries[j].Level
}
return entries[i].FileNum < entries[j].FileNum
})
for _, df := range entries {
fmt.Fprintf(&buf, " deleted: L%d %s\n", df.Level, df.FileNum)
}
for _, nf := range v.NewFiles {
fmt.Fprintf(&buf, " added: L%d %s", nf.Level, nf.Meta.String())
if nf.Meta.CreationTime != 0 {
fmt.Fprintf(&buf, " (%s)",
time.Unix(nf.Meta.CreationTime, 0).UTC().Format(time.RFC3339))
}
fmt.Fprintln(&buf)
}
return buf.String()
}

// Encode encodes an edit to the specified writer.
func (v *VersionEdit) Encode(w io.Writer) error {
e := versionEditEncoder{new(bytes.Buffer)}
Expand Down

0 comments on commit d34ce84

Please sign in to comment.