diff --git a/pkg/kv/kvserver/rditer/replica_data_iter_test.go b/pkg/kv/kvserver/rditer/replica_data_iter_test.go index 253b5caea050..8ae5efa1001d 100644 --- a/pkg/kv/kvserver/rditer/replica_data_iter_test.go +++ b/pkg/kv/kvserver/rditer/replica_data_iter_test.go @@ -126,7 +126,7 @@ func verifyRDReplicatedOnlyMVCCIter( ) { t.Helper() verify := func(t *testing.T, useSpanSet, reverse bool) { - readWriter := eng.NewReadOnly() + readWriter := eng.NewReadOnly(storage.StandardDurability) defer readWriter.Close() if useSpanSet { var spans spanset.SpanSet @@ -189,7 +189,7 @@ func verifyRDReplicatedOnlyMVCCIter( func verifyRDEngineIter( t *testing.T, desc *roachpb.RangeDescriptor, eng storage.Engine, expectedKeys []storage.MVCCKey, ) { - readWriter := eng.NewReadOnly() + readWriter := eng.NewReadOnly(storage.StandardDurability) defer readWriter.Close() iter := NewReplicaEngineDataIterator(desc, readWriter, false) defer iter.Close() diff --git a/pkg/kv/kvserver/replica_gossip.go b/pkg/kv/kvserver/replica_gossip.go index a1aeb0bbb249..f22a76ced01f 100644 --- a/pkg/kv/kvserver/replica_gossip.go +++ b/pkg/kv/kvserver/replica_gossip.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -174,7 +175,7 @@ func (r *Replica) MaybeGossipNodeLivenessRaftMuLocked( ba.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeaderFromSpan(span)}) // Call evaluateBatch instead of Send to avoid reacquiring latches. rec := NewReplicaEvalContext(r, todoSpanSet) - rw := r.Engine().NewReadOnly() + rw := r.Engine().NewReadOnly(storage.StandardDurability) defer rw.Close() br, result, pErr := @@ -217,7 +218,7 @@ func (r *Replica) loadSystemConfig(ctx context.Context) (*config.SystemConfigEnt ba.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeaderFromSpan(keys.SystemConfigSpan)}) // Call evaluateBatch instead of Send to avoid reacquiring latches. rec := NewReplicaEvalContext(r, todoSpanSet) - rw := r.Engine().NewReadOnly() + rw := r.Engine().NewReadOnly(storage.StandardDurability) defer rw.Close() br, result, pErr := evaluateBatch( diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 06742c05cea6..3f8d1ee57441 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -74,7 +74,7 @@ func (r *replicaRaftStorage) InitialState() (raftpb.HardState, raftpb.ConfState, // and this method will always return at least one entry even if it exceeds // maxBytes. Sideloaded proposals count towards maxBytes with their payloads inlined. func (r *replicaRaftStorage) Entries(lo, hi, maxBytes uint64) ([]raftpb.Entry, error) { - readonly := r.store.Engine().NewReadOnly() + readonly := r.store.Engine().NewReadOnly(storage.StandardDurability) defer readonly.Close() ctx := r.AnnotateCtx(context.TODO()) if r.raftMu.sideloaded == nil { @@ -280,7 +280,7 @@ func (r *replicaRaftStorage) Term(i uint64) (uint64, error) { if e, ok := r.store.raftEntryCache.Get(r.RangeID, i); ok { return e.Term, nil } - readonly := r.store.Engine().NewReadOnly() + readonly := r.store.Engine().NewReadOnly(storage.StandardDurability) defer readonly.Close() ctx := r.AnnotateCtx(context.TODO()) return term(ctx, r.mu.stateLoader, readonly, r.RangeID, r.store.raftEntryCache, i) diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index fb4df085fee9..ec19a2b92e00 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -54,7 +54,7 @@ func (r *Replica) executeReadOnlyBatch( // TODO(irfansharif): It's unfortunate that in this read-only code path, // we're stuck with a ReadWriter because of the way evaluateBatch is // designed. - rw := r.store.Engine().NewReadOnly() + rw := r.store.Engine().NewReadOnly(storage.StandardDurability) if !rw.ConsistentIterators() { // This is not currently needed for correctness, but future optimizations // may start relying on this, so we assert here. diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index b3beed4d84c9..43f9cff57c39 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -192,7 +192,7 @@ func TestReadOnlyBasics(t *testing.T) { e := engineImpl.create() defer e.Close() - ro := e.NewReadOnly() + ro := e.NewReadOnly(StandardDurability) if ro.Closed() { t.Fatal("read-only is expectedly found to be closed") } diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 2b153f6e8e54..1ad4bfeea5b8 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -653,6 +653,22 @@ type ReadWriter interface { Writer } +// DurabilityRequirement is an advanced option. If in doubt, use +// StandardDurability. +// +// GuranteedDurability maps to pebble.IterOptions.OnlyReadGuaranteedDurable. +// This acknowledges the fact that we do not (without sacrificing correctness) +// sync the WAL for many writes, and there are some advanced cases +// (raftLogTruncator) that need visibility into what is guaranteed durable. +type DurabilityRequirement int8 + +const ( + // StandardDurability is what should normally be used. + StandardDurability DurabilityRequirement = iota + // GuaranteedDurability is an advanced option (only for raftLogTruncator). + GuaranteedDurability +) + // Engine is the interface that wraps the core operations of a key/value store. type Engine interface { ReadWriter @@ -685,14 +701,15 @@ type Engine interface { // them atomically on a call to Commit(). NewBatch() Batch // NewReadOnly returns a new instance of a ReadWriter that wraps this - // engine. This wrapper panics when unexpected operations (e.g., write - // operations) are executed on it and caches iterators to avoid the overhead - // of creating multiple iterators for batched reads. + // engine, and with the given durability requirement. This wrapper panics + // when unexpected operations (e.g., write operations) are executed on it + // and caches iterators to avoid the overhead of creating multiple iterators + // for batched reads. // // All iterators created from a read-only engine are guaranteed to provide a // consistent snapshot of the underlying engine. See the comment on the // Reader interface and the Reader.ConsistentIterators method. - NewReadOnly() ReadWriter + NewReadOnly(durability DurabilityRequirement) ReadWriter // NewUnindexedBatch returns a new instance of a batched engine which wraps // this engine. It is unindexed, in that writes to the batch are not // visible to reads until after it commits. The batch accumulates all @@ -740,7 +757,13 @@ type Engine interface { // addSSTablePreApply to select alternate code paths, but really there should // be a unified code path there. InMem() bool - + // RegisterFlushCompletedCallback registers a callback that will be run for + // every successful flush. Only one callback can be registered at a time, so + // registering again replaces the previous callback. The callback must + // return quickly and must not call any methods on the Engine in the context + // of the callback since it could cause a deadlock (since the callback may + // be invoked while holding mutexes). + RegisterFlushCompletedCallback(cb func()) // Filesystem functionality. fs.FS // ReadFile reads the content from the file with the given filename int this RocksDB's env. diff --git a/pkg/storage/engine_test.go b/pkg/storage/engine_test.go index 623c7140cf89..1416de23e72e 100644 --- a/pkg/storage/engine_test.go +++ b/pkg/storage/engine_test.go @@ -857,7 +857,7 @@ func TestEngineScan1(t *testing.T) { } // Test iterator stats. - ro := engine.NewReadOnly() + ro := engine.NewReadOnly(StandardDurability) iter := ro.NewMVCCIterator(MVCCKeyIterKind, IterOptions{LowerBound: roachpb.Key("cat"), UpperBound: roachpb.Key("server")}) iter.SeekGE(MVCCKey{Key: roachpb.Key("cat")}) diff --git a/pkg/storage/intent_interleaving_iter.go b/pkg/storage/intent_interleaving_iter.go index bf31221fbe93..20446b93ada8 100644 --- a/pkg/storage/intent_interleaving_iter.go +++ b/pkg/storage/intent_interleaving_iter.go @@ -976,7 +976,7 @@ func (i *intentInterleavingIter) SupportsPrev() bool { // the identical engine state. func newMVCCIteratorByCloningEngineIter(iter EngineIterator, opts IterOptions) MVCCIterator { pIter := iter.GetRawIter() - it := newPebbleIterator(nil, pIter, opts) + it := newPebbleIterator(nil, pIter, opts, StandardDurability) if iter == nil { panic("couldn't create a new iterator") } diff --git a/pkg/storage/intent_reader_writer.go b/pkg/storage/intent_reader_writer.go index 9e6068f8006d..3217153b8950 100644 --- a/pkg/storage/intent_reader_writer.go +++ b/pkg/storage/intent_reader_writer.go @@ -101,7 +101,9 @@ func (idw intentDemuxWriter) ClearMVCCRangeAndIntents( // code probably uses an MVCCIterator. type wrappableReader interface { Reader - rawGet(key []byte) (value []byte, err error) + // rawMVCCGet is only used for Reader.MVCCGet which is deprecated and not + // performance sensitive. + rawMVCCGet(key []byte) (value []byte, err error) } // wrapReader wraps the provided reader, to return an implementation of MVCCIterator @@ -126,7 +128,7 @@ var intentInterleavingReaderPool = sync.Pool{ // Get implements the Reader interface. func (imr *intentInterleavingReader) MVCCGet(key MVCCKey) ([]byte, error) { - val, err := imr.wrappableReader.rawGet(EncodeMVCCKey(key)) + val, err := imr.wrappableReader.rawMVCCGet(EncodeMVCCKey(key)) if val != nil || err != nil || !key.Timestamp.IsEmpty() { return val, err } diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index fde8f277f707..99af0c9d53dd 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -41,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -635,6 +636,11 @@ type Pebble struct { unencryptedFS vfs.FS logger pebble.Logger eventListener *pebble.EventListener + mu struct { + // This mutex is the lowest in any lock ordering. + syncutil.Mutex + flushCompletedCallback func() + } wrappedIntentWriter intentDemuxWriter @@ -823,7 +829,7 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (*Pebble, error) { ctx: logCtx, depth: 2, // skip over the EventListener stack frame }), - p.makeMetricEventListener(ctx), + p.makeMetricEtcEventListener(ctx), ) p.eventListener = &cfg.Opts.EventListener p.wrappedIntentWriter = wrapIntentWriter(ctx, p) @@ -865,7 +871,7 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (*Pebble, error) { return p, nil } -func (p *Pebble) makeMetricEventListener(ctx context.Context) pebble.EventListener { +func (p *Pebble) makeMetricEtcEventListener(ctx context.Context) pebble.EventListener { return pebble.EventListener{ WriteStallBegin: func(info pebble.WriteStallBeginInfo) { atomic.AddInt64(&p.writeStallCount, 1) @@ -892,6 +898,17 @@ func (p *Pebble) makeMetricEventListener(ctx context.Context) pebble.EventListen } atomic.AddInt64(&p.diskSlowCount, 1) }, + FlushEnd: func(info pebble.FlushInfo) { + if info.Err != nil { + return + } + p.mu.Lock() + cb := p.mu.flushCompletedCallback + p.mu.Unlock() + if cb != nil { + cb() + } + }, } } @@ -941,17 +958,21 @@ func (p *Pebble) ExportMVCCToSst( // MVCCGet implements the Engine interface. func (p *Pebble) MVCCGet(key MVCCKey) ([]byte, error) { + return mvccGetHelper(key, p) +} + +func mvccGetHelper(key MVCCKey, reader wrappableReader) ([]byte, error) { if len(key.Key) == 0 { return nil, emptyKeyError() } - r := wrapReader(p) + r := wrapReader(reader) // Doing defer r.Free() does not inline. v, err := r.MVCCGet(key) r.Free() return v, err } -func (p *Pebble) rawGet(key []byte) ([]byte, error) { +func (p *Pebble) rawMVCCGet(key []byte) ([]byte, error) { ret, closer, err := p.db.Get(key) if closer != nil { retCopy := make([]byte, len(ret)) @@ -999,7 +1020,7 @@ func (p *Pebble) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) MVCCIt return iter } - iter := newPebbleIterator(p.db, nil, opts) + iter := newPebbleIterator(p.db, nil, opts, StandardDurability) if iter == nil { panic("couldn't create a new iterator") } @@ -1011,7 +1032,7 @@ func (p *Pebble) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) MVCCIt // NewEngineIterator implements the Engine interface. func (p *Pebble) NewEngineIterator(opts IterOptions) EngineIterator { - iter := newPebbleIterator(p.db, nil, opts) + iter := newPebbleIterator(p.db, nil, opts, StandardDurability) if iter == nil { panic("couldn't create a new iterator") } @@ -1418,8 +1439,8 @@ func (p *Pebble) NewBatch() Batch { } // NewReadOnly implements the Engine interface. -func (p *Pebble) NewReadOnly() ReadWriter { - return newPebbleReadOnly(p) +func (p *Pebble) NewReadOnly(durability DurabilityRequirement) ReadWriter { + return newPebbleReadOnly(p, durability) } // NewUnindexedBatch implements the Engine interface. @@ -1477,6 +1498,13 @@ func (p *Pebble) InMem() bool { return p.path == "" } +// RegisterFlushCompletedCallback implements the Engine interface. +func (p *Pebble) RegisterFlushCompletedCallback(cb func()) { + p.mu.Lock() + p.mu.flushCompletedCallback = cb + p.mu.Unlock() +} + // ReadFile implements the Engine interface. func (p *Pebble) ReadFile(filename string) ([]byte, error) { file, err := p.fs.Open(filename) @@ -1645,6 +1673,7 @@ type pebbleReadOnly struct { prefixEngineIter pebbleIterator normalEngineIter pebbleIterator iter cloneableIter + durability DurabilityRequirement closed bool } @@ -1665,7 +1694,7 @@ var pebbleReadOnlyPool = sync.Pool{ } // Instantiates a new pebbleReadOnly. -func newPebbleReadOnly(parent *Pebble) *pebbleReadOnly { +func newPebbleReadOnly(parent *Pebble, durability DurabilityRequirement) *pebbleReadOnly { p := pebbleReadOnlyPool.Get().(*pebbleReadOnly) // When p is a reused pebbleReadOnly from the pool, the iter fields preserve // the original reusable=true that was set above in pebbleReadOnlyPool.New(), @@ -1677,6 +1706,7 @@ func newPebbleReadOnly(parent *Pebble) *pebbleReadOnly { normalIter: p.normalIter, prefixEngineIter: p.prefixEngineIter, normalEngineIter: p.normalEngineIter, + durability: durability, } return p } @@ -1693,6 +1723,7 @@ func (p *pebbleReadOnly) Close() { p.normalIter.destroy() p.prefixEngineIter.destroy() p.normalEngineIter.destroy() + p.durability = StandardDurability pebbleReadOnlyPool.Put(p) } @@ -1713,26 +1744,39 @@ func (p *pebbleReadOnly) ExportMVCCToSst( } func (p *pebbleReadOnly) MVCCGet(key MVCCKey) ([]byte, error) { - if p.closed { - panic("using a closed pebbleReadOnly") - } - return p.parent.MVCCGet(key) + return mvccGetHelper(key, p) } -func (p *pebbleReadOnly) rawGet(key []byte) ([]byte, error) { +func (p *pebbleReadOnly) rawMVCCGet(key []byte) ([]byte, error) { if p.closed { panic("using a closed pebbleReadOnly") } - return p.parent.rawGet(key) + // Cannot delegate to p.parent.rawMVCCGet since we need to use p.durability. + onlyReadGuaranteedDurable := false + if p.durability == GuaranteedDurability { + onlyReadGuaranteedDurable = true + } + options := pebble.IterOptions{ + LowerBound: key, + UpperBound: roachpb.BytesNext(key), + OnlyReadGuaranteedDurable: onlyReadGuaranteedDurable, + } + iter := p.parent.db.NewIter(&options) + defer func() { + // Already handled error. + _ = iter.Close() + }() + valid := iter.SeekGE(key) + if !valid { + return nil, iter.Error() + } + return iter.Value(), nil } func (p *pebbleReadOnly) MVCCGetProto( key MVCCKey, msg protoutil.Message, ) (ok bool, keyBytes, valBytes int64, err error) { - if p.closed { - panic("using a closed pebbleReadOnly") - } - return p.parent.MVCCGetProto(key, msg) + return pebbleGetProto(p, key, msg) } func (p *pebbleReadOnly) MVCCIterate( @@ -1770,7 +1814,7 @@ func (p *pebbleReadOnly) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions if !opts.MinTimestampHint.IsEmpty() { // MVCCIterators that specify timestamp bounds cannot be cached. - iter := MVCCIterator(newPebbleIterator(p.parent.db, nil, opts)) + iter := MVCCIterator(newPebbleIterator(p.parent.db, nil, opts, p.durability)) if util.RaceEnabled { iter = wrapInUnsafeIter(iter) } @@ -1790,7 +1834,7 @@ func (p *pebbleReadOnly) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions if iter.iter != nil { iter.setBounds(opts.LowerBound, opts.UpperBound) } else { - iter.init(p.parent.db, p.iter, opts) + iter.init(p.parent.db, p.iter, opts, p.durability) if p.iter == nil { // For future cloning. p.iter = iter.iter @@ -1825,7 +1869,7 @@ func (p *pebbleReadOnly) NewEngineIterator(opts IterOptions) EngineIterator { if iter.iter != nil { iter.setBounds(opts.LowerBound, opts.UpperBound) } else { - iter.init(p.parent.db, p.iter, opts) + iter.init(p.parent.db, p.iter, opts, p.durability) if p.iter == nil { // For future cloning. p.iter = iter.iter @@ -1857,7 +1901,11 @@ func (p *pebbleReadOnly) ConsistentIterators() bool { // PinEngineStateForIterators implements the Engine interface. func (p *pebbleReadOnly) PinEngineStateForIterators() error { if p.iter == nil { - p.iter = p.parent.db.NewIter(nil) + o := (*pebble.IterOptions)(nil) + if p.durability == GuaranteedDurability { + o = &pebble.IterOptions{OnlyReadGuaranteedDurable: true} + } + p.iter = p.parent.db.NewIter(o) } return nil } @@ -1981,7 +2029,7 @@ func (p *pebbleSnapshot) MVCCGet(key MVCCKey) ([]byte, error) { return v, err } -func (p *pebbleSnapshot) rawGet(key []byte) ([]byte, error) { +func (p *pebbleSnapshot) rawMVCCGet(key []byte) ([]byte, error) { ret, closer, err := p.snapshot.Get(key) if closer != nil { retCopy := make([]byte, len(ret)) @@ -2028,7 +2076,7 @@ func (p *pebbleSnapshot) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions } return iter } - iter := MVCCIterator(newPebbleIterator(p.snapshot, nil, opts)) + iter := MVCCIterator(newPebbleIterator(p.snapshot, nil, opts, StandardDurability)) if util.RaceEnabled { iter = wrapInUnsafeIter(iter) } @@ -2037,7 +2085,7 @@ func (p *pebbleSnapshot) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions // NewEngineIterator implements the Reader interface. func (p pebbleSnapshot) NewEngineIterator(opts IterOptions) EngineIterator { - return newPebbleIterator(p.snapshot, nil, opts) + return newPebbleIterator(p.snapshot, nil, opts, StandardDurability) } // ConsistentIterators implements the Reader interface. diff --git a/pkg/storage/pebble_batch.go b/pkg/storage/pebble_batch.go index 117c2995cfa8..371f430dd590 100644 --- a/pkg/storage/pebble_batch.go +++ b/pkg/storage/pebble_batch.go @@ -143,7 +143,7 @@ func (p *pebbleBatch) MVCCGet(key MVCCKey) ([]byte, error) { return v, err } -func (p *pebbleBatch) rawGet(key []byte) ([]byte, error) { +func (p *pebbleBatch) rawMVCCGet(key []byte) ([]byte, error) { r := pebble.Reader(p.batch) if p.writeOnly { panic("write-only batch") @@ -209,7 +209,7 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M if !opts.MinTimestampHint.IsEmpty() { // MVCCIterators that specify timestamp bounds cannot be cached. - iter := MVCCIterator(newPebbleIterator(p.batch, nil, opts)) + iter := MVCCIterator(newPebbleIterator(p.batch, nil, opts, StandardDurability)) if util.RaceEnabled { iter = wrapInUnsafeIter(iter) } @@ -230,9 +230,9 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M iter.setBounds(opts.LowerBound, opts.UpperBound) } else { if p.batch.Indexed() { - iter.init(p.batch, p.iter, opts) + iter.init(p.batch, p.iter, opts, StandardDurability) } else { - iter.init(p.db, p.iter, opts) + iter.init(p.db, p.iter, opts, StandardDurability) } if p.iter == nil { // For future cloning. @@ -272,9 +272,9 @@ func (p *pebbleBatch) NewEngineIterator(opts IterOptions) EngineIterator { iter.setBounds(opts.LowerBound, opts.UpperBound) } else { if p.batch.Indexed() { - iter.init(p.batch, p.iter, opts) + iter.init(p.batch, p.iter, opts, StandardDurability) } else { - iter.init(p.db, p.iter, opts) + iter.init(p.db, p.iter, opts, StandardDurability) } if p.iter == nil { // For future cloning. diff --git a/pkg/storage/pebble_iterator.go b/pkg/storage/pebble_iterator.go index cc4a5ebe502e..ea7a7b77a0d9 100644 --- a/pkg/storage/pebble_iterator.go +++ b/pkg/storage/pebble_iterator.go @@ -84,11 +84,14 @@ type testingSetBoundsListener interface { // Instantiates a new Pebble iterator, or gets one from the pool. func newPebbleIterator( - handle pebble.Reader, iterToClone cloneableIter, opts IterOptions, + handle pebble.Reader, + iterToClone cloneableIter, + opts IterOptions, + durability DurabilityRequirement, ) *pebbleIterator { iter := pebbleIterPool.Get().(*pebbleIterator) iter.reusable = false // defensive - iter.init(handle, iterToClone, opts) + iter.init(handle, iterToClone, opts, durability) return iter } @@ -97,7 +100,15 @@ func newPebbleIterator( // pebbleBatch), or a newly-instantiated one through newPebbleIterator. The // underlying *pebble.Iterator is created using iterToClone, if non-nil and // there are no timestamp hints, else it is created using handle. -func (p *pebbleIterator) init(handle pebble.Reader, iterToClone cloneableIter, opts IterOptions) { +// +// **NOTE**: the durability parameter may be ignored if iterToClone is +// non-nil, so make sure that the desired durability is the same. +func (p *pebbleIterator) init( + handle pebble.Reader, + iterToClone cloneableIter, + opts IterOptions, + durability DurabilityRequirement, +) { *p = pebbleIterator{ keyBuf: p.keyBuf, lowerBoundBuf: p.lowerBoundBuf, @@ -110,6 +121,10 @@ func (p *pebbleIterator) init(handle pebble.Reader, iterToClone cloneableIter, o panic("iterator must set prefix or upper bound or lower bound") } + p.options.OnlyReadGuaranteedDurable = false + if durability == GuaranteedDurability { + p.options.OnlyReadGuaranteedDurable = true + } if opts.LowerBound != nil { // This is the same as // p.options.LowerBound = EncodeKeyToBuf(p.lowerBoundBuf[0][:0], MVCCKey{Key: opts.LowerBound}) diff --git a/pkg/storage/pebble_mvcc_scanner_test.go b/pkg/storage/pebble_mvcc_scanner_test.go index 779567407007..0a61de339f31 100644 --- a/pkg/storage/pebble_mvcc_scanner_test.go +++ b/pkg/storage/pebble_mvcc_scanner_test.go @@ -72,7 +72,7 @@ func TestMVCCScanWithManyVersionsAndSeparatedIntents(t *testing.T) { require.NoError(t, err) } - reader := eng.NewReadOnly() + reader := eng.NewReadOnly(StandardDurability) defer reader.Close() iter := reader.NewMVCCIterator( MVCCKeyAndIntentsIterKind, IterOptions{LowerBound: keys[0], UpperBound: roachpb.Key("d")}) @@ -137,7 +137,7 @@ func TestMVCCScanWithLargeKeyValue(t *testing.T) { require.NoError(t, eng.PutMVCC(MVCCKey{Key: keys[3], Timestamp: hlc.Timestamp{WallTime: 1}}, largeValue)) - reader := eng.NewReadOnly() + reader := eng.NewReadOnly(StandardDurability) defer reader.Close() iter := reader.NewMVCCIterator( MVCCKeyAndIntentsIterKind, IterOptions{LowerBound: keys[0], UpperBound: roachpb.Key("e")}) diff --git a/pkg/storage/pebble_test.go b/pkg/storage/pebble_test.go index 557cf6196440..c4306900e5a1 100644 --- a/pkg/storage/pebble_test.go +++ b/pkg/storage/pebble_test.go @@ -266,7 +266,8 @@ func TestPebbleIterBoundSliceStabilityAndNoop(t *testing.T) { eng := createTestPebbleEngine().(*Pebble) defer eng.Close() - iter := newPebbleIterator(eng.db, nil, IterOptions{UpperBound: roachpb.Key("foo")}) + iter := newPebbleIterator( + eng.db, nil, IterOptions{UpperBound: roachpb.Key("foo")}, StandardDurability) defer iter.Close() checker := &iterBoundsChecker{t: t} iter.testingSetBoundsListener = checker @@ -500,9 +501,9 @@ func TestPebbleIterConsistency(t *testing.T) { require.NoError(t, eng.PutMVCC(k1, []byte("a1"))) var ( - roEngine = eng.NewReadOnly() + roEngine = eng.NewReadOnly(StandardDurability) batch = eng.NewBatch() - roEngine2 = eng.NewReadOnly() + roEngine2 = eng.NewReadOnly(StandardDurability) batch2 = eng.NewBatch() ) defer roEngine.Close() @@ -1181,3 +1182,59 @@ func TestPebbleMVCCTimeIntervalCollectorAndFilter(t *testing.T) { expected := []int64{7, 6, 5} require.Equal(t, expected, found) } + +func TestPebbleFlushCallbackAndDurabilityRequirement(t *testing.T) { + defer leaktest.AfterTest(t)() + + eng := createTestPebbleEngine() + defer eng.Close() + + ts := hlc.Timestamp{WallTime: 1} + k := MVCCKey{[]byte("a"), ts} + // Write. + require.NoError(t, eng.PutMVCC(k, []byte("a1"))) + cbCount := int32(0) + eng.RegisterFlushCompletedCallback(func() { + atomic.AddInt32(&cbCount, 1) + }) + roStandard := eng.NewReadOnly(StandardDurability) + defer roStandard.Close() + roGuaranteed := eng.NewReadOnly(GuaranteedDurability) + defer roGuaranteed.Close() + roGuaranteedPinned := eng.NewReadOnly(GuaranteedDurability) + defer roGuaranteedPinned.Close() + require.NoError(t, roGuaranteedPinned.PinEngineStateForIterators()) + // Returns the value found or nil. + checkGetAndIter := func(reader Reader) []byte { + v, err := reader.MVCCGet(k) + require.NoError(t, err) + iter := reader.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: k.Key.Next()}) + defer iter.Close() + iter.SeekGE(k) + valid, err := iter.Valid() + require.NoError(t, err) + require.Equal(t, v != nil, valid) + if valid { + require.Equal(t, v, iter.Value()) + } + return v + } + require.Equal(t, "a1", string(checkGetAndIter(roStandard))) + // Write is not visible yet. + require.Nil(t, checkGetAndIter(roGuaranteed)) + require.Nil(t, checkGetAndIter(roGuaranteedPinned)) + + // Flush the engine and wait for it to complete. + require.NoError(t, eng.Flush()) + testutils.SucceedsSoon(t, func() error { + if atomic.LoadInt32(&cbCount) < 1 { + return errors.Errorf("not flushed") + } + return nil + }) + // Write is visible to new guaranteed reader. We need to use a new reader + // due to iterator caching. + roGuaranteed2 := eng.NewReadOnly(GuaranteedDurability) + defer roGuaranteed2.Close() + require.Equal(t, "a1", string(checkGetAndIter(roGuaranteed2))) +}