Skip to content

Commit

Permalink
storage: introduce guaranteed durability functionality
Browse files Browse the repository at this point in the history
This is the CockroachDB plumbing for Pebble's
IterOptions.OnlyReadGuaranteedDurable. It is for use in
the raftLogTruncator
cockroachdb#76215.

Since most of the exported interfaces in the storage
package use a Reader, we support this via a
DurabilityRequirement parameter on Engine.NewReadOnly,
and not via an iterator option.

There is also a RegisterFlushCompletedCallback method
on Engine which will be used to poll certain durable
state in the raftLogTruncator.

Other than the trivial plumbing, this required some
refactoring of the Reader.MVCCGet* code for Pebble
and pebbleReadOnly. Even though it is deprecated and
primarily/only used in tests, we don't want to have
the durability semantics diverge.

Release note: None
  • Loading branch information
sumeerbhola committed Feb 18, 2022
1 parent 89198dd commit 66de60a
Show file tree
Hide file tree
Showing 14 changed files with 203 additions and 57 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/rditer/replica_data_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func verifyRDReplicatedOnlyMVCCIter(
) {
t.Helper()
verify := func(t *testing.T, useSpanSet, reverse bool) {
readWriter := eng.NewReadOnly()
readWriter := eng.NewReadOnly(storage.StandardDurability)
defer readWriter.Close()
if useSpanSet {
var spans spanset.SpanSet
Expand Down Expand Up @@ -189,7 +189,7 @@ func verifyRDReplicatedOnlyMVCCIter(
func verifyRDEngineIter(
t *testing.T, desc *roachpb.RangeDescriptor, eng storage.Engine, expectedKeys []storage.MVCCKey,
) {
readWriter := eng.NewReadOnly()
readWriter := eng.NewReadOnly(storage.StandardDurability)
defer readWriter.Close()
iter := NewReplicaEngineDataIterator(desc, readWriter, false)
defer iter.Close()
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/replica_gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -174,7 +175,7 @@ func (r *Replica) MaybeGossipNodeLivenessRaftMuLocked(
ba.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeaderFromSpan(span)})
// Call evaluateBatch instead of Send to avoid reacquiring latches.
rec := NewReplicaEvalContext(r, todoSpanSet)
rw := r.Engine().NewReadOnly()
rw := r.Engine().NewReadOnly(storage.StandardDurability)
defer rw.Close()

br, result, pErr :=
Expand Down Expand Up @@ -217,7 +218,7 @@ func (r *Replica) loadSystemConfig(ctx context.Context) (*config.SystemConfigEnt
ba.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeaderFromSpan(keys.SystemConfigSpan)})
// Call evaluateBatch instead of Send to avoid reacquiring latches.
rec := NewReplicaEvalContext(r, todoSpanSet)
rw := r.Engine().NewReadOnly()
rw := r.Engine().NewReadOnly(storage.StandardDurability)
defer rw.Close()

br, result, pErr := evaluateBatch(
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (r *replicaRaftStorage) InitialState() (raftpb.HardState, raftpb.ConfState,
// and this method will always return at least one entry even if it exceeds
// maxBytes. Sideloaded proposals count towards maxBytes with their payloads inlined.
func (r *replicaRaftStorage) Entries(lo, hi, maxBytes uint64) ([]raftpb.Entry, error) {
readonly := r.store.Engine().NewReadOnly()
readonly := r.store.Engine().NewReadOnly(storage.StandardDurability)
defer readonly.Close()
ctx := r.AnnotateCtx(context.TODO())
if r.raftMu.sideloaded == nil {
Expand Down Expand Up @@ -280,7 +280,7 @@ func (r *replicaRaftStorage) Term(i uint64) (uint64, error) {
if e, ok := r.store.raftEntryCache.Get(r.RangeID, i); ok {
return e.Term, nil
}
readonly := r.store.Engine().NewReadOnly()
readonly := r.store.Engine().NewReadOnly(storage.StandardDurability)
defer readonly.Close()
ctx := r.AnnotateCtx(context.TODO())
return term(ctx, r.mu.stateLoader, readonly, r.RangeID, r.store.raftEntryCache, i)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (r *Replica) executeReadOnlyBatch(
// TODO(irfansharif): It's unfortunate that in this read-only code path,
// we're stuck with a ReadWriter because of the way evaluateBatch is
// designed.
rw := r.store.Engine().NewReadOnly()
rw := r.store.Engine().NewReadOnly(storage.StandardDurability)
if !rw.ConsistentIterators() {
// This is not currently needed for correctness, but future optimizations
// may start relying on this, so we assert here.
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func TestReadOnlyBasics(t *testing.T) {
e := engineImpl.create()
defer e.Close()

ro := e.NewReadOnly()
ro := e.NewReadOnly(StandardDurability)
if ro.Closed() {
t.Fatal("read-only is expectedly found to be closed")
}
Expand Down
33 changes: 28 additions & 5 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,22 @@ type ReadWriter interface {
Writer
}

// DurabilityRequirement is an advanced option. If in doubt, use
// StandardDurability.
//
// GuranteedDurability maps to pebble.IterOptions.OnlyReadGuaranteedDurable.
// This acknowledges the fact that we do not (without sacrificing correctness)
// sync the WAL for many writes, and there are some advanced cases
// (raftLogTruncator) that need visibility into what is guaranteed durable.
type DurabilityRequirement int8

const (
// StandardDurability is what should normally be used.
StandardDurability DurabilityRequirement = iota
// GuaranteedDurability is an advanced option (only for raftLogTruncator).
GuaranteedDurability
)

// Engine is the interface that wraps the core operations of a key/value store.
type Engine interface {
ReadWriter
Expand Down Expand Up @@ -685,14 +701,15 @@ type Engine interface {
// them atomically on a call to Commit().
NewBatch() Batch
// NewReadOnly returns a new instance of a ReadWriter that wraps this
// engine. This wrapper panics when unexpected operations (e.g., write
// operations) are executed on it and caches iterators to avoid the overhead
// of creating multiple iterators for batched reads.
// engine, and with the given durability requirement. This wrapper panics
// when unexpected operations (e.g., write operations) are executed on it
// and caches iterators to avoid the overhead of creating multiple iterators
// for batched reads.
//
// All iterators created from a read-only engine are guaranteed to provide a
// consistent snapshot of the underlying engine. See the comment on the
// Reader interface and the Reader.ConsistentIterators method.
NewReadOnly() ReadWriter
NewReadOnly(durability DurabilityRequirement) ReadWriter
// NewUnindexedBatch returns a new instance of a batched engine which wraps
// this engine. It is unindexed, in that writes to the batch are not
// visible to reads until after it commits. The batch accumulates all
Expand Down Expand Up @@ -740,7 +757,13 @@ type Engine interface {
// addSSTablePreApply to select alternate code paths, but really there should
// be a unified code path there.
InMem() bool

// RegisterFlushCompletedCallback registers a callback that will be run for
// every successful flush. Only one callback can be registered at a time, so
// registering again replaces the previous callback. The callback must
// return quickly and must not call any methods on the Engine in the context
// of the callback since it could cause a deadlock (since the callback may
// be invoked while holding mutexes).
RegisterFlushCompletedCallback(cb func())
// Filesystem functionality.
fs.FS
// ReadFile reads the content from the file with the given filename int this RocksDB's env.
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ func TestEngineScan1(t *testing.T) {
}

// Test iterator stats.
ro := engine.NewReadOnly()
ro := engine.NewReadOnly(StandardDurability)
iter := ro.NewMVCCIterator(MVCCKeyIterKind,
IterOptions{LowerBound: roachpb.Key("cat"), UpperBound: roachpb.Key("server")})
iter.SeekGE(MVCCKey{Key: roachpb.Key("cat")})
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/intent_interleaving_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,7 @@ func (i *intentInterleavingIter) SupportsPrev() bool {
// the identical engine state.
func newMVCCIteratorByCloningEngineIter(iter EngineIterator, opts IterOptions) MVCCIterator {
pIter := iter.GetRawIter()
it := newPebbleIterator(nil, pIter, opts)
it := newPebbleIterator(nil, pIter, opts, StandardDurability)
if iter == nil {
panic("couldn't create a new iterator")
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/intent_reader_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ func (idw intentDemuxWriter) ClearMVCCRangeAndIntents(
// code probably uses an MVCCIterator.
type wrappableReader interface {
Reader
rawGet(key []byte) (value []byte, err error)
// rawMVCCGet is only used for Reader.MVCCGet which is deprecated and not
// performance sensitive.
rawMVCCGet(key []byte) (value []byte, err error)
}

// wrapReader wraps the provided reader, to return an implementation of MVCCIterator
Expand All @@ -126,7 +128,7 @@ var intentInterleavingReaderPool = sync.Pool{

// Get implements the Reader interface.
func (imr *intentInterleavingReader) MVCCGet(key MVCCKey) ([]byte, error) {
val, err := imr.wrappableReader.rawGet(EncodeMVCCKey(key))
val, err := imr.wrappableReader.rawMVCCGet(EncodeMVCCKey(key))
if val != nil || err != nil || !key.Timestamp.IsEmpty() {
return val, err
}
Expand Down
Loading

0 comments on commit 66de60a

Please sign in to comment.