Skip to content

Commit

Permalink
storage: use consistent iterators when needed, and when possible
Browse files Browse the repository at this point in the history
For pebbleBatch and pebbleReadOnly, all iterators without timestamp
hints see the same underlying engine state, with no interface
change for the caller. This is done by cloning the first created
pebble.Iterator.

intentInterleavingIter explicitly requests a clone, when it creates
two iterators, which ensures the consistency guarantee applies to
all Reader implementations.

Informs cockroachdb#55461
Informs cockroachdb#41720

Release note: None
  • Loading branch information
sumeerbhola committed Jan 7, 2021
1 parent cee4753 commit 245a009
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 26 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/spanset/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//pkg/util/protoutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//:pebble",
],
)

Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/pebble"
)

// MVCCIterator wraps an storage.MVCCIterator and ensures that it can
Expand Down Expand Up @@ -322,6 +323,11 @@ func (i *EngineIterator) SetUpperBound(key roachpb.Key) {
i.i.SetUpperBound(key)
}

// GetRawIter is part of the storage.EngineIterator interface.
func (i *EngineIterator) GetRawIter() *pebble.Iterator {
return i.i.GetRawIter()
}

type spanSetReader struct {
r storage.Reader
spans *SpanSet
Expand Down Expand Up @@ -416,6 +422,10 @@ func (s spanSetReader) NewEngineIterator(opts storage.IterOptions) storage.Engin
}
}

func (s spanSetReader) ConsistentIterators() bool {
return s.r.ConsistentIterators()
}

// GetDBEngine recursively searches for the underlying rocksDB engine.
func GetDBEngine(reader storage.Reader, span roachpb.Span) storage.Reader {
switch v := reader.(type) {
Expand Down
19 changes: 18 additions & 1 deletion pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ type EngineIterator interface {
Value() []byte
// SetUpperBound installs a new upper bound for this iterator.
SetUpperBound(roachpb.Key)
// GetRawIter is a low-level method only for use in the storage package,
// that returns the underlying pebble Iterator.
GetRawIter() *pebble.Iterator
}

// IterOptions contains options used to create an {MVCC,Engine}Iterator.
Expand Down Expand Up @@ -263,7 +266,17 @@ const (
MVCCKeyIterKind
)

// Reader is the read interface to an engine's data.
// Reader is the read interface to an engine's data. Certain implementations
// of Reader guarantee consistency of the underlying engine state across the
// different iterators created by NewMVCCIterator, NewEngineIterator:
// - pebbleSnapshot, because it uses an engine snapshot.
// - pebbleReadOnly, pebbleBatch: when the IterOptions do not specify a
// timestamp hint.
// The ConsistentIterators method returns true when this consistency is
// guaranteed by the Reader.
// TODO(sumeer): this partial consistency can be a source of bugs if future
// code starts relying on it, but rarely uses a Reader that does not guarantee
// it. Can we enumerate the current cases where KV uses Engine as a Reader?
type Reader interface {
// Close closes the reader, freeing up any outstanding resources. Note that
// various implementations have slightly different behaviors. In particular,
Expand Down Expand Up @@ -334,6 +347,10 @@ type Reader interface {
// with the iterator to free resources. The caller can change IterOptions
// after this function returns.
NewEngineIterator(opts IterOptions) EngineIterator
// ConsistentIterators returns true if the Reader implementation guarantees
// that the different iterators constructed by this Reader will see the
// same underlying Engine state.
ConsistentIterators() bool
}

// PrecedingIntentState is information needed when writing or clearing an
Expand Down
23 changes: 18 additions & 5 deletions pkg/storage/intent_interleaving_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,6 @@ func newIntentInterleavingIterator(reader Reader, opts IterOptions) MVCCIterator
// Make sure we don't step outside the lock table key space.
intentOpts.UpperBound = keys.LockTableSingleKeyEnd
}
// TODO(sumeer): the creation of these iterators can race with concurrent
// mutations, which may make them inconsistent with each other. Add a way to
// cheaply clone the underlying Pebble iterator and use it in both places
// (the clones will have the same underlying memtables and sstables).
// Note that we can reuse intentKeyBuf after NewEngineIterator returns.
intentIter := reader.NewEngineIterator(intentOpts)

Expand All @@ -170,7 +166,12 @@ func newIntentInterleavingIterator(reader Reader, opts IterOptions) MVCCIterator
// or prefix iteration. We remember whether the upper bound has been
// set, so if not set, we can set the upper bound when SeekGE is called
// for prefix iteration.
iter := reader.NewMVCCIterator(MVCCKeyIterKind, opts)
//
// The creation of these iterators can race with concurrent mutations, which
// may make them inconsistent with each other. So we clone here, to ensure
// consistency (certain Reader implementations already ensure consistency,
// but we want consistency for all Readers).
iter := newMVCCIteratorByCloningEngineIter(intentIter, opts)
return &intentInterleavingIter{
prefix: opts.Prefix,
iter: iter,
Expand Down Expand Up @@ -755,3 +756,15 @@ func (i *intentInterleavingIter) Stats() IteratorStats {
func (i *intentInterleavingIter) SupportsPrev() bool {
return true
}

// newMVCCIteratorByCloningEngineIter assumes MVCCKeyIterKind and no timestamp
// hints. It uses pebble.Iterator.Clone to ensure that the two iterators see
// the identical engine state.
func newMVCCIteratorByCloningEngineIter(iter EngineIterator, opts IterOptions) MVCCIterator {
pIter := iter.GetRawIter()
it := newPebbleIterator(nil, pIter, opts)
if iter == nil {
panic("couldn't create a new iterator")
}
return it
}
59 changes: 52 additions & 7 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ func (p *Pebble) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) MVCCIt
return r.NewMVCCIterator(iterKind, opts)
}
}
iter := newPebbleIterator(p.db, opts)
iter := newPebbleIterator(p.db, nil, opts)
if iter == nil {
panic("couldn't create a new iterator")
}
Expand All @@ -716,13 +716,18 @@ func (p *Pebble) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) MVCCIt

