Skip to content

Commit

Permalink
storage: use SetOptions() when reusing Pebble iterators
Browse files Browse the repository at this point in the history
Iterator reuse now relies on `Pebble.SetOptions()` to configure the
reused Pebble iterator. This allows a wider range of iterators to be
reused, since previously only the bounds could be changed on existing
iterators.

Release note: None
  • Loading branch information
erikgrinaker committed May 15, 2022
1 parent 1f1bd33 commit 87eb356
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 297 deletions.
14 changes: 1 addition & 13 deletions pkg/storage/intent_interleaving_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func newIntentInterleavingIterator(reader Reader, opts IterOptions) MVCCIterator
if reader.ConsistentIterators() {
iter = reader.NewMVCCIterator(MVCCKeyIterKind, opts)
} else {
iter = newMVCCIteratorByCloningEngineIter(intentIter, opts)
iter = newPebbleIterator(nil, intentIter.GetRawIter(), opts, StandardDurability)
}

*iiIter = intentInterleavingIter{
Expand Down Expand Up @@ -972,18 +972,6 @@ func (i *intentInterleavingIter) SupportsPrev() bool {
return true
}

// newMVCCIteratorByCloningEngineIter assumes MVCCKeyIterKind and no timestamp
// hints. It uses pebble.Iterator.Clone to ensure that the two iterators see
// the identical engine state.
func newMVCCIteratorByCloningEngineIter(iter EngineIterator, opts IterOptions) MVCCIterator {
pIter := iter.GetRawIter()
it := newPebbleIterator(nil, pIter, opts, StandardDurability)
if iter == nil {
panic("couldn't create a new iterator")
}
return it
}

// unsageMVCCIterator is used in RaceEnabled test builds to randomly inject
// changes to unsafe keys retrieved from MVCCIterators.
type unsafeMVCCIterator struct {
Expand Down
29 changes: 2 additions & 27 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -1968,27 +1968,16 @@ func (p *pebbleReadOnly) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions
return iter
}

if !opts.MinTimestampHint.IsEmpty() {
// MVCCIterators that specify timestamp bounds cannot be cached.
iter := MVCCIterator(newPebbleIterator(p.parent.db, nil, opts, p.durability))
if util.RaceEnabled {
iter = wrapInUnsafeIter(iter)
}
return iter
}

iter := &p.normalIter
if opts.Prefix {
iter = &p.prefixIter
}
if iter.inuse {
return newPebbleIterator(p.parent.db, p.iter, opts, p.durability)
}
// Ensures no timestamp hints etc.
checkOptionsForIterReuse(opts)

if iter.iter != nil {
iter.setBounds(opts.LowerBound, opts.UpperBound)
iter.setOptions(opts, p.durability)
} else {
iter.init(p.parent.db, p.iter, p.iterUnused, opts, p.durability)
if p.iter == nil {
Expand Down Expand Up @@ -2020,11 +2009,9 @@ func (p *pebbleReadOnly) NewEngineIterator(opts IterOptions) EngineIterator {
if iter.inuse {
return newPebbleIterator(p.parent.db, p.iter, opts, p.durability)
}
// Ensures no timestamp hints etc.
checkOptionsForIterReuse(opts)

if iter.iter != nil {
iter.setBounds(opts.LowerBound, opts.UpperBound)
iter.setOptions(opts, p.durability)
} else {
iter.init(p.parent.db, p.iter, p.iterUnused, opts, p.durability)
if p.iter == nil {
Expand All @@ -2039,18 +2026,6 @@ func (p *pebbleReadOnly) NewEngineIterator(opts IterOptions) EngineIterator {
return iter
}

// checkOptionsForIterReuse checks that the options are appropriate for
// iterators that are reusable, and panics if not. This includes disallowing
// any timestamp hints.
func checkOptionsForIterReuse(opts IterOptions) {
if !opts.MinTimestampHint.IsEmpty() || !opts.MaxTimestampHint.IsEmpty() {
panic("iterator with timestamp hints cannot be reused")
}
if !opts.Prefix && len(opts.UpperBound) == 0 && len(opts.LowerBound) == 0 {
panic("iterator must set prefix or upper bound or lower bound")
}
}

// ConsistentIterators implements the Engine interface.
func (p *pebbleReadOnly) ConsistentIterators() bool {
return true
Expand Down
25 changes: 2 additions & 23 deletions pkg/storage/pebble_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,6 @@ func (p *pebbleBatch) MVCCIterate(

// NewMVCCIterator implements the Batch interface.
func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) MVCCIterator {
if !opts.Prefix && len(opts.UpperBound) == 0 && len(opts.LowerBound) == 0 {
panic("iterator must set prefix or upper bound or lower bound")
}

if p.writeOnly {
panic("write-only batch")
}
Expand All @@ -223,15 +219,6 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M
return iter
}

if !opts.MinTimestampHint.IsEmpty() {
// MVCCIterators that specify timestamp bounds cannot be cached.
iter := MVCCIterator(newPebbleIterator(p.batch, nil, opts, StandardDurability))
if util.RaceEnabled {
iter = wrapInUnsafeIter(iter)
}
return iter
}

iter := &p.normalIter
if opts.Prefix {
iter = &p.prefixIter
Expand All @@ -243,11 +230,9 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M
if iter.inuse {
return newPebbleIterator(handle, p.iter, opts, StandardDurability)
}
// Ensures no timestamp hints etc.
checkOptionsForIterReuse(opts)

if iter.iter != nil {
iter.setBounds(opts.LowerBound, opts.UpperBound)
iter.setOptions(opts, StandardDurability)
} else {
iter.init(handle, p.iter, p.iterUnused, opts, StandardDurability)
if p.iter == nil {
Expand All @@ -267,10 +252,6 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M

// NewEngineIterator implements the Batch interface.
func (p *pebbleBatch) NewEngineIterator(opts IterOptions) EngineIterator {
if !opts.Prefix && len(opts.UpperBound) == 0 && len(opts.LowerBound) == 0 {
panic("iterator must set prefix or upper bound or lower bound")
}

if p.writeOnly {
panic("write-only batch")
}
Expand All @@ -286,11 +267,9 @@ func (p *pebbleBatch) NewEngineIterator(opts IterOptions) EngineIterator {
if iter.inuse {
return newPebbleIterator(handle, p.iter, opts, StandardDurability)
}
// Ensures no timestamp hints etc.
checkOptionsForIterReuse(opts)

if iter.iter != nil {
iter.setBounds(opts.LowerBound, opts.UpperBound)
iter.setOptions(opts, StandardDurability)
} else {
iter.init(handle, p.iter, p.iterUnused, opts, StandardDurability)
if p.iter == nil {
Expand Down
167 changes: 50 additions & 117 deletions pkg/storage/pebble_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,9 @@ type pebbleIterator struct {
// Reusable buffer for MVCCKey or EngineKey encoding.
keyBuf []byte
// Buffers for copying iterator bounds to. Note that the underlying memory
// is not GCed upon Close(), to reduce the number of overall allocations. We
// use two slices for each of the bounds since this caller should not change
// the slice holding the current bounds, that the callee (pebble.MVCCIterator)
// is currently using, until after the caller has made the SetBounds call.
lowerBoundBuf [2][]byte
upperBoundBuf [2][]byte
curBuf int
testingSetBoundsListener testingSetBoundsListener
// is not GCed upon Close(), to reduce the number of overall allocations.
lowerBoundBuf []byte
upperBoundBuf []byte

// Set to true to govern whether to call SeekPrefixGE or SeekGE. Skips
// SSTables based on MVCC/Engine key when true.
Expand Down Expand Up @@ -79,10 +74,6 @@ type cloneableIter interface {
Close() error
}

type testingSetBoundsListener interface {
postSetBounds(lower, upper []byte)
}

// Instantiates a new Pebble iterator, or gets one from the pool.
func newPebbleIterator(
handle pebble.Reader,
Expand All @@ -97,13 +88,10 @@ func newPebbleIterator(
}

// init resets this pebbleIterator for use with the specified arguments. The
// current instance could either be a cached pebbleIterator (eg. in
// current instance could either be a cached pebbleIterator (e.g. in
// pebbleBatch), or a newly-instantiated one through newPebbleIterator. The
// underlying *pebble.Iterator is created using iterToClone, if non-nil and
// there are no timestamp hints, else it is created using handle.
//
// **NOTE**: the durability parameter may be ignored if iterToClone is
// non-nil, so make sure that the desired durability is the same.
// underlying *pebble.Iterator is created using iterToClone, if non-nil, else it
// is created using handle.
func (p *pebbleIterator) init(
handle pebble.Reader,
iterToClone cloneableIter,
Expand All @@ -115,39 +103,66 @@ func (p *pebbleIterator) init(
keyBuf: p.keyBuf,
lowerBoundBuf: p.lowerBoundBuf,
upperBoundBuf: p.upperBoundBuf,
prefix: opts.Prefix,
reusable: p.reusable,
}

if iterToClone != nil {
if iterUnused {
// NB: If the iterator was never used (at the time of writing, this means
// that the iterator was created by `PinEngineStateForIterators()`), we
// don't need to clone it.
p.iter = iterToClone.(*pebble.Iterator)
} else {
var err error
if p.iter, err = iterToClone.Clone(); err != nil {
panic(err)
}
}
}

p.setOptions(opts, durability)

if p.iter == nil {
p.iter = handle.NewIter(&p.options)
}
p.inuse = true
}

// setOptions updates the options for a pebbleIterator. If p.iter is non-nil, it
// updates the options on the existing iterator too.
func (p *pebbleIterator) setOptions(opts IterOptions, durability DurabilityRequirement) {
if !opts.Prefix && len(opts.UpperBound) == 0 && len(opts.LowerBound) == 0 {
panic("iterator must set prefix or upper bound or lower bound")
}
if opts.MinTimestampHint.IsSet() && opts.MaxTimestampHint.IsEmpty() {
panic("min timestamp hint set without max timestamp hint")
}

p.options.OnlyReadGuaranteedDurable = false
if durability == GuaranteedDurability {
p.options.OnlyReadGuaranteedDurable = true
// Generate new Pebble iterator options.
p.options = pebble.IterOptions{
OnlyReadGuaranteedDurable: durability == GuaranteedDurability,
}
p.prefix = opts.Prefix

if opts.LowerBound != nil {
// This is the same as
// p.options.LowerBound = EncodeKeyToBuf(p.lowerBoundBuf[0][:0], MVCCKey{Key: opts.LowerBound})
// or EngineKey{Key: opts.LowerBound}.EncodeToBuf(...).
// Since we are encoding keys with an empty version anyway, we can just
// append the NUL byte instead of calling the above encode functions which
// will do the same thing.
p.lowerBoundBuf[0] = append(p.lowerBoundBuf[0][:0], opts.LowerBound...)
p.lowerBoundBuf[0] = append(p.lowerBoundBuf[0], 0x00)
p.options.LowerBound = p.lowerBoundBuf[0]
p.lowerBoundBuf = append(p.lowerBoundBuf[:0], opts.LowerBound...)
p.lowerBoundBuf = append(p.lowerBoundBuf, 0x00)
p.options.LowerBound = p.lowerBoundBuf
}
if opts.UpperBound != nil {
// Same as above.
p.upperBoundBuf[0] = append(p.upperBoundBuf[0][:0], opts.UpperBound...)
p.upperBoundBuf[0] = append(p.upperBoundBuf[0], 0x00)
p.options.UpperBound = p.upperBoundBuf[0]
p.upperBoundBuf = append(p.upperBoundBuf[:0], opts.UpperBound...)
p.upperBoundBuf = append(p.upperBoundBuf, 0x00)
p.options.UpperBound = p.upperBoundBuf
}

doClone := iterToClone != nil
if !opts.MaxTimestampHint.IsEmpty() {
doClone = false
if opts.MaxTimestampHint.IsSet() {
encodedMinTS := string(encodeMVCCTimestamp(opts.MinTimestampHint))
encodedMaxTS := string(encodeMVCCTimestamp(opts.MaxTimestampHint))
p.options.TableFilter = func(userProps map[string]string) bool {
Expand Down Expand Up @@ -179,94 +194,12 @@ func (p *pebbleIterator) init(
uint64(opts.MinTimestampHint.WallTime),
uint64(opts.MaxTimestampHint.WallTime)+1),
}
} else if !opts.MinTimestampHint.IsEmpty() {
panic("min timestamp hint set without max timestamp hint")
}

if doClone {
var err error
if iterUnused {
// NB: If the iterator was never used (at the time of writing, this means
// that the iterator was created by `PinEngineStateForIterators()`), we
// don't need to clone it.
p.iter = iterToClone.(*pebble.Iterator)
} else {
if p.iter, err = iterToClone.Clone(); err != nil {
panic(err)
}
}
p.iter.SetBounds(p.options.LowerBound, p.options.UpperBound)
} else {
if handle == nil {
panic("handle is nil for non-cloning path")
}
p.iter = handle.NewIter(&p.options)
}
if p.iter == nil {
panic("unable to create iterator")
}

p.inuse = true
}

// setBounds is called to change the bounds on a pebbleIterator. Note that
// this is not the first time that bounds will be passed to the underlying
// pebble.Iterator. The existing bounds are in p.options.
func (p *pebbleIterator) setBounds(lowerBound, upperBound roachpb.Key) {
// If the roachpb.Key bound is nil, the corresponding bound for the
// pebble.Iterator will also be nil. p.options contains the current bounds
// known to the pebble.Iterator.
boundsChanged := ((lowerBound == nil) != (p.options.LowerBound == nil)) ||
((upperBound == nil) != (p.options.UpperBound == nil))
if !boundsChanged {
// The nil-ness is the same but the values may be different.
if lowerBound != nil {
// Both must be non-nil. We know that we've appended 0x00 to
// p.options.LowerBound, which must be ignored for this comparison.
if !bytes.Equal(p.options.LowerBound[:len(p.options.LowerBound)-1], lowerBound) {
boundsChanged = true
}
}
// If the preceding if-block has not already set boundsChanged=true, see
// if the upper bound has changed.
if !boundsChanged && upperBound != nil {
// Both must be non-nil. We know that we've appended 0x00 to
// p.options.UpperBound, which must be ignored for this comparison.
if !bytes.Equal(p.options.UpperBound[:len(p.options.UpperBound)-1], upperBound) {
boundsChanged = true
}
}
}
if !boundsChanged {
// This noop optimization helps the underlying pebble.Iterator to optimize
// seeks.
return
}
// Set the bounds to nil, before we selectively change them.
p.options.LowerBound = nil
p.options.UpperBound = nil
p.curBuf = (p.curBuf + 1) % 2
i := p.curBuf
if lowerBound != nil {
// This is the same as
// p.options.LowerBound = EncodeKeyToBuf(p.lowerBoundBuf[i][:0], MVCCKey{Key: lowerBound}) .
// or EngineKey{Key: lowerBound}.EncodeToBuf(...).
// Since we are encoding keys with an empty version anyway, we can just
// append the NUL byte instead of calling the above encode functions which
// will do the same thing.
p.lowerBoundBuf[i] = append(p.lowerBoundBuf[i][:0], lowerBound...)
p.lowerBoundBuf[i] = append(p.lowerBoundBuf[i], 0x00)
p.options.LowerBound = p.lowerBoundBuf[i]
}
if upperBound != nil {
// Same as above.
p.upperBoundBuf[i] = append(p.upperBoundBuf[i][:0], upperBound...)
p.upperBoundBuf[i] = append(p.upperBoundBuf[i], 0x00)
p.options.UpperBound = p.upperBoundBuf[i]
}
p.iter.SetBounds(p.options.LowerBound, p.options.UpperBound)
if p.testingSetBoundsListener != nil {
p.testingSetBoundsListener.postSetBounds(p.options.LowerBound, p.options.UpperBound)
// Set the new iterator options. We unconditionally do so, since Pebble will
// optimize noop changes as needed, and it may affect batch write visibility.
if p.iter != nil {
p.iter.SetOptions(&p.options)
}
}

Expand Down
Loading

0 comments on commit 87eb356

Please sign in to comment.