From 7a4841781cbe335d89521887894fda966523b73a Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Tue, 6 Sep 2022 11:29:41 -0400 Subject: [PATCH] db: enforce batch point visibility in the merging iterator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, visibility of batch point keys was enforced within the batch iterator. This commit moves this visibility enforcement into the merging iterator, alongside the visibility enforcement of committed keys. Span keys (eg, range deletions and range keys) are still subject to visibility filtering when the batch iterator is constructed, because it's necessary to process all these spans to fragment them appropriately. This change addresses the pathological behavior observed in cockroachdb/cockroach#87277: filtering keys within leaf iterators can force the leaf iterators to iterate through more keys than necessary. If many mutations are made to a batch since the iterator was opened, the batch iterator may need to iterate over many entries before finding a visible key. This iteration may extend well beyond the next or previous key in the merging iterator. Using the SeekLT benchmark from cockroachdb/cockroach#87373, relative to master: ``` name old time/op new time/op delta SeekLTNext/useBatch=false-10 185ms ± 1% 185ms ± 2% ~ (p=0.841 n=5+5) SeekLTNext/useBatch=true-10 26.1s ± 1% 0.2s ± 2% -99.34% (p=0.008 n=5+5) ``` And relative to 22.1: ``` name old time/op new time/op delta SeekLTNext/useBatch=false-10 182ms ± 6% 185ms ± 2% ~ (p=0.690 n=5+5) SeekLTNext/useBatch=true-10 2.71s ± 7% 0.17s ± 2% -93.68% (p=0.008 n=5+5) ``` --- batch.go | 33 ++++++++++++++++++++++++++------ compaction.go | 2 +- db.go | 4 +++- get_iter.go | 4 ++-- internal/base/internal.go | 24 ++++++++++++++++++----- internal/keyspan/merging_iter.go | 6 +++++- iterator.go | 10 ++++++++-- level_checker.go | 2 +- merging_iter.go | 28 ++++++++++++++------------- 9 files changed, 81 insertions(+), 32 deletions(-) diff --git a/batch.go b/batch.go index 0753218e09..1124ea9cb6 100644 --- a/batch.go +++ b/batch.go @@ -855,16 +855,37 @@ func (b *Batch) NewIter(o *IterOptions) *Iterator { // contents of the batch. func (b *Batch) newInternalIter(o *IterOptions) *batchIter { iter := &batchIter{} - b.initInternalIter(o, iter, b.nextSeqNum()) + b.initInternalIter(o, iter) return iter } -func (b *Batch) initInternalIter(o *IterOptions, iter *batchIter, batchSnapshot uint64) { +func (b *Batch) initInternalIter(o *IterOptions, iter *batchIter) { *iter = batchIter{ - cmp: b.cmp, - batch: b, - iter: b.index.NewIter(o.GetLowerBound(), o.GetUpperBound()), - snapshot: batchSnapshot, + cmp: b.cmp, + batch: b, + iter: b.index.NewIter(o.GetLowerBound(), o.GetUpperBound()), + // NB: We explicitly do not propagate the batch snapshot to the point + // key iterator. Filtering point keys within the batch iterator can + // cause pathological behavior where a batch iterator advances + // significantly farther than necessary filtering many batch keys that + // are not visible at the batch sequence number. Instead, the merging + // iterator enforces bounds. + // + // For example, consider an engine that contains the committed keys + // 'bar' and 'bax', with no keys between them. Consider a batch + // containing keys 1,000 keys within the range [a,z]. All of the + // batch keys were added to the batch after the iterator was + // constructed, so they are not visible to the iterator. A call to + // SeekGE('bax') would seek the LSM iterators and discover the key + // 'bax'. It would also seek the batch iterator, landing on the key + // 'baz' but discover it that it's not visible. The batch iterator would + // next through the rest of the batch's keys, only to discover there are + // no visible keys greater than or equal to 'bax'. + // + // Filtering these batch points within the merging iterator ensures that + // the batch iterator never needs to iterate beyond 'baz', because it + // already found a smaller, visible key 'bax'. + snapshot: base.InternalKeySeqNumMax, } } diff --git a/compaction.go b/compaction.go index d470e89fcc..124ffb8183 100644 --- a/compaction.go +++ b/compaction.go @@ -342,7 +342,7 @@ func rangeKeyCompactionTransform( usedLen := 0 for i >= 0 { start := j - for j < len(s.Keys) && !base.Visible(s.Keys[j].SeqNum(), snapshots[i]) { + for j < len(s.Keys) && !base.Visible(s.Keys[j].SeqNum(), snapshots[i], base.InternalKeySeqNumMax) { // Include j in current partition. j++ } diff --git a/db.go b/db.go index c72d0b6033..1bf2883e31 100644 --- a/db.go +++ b/db.go @@ -1088,7 +1088,7 @@ func (i *Iterator) constructPointIter(memtables flushableList, buf *iterAlloc) { rangeDelIter: newErrorKeyspanIter(ErrNotIndexed), }) } else { - i.batch.initInternalIter(&i.opts, &i.batchPointIter, i.batchSeqNum) + i.batch.initInternalIter(&i.opts, &i.batchPointIter) i.batch.initRangeDelIter(&i.opts, &i.batchRangeDelIter, i.batchSeqNum) // Only include the batch's rangedel iterator if it's non-empty. // This requires some subtle logic in the case a rangedel is later @@ -1148,9 +1148,11 @@ func (i *Iterator) constructPointIter(memtables flushableList, buf *iterAlloc) { } buf.merging.init(&i.opts, &i.stats.InternalStats, i.comparer.Compare, i.comparer.Split, mlevels...) buf.merging.snapshot = i.seqNum + buf.merging.batchSnapshot = i.batchSeqNum buf.merging.elideRangeTombstones = true buf.merging.combinedIterState = &i.lazyCombinedIter.combinedIterState i.pointIter = &buf.merging + i.merging = &buf.merging } // NewBatch returns a new empty write-only batch. Any reads on the batch will diff --git a/get_iter.go b/get_iter.go index c56aad35c9..ed9b1e1662 100644 --- a/get_iter.go +++ b/get_iter.go @@ -101,7 +101,7 @@ func (g *getIter) Next() (*InternalKey, []byte) { return nil, nil } if g.equal(g.key, key.UserKey) { - if !key.Visible(g.snapshot) { + if !key.Visible(g.snapshot, base.InternalKeySeqNumMax) { g.iterKey, g.iterValue = g.iter.Next() continue } @@ -125,7 +125,7 @@ func (g *getIter) Next() (*InternalKey, []byte) { return nil, nil } g.iter = g.batch.newInternalIter(nil) - g.rangeDelIter = g.batch.newRangeDelIter(nil, g.batch.nextSeqNum()) + g.rangeDelIter = g.batch.newRangeDelIter(nil, base.InternalKeySeqNumMax) g.iterKey, g.iterValue = g.iter.SeekGE(g.key, base.SeekGEFlagsNone) g.batch = nil continue diff --git a/internal/base/internal.go b/internal/base/internal.go index 6953de9710..2144596b04 100644 --- a/internal/base/internal.go +++ b/internal/base/internal.go @@ -333,14 +333,28 @@ func (k InternalKey) SeqNum() uint64 { // Visible returns true if the key is visible at the specified snapshot // sequence number. -func (k InternalKey) Visible(snapshot uint64) bool { - return Visible(k.SeqNum(), snapshot) +func (k InternalKey) Visible(snapshot, batchSnapshot uint64) bool { + return Visible(k.SeqNum(), snapshot, batchSnapshot) } // Visible returns true if a key with the provided sequence number is visible at -// the specified snapshot sequence number. -func Visible(seqNum uint64, snapshot uint64) bool { - return seqNum < snapshot || (seqNum&InternalKeySeqNumBatch) != 0 +// the specified snapshot sequence numbers. +func Visible(seqNum uint64, snapshot, batchSnapshot uint64) bool { + // There are two snapshot sequence numbers, one for committed keys and one + // for batch keys. If a seqNum is less than `snapshot`, then seqNum + // corresponds to a committed key that is visible. If seqNum has its batch + // bit set, then seqNum corresponds to an uncommitted batch key. Its + // visible if its snapshot is less than batchSnapshot. + // + // There's one complication. The maximal sequence number + // (`InternalKeySeqNumMax`) is used across Pebble for exclusive sentinel + // keys and other purposes. The maximal sequence number has its batch bit + // set, but it can never be < `batchSnapshot`, since there is no expressible + // larger snapshot. We dictate that the maximal sequence number is always + // visible. + return seqNum < snapshot || + ((seqNum&InternalKeySeqNumBatch) != 0 && seqNum < batchSnapshot) || + seqNum == InternalKeySeqNumMax } // SetKind sets the kind component of the key. diff --git a/internal/keyspan/merging_iter.go b/internal/keyspan/merging_iter.go index bcc63f4130..996de03f8b 100644 --- a/internal/keyspan/merging_iter.go +++ b/internal/keyspan/merging_iter.go @@ -52,7 +52,11 @@ func visibleTransform(snapshot uint64) Transformer { dst.Start, dst.End = s.Start, s.End dst.Keys = dst.Keys[:0] for _, k := range s.Keys { - if base.Visible(k.SeqNum(), snapshot) { + // NB: The InternalKeySeqNumMax value is used for the batch snapshot + // because a batch's visible span keys are filtered when they're + // fragmented. There's no requirement to enforce visibility at + // iteration time. + if base.Visible(k.SeqNum(), snapshot, base.InternalKeySeqNumMax) { dst.Keys = append(dst.Keys, k) } } diff --git a/iterator.go b/iterator.go index ca5f69d6b5..414b7f38cf 100644 --- a/iterator.go +++ b/iterator.go @@ -208,6 +208,11 @@ type Iterator struct { batchPointIter batchIter batchRangeDelIter keyspan.Iter batchRangeKeyIter keyspan.Iter + // merging is a pointer to this iterator's point merging iterator. It + // appears here because key visibility is handled by the merging iterator. + // During SetOptions on an iterator over an indexed batch, this field is + // used to update the merging iterator's batch snapshot. + merging *mergingIter // Keeping the bools here after all the 8 byte aligned fields shrinks the // sizeof this struct by 24 bytes. @@ -1988,11 +1993,13 @@ func (i *Iterator) SetOptions(o *IterOptions) { nextBatchSeqNum := (uint64(len(i.batch.data)) | base.InternalKeySeqNumBatch) if nextBatchSeqNum != i.batchSeqNum { i.batchSeqNum = nextBatchSeqNum + if i.merging != nil { + i.merging.batchSnapshot = nextBatchSeqNum + } if i.pointIter != nil { if i.batch.countRangeDels == 0 { // No range deletions exist in the batch. We only need to // update the batchIter's snapshot. - i.batchPointIter.snapshot = nextBatchSeqNum i.invalidate() } else if i.batchRangeDelIter.Count() == 0 { // When we constructed this iterator, there were no @@ -2012,7 +2019,6 @@ func (i *Iterator) SetOptions(o *IterOptions) { // count of fragmented range deletions, NOT the number of // range deletions written to the batch // [i.batch.countRangeDels]. - i.batchPointIter.snapshot = nextBatchSeqNum i.batch.initRangeDelIter(&i.opts, &i.batchRangeDelIter, nextBatchSeqNum) i.invalidate() } diff --git a/level_checker.go b/level_checker.go index 7c944e40f7..18847d2e0a 100644 --- a/level_checker.go +++ b/level_checker.go @@ -129,7 +129,7 @@ func (m *simpleMergingIter) step() bool { item := &m.heap.items[0] l := &m.levels[item.index] // Sentinels are not relevant for this point checking. - if !item.key.IsExclusiveSentinel() && item.key.Visible(m.snapshot) { + if !item.key.IsExclusiveSentinel() && item.key.Visible(m.snapshot, base.InternalKeySeqNumMax) { m.numPoints++ keyChanged := m.heap.cmp(item.key.UserKey, m.lastKey.UserKey) != 0 if !keyChanged { diff --git a/merging_iter.go b/merging_iter.go index fecb71ca6d..54fdc52fbb 100644 --- a/merging_iter.go +++ b/merging_iter.go @@ -230,17 +230,18 @@ type levelIterBoundaryContext struct { // scenarios and have each step display the current state (i.e. the current // heap and range-del iterator positioning). type mergingIter struct { - logger Logger - split Split - dir int - snapshot uint64 - levels []mergingIterLevel - heap mergingIterHeap - err error - prefix []byte - lower []byte - upper []byte - stats *InternalIteratorStats + logger Logger + split Split + dir int + snapshot uint64 + batchSnapshot uint64 + levels []mergingIterLevel + heap mergingIterHeap + err error + prefix []byte + lower []byte + upper []byte + stats *InternalIteratorStats combinedIterState *combinedIterState @@ -293,6 +294,7 @@ func (m *mergingIter) init( m.upper = opts.UpperBound } m.snapshot = InternalKeySeqNumMax + m.batchSnapshot = InternalKeySeqNumMax m.levels = levels m.heap.cmp = cmp m.split = split @@ -707,7 +709,7 @@ func (m *mergingIter) findNextEntry() (*InternalKey, []byte) { reseeked = true continue } - if item.key.Visible(m.snapshot) && + if item.key.Visible(m.snapshot, m.batchSnapshot) && (!m.levels[item.index].isIgnorableBoundaryKey) && (item.key.Kind() != InternalKeyKindRangeDelete || !m.elideRangeTombstones) { return &item.key, item.value @@ -861,7 +863,7 @@ func (m *mergingIter) findPrevEntry() (*InternalKey, []byte) { m.stats.PointsCoveredByRangeTombstones++ continue } - if item.key.Visible(m.snapshot) && + if item.key.Visible(m.snapshot, m.batchSnapshot) && (!m.levels[item.index].isIgnorableBoundaryKey) && (item.key.Kind() != InternalKeyKindRangeDelete || !m.elideRangeTombstones) { return &item.key, item.value