// NewEngineIterator implements the Engine interface.
func (p *Pebble) NewEngineIterator(opts IterOptions) EngineIterator {
iter := newPebbleIterator(p.db, opts)
iter := newPebbleIterator(p.db, nil, opts)
if iter == nil {
panic("couldn't create a new iterator")
}
return iter
}

// ConsistentIterators implements the Engine interface.
func (p *Pebble) ConsistentIterators() bool {
return false
}

// ApplyBatchRepr implements the Engine interface.
func (p *Pebble) ApplyBatchRepr(repr []byte, sync bool) error {
// batch.SetRepr takes ownership of the underlying slice, so make a copy.
Expand Down Expand Up @@ -1201,10 +1206,25 @@ type pebbleReadOnly struct {
// need separate iterators for EngineKey and MVCCKey iteration since
// iterators that make separated locks/intents look as interleaved need to
// use both simultaneously.
// When the first iterator is initialized, the underlying *pebble.Iterator
// is stashed in iter, so that subsequent iterator initialization can use
// Iterator.Clone to use the same underlying engine state. This relies on
// the fact that all pebbleIterators created here are marked as reusable,
// which causes pebbleIterator.Close to not close iter. iter will be closed
// when pebbleReadOnly.Close is called.
//
// TODO(sumeer): The lazy iterator creation is insufficient to address
// issues like https://github.com/cockroachdb/cockroach/issues/55461.
// We could create the pebble.Iterator eagerly, since a caller using
// pebbleReadOnly is eventually going to be create one anyway. But we
// already have different behaviors in different Reader implementations
// (see Reader.ConsistentIterators) that callers don't pay attention
// to, and adding another such difference could be a source of bugs.
prefixIter pebbleIterator
normalIter pebbleIterator
prefixEngineIter pebbleIterator
normalEngineIter pebbleIterator
iter cloneableIter
closed bool
}

Expand All @@ -1215,6 +1235,9 @@ func (p *pebbleReadOnly) Close() {
panic("closing an already-closed pebbleReadOnly")
}
p.closed = true
// Setting iter to nil is sufficient since it will be closed by one of the
// subsequent destroy calls.
p.iter = nil
p.prefixIter.destroy()
p.normalIter.destroy()
p.prefixEngineIter.destroy()
Expand Down Expand Up @@ -1285,7 +1308,7 @@ func (p *pebbleReadOnly) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions

if !opts.MinTimestampHint.IsEmpty() {
// MVCCIterators that specify timestamp bounds cannot be cached.
return newPebbleIterator(p.parent.db, opts)
return newPebbleIterator(p.parent.db, nil, opts)
}

iter := &p.normalIter
Expand All @@ -1299,7 +1322,13 @@ func (p *pebbleReadOnly) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions
if iter.iter != nil {
iter.setOptions(opts)
} else {
iter.init(p.parent.db, opts)
iter.init(p.parent.db, p.iter, opts)
// The timestamp hints should be empty given the earlier code, but we are
// being defensive.
if p.iter == nil && opts.MaxTimestampHint.IsEmpty() && opts.MinTimestampHint.IsEmpty() {
// For future cloning.
p.iter = iter.iter
}
iter.reusable = true
}

Expand All @@ -1324,14 +1353,25 @@ func (p *pebbleReadOnly) NewEngineIterator(opts IterOptions) EngineIterator {
if iter.iter != nil {
iter.setOptions(opts)
} else {
iter.init(p.parent.db, opts)
iter.init(p.parent.db, p.iter, opts)
// The timestamp hints should be empty given this is an EngineIterator,
// but we are being defensive.
if p.iter == nil && opts.MaxTimestampHint.IsEmpty() && opts.MinTimestampHint.IsEmpty() {
// For future cloning.
p.iter = iter.iter
}
iter.reusable = true
}

iter.inuse = true
return iter
}

// ConsistentIterators implements the Engine interface.
func (p *pebbleReadOnly) ConsistentIterators() bool {
return true
}

// Writer methods are not implemented for pebbleReadOnly. Ideally, the code
// could be refactored so that a Reader could be supplied to evaluateBatch

Expand Down Expand Up @@ -1491,12 +1531,17 @@ func (p *pebbleSnapshot) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions
return r.NewMVCCIterator(iterKind, opts)
}
}
return newPebbleIterator(p.snapshot, opts)
return newPebbleIterator(p.snapshot, nil, opts)
}

