diff --git a/pkg/kv/kvserver/spanset/batch.go b/pkg/kv/kvserver/spanset/batch.go index a09bc41d2e34..2ae3a19634d1 100644 --- a/pkg/kv/kvserver/spanset/batch.go +++ b/pkg/kv/kvserver/spanset/batch.go @@ -402,7 +402,7 @@ func (s spanSetReader) Closed() bool { return s.r.Closed() } -// ExportMVCCToSst is part of the engine.Reader interface. +// ExportMVCCToSst is part of the storage.Reader interface. func (s spanSetReader) ExportMVCCToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, @@ -479,10 +479,16 @@ func (s spanSetReader) NewEngineIterator(opts storage.IterOptions) storage.Engin } } +// ConsistentIterators implements the storage.Reader interface. func (s spanSetReader) ConsistentIterators() bool { return s.r.ConsistentIterators() } +// PinEngineStateForIterators implements the storage.Reader interface. +func (s spanSetReader) PinEngineStateForIterators() error { + return s.r.PinEngineStateForIterators() +} + // GetDBEngine recursively searches for the underlying rocksDB engine. func GetDBEngine(reader storage.Reader, span roachpb.Span) storage.Reader { switch v := reader.(type) { @@ -495,7 +501,7 @@ func GetDBEngine(reader storage.Reader, span roachpb.Span) storage.Reader { } } -// getSpanReader is a getter to access the engine.Reader field of the +// getSpanReader is a getter to access the storage.Reader field of the // spansetReader. func getSpanReader(r ReadWriter, span roachpb.Span) storage.Reader { if err := r.spanSetReader.spans.CheckAllowed(SpanReadOnly, span); err != nil { @@ -700,7 +706,7 @@ func makeSpanSetReadWriterAt(rw storage.ReadWriter, spans *SpanSet, ts hlc.Times } } -// NewReadWriterAt returns an engine.ReadWriter that asserts access of the +// NewReadWriterAt returns a storage.ReadWriter that asserts access of the // underlying ReadWriter against the given SpanSet at a given timestamp. // If zero timestamp is provided, accesses are considered non-MVCC. func NewReadWriterAt(rw storage.ReadWriter, spans *SpanSet, ts hlc.Timestamp) storage.ReadWriter { @@ -734,7 +740,7 @@ func (s spanSetBatch) Repr() []byte { return s.b.Repr() } -// NewBatch returns an engine.Batch that asserts access of the underlying +// NewBatch returns a storage.Batch that asserts access of the underlying // Batch against the given SpanSet. We only consider span boundaries, associated // timestamps are not considered. func NewBatch(b storage.Batch, spans *SpanSet) storage.Batch { @@ -746,7 +752,7 @@ func NewBatch(b storage.Batch, spans *SpanSet) storage.Batch { } } -// NewBatchAt returns an engine.Batch that asserts access of the underlying +// NewBatchAt returns an storage.Batch that asserts access of the underlying // Batch against the given SpanSet at the given timestamp. // If the zero timestamp is used, all accesses are considered non-MVCC. func NewBatchAt(b storage.Batch, spans *SpanSet, ts hlc.Timestamp) storage.Batch { diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index e112e7fc749f..4ebc94c90103 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -363,9 +363,10 @@ const ( // different iterators created by NewMVCCIterator, NewEngineIterator: // - pebbleSnapshot, because it uses an engine snapshot. // - pebbleReadOnly, pebbleBatch: when the IterOptions do not specify a -// timestamp hint. Note that currently the engine state visible here is -// not as of the time of the Reader creation. It is the time when the -// first iterator is created. +// timestamp hint (see IterOptions). Note that currently the engine state +// visible here is not as of the time of the Reader creation. It is the time +// when the first iterator is created, or earlier if +// PinEngineStateForIterators is called. // The ConsistentIterators method returns true when this consistency is // guaranteed by the Reader. // TODO(sumeer): this partial consistency can be a source of bugs if future @@ -443,9 +444,25 @@ type Reader interface { // after this function returns. NewEngineIterator(opts IterOptions) EngineIterator // ConsistentIterators returns true if the Reader implementation guarantees - // that the different iterators constructed by this Reader will see the - // same underlying Engine state. + // that the different iterators constructed by this Reader will see the same + // underlying Engine state. NB: this only applies to iterators without + // timestamp hints (see IterOptions), i.e., even if this returns true, those + // iterators can be "inconsistent" in terms of seeing a different engine + // state. The only exception to this is a Reader created using NewSnapshot. ConsistentIterators() bool + + // PinEngineStateForIterators ensures that the state seen by iterators + // without timestamp hints (see IterOptions) is pinned and will not see + // future mutations. It can be called multiple times on a Reader in which + // case the state seen will be either: + // - As of the first call. + // - For a Reader returned by Engine.NewSnapshot, the pinned state is as of + // the time the snapshot was taken. + // So the semantics that are true for all Readers is that the pinned state + // is somewhere in the time interval between the creation of the Reader and + // the first call to PinEngineStateForIterators. + // REQUIRES: ConsistentIterators returns true. + PinEngineStateForIterators() error } // PrecedingIntentState is information needed when writing or clearing an diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 60658870ca0a..c0e024b0aadf 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -779,6 +779,12 @@ func (p *Pebble) ConsistentIterators() bool { return false } +// PinEngineStateForIterators implements the Engine interface. +func (p *Pebble) PinEngineStateForIterators() error { + return errors.AssertionFailedf( + "PinEngineStateForIterators must not be called when ConsistentIterators returns false") +} + // ApplyBatchRepr implements the Engine interface. func (p *Pebble) ApplyBatchRepr(repr []byte, sync bool) error { // batch.SetRepr takes ownership of the underlying slice, so make a copy. @@ -1237,22 +1243,13 @@ type pebbleReadOnly struct { // need separate iterators for EngineKey and MVCCKey iteration since // iterators that make separated locks/intents look as interleaved need to // use both simultaneously. - // When the first iterator is initialized, the underlying *pebble.Iterator - // is stashed in iter, so that subsequent iterator initialization can use - // Iterator.Clone to use the same underlying engine state. This relies on - // the fact that all pebbleIterators created here are marked as reusable, - // which causes pebbleIterator.Close to not close iter. iter will be closed - // when pebbleReadOnly.Close is called. - // - // TODO(sumeer): The lazy iterator creation is insufficient to address - // issues like https://github.com/cockroachdb/cockroach/issues/55461. - // We could create the pebble.Iterator eagerly, since a caller using - // pebbleReadOnly is eventually going to create one anyway. But we - // already have different behaviors in different Reader implementations - // (see Reader.ConsistentIterators) that callers don't pay attention - // to, and adding another such difference could be a source of bugs. - // See https://github.com/cockroachdb/cockroach/pull/58515#pullrequestreview-563993424 - // for more discussion. + // When the first iterator is initialized, or when + // PinEngineStateForIterators is called (whichever happens first), the + // underlying *pebble.Iterator is stashed in iter, so that subsequent + // iterator initialization can use Iterator.Clone to use the same underlying + // engine state. This relies on the fact that all pebbleIterators created + // here are marked as reusable, which causes pebbleIterator.Close to not + // close iter. iter will be closed when pebbleReadOnly.Close is called. prefixIter pebbleIterator normalIter pebbleIterator prefixEngineIter pebbleIterator @@ -1474,6 +1471,14 @@ func (p *pebbleReadOnly) ConsistentIterators() bool { return true } +// PinEngineStateForIterators implements the Engine interface. +func (p *pebbleReadOnly) PinEngineStateForIterators() error { + if p.iter == nil { + p.iter = p.parent.db.NewIter(nil) + } + return nil +} + // Writer methods are not implemented for pebbleReadOnly. Ideally, the code // could be refactored so that a Reader could be supplied to evaluateBatch @@ -1673,6 +1678,12 @@ func (p pebbleSnapshot) ConsistentIterators() bool { return true } +// PinEngineStateForIterators implements the Reader interface. +func (p *pebbleSnapshot) PinEngineStateForIterators() error { + // Snapshot already pins state, so nothing to do. + return nil +} + // pebbleGetProto uses Reader.MVCCGet, so it not as efficient as a function // that can unmarshal without copying bytes. But we don't care about // efficiency, since this is used to implement Reader.MVCCGetProto, which is diff --git a/pkg/storage/pebble_batch.go b/pkg/storage/pebble_batch.go index 19f8715ba5cc..f8ebaff9a4ab 100644 --- a/pkg/storage/pebble_batch.go +++ b/pkg/storage/pebble_batch.go @@ -37,12 +37,13 @@ type pebbleBatch struct { // need separate iterators for EngineKey and MVCCKey iteration since // iterators that make separated locks/intents look as interleaved need to // use both simultaneously. - // When the first iterator is initialized, the underlying *pebble.Iterator - // is stashed in iter, so that subsequent iterator initialization can use - // Iterator.Clone to use the same underlying engine state. This relies on - // the fact that all pebbleIterators created here are marked as reusable, - // which causes pebbleIterator.Close to not close iter. iter will be closed - // when pebbleBatch.Close is called. + // When the first iterator is initialized, or when + // PinEngineStateForIterators is called (whichever happens first), the + // underlying *pebble.Iterator is stashed in iter, so that subsequent + // iterator initialization can use Iterator.Clone to use the same underlying + // engine state. This relies on the fact that all pebbleIterators created + // here are marked as reusable, which causes pebbleIterator.Close to not + // close iter. iter will be closed when pebbleBatch.Close is called. prefixIter pebbleIterator normalIter pebbleIterator prefixEngineIter pebbleIterator @@ -299,6 +300,18 @@ func (p *pebbleBatch) ConsistentIterators() bool { return true } +// PinEngineStateForIterators implements the Batch interface. +func (p *pebbleBatch) PinEngineStateForIterators() error { + if p.iter == nil { + if p.batch.Indexed() { + p.iter = p.batch.NewIter(nil) + } else { + p.iter = p.db.NewIter(nil) + } + } + return nil +} + // NewMVCCIterator implements the Batch interface. func (p *pebbleBatch) ApplyBatchRepr(repr []byte, sync bool) error { var batch pebble.Batch diff --git a/pkg/storage/pebble_test.go b/pkg/storage/pebble_test.go index 55020acba970..cbeadb049250 100644 --- a/pkg/storage/pebble_test.go +++ b/pkg/storage/pebble_test.go @@ -457,10 +457,14 @@ func TestPebbleIterConsistency(t *testing.T) { roEngine := eng.NewReadOnly() batch := eng.NewBatch() + roEngine2 := eng.NewReadOnly() + batch2 := eng.NewBatch() require.False(t, eng.ConsistentIterators()) require.True(t, roEngine.ConsistentIterators()) require.True(t, batch.ConsistentIterators()) + require.True(t, roEngine2.ConsistentIterators()) + require.True(t, batch2.ConsistentIterators()) // Since an iterator is created on pebbleReadOnly, pebbleBatch before // writing a newer version of "a", the newer version will not be visible to @@ -468,6 +472,9 @@ func TestPebbleIterConsistency(t *testing.T) { roEngine.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("a")}).Close() batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("a")}).Close() eng.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("a")}).Close() + // Pin the state for iterators. + require.Nil(t, roEngine2.PinEngineStateForIterators()) + require.Nil(t, batch2.PinEngineStateForIterators()) // Write a newer version of "a" require.NoError(t, eng.PutMVCC(MVCCKey{[]byte("a"), ts2}, []byte("a2"))) @@ -506,26 +513,41 @@ func TestPebbleIterConsistency(t *testing.T) { checkMVCCIter(roEngine.NewMVCCIterator(MVCCKeyIterKind, IterOptions{Prefix: true})) checkMVCCIter(batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")})) checkMVCCIter(batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{Prefix: true})) + checkMVCCIter(roEngine2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")})) + checkMVCCIter(roEngine2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{Prefix: true})) + checkMVCCIter(batch2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")})) + checkMVCCIter(batch2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{Prefix: true})) checkEngineIter(roEngine.NewEngineIterator(IterOptions{UpperBound: []byte("b")})) checkEngineIter(roEngine.NewEngineIterator(IterOptions{Prefix: true})) checkEngineIter(batch.NewEngineIterator(IterOptions{UpperBound: []byte("b")})) checkEngineIter(batch.NewEngineIterator(IterOptions{Prefix: true})) + checkEngineIter(roEngine2.NewEngineIterator(IterOptions{UpperBound: []byte("b")})) + checkEngineIter(roEngine2.NewEngineIterator(IterOptions{Prefix: true})) + checkEngineIter(batch2.NewEngineIterator(IterOptions{UpperBound: []byte("b")})) + checkEngineIter(batch2.NewEngineIterator(IterOptions{Prefix: true})) - // The eng iterator will see both values. - iter := eng.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")}) - defer iter.Close() - iter.SeekGE(MVCCKey{Key: []byte("a")}) - count := 0 - for ; ; iter.Next() { - valid, err := iter.Valid() - require.NoError(t, err) - if !valid { - break + checkIterSeesBothValues := func(iter MVCCIterator) { + iter.SeekGE(MVCCKey{Key: []byte("a")}) + count := 0 + for ; ; iter.Next() { + valid, err := iter.Valid() + require.NoError(t, err) + if !valid { + break + } + count++ } - count++ + require.Equal(t, 2, count) + iter.Close() } - require.Equal(t, 2, count) + // The eng iterator will see both values. + checkIterSeesBothValues(eng.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")})) + // The indexed batches will see 2 values since the second one is written to the batch. + require.NoError(t, batch.PutMVCC(MVCCKey{[]byte("a"), ts2}, []byte("a2"))) + require.NoError(t, batch2.PutMVCC(MVCCKey{[]byte("a"), ts2}, []byte("a2"))) + checkIterSeesBothValues(batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")})) + checkIterSeesBothValues(batch2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")})) } func BenchmarkMVCCKeyCompare(b *testing.B) {