Skip to content

Commit

Permalink
db: enforce batch point visibility in the merging iterator
Browse files Browse the repository at this point in the history
Previously, visibility of batch point keys was enforced within the batch
iterator. This commit moves this visibility enforcement into the merging
iterator, alongside the visibility enforcement of committed keys. Span keys
(eg, range deletions and range keys) are still subject to visibility filtering
when the batch iterator is constructed, because it's necessary to process all
these spans to fragment them appropriately.

This change addresses the pathological behavior observed in
cockroachdb/cockroach#87277: filtering keys within leaf iterators can force the
leaf iterators to iterate through more keys than necessary. If many mutations
are made to a batch since the iterator was opened, the batch iterator may need
to iterate over many entries before finding a visible key. This iteration may
extend well beyond the next or previous key in the merging iterator.

Using the SeekLT benchmark from cockroachdb/cockroach#87373, relative to master:
```
name                          old time/op  new time/op  delta
SeekLTNext/useBatch=false-10   185ms ± 1%   185ms ± 2%     ~     (p=0.841 n=5+5)
SeekLTNext/useBatch=true-10    26.1s ± 1%    0.2s ± 2%  -99.34%  (p=0.008 n=5+5)
```

And relative to 22.1:
```
name                          old time/op  new time/op  delta
SeekLTNext/useBatch=false-10   182ms ± 6%   185ms ± 2%     ~     (p=0.690 n=5+5)
SeekLTNext/useBatch=true-10    2.71s ± 7%   0.17s ± 2%  -93.68%  (p=0.008 n=5+5)
```
  • Loading branch information