// NewEngineIterator implements the Reader interface.
func (p pebbleSnapshot) NewEngineIterator(opts IterOptions) EngineIterator {
return newPebbleIterator(p.snapshot, opts)
return newPebbleIterator(p.snapshot, nil, opts)
}

// ConsistentIterators implements the Reader interface.
func (p pebbleSnapshot) ConsistentIterators() bool {
return true
}

// pebbleGetProto uses Reader.MVCCGet, so it not as efficient as a function
Expand Down
45 changes: 38 additions & 7 deletions pkg/storage/pebble_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,17 @@ type pebbleBatch struct {
// need separate iterators for EngineKey and MVCCKey iteration since
// iterators that make separated locks/intents look as interleaved need to
// use both simultaneously.
// When the first iterator is initialized, the underlying *pebble.Iterator
// is stashed in iter, so that subsequent iterator initialization can use
// Iterator.Clone to use the same underlying engine state. This relies on
// the fact that all pebbleIterators created here are marked as reusable,
// which causes pebbleIterator.Close to not close iter. iter will be closed
// when pebbleBatch.Close is called.
prefixIter pebbleIterator
normalIter pebbleIterator
prefixEngineIter pebbleIterator
normalEngineIter pebbleIterator
iter cloneableIter
closed bool
isDistinct bool
distinctOpen bool
Expand Down Expand Up @@ -95,6 +102,9 @@ func (p *pebbleBatch) Close() {
}
p.closed = true

// Setting iter to nil is sufficient since it will be closed by one of the
// subsequent destroy calls.
p.iter = nil
// Destroy the iterators before closing the batch.
p.prefixIter.destroy()
p.normalIter.destroy()
Expand Down Expand Up @@ -205,7 +215,7 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M

if !opts.MinTimestampHint.IsEmpty() {
// MVCCIterators that specify timestamp bounds cannot be cached.
return newPebbleIterator(p.batch, opts)
return newPebbleIterator(p.batch, nil, opts)
}

iter := &p.normalIter
Expand All @@ -218,10 +228,18 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M

if iter.iter != nil {
iter.setOptions(opts)
} else if p.batch.Indexed() {
iter.init(p.batch, opts)
} else {
iter.init(p.db, opts)
if p.batch.Indexed() {
iter.init(p.batch, p.iter, opts)
} else {
iter.init(p.db, p.iter, opts)
}
// The timestamp hints should be empty given the earlier code, but we are
// being defensive.
if p.iter == nil && opts.MaxTimestampHint.IsEmpty() && opts.MinTimestampHint.IsEmpty() {
// For future cloning.
p.iter = iter.iter
}
}

iter.inuse = true
Expand Down Expand Up @@ -251,16 +269,29 @@ func (p *pebbleBatch) NewEngineIterator(opts IterOptions) EngineIterator {

if iter.iter != nil {
iter.setOptions(opts)
} else if p.batch.Indexed() {
iter.init(p.batch, opts)
} else {
iter.init(p.db, opts)
if p.batch.Indexed() {
iter.init(p.batch, p.iter, opts)
} else {
iter.init(p.db, p.iter, opts)
}
// The timestamp hints should be empty given this is an EngineIterator,
// but we are being defensive.
if p.iter == nil && opts.MaxTimestampHint.IsEmpty() && opts.MinTimestampHint.IsEmpty() {
// For future cloning.
p.iter = iter.iter
}
}

iter.inuse = true
return iter
}

// ConsistentIterators implements the Batch interface.
func (p *pebbleBatch) ConsistentIterators() bool {
return true
}

// NewMVCCIterator implements the Batch interface.
func (p *pebbleBatch) ApplyBatchRepr(repr []byte, sync bool) error {
if p.distinctOpen {
Expand Down
Loading

0 comments on commit 245a009

Please sign in to comment.