diff --git a/batch.go b/batch.go index 0753218e09..1124ea9cb6 100644 --- a/batch.go +++ b/batch.go @@ -855,16 +855,37 @@ func (b *Batch) NewIter(o *IterOptions) *Iterator { // contents of the batch. func (b *Batch) newInternalIter(o *IterOptions) *batchIter { iter := &batchIter{} - b.initInternalIter(o, iter, b.nextSeqNum()) + b.initInternalIter(o, iter) return iter } -func (b *Batch) initInternalIter(o *IterOptions, iter *batchIter, batchSnapshot uint64) { +func (b *Batch) initInternalIter(o *IterOptions, iter *batchIter) { *iter = batchIter{ - cmp: b.cmp, - batch: b, - iter: b.index.NewIter(o.GetLowerBound(), o.GetUpperBound()), - snapshot: batchSnapshot, + cmp: b.cmp, + batch: b, + iter: b.index.NewIter(o.GetLowerBound(), o.GetUpperBound()), + // NB: We explicitly do not propagate the batch snapshot to the point + // key iterator. Filtering point keys within the batch iterator can + // cause pathological behavior where a batch iterator advances + // significantly farther than necessary filtering many batch keys that + // are not visible at the batch sequence number. Instead, the merging + // iterator enforces bounds. + // + // For example, consider an engine that contains the committed keys + // 'bar' and 'bax', with no keys between them. Consider a batch + // containing keys 1,000 keys within the range [a,z]. All of the + // batch keys were added to the batch after the iterator was + // constructed, so they are not visible to the iterator. A call to + // SeekGE('bax') would seek the LSM iterators and discover the key + // 'bax'. It would also seek the batch iterator, landing on the key + // 'baz' but discover it that it's not visible. The batch iterator would + // next through the rest of the batch's keys, only to discover there are + // no visible keys greater than or equal to 'bax'. + // + // Filtering these batch points within the merging iterator ensures that + // the batch iterator never needs to iterate beyond 'baz', because it + // already found a smaller, visible key 'bax'. + snapshot: base.InternalKeySeqNumMax, } } diff --git a/compaction.go b/compaction.go index d470e89fcc..124ffb8183 100644 --- a/compaction.go +++ b/compaction.go @@ -342,7 +342,7 @@ func rangeKeyCompactionTransform( usedLen := 0 for i >= 0 { start := j - for j < len(s.Keys) && !base.Visible(s.Keys[j].SeqNum(), snapshots[i]) { + for j < len(s.Keys) && !base.Visible(s.Keys[j].SeqNum(), snapshots[i], base.InternalKeySeqNumMax) { // Include j in current partition. j++ } diff --git a/db.go b/db.go index c72d0b6033..1bf2883e31 100644 --- a/db.go +++ b/db.go @@ -1088,7 +1088,7 @@ func (i *Iterator) constructPointIter(memtables flushableList, buf *iterAlloc) { rangeDelIter: newErrorKeyspanIter(ErrNotIndexed), }) } else { - i.batch.initInternalIter(&i.opts, &i.batchPointIter, i.batchSeqNum) + i.batch.initInternalIter(&i.opts, &i.batchPointIter) i.batch.initRangeDelIter(&i.opts, &i.batchRangeDelIter, i.batchSeqNum) // Only include the batch's rangedel iterator if it's non-empty. // This requires some subtle logic in the case a rangedel is later @@ -1148,9 +1148,11 @@ func (i *Iterator) constructPointIter(memtables flushableList, buf *iterAlloc) { } buf.merging.init(&i.opts, &i.stats.InternalStats, i.comparer.Compare, i.comparer.Split, mlevels...) buf.merging.snapshot = i.seqNum + buf.merging.batchSnapshot = i.batchSeqNum buf.merging.elideRangeTombstones = true buf.merging.combinedIterState = &i.lazyCombinedIter.combinedIterState i.pointIter = &buf.merging + i.merging = &buf.merging } // NewBatch returns a new empty write-only batch. Any reads on the batch will diff --git a/get_iter.go b/get_iter.go index c56aad35c9..602b9ca41c 100644 --- a/get_iter.go +++ b/get_iter.go @@ -101,7 +101,7 @@ func (g *getIter) Next() (*InternalKey, []byte) { return nil, nil } if g.equal(g.key, key.UserKey) { - if !key.Visible(g.snapshot) { + if !key.Visible(g.snapshot, base.InternalKeySeqNumMax) { g.iterKey, g.iterValue = g.iter.Next() continue } @@ -125,7 +125,12 @@ func (g *getIter) Next() (*InternalKey, []byte) { return nil, nil } g.iter = g.batch.newInternalIter(nil) - g.rangeDelIter = g.batch.newRangeDelIter(nil, g.batch.nextSeqNum()) + g.rangeDelIter = g.batch.newRangeDelIter( + nil, + // Get always reads the entirety of the batch's history, so no + // batch keys should be filtered. + base.InternalKeySeqNumMax, + ) g.iterKey, g.iterValue = g.iter.SeekGE(g.key, base.SeekGEFlagsNone) g.batch = nil continue diff --git a/internal/base/internal.go b/internal/base/internal.go index 6953de9710..2144596b04 100644 --- a/internal/base/internal.go +++ b/internal/base/internal.go @@ -333,14 +333,28 @@ func (k InternalKey) SeqNum() uint64 { // Visible returns true if the key is visible at the specified snapshot // sequence number. -func (k InternalKey) Visible(snapshot uint64) bool { - return Visible(k.SeqNum(), snapshot) +func (k InternalKey) Visible(snapshot, batchSnapshot uint64) bool { + return Visible(k.SeqNum(), snapshot, batchSnapshot) } // Visible returns true if a key with the provided sequence number is visible at -// the specified snapshot sequence number. -func Visible(seqNum uint64, snapshot uint64) bool { - return seqNum < snapshot || (seqNum&InternalKeySeqNumBatch) != 0 +// the specified snapshot sequence numbers. +func Visible(seqNum uint64, snapshot, batchSnapshot uint64) bool { + // There are two snapshot sequence numbers, one for committed keys and one + // for batch keys. If a seqNum is less than `snapshot`, then seqNum + // corresponds to a committed key that is visible. If seqNum has its batch + // bit set, then seqNum corresponds to an uncommitted batch key. Its + // visible if its snapshot is less than batchSnapshot. + // + // There's one complication. The maximal sequence number + // (`InternalKeySeqNumMax`) is used across Pebble for exclusive sentinel + // keys and other purposes. The maximal sequence number has its batch bit + // set, but it can never be < `batchSnapshot`, since there is no expressible + // larger snapshot. We dictate that the maximal sequence number is always + // visible. + return seqNum < snapshot || + ((seqNum&InternalKeySeqNumBatch) != 0 && seqNum < batchSnapshot) || + seqNum == InternalKeySeqNumMax } // SetKind sets the kind component of the key. diff --git a/internal/keyspan/merging_iter.go b/internal/keyspan/merging_iter.go index bcc63f4130..996de03f8b 100644 --- a/internal/keyspan/merging_iter.go +++ b/internal/keyspan/merging_iter.go @@ -52,7 +52,11 @@ func visibleTransform(snapshot uint64) Transformer { dst.Start, dst.End = s.Start, s.End dst.Keys = dst.Keys[:0] for _, k := range s.Keys { - if base.Visible(k.SeqNum(), snapshot) { + // NB: The InternalKeySeqNumMax value is used for the batch snapshot + // because a batch's visible span keys are filtered when they're + // fragmented. There's no requirement to enforce visibility at + // iteration time. + if base.Visible(k.SeqNum(), snapshot, base.InternalKeySeqNumMax) { dst.Keys = append(dst.Keys, k) } } diff --git a/iterator.go b/iterator.go index ca5f69d6b5..414b7f38cf 100644 --- a/iterator.go +++ b/iterator.go @@ -208,6 +208,11 @@ type Iterator struct { batchPointIter batchIter batchRangeDelIter keyspan.Iter batchRangeKeyIter keyspan.Iter + // merging is a pointer to this iterator's point merging iterator. It + // appears here because key visibility is handled by the merging iterator. + // During SetOptions on an iterator over an indexed batch, this field is + // used to update the merging iterator's batch snapshot. + merging *mergingIter // Keeping the bools here after all the 8 byte aligned fields shrinks the // sizeof this struct by 24 bytes. @@ -1988,11 +1993,13 @@ func (i *Iterator) SetOptions(o *IterOptions) { nextBatchSeqNum := (uint64(len(i.batch.data)) | base.InternalKeySeqNumBatch) if nextBatchSeqNum != i.batchSeqNum { i.batchSeqNum = nextBatchSeqNum + if i.merging != nil { + i.merging.batchSnapshot = nextBatchSeqNum + } if i.pointIter != nil { if i.batch.countRangeDels == 0 { // No range deletions exist in the batch. We only need to // update the batchIter's snapshot. - i.batchPointIter.snapshot = nextBatchSeqNum i.invalidate() } else if i.batchRangeDelIter.Count() == 0 { // When we constructed this iterator, there were no @@ -2012,7 +2019,6 @@ func (i *Iterator) SetOptions(o *IterOptions) { // count of fragmented range deletions, NOT the number of // range deletions written to the batch // [i.batch.countRangeDels]. - i.batchPointIter.snapshot = nextBatchSeqNum i.batch.initRangeDelIter(&i.opts, &i.batchRangeDelIter, nextBatchSeqNum) i.invalidate() } diff --git a/level_checker.go b/level_checker.go index 7c944e40f7..18847d2e0a 100644 --- a/level_checker.go +++ b/level_checker.go @@ -129,7 +129,7 @@ func (m *simpleMergingIter) step() bool { item := &m.heap.items[0] l := &m.levels[item.index] // Sentinels are not relevant for this point checking. - if !item.key.IsExclusiveSentinel() && item.key.Visible(m.snapshot) { + if !item.key.IsExclusiveSentinel() && item.key.Visible(m.snapshot, base.InternalKeySeqNumMax) { m.numPoints++ keyChanged := m.heap.cmp(item.key.UserKey, m.lastKey.UserKey) != 0 if !keyChanged { diff --git a/merging_iter.go b/merging_iter.go index fecb71ca6d..54fdc52fbb 100644 --- a/merging_iter.go +++ b/merging_iter.go @@ -230,17 +230,18 @@ type levelIterBoundaryContext struct { // scenarios and have each step display the current state (i.e. the current // heap and range-del iterator positioning). type mergingIter struct { - logger Logger - split Split - dir int - snapshot uint64 - levels []mergingIterLevel - heap mergingIterHeap - err error - prefix []byte - lower []byte - upper []byte - stats *InternalIteratorStats + logger Logger + split Split + dir int + snapshot uint64 + batchSnapshot uint64 + levels []mergingIterLevel + heap mergingIterHeap + err error + prefix []byte + lower []byte + upper []byte + stats *InternalIteratorStats combinedIterState *combinedIterState @@ -293,6 +294,7 @@ func (m *mergingIter) init( m.upper = opts.UpperBound } m.snapshot = InternalKeySeqNumMax + m.batchSnapshot = InternalKeySeqNumMax m.levels = levels m.heap.cmp = cmp m.split = split @@ -707,7 +709,7 @@ func (m *mergingIter) findNextEntry() (*InternalKey, []byte) { reseeked = true continue } - if item.key.Visible(m.snapshot) && + if item.key.Visible(m.snapshot, m.batchSnapshot) && (!m.levels[item.index].isIgnorableBoundaryKey) && (item.key.Kind() != InternalKeyKindRangeDelete || !m.elideRangeTombstones) { return &item.key, item.value @@ -861,7 +863,7 @@ func (m *mergingIter) findPrevEntry() (*InternalKey, []byte) { m.stats.PointsCoveredByRangeTombstones++ continue } - if item.key.Visible(m.snapshot) && + if item.key.Visible(m.snapshot, m.batchSnapshot) && (!m.levels[item.index].isIgnorableBoundaryKey) && (item.key.Kind() != InternalKeyKindRangeDelete || !m.elideRangeTombstones) { return &item.key, item.value