Skip to content

Commit

Permalink
kvserver: introduce Store.{logEngine,todoEngine}
Browse files Browse the repository at this point in the history
Over the last few months, we have been chipping away[^1] at separating the
engine interactions of the KV server in preparation for optionally
running with a separate log engine for the raft state.

[^1]: https://github.com/cockroachdb/cockroach/pulls?q=is%3Apr+%22CRDB-220%22+is%3Amerged+created%3A%3C2023-02-08

Plenty of work remains to be able to safely split these engines apart,
or to even be able to do it at all, but now's a good time to take stock
and syntactically "boil the ocean" by forcing all users of
`Store.engine` to decide between three `Engine` fields - all for now
backed by the same Engine - now present on `Store`:

- log engine
- state engine
- "TODO" engine.

The third engine mirrors the `context.TODO()` pattern - the "TODO"
engine indicates that work needs to be done to even arrive at an
operation that targets one of the engines specifically; or that there is
something that should be revisited about this particular usage.

With the stability period coming up, we will want to avoid large
mechanical changes in the near future, so now is a good time to get this
out of the way and to work towards a "prototype plus" of the raft log
separation for the remaining weeks.

Ideally this would culminate in a way to actually run with two engines
for experiments (ignoring crash-restart correctness issues and the
like), which would be an important milestone for the project and would
give us a reality check on the work that lies ahead to productionize
this work.

This refactor was carried out mechanically: introduce the new fields,
look at all usages of the old field one by one and replace them as
appropriate, leaving TODO comments where this seemed to add anything
new. Finally, drop the old field as it had become unused.

There is a flavor of TODO engine usages that requires us to simply chose
which engine to use for the particular purpose.  I'd like to move these
into issues over time, making a temporary call for now, such that the
CRDB-220 epic more completely reflects the scope of work included in it.

Epic: CRDB-220
Release note: None
  • Loading branch information
