Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: use SetOptions() when reusing Pebble iterators #81242

Merged
merged 1 commit into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions pkg/storage/intent_interleaving_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func newIntentInterleavingIterator(reader Reader, opts IterOptions) MVCCIterator
if reader.ConsistentIterators() {
iter = reader.NewMVCCIterator(MVCCKeyIterKind, opts)
} else {
iter = newMVCCIteratorByCloningEngineIter(intentIter, opts)
iter = newPebbleIterator(nil, intentIter.GetRawIter(), opts, StandardDurability)
}

*iiIter = intentInterleavingIter{
Expand Down Expand Up @@ -972,18 +972,6 @@ func (i *intentInterleavingIter) SupportsPrev() bool {
return true
}

// newMVCCIteratorByCloningEngineIter assumes MVCCKeyIterKind and no timestamp
// hints. It uses pebble.Iterator.Clone to ensure that the two iterators see
// the identical engine state.
func newMVCCIteratorByCloningEngineIter(iter EngineIterator, opts IterOptions) MVCCIterator {
pIter := iter.GetRawIter()
it := newPebbleIterator(nil, pIter, opts, StandardDurability)
if iter == nil {
panic("couldn't create a new iterator")
}
return it
}

// unsageMVCCIterator is used in RaceEnabled test builds to randomly inject
// changes to unsafe keys retrieved from MVCCIterators.
type unsafeMVCCIterator struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3401,7 +3401,7 @@ func MVCCResolveWriteIntentRange(
mvccIter = rw.NewMVCCIterator(MVCCKeyIterKind, iterOpts)
} else {
// For correctness, we need mvccIter to be consistent with engineIter.
mvccIter = newMVCCIteratorByCloningEngineIter(engineIter, iterOpts)
mvccIter = newPebbleIterator(nil, engineIter.GetRawIter(), iterOpts, StandardDurability)
}
iterAndBuf := GetBufUsingIter(mvccIter)
defer func() {
Expand Down
29 changes: 2 additions & 27 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -2003,27 +2003,16 @@ func (p *pebbleReadOnly) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions
return iter
}

if !opts.MinTimestampHint.IsEmpty() {
// MVCCIterators that specify timestamp bounds cannot be cached.
iter := MVCCIterator(newPebbleIterator(p.parent.db, nil, opts, p.durability))
if util.RaceEnabled {
iter = wrapInUnsafeIter(iter)
}
return iter
}

iter := &p.normalIter
if opts.Prefix {
iter = &p.prefixIter
}
if iter.inuse {
return newPebbleIterator(p.parent.db, p.iter, opts, p.durability)
}
// Ensures no timestamp hints etc.
checkOptionsForIterReuse(opts)

if iter.iter != nil {
iter.setBounds(opts.LowerBound, opts.UpperBound)
iter.setOptions(opts, p.durability)
} else {
iter.init(p.parent.db, p.iter, p.iterUnused, opts, p.durability)
if p.iter == nil {
Expand Down Expand Up @@ -2055,11 +2044,9 @@ func (p *pebbleReadOnly) NewEngineIterator(opts IterOptions) EngineIterator {
if iter.inuse {
return newPebbleIterator(p.parent.db, p.iter, opts, p.durability)
}
// Ensures no timestamp hints etc.
checkOptionsForIterReuse(opts)

if iter.iter != nil {
iter.setBounds(opts.LowerBound, opts.UpperBound)
iter.setOptions(opts, p.durability)
} else {
iter.init(p.parent.db, p.iter, p.iterUnused, opts, p.durability)
if p.iter == nil {
Expand All @@ -2074,18 +2061,6 @@ func (p *pebbleReadOnly) NewEngineIterator(opts IterOptions) EngineIterator {
return iter
}

// checkOptionsForIterReuse checks that the options are appropriate for
// iterators that are reusable, and panics if not. This includes disallowing
// any timestamp hints.
func checkOptionsForIterReuse(opts IterOptions) {
if !opts.MinTimestampHint.IsEmpty() || !opts.MaxTimestampHint.IsEmpty() {
panic("iterator with timestamp hints cannot be reused")
}
if !opts.Prefix && len(opts.UpperBound) == 0 && len(opts.LowerBound) == 0 {
panic("iterator must set prefix or upper bound or lower bound")
}
}

// ConsistentIterators implements the Engine interface.
func (p *pebbleReadOnly) ConsistentIterators() bool {
return true
Expand Down
25 changes: 2 additions & 23 deletions pkg/storage/pebble_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,6 @@ func (p *pebbleBatch) MVCCIterate(

// NewMVCCIterator implements the Batch interface.
func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) MVCCIterator {
if !opts.Prefix && len(opts.UpperBound) == 0 && len(opts.LowerBound) == 0 {
panic("iterator must set prefix or upper bound or lower bound")
}

if p.writeOnly {
panic("write-only batch")
}
Expand All @@ -223,15 +219,6 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M
return iter
}

if !opts.MinTimestampHint.IsEmpty() {
// MVCCIterators that specify timestamp bounds cannot be cached.
iter := MVCCIterator(newPebbleIterator(p.batch, nil, opts, StandardDurability))
if util.RaceEnabled {
iter = wrapInUnsafeIter(iter)
}
return iter
}

iter := &p.normalIter
if opts.Prefix {
iter = &p.prefixIter
Expand All @@ -243,11 +230,9 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M
if iter.inuse {
return newPebbleIterator(handle, p.iter, opts, StandardDurability)
}
// Ensures no timestamp hints etc.
checkOptionsForIterReuse(opts)

if iter.iter != nil {
iter.setBounds(opts.LowerBound, opts.UpperBound)
iter.setOptions(opts, StandardDurability)
} else {
iter.init(handle, p.iter, p.iterUnused, opts, StandardDurability)
if p.iter == nil {
Expand All @@ -267,10 +252,6 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M

// NewEngineIterator implements the Batch interface.
func (p *pebbleBatch) NewEngineIterator(opts IterOptions) EngineIterator {
if !opts.Prefix && len(opts.UpperBound) == 0 && len(opts.LowerBound) == 0 {
panic("iterator must set prefix or upper bound or lower bound")
}

if p.writeOnly {
panic("write-only batch")
}
Expand All @@ -286,11 +267,9 @@ func (p *pebbleBatch) NewEngineIterator(opts IterOptions) EngineIterator {
if iter.inuse {
return newPebbleIterator(handle, p.iter, opts, StandardDurability)
}
// Ensures no timestamp hints etc.
checkOptionsForIterReuse(opts)

if iter.iter != nil {
iter.setBounds(opts.LowerBound, opts.UpperBound)
iter.setOptions(opts, StandardDurability)
} else {
iter.init(handle, p.iter, p.iterUnused, opts, StandardDurability)
if p.iter == nil {
Expand Down
167 changes: 50 additions & 117 deletions pkg/storage/pebble_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,9 @@ type pebbleIterator struct {
// Reusable buffer for MVCCKey or EngineKey encoding.
keyBuf []byte
// Buffers for copying iterator bounds to. Note that the underlying memory
// is not GCed upon Close(), to reduce the number of overall allocations. We
// use two slices for each of the bounds since this caller should not change
// the slice holding the current bounds, that the callee (pebble.MVCCIterator)
// is currently using, until after the caller has made the SetBounds call.
lowerBoundBuf [2][]byte
upperBoundBuf [2][]byte
curBuf int
testingSetBoundsListener testingSetBoundsListener
// is not GCed upon Close(), to reduce the number of overall allocations.
lowerBoundBuf []byte
upperBoundBuf []byte

// Set to true to govern whether to call SeekPrefixGE or SeekGE. Skips
// SSTables based on MVCC/Engine key when true.
Expand Down Expand Up @@ -79,10 +74,6 @@ type cloneableIter interface {
Close() error
}

type testingSetBoundsListener interface {
postSetBounds(lower, upper []byte)
}

// Instantiates a new Pebble iterator, or gets one from the pool.
func newPebbleIterator(
handle pebble.Reader,
Expand All @@ -97,13 +88,10 @@ func newPebbleIterator(
}

// init resets this pebbleIterator for use with the specified arguments. The
// current instance could either be a cached pebbleIterator (eg. in
// current instance could either be a cached pebbleIterator (e.g. in
// pebbleBatch), or a newly-instantiated one through newPebbleIterator. The
// underlying *pebble.Iterator is created using iterToClone, if non-nil and
// there are no timestamp hints, else it is created using handle.
//
// **NOTE**: the durability parameter may be ignored if iterToClone is
// non-nil, so make sure that the desired durability is the same.
// underlying *pebble.Iterator is created using iterToClone, if non-nil, else it
// is created using handle.
func (p *pebbleIterator) init(
handle pebble.Reader,
iterToClone cloneableIter,
Expand All @@ -115,39 +103,66 @@ func (p *pebbleIterator) init(
keyBuf: p.keyBuf,
lowerBoundBuf: p.lowerBoundBuf,
upperBoundBuf: p.upperBoundBuf,
prefix: opts.Prefix,
reusable: p.reusable,
}

if iterToClone != nil {
if iterUnused {
// NB: If the iterator was never used (at the time of writing, this means
// that the iterator was created by `PinEngineStateForIterators()`), we
// don't need to clone it.
p.iter = iterToClone.(*pebble.Iterator)
} else {
var err error
if p.iter, err = iterToClone.Clone(); err != nil {
panic(err)
}
}
}

p.setOptions(opts, durability)

if p.iter == nil {
p.iter = handle.NewIter(&p.options)
}
p.inuse = true
}

// setOptions updates the options for a pebbleIterator. If p.iter is non-nil, it
// updates the options on the existing iterator too.
func (p *pebbleIterator) setOptions(opts IterOptions, durability DurabilityRequirement) {
if !opts.Prefix && len(opts.UpperBound) == 0 && len(opts.LowerBound) == 0 {
panic("iterator must set prefix or upper bound or lower bound")
}
if opts.MinTimestampHint.IsSet() && opts.MaxTimestampHint.IsEmpty() {
panic("min timestamp hint set without max timestamp hint")
}

p.options.OnlyReadGuaranteedDurable = false
if durability == GuaranteedDurability {
p.options.OnlyReadGuaranteedDurable = true
// Generate new Pebble iterator options.
p.options = pebble.IterOptions{
OnlyReadGuaranteedDurable: durability == GuaranteedDurability,
}
p.prefix = opts.Prefix

if opts.LowerBound != nil {
// This is the same as
// p.options.LowerBound = EncodeKeyToBuf(p.lowerBoundBuf[0][:0], MVCCKey{Key: opts.LowerBound})
// or EngineKey{Key: opts.LowerBound}.EncodeToBuf(...).
// Since we are encoding keys with an empty version anyway, we can just
// append the NUL byte instead of calling the above encode functions which
// will do the same thing.
p.lowerBoundBuf[0] = append(p.lowerBoundBuf[0][:0], opts.LowerBound...)
p.lowerBoundBuf[0] = append(p.lowerBoundBuf[0], 0x00)
p.options.LowerBound = p.lowerBoundBuf[0]
p.lowerBoundBuf = append(p.lowerBoundBuf[:0], opts.LowerBound...)
p.lowerBoundBuf = append(p.lowerBoundBuf, 0x00)
p.options.LowerBound = p.lowerBoundBuf
}
if opts.UpperBound != nil {
// Same as above.
p.upperBoundBuf[0] = append(p.upperBoundBuf[0][:0], opts.UpperBound...)
p.upperBoundBuf[0] = append(p.upperBoundBuf[0], 0x00)
p.options.UpperBound = p.upperBoundBuf[0]
p.upperBoundBuf = append(p.upperBoundBuf[:0], opts.UpperBound...)
p.upperBoundBuf = append(p.upperBoundBuf, 0x00)
p.options.UpperBound = p.upperBoundBuf
}

doClone := iterToClone != nil
if !opts.MaxTimestampHint.IsEmpty() {
doClone = false
if opts.MaxTimestampHint.IsSet() {
encodedMinTS := string(encodeMVCCTimestamp(opts.MinTimestampHint))
encodedMaxTS := string(encodeMVCCTimestamp(opts.MaxTimestampHint))
p.options.TableFilter = func(userProps map[string]string) bool {
Expand Down Expand Up @@ -179,94 +194,12 @@ func (p *pebbleIterator) init(
uint64(opts.MinTimestampHint.WallTime),
uint64(opts.MaxTimestampHint.WallTime)+1),
}
} else if !opts.MinTimestampHint.IsEmpty() {
panic("min timestamp hint set without max timestamp hint")
}

if doClone {
var err error
if iterUnused {
// NB: If the iterator was never used (at the time of writing, this means
// that the iterator was created by `PinEngineStateForIterators()`), we
// don't need to clone it.
p.iter = iterToClone.(*pebble.Iterator)
} else {
if p.iter, err = iterToClone.Clone(); err != nil {
panic(err)
}
}
p.iter.SetBounds(p.options.LowerBound, p.options.UpperBound)
} else {
if handle == nil {
panic("handle is nil for non-cloning path")
}
p.iter = handle.NewIter(&p.options)
}
if p.iter == nil {
panic("unable to create iterator")
}

p.inuse = true
}

// setBounds is called to change the bounds on a pebbleIterator. Note that
// this is not the first time that bounds will be passed to the underlying
// pebble.Iterator. The existing bounds are in p.options.
func (p *pebbleIterator) setBounds(lowerBound, upperBound roachpb.Key) {
// If the roachpb.Key bound is nil, the corresponding bound for the
// pebble.Iterator will also be nil. p.options contains the current bounds
// known to the pebble.Iterator.
boundsChanged := ((lowerBound == nil) != (p.options.LowerBound == nil)) ||
((upperBound == nil) != (p.options.UpperBound == nil))
if !boundsChanged {
// The nil-ness is the same but the values may be different.
if lowerBound != nil {
// Both must be non-nil. We know that we've appended 0x00 to
// p.options.LowerBound, which must be ignored for this comparison.
if !bytes.Equal(p.options.LowerBound[:len(p.options.LowerBound)-1], lowerBound) {
boundsChanged = true
}
}
// If the preceding if-block has not already set boundsChanged=true, see
// if the upper bound has changed.
if !boundsChanged && upperBound != nil {
// Both must be non-nil. We know that we've appended 0x00 to
// p.options.UpperBound, which must be ignored for this comparison.
if !bytes.Equal(p.options.UpperBound[:len(p.options.UpperBound)-1], upperBound) {
boundsChanged = true
}
}
}
if !boundsChanged {
// This noop optimization helps the underlying pebble.Iterator to optimize
// seeks.
return
}
// Set the bounds to nil, before we selectively change them.
p.options.LowerBound = nil
p.options.UpperBound = nil
p.curBuf = (p.curBuf + 1) % 2
i := p.curBuf
if lowerBound != nil {
// This is the same as
// p.options.LowerBound = EncodeKeyToBuf(p.lowerBoundBuf[i][:0], MVCCKey{Key: lowerBound}) .
// or EngineKey{Key: lowerBound}.EncodeToBuf(...).
// Since we are encoding keys with an empty version anyway, we can just
// append the NUL byte instead of calling the above encode functions which
// will do the same thing.
p.lowerBoundBuf[i] = append(p.lowerBoundBuf[i][:0], lowerBound...)
p.lowerBoundBuf[i] = append(p.lowerBoundBuf[i], 0x00)
p.options.LowerBound = p.lowerBoundBuf[i]
}
if upperBound != nil {
// Same as above.
p.upperBoundBuf[i] = append(p.upperBoundBuf[i][:0], upperBound...)
p.upperBoundBuf[i] = append(p.upperBoundBuf[i], 0x00)
p.options.UpperBound = p.upperBoundBuf[i]
}
p.iter.SetBounds(p.options.LowerBound, p.options.UpperBound)
if p.testingSetBoundsListener != nil {
p.testingSetBoundsListener.postSetBounds(p.options.LowerBound, p.options.UpperBound)
// Set the new iterator options. We unconditionally do so, since Pebble will
// optimize noop changes as needed, and it may affect batch write visibility.
if p.iter != nil {
p.iter.SetOptions(&p.options)
}
}

Expand Down
Loading