jbowens committed Sep 7, 2022
1 parent ade651d commit 675ac87
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 32 deletions.
33 changes: 27 additions & 6 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,16 +855,37 @@ func (b *Batch) NewIter(o *IterOptions) *Iterator {
// contents of the batch.
func (b *Batch) newInternalIter(o *IterOptions) *batchIter {
iter := &batchIter{}
b.initInternalIter(o, iter, b.nextSeqNum())
b.initInternalIter(o, iter)
return iter
}

func (b *Batch) initInternalIter(o *IterOptions, iter *batchIter, batchSnapshot uint64) {
func (b *Batch) initInternalIter(o *IterOptions, iter *batchIter) {
*iter = batchIter{
cmp: b.cmp,
batch: b,
iter: b.index.NewIter(o.GetLowerBound(), o.GetUpperBound()),
snapshot: batchSnapshot,
cmp: b.cmp,
batch: b,
iter: b.index.NewIter(o.GetLowerBound(), o.GetUpperBound()),
// NB: We explicitly do not propagate the batch snapshot to the point
// key iterator. Filtering point keys within the batch iterator can
// cause pathological behavior where a batch iterator advances
// significantly farther than necessary filtering many batch keys that
// are not visible at the batch sequence number. Instead, the merging
// iterator enforces bounds.
//
// For example, consider an engine that contains the committed keys
// 'bar' and 'bax', with no keys between them. Consider a batch
// containing keys 1,000 keys within the range [a,z]. All of the
// batch keys were added to the batch after the iterator was
// constructed, so they are not visible to the iterator. A call to
// SeekGE('bax') would seek the LSM iterators and discover the key
// 'bax'. It would also seek the batch iterator, landing on the key
// 'baz' but discover it that it's not visible. The batch iterator would
// next through the rest of the batch's keys, only to discover there are
// no visible keys greater than or equal to 'bax'.
//
// Filtering these batch points within the merging iterator ensures that
// the batch iterator never needs to iterate beyond 'baz', because it
// already found a smaller, visible key 'bax'.
snapshot: base.InternalKeySeqNumMax,
}
}

Expand Down
2 changes: 1 addition & 1 deletion compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func rangeKeyCompactionTransform(
usedLen := 0
for i >= 0 {
start := j
for j < len(s.Keys) && !base.Visible(s.Keys[j].SeqNum(), snapshots[i]) {
for j < len(s.Keys) && !base.Visible(s.Keys[j].SeqNum(), snapshots[i], base.InternalKeySeqNumMax) {
// Include j in current partition.
j++
}
Expand Down
4 changes: 3 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,7 @@ func (i *Iterator) constructPointIter(memtables flushableList, buf *iterAlloc) {
rangeDelIter: newErrorKeyspanIter(ErrNotIndexed),
})
} else {
i.batch.initInternalIter(&i.opts, &i.batchPointIter, i.batchSeqNum)
i.batch.initInternalIter(&i.opts, &i.batchPointIter)
i.batch.initRangeDelIter(&i.opts, &i.batchRangeDelIter, i.batchSeqNum)
// Only include the batch's rangedel iterator if it's non-empty.
// This requires some subtle logic in the case a rangedel is later
Expand Down Expand Up @@ -1148,9 +1148,11 @@ func (i *Iterator) constructPointIter(memtables flushableList, buf *iterAlloc) {
}
buf.merging.init(&i.opts, &i.stats.InternalStats, i.comparer.Compare, i.comparer.Split, mlevels...)
buf.merging.snapshot = i.seqNum
buf.merging.batchSnapshot = i.batchSeqNum
buf.merging.elideRangeTombstones = true
buf.merging.combinedIterState = &i.lazyCombinedIter.combinedIterState
i.pointIter = &buf.merging
i.merging = &buf.merging
}

// NewBatch returns a new empty write-only batch. Any reads on the batch will
Expand Down
9 changes: 7 additions & 2 deletions get_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (g *getIter) Next() (*InternalKey, []byte) {
return nil, nil
}
if g.equal(g.key, key.UserKey) {
if !key.Visible(g.snapshot) {
if !key.Visible(g.snapshot, base.InternalKeySeqNumMax) {
g.iterKey, g.iterValue = g.iter.Next()
continue
}
Expand All @@ -125,7 +125,12 @@ func (g *getIter) Next() (*InternalKey, []byte) {
return nil, nil
}
g.iter = g.batch.newInternalIter(nil)
g.rangeDelIter = g.batch.newRangeDelIter(nil, g.batch.nextSeqNum())
g.rangeDelIter = g.batch.newRangeDelIter(
nil,
// Get always reads the entirety of the batch's history, so no
// batch keys should be filtered.
base.InternalKeySeqNumMax,
)
g.iterKey, g.iterValue = g.iter.SeekGE(g.key, base.SeekGEFlagsNone)
g.batch = nil
continue
Expand Down
24 changes: 19 additions & 5 deletions internal/base/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,14 +333,28 @@ func (k InternalKey) SeqNum() uint64 {

// Visible returns true if the key is visible at the specified snapshot
// sequence number.
func (k InternalKey) Visible(snapshot uint64) bool {
return Visible(k.SeqNum(), snapshot)
func (k InternalKey) Visible(snapshot, batchSnapshot uint64) bool {
return Visible(k.SeqNum(), snapshot, batchSnapshot)
}

// Visible returns true if a key with the provided sequence number is visible at
// the specified snapshot sequence number.
func Visible(seqNum uint64, snapshot uint64) bool {
return seqNum < snapshot || (seqNum&InternalKeySeqNumBatch) != 0
// the specified snapshot sequence numbers.
func Visible(seqNum uint64, snapshot, batchSnapshot uint64) bool {
// There are two snapshot sequence numbers, one for committed keys and one
// for batch keys. If a seqNum is less than `snapshot`, then seqNum
// corresponds to a committed key that is visible. If seqNum has its batch
// bit set, then seqNum corresponds to an uncommitted batch key. Its
// visible if its snapshot is less than batchSnapshot.
//
// There's one complication. The maximal sequence number
// (`InternalKeySeqNumMax`) is used across Pebble for exclusive sentinel
// keys and other purposes. The maximal sequence number has its batch bit
// set, but it can never be < `batchSnapshot`, since there is no expressible
// larger snapshot. We dictate that the maximal sequence number is always
// visible.
return seqNum < snapshot ||
((seqNum&InternalKeySeqNumBatch) != 0 && seqNum < batchSnapshot) ||
seqNum == InternalKeySeqNumMax
}

// SetKind sets the kind component of the key.
Expand Down
6 changes: 5 additions & 1 deletion internal/keyspan/merging_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ func visibleTransform(snapshot uint64) Transformer {
dst.Start, dst.End = s.Start, s.End
dst.Keys = dst.Keys[:0]
for _, k := range s.Keys {
if base.Visible(k.SeqNum(), snapshot) {
// NB: The InternalKeySeqNumMax value is used for the batch snapshot
// because a batch's visible span keys are filtered when they're
// fragmented. There's no requirement to enforce visibility at
// iteration time.
if base.Visible(k.SeqNum(), snapshot, base.InternalKeySeqNumMax) {
dst.Keys = append(dst.Keys, k)
}
}
Expand Down
10 changes: 8 additions & 2 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ type Iterator struct {
batchPointIter batchIter
batchRangeDelIter keyspan.Iter
batchRangeKeyIter keyspan.Iter
// merging is a pointer to this iterator's point merging iterator. It
// appears here because key visibility is handled by the merging iterator.
// During SetOptions on an iterator over an indexed batch, this field is
// used to update the merging iterator's batch snapshot.
merging *mergingIter

// Keeping the bools here after all the 8 byte aligned fields shrinks the
// sizeof this struct by 24 bytes.
Expand Down Expand Up @@ -1988,11 +1993,13 @@ func (i *Iterator) SetOptions(o *IterOptions) {
nextBatchSeqNum := (uint64(len(i.batch.data)) | base.InternalKeySeqNumBatch)
if nextBatchSeqNum != i.batchSeqNum {
i.batchSeqNum = nextBatchSeqNum
if i.merging != nil {
i.merging.batchSnapshot = nextBatchSeqNum
}
if i.pointIter != nil {
if i.batch.countRangeDels == 0 {
// No range deletions exist in the batch. We only need to
// update the batchIter's snapshot.
i.batchPointIter.snapshot = nextBatchSeqNum
i.invalidate()
} else if i.batchRangeDelIter.Count() == 0 {
// When we constructed this iterator, there were no
Expand All @@ -2012,7 +2019,6 @@ func (i *Iterator) SetOptions(o *IterOptions) {
// count of fragmented range deletions, NOT the number of
// range deletions written to the batch
// [i.batch.countRangeDels].
i.batchPointIter.snapshot = nextBatchSeqNum
i.batch.initRangeDelIter(&i.opts, &i.batchRangeDelIter, nextBatchSeqNum)
i.invalidate()
}
Expand Down
2 changes: 1 addition & 1 deletion level_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (m *simpleMergingIter) step() bool {
item := &m.heap.items[0]
l := &m.levels[item.index]
// Sentinels are not relevant for this point checking.
if !item.key.IsExclusiveSentinel() && item.key.Visible(m.snapshot) {
if !item.key.IsExclusiveSentinel() && item.key.Visible(m.snapshot, base.InternalKeySeqNumMax) {
m.numPoints++
keyChanged := m.heap.cmp(item.key.UserKey, m.lastKey.UserKey) != 0
if !keyChanged {
Expand Down
28 changes: 15 additions & 13 deletions merging_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,17 +230,18 @@ type levelIterBoundaryContext struct {
// scenarios and have each step display the current state (i.e. the current
// heap and range-del iterator positioning).
type mergingIter struct {
logger Logger
split Split
dir int
snapshot uint64
levels []mergingIterLevel
heap mergingIterHeap
err error
prefix []byte
lower []byte
upper []byte
stats *InternalIteratorStats
logger Logger
split Split
dir int
snapshot uint64
batchSnapshot uint64
levels []mergingIterLevel
heap mergingIterHeap
err error
prefix []byte
lower []byte
upper []byte
stats *InternalIteratorStats

combinedIterState *combinedIterState

Expand Down Expand Up @@ -293,6 +294,7 @@ func (m *mergingIter) init(
m.upper = opts.UpperBound
}
m.snapshot = InternalKeySeqNumMax
m.batchSnapshot = InternalKeySeqNumMax
m.levels = levels
m.heap.cmp = cmp
m.split = split
Expand Down Expand Up @@ -707,7 +709,7 @@ func (m *mergingIter) findNextEntry() (*InternalKey, []byte) {
reseeked = true
continue
}
if item.key.Visible(m.snapshot) &&
if item.key.Visible(m.snapshot, m.batchSnapshot) &&
(!m.levels[item.index].isIgnorableBoundaryKey) &&
(item.key.Kind() != InternalKeyKindRangeDelete || !m.elideRangeTombstones) {
return &item.key, item.value
Expand Down Expand Up @@ -861,7 +863,7 @@ func (m *mergingIter) findPrevEntry() (*InternalKey, []byte) {
m.stats.PointsCoveredByRangeTombstones++
continue
}
if item.key.Visible(m.snapshot) &&
if item.key.Visible(m.snapshot, m.batchSnapshot) &&
(!m.levels[item.index].isIgnorableBoundaryKey) &&
(item.key.Kind() != InternalKeyKindRangeDelete || !m.elideRangeTombstones) {
return &item.key, item.value
Expand Down

0 comments on commit 675ac87

Please sign in to comment.