Skip to content

Commit

Permalink
db: synchronize ingests with flushes of later memtables
Browse files Browse the repository at this point in the history
Pebble's consistency relies on maintaining the sequence number invariant: for
two internal keys k#s1 and k#s2 with the same user key and s1 < s2, k#s2 must
be in a higher level of the LSM.

Previously, it was possible for a sequence number inversion to occur during a
concurrent ingest and batch application writing to the same key. If the ingest
acquired its sequence number before the batch, but the batch application
completed first AND the containing memtable flushed first, the batch's higher
sequenced key could exist below the older conflicting key.

Informs #2196.
  • Loading branch information
jbowens committed May 8, 2023
1 parent 11c9b6d commit c03eda9
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 3 deletions.
44 changes: 41 additions & 3 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,7 @@ func (d *DB) ingest(
// approximate ingest-into-L0 stats when using flushable ingests.
metaFlushableOverlaps := make([]bool, len(meta))
var mem *flushableEntry
var mut *memTable
// asFlushable indicates whether the sstable was ingested as a flushable.
var asFlushable bool
prepare := func(seqNum uint64) {
Expand Down Expand Up @@ -891,8 +892,20 @@ func (d *DB) ingest(
d.opts.Logger.Infof("ingest error reading flushable for log %s: %s", m.logNum, err)
}
}

if mem == nil {
// No overlap with any of the queued flushables.
// No overlap with any of the queued flushables, so no need to queue
// after them.

// New writes with higher sequence numbers may be concurrently
// committed. We must ensure they don't flush before this ingest
// completes. To do that, we ref the mutable memtable as a writer,
// preventing its flushing (and the flushing of all subsequent
// flushables in the queue). Once we've acquired the manifest lock
// to add the ingested sstables to the LSM, we can unref as we're
// guaranteed that the flush won't edit the LSM before this ingest.
mut = d.mu.mem.mutable
mut.writerRef()
return
}
// The ingestion overlaps with some entry in the flushable queue.
Expand All @@ -904,6 +917,15 @@ func (d *DB) ingest(
if mem.flushable == d.mu.mem.mutable {
err = d.makeRoomForWrite(nil)
}
// New writes with higher sequence numbers may be concurrently
// committed. We must ensure they don't flush before this ingest
// completes. To do that, we ref the mutable memtable as a writer,
// preventing its flushing (and the flushing of all subsequent
// flushables in the queue). Once we've acquired the manifest lock
// to add the ingested sstables to the LSM, we can unref as we're
// guaranteed that the flush won't edit the LSM before this ingest.
mut = d.mu.mem.mutable
mut.writerRef()
mem.flushForced = true
d.maybeScheduleFlush()
return
Expand All @@ -918,6 +940,9 @@ func (d *DB) ingest(
apply := func(seqNum uint64) {
if err != nil || asFlushable {
// An error occurred during prepare.
if mut != nil {
mut.writerUnref()
}
return
}

Expand All @@ -928,6 +953,9 @@ func (d *DB) ingest(
if err = ingestUpdateSeqNum(
d.cmp, d.opts.Comparer.FormatKey, seqNum, meta,
); err != nil {
if mut != nil {
mut.writerUnref()
}
return
}

Expand All @@ -939,7 +967,7 @@ func (d *DB) ingest(

// Assign the sstables to the correct level in the LSM and apply the
// version edit.
ve, err = d.ingestApply(jobID, meta, targetLevelFunc)
ve, err = d.ingestApply(jobID, meta, targetLevelFunc, mut)
}

d.commit.AllocateSeqNum(len(meta), prepare, apply)
Expand Down Expand Up @@ -1022,7 +1050,7 @@ type ingestTargetLevelFunc func(
) (int, error)

func (d *DB) ingestApply(
jobID int, meta []*fileMetadata, findTargetLevel ingestTargetLevelFunc,
jobID int, meta []*fileMetadata, findTargetLevel ingestTargetLevelFunc, mut *memTable,
) (*versionEdit, error) {
d.mu.Lock()
defer d.mu.Unlock()
Expand All @@ -1039,6 +1067,16 @@ func (d *DB) ingestApply(
// logAndApply unconditionally releases the manifest lock, but any earlier
// returns must unlock the manifest.
d.mu.versions.logLock()

if mut != nil {
// Unref the mutable memtable to allows its flush to proceed. Now that we've
// acquired the manifest lock, we can be certain that if the mutable
// memtable has received more recent conflicting writes, the flush won't
// beat us to applying to the manifest resulting in sequence number
// inversion.
mut.writerUnref()
}

current := d.mu.versions.currentVersion()
baseLevel := d.mu.versions.picker.getBaseLevel()
iterOps := IterOptions{logger: d.opts.Logger}
Expand Down
135 changes: 135 additions & 0 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
"bytes"
"fmt"
"io"
"math"
"os"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -22,9 +24,11 @@ import (
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/errorfs"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/rangekey"
"github.com/cockroachdb/pebble/internal/testkeys"
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/record"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
"github.com/kr/pretty"
Expand Down Expand Up @@ -1416,6 +1420,137 @@ func TestIngestMemtablePendingOverlap(t *testing.T) {
require.NoError(t, d.Close())
}

// TestIngestMemtableOverlapRace is a regression test for the race described in
// #2196. If an ingest that checks for overlap with the mutable memtable and
// finds no overlap, it must not allow overlapping keys with later sequence
// numbers to be applied to the memtable and the memtable to be flushed before
// the ingest completes.
//
// This test operates by committing the same key concurrently:
// - 1 goroutine repeatedly ingests the same sstable writing the key `foo`
// - n goroutines repeatedly apply batches writing the key `foo` and trigger
// flushes.
//
// After a while, the database is closed and the manifest is verified. Version
// edits should contain new files with monotonically increasing sequence
// numbers, since every flush and every ingest conflicts with one another.
func TestIngestMemtableOverlapRace(t *testing.T) {
mem := vfs.NewMem()
el := MakeLoggingEventListener(DefaultLogger)
d, err := Open("", &Options{
FS: mem,
// Disable automatic compactions to keep the manifest clean; only
// flushes and ingests.
DisableAutomaticCompactions: true,
// Disable the WAL to speed up batch commits.
DisableWAL: true,
EventListener: &el,
// We're endlessly appending to L0 without clearing it, so set a maximal
// stop writes threshold.
L0StopWritesThreshold: math.MaxInt,
// Accumulating more than 1 immutable memtable doesn't help us exercise
// the bug, since the committed keys need to be flushed promptly.
MemTableStopWritesThreshold: 2,
})
require.NoError(t, err)

// Prepare a sstable `ext` deleting foo.
f, err := mem.Create("ext")
require.NoError(t, err)
w := sstable.NewWriter(objstorage.NewFileWritable(f), sstable.WriterOptions{})
require.NoError(t, w.Delete([]byte("foo")))
require.NoError(t, w.Close())

var done atomic.Bool
const numSetters = 2
var wg sync.WaitGroup
wg.Add(numSetters + 1)

untilDone := func(fn func()) {
defer wg.Done()
for !done.Load() {
fn()
}
}

// Ingest in the background.
totalIngests := 0
go untilDone(func() {
filename := fmt.Sprintf("ext%d", totalIngests)
require.NoError(t, mem.Link("ext", filename))
require.NoError(t, d.Ingest([]string{filename}))
totalIngests++
})

// Apply batches and trigger flushes in the background.
wo := &WriteOptions{Sync: false}
var localCommits [numSetters]int
for i := 0; i < numSetters; i++ {
i := i
v := []byte(fmt.Sprintf("v%d", i+1))
go untilDone(func() {
// Commit a batch setting foo=vN.
b := d.NewBatch()
require.NoError(t, b.Set([]byte("foo"), v, nil))
require.NoError(t, b.Commit(wo))
localCommits[i]++
d.AsyncFlush()
})
}
time.Sleep(100 * time.Millisecond)
done.Store(true)
wg.Wait()

var totalCommits int
for i := 0; i < numSetters; i++ {
totalCommits += localCommits[i]
}
m := d.Metrics()
tot := m.Total()
t.Logf("Committed %d batches.", totalCommits)
t.Logf("Flushed %d times.", m.Flush.Count)
t.Logf("Ingested %d sstables.", tot.TablesIngested)
require.NoError(t, d.CheckLevels(nil))
require.NoError(t, d.Close())

// Replay the manifest. Every flush and ingest is a separate version edit.
// Since they all write the same key and compactions are disabled, sequence
// numbers of new files should be monotonically increasing.
//
// This check is necessary because most of these sstables are ingested into
// L0. The L0 sublevels construction will order them by LargestSeqNum, even
// if they're added to L0 out-of-order. The CheckLevels call at the end of
// the test may find that the sublevels are all appropriately ordered, but
// the manifest may reveal they were added to the LSM out-of-order.
dbDesc, err := Peek("", mem)
require.NoError(t, err)
require.True(t, dbDesc.Exists)
f, err = mem.Open(dbDesc.ManifestFilename)
require.NoError(t, err)
defer f.Close()
rr := record.NewReader(f, 0 /* logNum */)
var largest *fileMetadata
for {
r, err := rr.Next()
if err == io.EOF || err == record.ErrInvalidChunk {
break
}
require.NoError(t, err)
var ve manifest.VersionEdit
require.NoError(t, ve.Decode(r))
t.Log(ve.String())
for _, f := range ve.NewFiles {
if largest != nil {
require.Equal(t, 0, f.Level)
if largest.LargestSeqNum > f.Meta.LargestSeqNum {
t.Fatalf("previous largest file %s has sequence number > next file %s", largest, f.Meta)
}
}
largest = f.Meta
}
}
}

type ingestCrashFS struct {
vfs.FS
}
Expand Down
45 changes: 45 additions & 0 deletions internal/manifest/version_edit.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"io"
"sort"
"sync/atomic"
"time"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
Expand Down Expand Up @@ -364,6 +367,48 @@ func (v *VersionEdit) Decode(r io.Reader) error {
return nil
}

// String implements fmt.Stringer for a VersionEdit.
func (v *VersionEdit) String() string {
var buf bytes.Buffer
if v.ComparerName != "" {
fmt.Fprintf(&buf, " comparer: %s", v.ComparerName)
}
if v.MinUnflushedLogNum != 0 {
fmt.Fprintf(&buf, " log-num: %d\n", v.MinUnflushedLogNum)
}
if v.ObsoletePrevLogNum != 0 {
fmt.Fprintf(&buf, " prev-log-num: %d\n", v.ObsoletePrevLogNum)
}
if v.NextFileNum != 0 {
fmt.Fprintf(&buf, " next-file-num: %d\n", v.NextFileNum)
}
if v.LastSeqNum != 0 {
fmt.Fprintf(&buf, " last-seq-num: %d\n", v.LastSeqNum)
}
entries := make([]DeletedFileEntry, 0, len(v.DeletedFiles))
for df := range v.DeletedFiles {
entries = append(entries, df)
}
sort.Slice(entries, func(i, j int) bool {
if entries[i].Level != entries[j].Level {
return entries[i].Level < entries[j].Level
}
return entries[i].FileNum < entries[j].FileNum
})
for _, df := range entries {
fmt.Fprintf(&buf, " deleted: L%d %s\n", df.Level, df.FileNum)
}
for _, nf := range v.NewFiles {
fmt.Fprintf(&buf, " added: L%d %s", nf.Level, nf.Meta.String())
if nf.Meta.CreationTime != 0 {
fmt.Fprintf(&buf, " (%s)",
time.Unix(nf.Meta.CreationTime, 0).UTC().Format(time.RFC3339))
}
fmt.Fprintln(&buf)
}
return buf.String()
}

// Encode encodes an edit to the specified writer.
func (v *VersionEdit) Encode(w io.Writer) error {
e := versionEditEncoder{new(bytes.Buffer)}
Expand Down

0 comments on commit c03eda9

Please sign in to comment.