From 245a0094b863ace548d6d6ef40faa850f1343122 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Wed, 6 Jan 2021 17:10:37 -0500 Subject: [PATCH] storage: use consistent iterators when needed, and when possible For pebbleBatch and pebbleReadOnly, all iterators without timestamp hints see the same underlying engine state, with no interface change for the caller. This is done by cloning the first created pebble.Iterator. intentInterleavingIter explicitly requests a clone, when it creates two iterators, which ensures the consistency guarantee applies to all Reader implementations. Informs #55461 Informs #41720 Release note: None --- pkg/kv/kvserver/spanset/BUILD.bazel | 1 + pkg/kv/kvserver/spanset/batch.go | 10 +++ pkg/storage/engine.go | 19 +++++- pkg/storage/intent_interleaving_iter.go | 23 +++++-- pkg/storage/pebble.go | 59 ++++++++++++++--- pkg/storage/pebble_batch.go | 45 ++++++++++--- pkg/storage/pebble_iterator.go | 38 +++++++++-- pkg/storage/pebble_test.go | 84 +++++++++++++++++++++++++ 8 files changed, 253 insertions(+), 26 deletions(-) diff --git a/pkg/kv/kvserver/spanset/BUILD.bazel b/pkg/kv/kvserver/spanset/BUILD.bazel index ea7460038128..a694995d9c95 100644 --- a/pkg/kv/kvserver/spanset/BUILD.bazel +++ b/pkg/kv/kvserver/spanset/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "//pkg/util/protoutil", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_pebble//:pebble", ], ) diff --git a/pkg/kv/kvserver/spanset/batch.go b/pkg/kv/kvserver/spanset/batch.go index 9a163a27ab0a..3692a38df197 100644 --- a/pkg/kv/kvserver/spanset/batch.go +++ b/pkg/kv/kvserver/spanset/batch.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/pebble" ) // MVCCIterator wraps an storage.MVCCIterator and ensures that it can @@ -322,6 +323,11 @@ func (i *EngineIterator) SetUpperBound(key roachpb.Key) { i.i.SetUpperBound(key) } +// GetRawIter is part of the storage.EngineIterator interface. +func (i *EngineIterator) GetRawIter() *pebble.Iterator { + return i.i.GetRawIter() +} + type spanSetReader struct { r storage.Reader spans *SpanSet @@ -416,6 +422,10 @@ func (s spanSetReader) NewEngineIterator(opts storage.IterOptions) storage.Engin } } +func (s spanSetReader) ConsistentIterators() bool { + return s.r.ConsistentIterators() +} + // GetDBEngine recursively searches for the underlying rocksDB engine. func GetDBEngine(reader storage.Reader, span roachpb.Span) storage.Reader { switch v := reader.(type) { diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 47895f94651f..bdc6d4e2bf10 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -202,6 +202,9 @@ type EngineIterator interface { Value() []byte // SetUpperBound installs a new upper bound for this iterator. SetUpperBound(roachpb.Key) + // GetRawIter is a low-level method only for use in the storage package, + // that returns the underlying pebble Iterator. + GetRawIter() *pebble.Iterator } // IterOptions contains options used to create an {MVCC,Engine}Iterator. @@ -263,7 +266,17 @@ const ( MVCCKeyIterKind ) -// Reader is the read interface to an engine's data. +// Reader is the read interface to an engine's data. Certain implementations +// of Reader guarantee consistency of the underlying engine state across the +// different iterators created by NewMVCCIterator, NewEngineIterator: +// - pebbleSnapshot, because it uses an engine snapshot. +// - pebbleReadOnly, pebbleBatch: when the IterOptions do not specify a +// timestamp hint. +// The ConsistentIterators method returns true when this consistency is +// guaranteed by the Reader. +// TODO(sumeer): this partial consistency can be a source of bugs if future +// code starts relying on it, but rarely uses a Reader that does not guarantee +// it. Can we enumerate the current cases where KV uses Engine as a Reader? type Reader interface { // Close closes the reader, freeing up any outstanding resources. Note that // various implementations have slightly different behaviors. In particular, @@ -334,6 +347,10 @@ type Reader interface { // with the iterator to free resources. The caller can change IterOptions // after this function returns. NewEngineIterator(opts IterOptions) EngineIterator + // ConsistentIterators returns true if the Reader implementation guarantees + // that the different iterators constructed by this Reader will see the + // same underlying Engine state. + ConsistentIterators() bool } // PrecedingIntentState is information needed when writing or clearing an diff --git a/pkg/storage/intent_interleaving_iter.go b/pkg/storage/intent_interleaving_iter.go index 917dbf61e3b0..37a0d009e3ef 100644 --- a/pkg/storage/intent_interleaving_iter.go +++ b/pkg/storage/intent_interleaving_iter.go @@ -156,10 +156,6 @@ func newIntentInterleavingIterator(reader Reader, opts IterOptions) MVCCIterator // Make sure we don't step outside the lock table key space. intentOpts.UpperBound = keys.LockTableSingleKeyEnd } - // TODO(sumeer): the creation of these iterators can race with concurrent - // mutations, which may make them inconsistent with each other. Add a way to - // cheaply clone the underlying Pebble iterator and use it in both places - // (the clones will have the same underlying memtables and sstables). // Note that we can reuse intentKeyBuf after NewEngineIterator returns. intentIter := reader.NewEngineIterator(intentOpts) @@ -170,7 +166,12 @@ func newIntentInterleavingIterator(reader Reader, opts IterOptions) MVCCIterator // or prefix iteration. We remember whether the upper bound has been // set, so if not set, we can set the upper bound when SeekGE is called // for prefix iteration. - iter := reader.NewMVCCIterator(MVCCKeyIterKind, opts) + // + // The creation of these iterators can race with concurrent mutations, which + // may make them inconsistent with each other. So we clone here, to ensure + // consistency (certain Reader implementations already ensure consistency, + // but we want consistency for all Readers). + iter := newMVCCIteratorByCloningEngineIter(intentIter, opts) return &intentInterleavingIter{ prefix: opts.Prefix, iter: iter, @@ -755,3 +756,15 @@ func (i *intentInterleavingIter) Stats() IteratorStats { func (i *intentInterleavingIter) SupportsPrev() bool { return true } + +// newMVCCIteratorByCloningEngineIter assumes MVCCKeyIterKind and no timestamp +// hints. It uses pebble.Iterator.Clone to ensure that the two iterators see +// the identical engine state. +func newMVCCIteratorByCloningEngineIter(iter EngineIterator, opts IterOptions) MVCCIterator { + pIter := iter.GetRawIter() + it := newPebbleIterator(nil, pIter, opts) + if iter == nil { + panic("couldn't create a new iterator") + } + return it +} diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 2340098b2ba2..576e15eb30b8 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -707,7 +707,7 @@ func (p *Pebble) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) MVCCIt return r.NewMVCCIterator(iterKind, opts) } } - iter := newPebbleIterator(p.db, opts) + iter := newPebbleIterator(p.db, nil, opts) if iter == nil { panic("couldn't create a new iterator") } @@ -716,13 +716,18 @@ func (p *Pebble) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) MVCCIt // NewEngineIterator implements the Engine interface. func (p *Pebble) NewEngineIterator(opts IterOptions) EngineIterator { - iter := newPebbleIterator(p.db, opts) + iter := newPebbleIterator(p.db, nil, opts) if iter == nil { panic("couldn't create a new iterator") } return iter } +// ConsistentIterators implements the Engine interface. +func (p *Pebble) ConsistentIterators() bool { + return false +} + // ApplyBatchRepr implements the Engine interface. func (p *Pebble) ApplyBatchRepr(repr []byte, sync bool) error { // batch.SetRepr takes ownership of the underlying slice, so make a copy. @@ -1201,10 +1206,25 @@ type pebbleReadOnly struct { // need separate iterators for EngineKey and MVCCKey iteration since // iterators that make separated locks/intents look as interleaved need to // use both simultaneously. + // When the first iterator is initialized, the underlying *pebble.Iterator + // is stashed in iter, so that subsequent iterator initialization can use + // Iterator.Clone to use the same underlying engine state. This relies on + // the fact that all pebbleIterators created here are marked as reusable, + // which causes pebbleIterator.Close to not close iter. iter will be closed + // when pebbleReadOnly.Close is called. + // + // TODO(sumeer): The lazy iterator creation is insufficient to address + // issues like https://github.com/cockroachdb/cockroach/issues/55461. + // We could create the pebble.Iterator eagerly, since a caller using + // pebbleReadOnly is eventually going to be create one anyway. But we + // already have different behaviors in different Reader implementations + // (see Reader.ConsistentIterators) that callers don't pay attention + // to, and adding another such difference could be a source of bugs. prefixIter pebbleIterator normalIter pebbleIterator prefixEngineIter pebbleIterator normalEngineIter pebbleIterator + iter cloneableIter closed bool } @@ -1215,6 +1235,9 @@ func (p *pebbleReadOnly) Close() { panic("closing an already-closed pebbleReadOnly") } p.closed = true + // Setting iter to nil is sufficient since it will be closed by one of the + // subsequent destroy calls. + p.iter = nil p.prefixIter.destroy() p.normalIter.destroy() p.prefixEngineIter.destroy() @@ -1285,7 +1308,7 @@ func (p *pebbleReadOnly) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions if !opts.MinTimestampHint.IsEmpty() { // MVCCIterators that specify timestamp bounds cannot be cached. - return newPebbleIterator(p.parent.db, opts) + return newPebbleIterator(p.parent.db, nil, opts) } iter := &p.normalIter @@ -1299,7 +1322,13 @@ func (p *pebbleReadOnly) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions if iter.iter != nil { iter.setOptions(opts) } else { - iter.init(p.parent.db, opts) + iter.init(p.parent.db, p.iter, opts) + // The timestamp hints should be empty given the earlier code, but we are + // being defensive. + if p.iter == nil && opts.MaxTimestampHint.IsEmpty() && opts.MinTimestampHint.IsEmpty() { + // For future cloning. + p.iter = iter.iter + } iter.reusable = true } @@ -1324,7 +1353,13 @@ func (p *pebbleReadOnly) NewEngineIterator(opts IterOptions) EngineIterator { if iter.iter != nil { iter.setOptions(opts) } else { - iter.init(p.parent.db, opts) + iter.init(p.parent.db, p.iter, opts) + // The timestamp hints should be empty given this is an EngineIterator, + // but we are being defensive. + if p.iter == nil && opts.MaxTimestampHint.IsEmpty() && opts.MinTimestampHint.IsEmpty() { + // For future cloning. + p.iter = iter.iter + } iter.reusable = true } @@ -1332,6 +1367,11 @@ func (p *pebbleReadOnly) NewEngineIterator(opts IterOptions) EngineIterator { return iter } +// ConsistentIterators implements the Engine interface. +func (p *pebbleReadOnly) ConsistentIterators() bool { + return true +} + // Writer methods are not implemented for pebbleReadOnly. Ideally, the code // could be refactored so that a Reader could be supplied to evaluateBatch @@ -1491,12 +1531,17 @@ func (p *pebbleSnapshot) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions return r.NewMVCCIterator(iterKind, opts) } } - return newPebbleIterator(p.snapshot, opts) + return newPebbleIterator(p.snapshot, nil, opts) } // NewEngineIterator implements the Reader interface. func (p pebbleSnapshot) NewEngineIterator(opts IterOptions) EngineIterator { - return newPebbleIterator(p.snapshot, opts) + return newPebbleIterator(p.snapshot, nil, opts) +} + +// ConsistentIterators implements the Reader interface. +func (p pebbleSnapshot) ConsistentIterators() bool { + return true } // pebbleGetProto uses Reader.MVCCGet, so it not as efficient as a function diff --git a/pkg/storage/pebble_batch.go b/pkg/storage/pebble_batch.go index f2a3c369fb68..e2ca825a9243 100644 --- a/pkg/storage/pebble_batch.go +++ b/pkg/storage/pebble_batch.go @@ -33,10 +33,17 @@ type pebbleBatch struct { // need separate iterators for EngineKey and MVCCKey iteration since // iterators that make separated locks/intents look as interleaved need to // use both simultaneously. + // When the first iterator is initialized, the underlying *pebble.Iterator + // is stashed in iter, so that subsequent iterator initialization can use + // Iterator.Clone to use the same underlying engine state. This relies on + // the fact that all pebbleIterators created here are marked as reusable, + // which causes pebbleIterator.Close to not close iter. iter will be closed + // when pebbleBatch.Close is called. prefixIter pebbleIterator normalIter pebbleIterator prefixEngineIter pebbleIterator normalEngineIter pebbleIterator + iter cloneableIter closed bool isDistinct bool distinctOpen bool @@ -95,6 +102,9 @@ func (p *pebbleBatch) Close() { } p.closed = true + // Setting iter to nil is sufficient since it will be closed by one of the + // subsequent destroy calls. + p.iter = nil // Destroy the iterators before closing the batch. p.prefixIter.destroy() p.normalIter.destroy() @@ -205,7 +215,7 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M if !opts.MinTimestampHint.IsEmpty() { // MVCCIterators that specify timestamp bounds cannot be cached. - return newPebbleIterator(p.batch, opts) + return newPebbleIterator(p.batch, nil, opts) } iter := &p.normalIter @@ -218,10 +228,18 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M if iter.iter != nil { iter.setOptions(opts) - } else if p.batch.Indexed() { - iter.init(p.batch, opts) } else { - iter.init(p.db, opts) + if p.batch.Indexed() { + iter.init(p.batch, p.iter, opts) + } else { + iter.init(p.db, p.iter, opts) + } + // The timestamp hints should be empty given the earlier code, but we are + // being defensive. + if p.iter == nil && opts.MaxTimestampHint.IsEmpty() && opts.MinTimestampHint.IsEmpty() { + // For future cloning. + p.iter = iter.iter + } } iter.inuse = true @@ -251,16 +269,29 @@ func (p *pebbleBatch) NewEngineIterator(opts IterOptions) EngineIterator { if iter.iter != nil { iter.setOptions(opts) - } else if p.batch.Indexed() { - iter.init(p.batch, opts) } else { - iter.init(p.db, opts) + if p.batch.Indexed() { + iter.init(p.batch, p.iter, opts) + } else { + iter.init(p.db, p.iter, opts) + } + // The timestamp hints should be empty given this is an EngineIterator, + // but we are being defensive. + if p.iter == nil && opts.MaxTimestampHint.IsEmpty() && opts.MinTimestampHint.IsEmpty() { + // For future cloning. + p.iter = iter.iter + } } iter.inuse = true return iter } +// ConsistentIterators implements the Batch interface. +func (p *pebbleBatch) ConsistentIterators() bool { + return true +} + // NewMVCCIterator implements the Batch interface. func (p *pebbleBatch) ApplyBatchRepr(repr []byte, sync bool) error { if p.distinctOpen { diff --git a/pkg/storage/pebble_iterator.go b/pkg/storage/pebble_iterator.go index 1e1ebca3d787..3ae1c111312f 100644 --- a/pkg/storage/pebble_iterator.go +++ b/pkg/storage/pebble_iterator.go @@ -70,17 +70,25 @@ var pebbleIterPool = sync.Pool{ }, } +type cloneableIter interface { + Clone() (*pebble.Iterator, error) +} + // Instantiates a new Pebble iterator, or gets one from the pool. -func newPebbleIterator(handle pebble.Reader, opts IterOptions) *pebbleIterator { +func newPebbleIterator( + handle pebble.Reader, iterToClone cloneableIter, opts IterOptions, +) *pebbleIterator { iter := pebbleIterPool.Get().(*pebbleIterator) - iter.init(handle, opts) + iter.init(handle, iterToClone, opts) return iter } // init resets this pebbleIterator for use with the specified arguments. The -// current instance could either be a cached iterator (eg. in pebbleBatch), or -// a newly-instantiated one through newPebbleIterator. -func (p *pebbleIterator) init(handle pebble.Reader, opts IterOptions) { +// current instance could either be a cached pebbleIterator (eg. in +// pebbleBatch), or a newly-instantiated one through newPebbleIterator. The +// underlying *pebble.Iterator is created using iterToClone, if non-nil and +// there are no timestamp hints, else it is created using handle. +func (p *pebbleIterator) init(handle pebble.Reader, iterToClone cloneableIter, opts IterOptions) { *p = pebbleIterator{ keyBuf: p.keyBuf, lowerBoundBuf: p.lowerBoundBuf, @@ -111,7 +119,9 @@ func (p *pebbleIterator) init(handle pebble.Reader, opts IterOptions) { p.options.UpperBound = p.upperBoundBuf[0] } + doClone := iterToClone != nil if !opts.MaxTimestampHint.IsEmpty() { + doClone = false encodedMinTS := string(encodeTimestamp(opts.MinTimestampHint)) encodedMaxTS := string(encodeTimestamp(opts.MaxTimestampHint)) p.options.TableFilter = func(userProps map[string]string) bool { @@ -139,7 +149,18 @@ func (p *pebbleIterator) init(handle pebble.Reader, opts IterOptions) { panic("min timestamp hint set without max timestamp hint") } - p.iter = handle.NewIter(&p.options) + if doClone { + var err error + if p.iter, err = iterToClone.Clone(); err != nil { + panic(err) + } + p.iter.SetBounds(p.options.LowerBound, p.options.UpperBound) + } else { + if handle == nil { + panic("handle is nil for non-cloning path") + } + p.iter = handle.NewIter(&p.options) + } if p.iter == nil { panic("unable to create iterator") } @@ -629,6 +650,11 @@ func (p *pebbleIterator) CheckForKeyCollisions( return checkForKeyCollisionsGo(p, sstData, start, end) } +// GetRawIter is part of the EngineIterator interface. +func (p *pebbleIterator) GetRawIter() *pebble.Iterator { + return p.iter +} + func (p *pebbleIterator) destroy() { if p.inuse { panic("iterator still in use") diff --git a/pkg/storage/pebble_test.go b/pkg/storage/pebble_test.go index e153d9e59601..3828c0e9b11c 100644 --- a/pkg/storage/pebble_test.go +++ b/pkg/storage/pebble_test.go @@ -298,6 +298,90 @@ func TestPebbleDiskSlowEmit(t *testing.T) { require.Equal(t, uint64(1), p.diskStallCount) } +func TestPebbleIterConsistency(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + eng := createTestPebbleEngine() + defer eng.Close() + ts1 := hlc.Timestamp{WallTime: 1} + ts2 := hlc.Timestamp{WallTime: 2} + k1 := MVCCKey{[]byte("a"), ts1} + eng.PutMVCC(k1, []byte("a1")) + + roEngine := eng.NewReadOnly() + batch := eng.NewBatch() + + require.False(t, eng.ConsistentIterators()) + require.True(t, roEngine.ConsistentIterators()) + require.True(t, batch.ConsistentIterators()) + + // Since an iterator is created on pebbleReadOnly, pebbleBatch before + // writing a newer version of "a", the newer version will not be visible to + // iterators that are created later. + roEngine.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("a")}).Close() + batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("a")}).Close() + eng.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("a")}).Close() + + // Write a newer version of "a" + eng.PutMVCC(MVCCKey{[]byte("a"), ts2}, []byte("a2")) + + checkMVCCIter := func(iter MVCCIterator) { + iter.SeekGE(MVCCKey{Key: []byte("a")}) + valid, err := iter.Valid() + require.Equal(t, true, valid) + require.NoError(t, err) + k := iter.UnsafeKey() + require.True(t, k1.Equal(k), "expected %s != actual %s", k1.String(), k.String()) + iter.Next() + valid, err = iter.Valid() + require.False(t, valid) + require.NoError(t, err) + iter.Close() + } + checkEngineIter := func(iter EngineIterator) { + valid, err := iter.SeekEngineKeyGE(EngineKey{Key: []byte("a")}) + require.Equal(t, true, valid) + require.NoError(t, err) + k, err := iter.UnsafeEngineKey() + require.NoError(t, err) + require.True(t, k.IsMVCCKey()) + mvccKey, err := k.ToMVCCKey() + require.NoError(t, err) + require.True( + t, k1.Equal(mvccKey), "expected %s != actual %s", k1.String(), mvccKey.String()) + valid, err = iter.NextEngineKey() + require.False(t, valid) + require.NoError(t, err) + iter.Close() + } + + checkMVCCIter(roEngine.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")})) + checkMVCCIter(roEngine.NewMVCCIterator(MVCCKeyIterKind, IterOptions{Prefix: true})) + checkMVCCIter(batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")})) + checkMVCCIter(batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{Prefix: true})) + + checkEngineIter(roEngine.NewEngineIterator(IterOptions{UpperBound: []byte("b")})) + checkEngineIter(roEngine.NewEngineIterator(IterOptions{Prefix: true})) + checkEngineIter(batch.NewEngineIterator(IterOptions{UpperBound: []byte("b")})) + checkEngineIter(batch.NewEngineIterator(IterOptions{Prefix: true})) + + // The eng iterator will see both values. + iter := eng.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")}) + defer iter.Close() + iter.SeekGE(MVCCKey{Key: []byte("a")}) + count := 0 + for ; ; iter.Next() { + valid, err := iter.Valid() + require.NoError(t, err) + if !valid { + break + } + count++ + } + require.Equal(t, 2, count) +} + func BenchmarkMVCCKeyCompare(b *testing.B) { rng := rand.New(rand.NewSource(timeutil.Now().Unix())) keys := make([][]byte, 1000)