tbg committed Feb 8, 2023
1 parent 7f62dc7 commit 870468b
Show file tree
Hide file tree
Showing 24 changed files with 138 additions and 82 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/loqrecovery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (s Server) ServeLocalReplicas(
stream serverpb.Admin_RecoveryCollectLocalReplicaInfoServer,
) error {
return s.stores.VisitStores(func(s *kvserver.Store) error {
reader := s.Engine().NewSnapshot()
reader := s.StateEngine().NewSnapshot()
defer reader.Close()
return visitStoreReplicas(ctx, reader, s.StoreID(), s.NodeID(),
func(info loqrecoverypb.ReplicaInfo) error {
Expand Down Expand Up @@ -383,7 +383,7 @@ func (s Server) NodeStatus(
status.PendingPlanID = &plan.PlanID
}
err = s.stores.VisitStores(func(s *kvserver.Store) error {
r, ok, err := readNodeRecoveryStatusInfo(ctx, s.Engine())
r, ok, err := readNodeRecoveryStatusInfo(ctx, s.StateEngine())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/mvcc_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ func (mgcq *mvccGCQueue) process(
log.VErrEventf(ctx, 2, "failed to update last processed time: %v", err)
}

snap := repl.store.Engine().NewSnapshot()
snap := repl.store.StateEngine().NewSnapshot()
defer snap.Close()

intentAgeThreshold := gc.IntentAgeThreshold.Get(&repl.store.ClusterSettings().SV)
Expand Down
11 changes: 6 additions & 5 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,7 @@ func (r *Replica) ContainsKeyRange(start, end roachpb.Key) bool {
func (r *Replica) GetLastReplicaGCTimestamp(ctx context.Context) (hlc.Timestamp, error) {
key := keys.RangeLastReplicaGCTimestampKey(r.RangeID)
var timestamp hlc.Timestamp
_, err := storage.MVCCGetProto(ctx, r.store.Engine(), key, hlc.Timestamp{}, &timestamp,
_, err := storage.MVCCGetProto(ctx, r.store.StateEngine(), key, hlc.Timestamp{}, &timestamp,
storage.MVCCGetOptions{})
if err != nil {
return hlc.Timestamp{}, err
Expand All @@ -1337,7 +1337,7 @@ func (r *Replica) GetLastReplicaGCTimestamp(ctx context.Context) (hlc.Timestamp,
func (r *Replica) setLastReplicaGCTimestamp(ctx context.Context, timestamp hlc.Timestamp) error {
key := keys.RangeLastReplicaGCTimestampKey(r.RangeID)
return storage.MVCCPutProto(
ctx, r.store.Engine(), nil, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, &timestamp)
ctx, r.store.StateEngine(), nil, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, &timestamp)
}

// getQueueLastProcessed returns the last processed timestamp for the
Expand All @@ -1346,7 +1346,7 @@ func (r *Replica) getQueueLastProcessed(ctx context.Context, queue string) (hlc.
key := keys.QueueLastProcessedKey(r.Desc().StartKey, queue)
var timestamp hlc.Timestamp
if r.store != nil {
_, err := storage.MVCCGetProto(ctx, r.store.Engine(), key, hlc.Timestamp{}, &timestamp,
_, err := storage.MVCCGetProto(ctx, r.store.StateEngine(), key, hlc.Timestamp{}, &timestamp,
storage.MVCCGetOptions{})
if err != nil {
log.VErrEventf(ctx, 2, "last processed timestamp unavailable: %s", err)
Expand Down Expand Up @@ -2245,13 +2245,14 @@ func (r *Replica) GetResponseMemoryAccount() *mon.BoundAccount {
// GetEngineCapacity returns the store's underlying engine capacity; other
// StoreCapacity fields not related to engine capacity are not populated.
func (r *Replica) GetEngineCapacity() (roachpb.StoreCapacity, error) {
return r.store.Engine().Capacity()
// TODO(sep-raft-log): need to expose log engine capacity.
return r.store.TODOEngine().Capacity()
}

// GetApproximateDiskBytes returns an approximate measure of bytes in the store
// in the specified key range.
func (r *Replica) GetApproximateDiskBytes(from, to roachpb.Key) (uint64, error) {
return r.store.Engine().ApproximateDiskBytes(from, to)
return r.store.StateEngine().ApproximateDiskBytes(from, to)
}

func init() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_app_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (b *replicaAppBatch) Stage(
// will be committed, but all of these commands will be `IsTrivial()`.
if err := b.ab.runPostAddTriggers(ctx, &cmd.ReplicatedCmd, postAddEnv{
st: b.r.store.cfg.Settings,
eng: b.r.store.engine,
eng: b.r.store.stateEngine,
sideloaded: b.r.raftMu.sideloaded,
bulkLimiter: b.r.store.limiters.BulkIOWriteRate,
}); err != nil {
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (sm *replicaStateMachine) NewBatch() apply.Batch {
b := &sm.batch
b.r = r
b.applyStats = &sm.applyStats
b.batch = r.store.engine.NewBatch()
b.batch = r.store.stateEngine.NewBatch()
r.mu.RLock()
b.state = r.mu.state
b.state.Stats = &sm.stats
Expand Down Expand Up @@ -197,7 +197,9 @@ func (sm *replicaStateMachine) ApplySideEffects(
// Assert that the on-disk state doesn't diverge from the in-memory
// state as a result of the side effects.
sm.r.mu.RLock()
sm.r.assertStateRaftMuLockedReplicaMuRLocked(ctx, sm.r.store.Engine())
// TODO(sep-raft-log): either check only statemachine invariants or
// pass both engines in.
sm.r.assertStateRaftMuLockedReplicaMuRLocked(ctx, sm.r.store.TODOEngine())
sm.r.mu.RUnlock()
sm.applyStats.stateAssertions++
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (r *Replica) adminSplitWithDescriptor(
var err error
targetSize := r.GetMaxBytes() / 2
foundSplitKey, err = storage.MVCCFindSplitKey(
ctx, r.store.engine, desc.StartKey, desc.EndKey, targetSize)
ctx, r.store.stateEngine, desc.StartKey, desc.EndKey, targetSize)
if err != nil {
return reply, errors.Wrap(err, "unable to determine split key")
}
Expand Down Expand Up @@ -3112,7 +3112,7 @@ func (r *Replica) followerSendSnapshot(
Type: req.Type,
}
newBatchFn := func() storage.Batch {
return r.store.Engine().NewUnindexedBatch(true /* writeOnly */)
return r.store.StateEngine().NewUnindexedBatch(true /* writeOnly */)
}
sent := func() {
r.store.metrics.RangeSnapshotsGenerated.Inc(1)
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ func (r *Replica) computeChecksumPostApply(

// Caller is holding raftMu, so an engine snapshot is automatically
// Raft-consistent (i.e. not in the middle of an AddSSTable).
snap := r.store.engine.NewSnapshot()
snap := r.store.stateEngine.NewSnapshot()
if cc.Checkpoint {
sl := stateloader.Make(r.RangeID)
as, err := sl.LoadRangeAppliedState(ctx, snap)
Expand Down Expand Up @@ -739,8 +739,8 @@ func (r *Replica) computeChecksumPostApply(
// early, the reply won't make it back to the leaseholder, so it will not be
// certain of completing the check. Since we're already in a goroutine
// that's about to end, just sleep for a few seconds and then terminate.
auxDir := r.store.engine.GetAuxiliaryDir()
_ = r.store.engine.MkdirAll(auxDir)
auxDir := r.store.stateEngine.GetAuxiliaryDir()
_ = r.store.stateEngine.MkdirAll(auxDir)
path := base.PreventedStartupFile(auxDir)

const attentionFmt = `ATTENTION:
Expand Down Expand Up @@ -779,7 +779,7 @@ $ cockroach debug range-data --replicated data/auxiliary/checkpoints/rN_at_M N
`
attentionArgs := []any{r, desc.Replicas(), auxDir, path}
preventStartupMsg := fmt.Sprintf(attentionFmt, attentionArgs...)
if err := fs.WriteFile(r.store.engine, path, []byte(preventStartupMsg)); err != nil {
if err := fs.WriteFile(r.store.stateEngine, path, []byte(preventStartupMsg)); err != nil {
log.Warningf(ctx, "%v", err)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica_corruption.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func (r *Replica) setCorruptRaftMuLocked(
cErr.Processed = true
r.mu.destroyStatus.Set(cErr, destroyReasonRemoved)

auxDir := r.store.engine.GetAuxiliaryDir()
_ = r.store.engine.MkdirAll(auxDir)
auxDir := r.store.stateEngine.GetAuxiliaryDir()
_ = r.store.stateEngine.MkdirAll(auxDir)
path := base.PreventedStartupFile(auxDir)

preventStartupMsg := fmt.Sprintf(`ATTENTION:
Expand All @@ -63,7 +63,7 @@ A file preventing this node from restarting was placed at:
%s
`, r, path)

if err := fs.WriteFile(r.store.engine, path, []byte(preventStartupMsg)); err != nil {
if err := fs.WriteFile(r.store.stateEngine, path, []byte(preventStartupMsg)); err != nil {
log.Warningf(ctx, "%v", err)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func loadInitializedReplicaForTesting(
if !desc.IsInitialized() {
return nil, errors.AssertionFailedf("can not load with uninitialized descriptor: %s", desc)
}
state, err := kvstorage.LoadReplicaState(ctx, store.engine, store.StoreID(), desc, replicaID)
state, err := kvstorage.LoadReplicaState(ctx, store.todoEngine, store.StoreID(), desc, replicaID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -140,9 +140,9 @@ func newUninitializedReplica(
r.raftMu.sideloaded = logstore.NewDiskSideloadStorage(
store.cfg.Settings,
rangeID,
store.engine.GetAuxiliaryDir(),
store.logEngine.GetAuxiliaryDir(),
store.limiters.BulkIOWriteRate,
store.engine,
store.logEngine,
)

r.splitQueueThrottle = util.Every(splitQueueThrottleDuration)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
// ranges, so can be passed to LogStore methods instead of being stored in it.
s := logstore.LogStore{
RangeID: r.RangeID,
Engine: r.store.engine,
Engine: r.store.logEngine,
Sideload: r.raftMu.sideloaded,
StateLoader: r.raftMu.stateLoader.StateLoader,
SyncWaiter: r.store.syncWaiter,
Expand Down
20 changes: 11 additions & 9 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var _ raft.Storage = (*replicaRaftStorage)(nil)
// exclusive access to r.mu.stateLoader.
func (r *replicaRaftStorage) InitialState() (raftpb.HardState, raftpb.ConfState, error) {
ctx := r.AnnotateCtx(context.TODO())
hs, err := r.mu.stateLoader.LoadHardState(ctx, r.store.Engine())
hs, err := r.mu.stateLoader.LoadHardState(ctx, r.store.LogEngine())
// For uninitialized ranges, membership is unknown at this point.
if raft.IsEmptyHardState(hs) || err != nil {
return raftpb.HardState{}, raftpb.ConfState{}, err
Expand All @@ -83,7 +83,7 @@ func (r *replicaRaftStorage) Entries(lo, hi, maxBytes uint64) ([]raftpb.Entry, e
if r.raftMu.sideloaded == nil {
return nil, errors.New("sideloaded storage is uninitialized")
}
return logstore.LoadEntries(ctx, r.mu.stateLoader.StateLoader, r.store.Engine(), r.RangeID,
return logstore.LoadEntries(ctx, r.mu.stateLoader.StateLoader, r.store.LogEngine(), r.RangeID,
r.store.raftEntryCache, r.raftMu.sideloaded, lo, hi, maxBytes)
}

Expand All @@ -107,7 +107,7 @@ func (r *replicaRaftStorage) Term(i uint64) (uint64, error) {
return r.mu.lastTermNotDurable, nil
}
ctx := r.AnnotateCtx(context.TODO())
return logstore.LoadTerm(ctx, r.mu.stateLoader.StateLoader, r.store.Engine(), r.RangeID,
return logstore.LoadTerm(ctx, r.mu.stateLoader.StateLoader, r.store.LogEngine(), r.RangeID,
r.store.raftEntryCache, i)
}

Expand Down Expand Up @@ -209,7 +209,7 @@ func (r *Replica) GetSnapshot(
// an AddSSTable" (i.e. a state in which an SSTable has been linked in, but
// the corresponding Raft command not applied yet).
r.raftMu.Lock()
snap := r.store.engine.NewSnapshot()
snap := r.store.stateEngine.NewSnapshot()
r.raftMu.Unlock()

defer func() {
Expand Down Expand Up @@ -544,7 +544,8 @@ func (r *Replica) applySnapshot(
// of the removed range. In this case, however, it's copacetic, as subsumed
// ranges _can't_ have new replicas.
if err := clearSubsumedReplicaDiskData(
ctx, r.store.ClusterSettings(), r.store.Engine(), inSnap.SSTStorageScratch,
// TODO(sep-raft-log): needs access to both engines.
ctx, r.store.ClusterSettings(), r.store.TODOEngine(), inSnap.SSTStorageScratch,
desc, subsumedRepls, mergedTombstoneReplicaID,
); err != nil {
return err
Expand All @@ -559,15 +560,16 @@ func (r *Replica) applySnapshot(
}
var ingestStats pebble.IngestOperationStats
if ingestStats, err =
r.store.engine.IngestExternalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil {
// TODO: separate ingestions for log and statemachine engine.
r.store.todoEngine.IngestExternalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil {
return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs())
}
if r.store.cfg.KVAdmissionController != nil {
r.store.cfg.KVAdmissionController.SnapshotIngested(r.store.StoreID(), ingestStats)
}
stats.ingestion = timeutil.Now()

state, err := stateloader.Make(desc.RangeID).Load(ctx, r.store.engine, desc)
state, err := stateloader.Make(desc.RangeID).Load(ctx, r.store.stateEngine, desc)
if err != nil {
log.Fatalf(ctx, "unable to load replica state: %s", err)
}
Expand All @@ -593,7 +595,7 @@ func (r *Replica) applySnapshot(
// Read the prior read summary for this range, which was included in the
// snapshot. We may need to use it to bump our timestamp cache if we
// discover that we are the leaseholder as of the snapshot's log index.
prioReadSum, err := readsummary.Load(ctx, r.store.engine, r.RangeID)
prioReadSum, err := readsummary.Load(ctx, r.store.stateEngine, r.RangeID)
if err != nil {
log.Fatalf(ctx, "failed to read prior read summary after applying snapshot: %+v", err)
}
Expand Down Expand Up @@ -679,7 +681,7 @@ func (r *Replica) applySnapshot(
// operation can be expensive. This is safe, as we hold the Replica.raftMu
// across both Replica.mu critical sections.
r.mu.RLock()
r.assertStateRaftMuLockedReplicaMuRLocked(ctx, r.store.Engine())
r.assertStateRaftMuLockedReplicaMuRLocked(ctx, r.store.TODOEngine())
r.mu.RUnlock()

// The rangefeed processor is listening for the logical ops attached to
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (r *Replica) executeReadOnlyBatch(
// TODO(irfansharif): It's unfortunate that in this read-only code path,
// we're stuck with a ReadWriter because of the way evaluateBatch is
// designed.
rw := r.store.Engine().NewReadOnly(storage.StandardDurability)
rw := r.store.StateEngine().NewReadOnly(storage.StandardDurability)
if !rw.ConsistentIterators() {
// This is not currently needed for correctness, but future optimizations
// may start relying on this, so we assert here.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func (r *Replica) evaluate1PC(
if !etArg.Commit {
clonedTxn.Status = roachpb.ABORTED
batch.Close()
batch = r.store.Engine().NewBatch()
batch = r.store.StateEngine().NewBatch()
ms.Reset()
} else {
// Run commit trigger manually.
Expand Down Expand Up @@ -697,7 +697,7 @@ func (r *Replica) evaluateWriteBatchWrapper(
func (r *Replica) newBatchedEngine(
ba *roachpb.BatchRequest, g *concurrency.Guard,
) (storage.Batch, *storage.OpLoggerBatch) {
batch := r.store.Engine().NewBatch()
batch := r.store.StateEngine().NewBatch()
if !batch.ConsistentIterators() {
// This is not currently needed for correctness, but future optimizations
// may start relying on this, so we assert here.
Expand Down
Loading

0 comments on commit 870468b

Please sign in to comment.