Skip to content

Commit

Permalink
Merge #96823
Browse files Browse the repository at this point in the history
96823: kvserver: introduce Store.{logEngine,todoEngine} r=pavelkalinnikov a=tbg

Over the last few months, we have been chipping away[^1] at separating the
engine interactions of the KV server in preparation for optionally
running with a separate log engine for the raft state.

[^1]: https://github.com/cockroachdb/cockroach/pulls?q=is%3Apr+%22CRDB-220%22+is%3Amerged+created%3A%3C2023-02-08

Plenty of work remains to be able to safely split these engines apart,
or to even be able to do it at all, but now's a good time to take stock
and syntactically "boil the ocean" by forcing all users of
`Store.engine` to decide between three `Engine` fields - all for now
backed by the same Engine - now present on `Store`:

- `LogEngine()`
- `StateEngine()`
- `TODOEngine()`

The third engine mirrors the `context.TODO()` pattern - the "TODO"
engine indicates that work needs to be done to even arrive at an
operation that targets one of the engines specifically; or that there is
something that should be revisited about this particular usage.

With the stability period coming up, we will want to avoid large
mechanical changes in the near future, so now is a good time to get this
out of the way and to work towards a "prototype plus" of the raft log
separation for the remaining weeks.

Ideally this would culminate in a way to actually run with two engines
for experiments (ignoring crash-restart correctness issues and the
like), which would be an important milestone for the project and would
give us a reality check on the work that lies ahead to productionize
this work.

This refactor was carried out mechanically and it assigns *everything*
to the `TODOEngine`. In other words, no brains have been put into
deciding which engine to use yet. This will be done piecemeal in
smaller PRs down the line, and will help discover new issues for
the CRDB-220 epic.

Epic: CRDB-220
Release note: None


Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Feb 14, 2023
2 parents f3ff417 + 5b6302b commit 16fd101
Show file tree
Hide file tree
Showing 63 changed files with 288 additions and 213 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ CREATE TABLE data2.foo (a int);
store := tcRestore.GetFirstStoreFromServer(t, 0)
startKey := keys.SystemSQLCodec.TablePrefix(uint32(id))
endKey := startKey.PrefixEnd()
it := store.Engine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
it := store.TODOEngine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
UpperBound: endKey,
})
defer it.Close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func assertExactlyEqualKVs(
) hlc.Timestamp {
// Iterate over the store.
store := tc.GetFirstStoreFromServer(t, 0)
it := store.Engine().NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{
it := store.TODOEngine().NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{
LowerBound: tenantPrefix,
UpperBound: tenantPrefix.PrefixEnd(),
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func assertEqualKVs(

// Iterate over the store.
store := tc.GetFirstStoreFromServer(t, 0)
it := store.Engine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
it := store.TODOEngine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
LowerBound: key,
UpperBound: key.PrefixEnd(),
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/addressing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func TestUpdateRangeAddressing(t *testing.T) {
// to RocksDB will be asynchronous.
var kvs []roachpb.KeyValue
testutils.SucceedsSoon(t, func() error {
res, err := storage.MVCCScan(ctx, store.Engine(), keys.MetaMin, keys.MetaMax,
res, err := storage.MVCCScan(ctx, store.TODOEngine(), keys.MetaMin, keys.MetaMax,
hlc.MaxTimestamp, storage.MVCCScanOptions{})
if err != nil {
// Wait for the intent to be resolved.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1824,7 +1824,7 @@ func TestAddSSTableSSTTimestampToRequestTimestampRespectsClosedTS(t *testing.T)
require.True(t, closedTS.LessEq(writeTS), "timestamp %s below closed timestamp %s", result.Timestamp, closedTS)

// Check that the value was in fact written at the write timestamp.
kvs, err := storage.Scan(store.Engine(), roachpb.Key("key"), roachpb.Key("key").Next(), 0)
kvs, err := storage.Scan(store.TODOEngine(), roachpb.Key("key"), roachpb.Key("key").Next(), 0)
require.NoError(t, err)
require.Len(t, kvs, 1)
require.Equal(t, storage.MVCCKey{Key: roachpb.Key("key"), Timestamp: writeTS}, kvs[0].Key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestKnobsUseRangeTombstonesForPointDeletes(t *testing.T) {

store, err := s.Stores().GetStore(s.GetFirstStoreID())
require.NoError(t, err)
eng := store.Engine()
eng := store.TODOEngine()
txn := db.NewTxn(ctx, "test")

// Write a non-transactional and transactional tombstone.
Expand Down
26 changes: 13 additions & 13 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func TestStoreRangeMergeMetadataCleanup(t *testing.T) {
}

// Collect all the keys.
preKeys := getEngineKeySet(t, store.Engine())
preKeys := getEngineKeySet(t, store.TODOEngine())

// Split the range.
lhsDesc, rhsDesc, err := createSplitRanges(ctx, scratchKey(""), store)
Expand All @@ -220,7 +220,7 @@ func TestStoreRangeMergeMetadataCleanup(t *testing.T) {
}

// Collect all the keys again.
postKeys := getEngineKeySet(t, store.Engine())
postKeys := getEngineKeySet(t, store.TODOEngine())

// Compute the new keys.
for k := range preKeys {
Expand Down Expand Up @@ -354,7 +354,7 @@ func mergeWithData(t *testing.T, retries int64) {
// Verify no intents remains on range descriptor keys.
for _, key := range []roachpb.Key{keys.RangeDescriptorKey(lhsDesc.StartKey), keys.RangeDescriptorKey(rhsDesc.StartKey)} {
if _, err := storage.MVCCGet(
ctx, store.Engine(), key, store.Clock().Now(), storage.MVCCGetOptions{},
ctx, store.TODOEngine(), key, store.Clock().Now(), storage.MVCCGetOptions{},
); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -748,7 +748,7 @@ func mergeCheckingTimestampCaches(
for _, r := range lhsRepls[1:] {
// Loosely-coupled truncation requires an engine flush to advance
// guaranteed durability.
require.NoError(t, r.Engine().Flush())
require.NoError(t, r.Store().TODOEngine().Flush())
firstIndex := r.GetFirstIndex()
if firstIndex < truncIndex {
return errors.Errorf("truncate not applied, %d < %d", firstIndex, truncIndex)
Expand Down Expand Up @@ -1349,7 +1349,7 @@ func TestStoreRangeMergeStats(t *testing.T) {
// merge below.

// Get the range stats for both ranges now that we have data.
snap := store.Engine().NewSnapshot()
snap := store.TODOEngine().NewSnapshot()
defer snap.Close()
msA, err := stateloader.Make(lhsDesc.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)
Expand All @@ -1367,7 +1367,7 @@ func TestStoreRangeMergeStats(t *testing.T) {
replMerged := store.LookupReplica(lhsDesc.StartKey)

// Get the range stats for the merged range and verify.
snap = store.Engine().NewSnapshot()
snap = store.TODOEngine().NewSnapshot()
defer snap.Close()
msMerged, err := stateloader.Make(replMerged.RangeID).LoadMVCCStats(ctx, snap)
require.NoError(t, err)
Expand Down Expand Up @@ -2542,8 +2542,8 @@ func TestStoreReplicaGCAfterMerge(t *testing.T) {
t.Fatalf("expected next replica ID to be %d, but got %d", e, a)
}
}
checkTombstone(store0.Engine())
checkTombstone(store1.Engine())
checkTombstone(store0.TODOEngine())
checkTombstone(store1.TODOEngine())
}

// TestStoreRangeMergeAddReplicaRace verifies that when an add replica request
Expand Down Expand Up @@ -3941,8 +3941,8 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
})
defer tc.Stopper().Stop(ctx)
store0, store2 := tc.GetFirstStoreFromServer(t, 0), tc.GetFirstStoreFromServer(t, 2)
sendingEng = store0.Engine()
receivingEng = store2.Engine()
sendingEng = store0.TODOEngine()
receivingEng = store2.TODOEngine()
distSender := tc.Servers[0].DistSender()

// This test works across 5 ranges in total. We start with a scratch range(1)
Expand Down Expand Up @@ -4072,8 +4072,8 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
}

// Verify that the sets of keys in store0 and store2 are identical.
storeKeys0 := getKeySet(store0.Engine())
storeKeys2 := getKeySet(store2.Engine())
storeKeys0 := getKeySet(store0.TODOEngine())
storeKeys2 := getKeySet(store2.TODOEngine())
for k := range storeKeys0 {
if _, ok := storeKeys2[k]; !ok {
return fmt.Errorf("store2 missing key %s", roachpb.Key(k))
Expand Down Expand Up @@ -5340,7 +5340,7 @@ func TestStoreMergeGCHint(t *testing.T) {
require.Greater(t, gcHint.LatestRangeDeleteTimestamp.WallTime, beforeSecondDel,
"highest timestamp wasn't picked up")
}
repl.AssertState(ctx, store.Engine())
repl.AssertState(ctx, store.TODOEngine())
})
}
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func TestStoreMetrics(t *testing.T) {
// This is useful, because most of the stats we track don't apply to
// memtables.
for i := range tc.Servers {
if err := tc.GetFirstStoreFromServer(t, i).Engine().Flush(); err != nil {
if err := tc.GetFirstStoreFromServer(t, i).TODOEngine().Flush(); err != nil {
t.Fatal(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestRaftLogQueue(t *testing.T) {
tc.GetFirstStoreFromServer(t, i).MustForceRaftLogScanAndProcess()
}
// Flush the engine to advance durability, which triggers truncation.
require.NoError(t, raftLeaderRepl.Engine().Flush())
require.NoError(t, raftLeaderRepl.Store().TODOEngine().Flush())
// Ensure that firstIndex has increased indicating that the log
// truncation has occurred.
afterTruncationIndex = raftLeaderRepl.GetFirstIndex()
Expand Down
30 changes: 15 additions & 15 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func TestReplicateRange(t *testing.T) {
// Verify no intent remains on range descriptor key.
key := keys.RangeDescriptorKey(rhsDesc.StartKey)
desc := roachpb.RangeDescriptor{}
if ok, err := storage.MVCCGetProto(ctx, store.Engine(), key,
if ok, err := storage.MVCCGetProto(ctx, store.TODOEngine(), key,
store.Clock().Now(), &desc, storage.MVCCGetOptions{}); err != nil {
t.Fatal(err)
} else if !ok {
Expand All @@ -305,7 +305,7 @@ func TestReplicateRange(t *testing.T) {
meta1 := keys.RangeMetaKey(meta2)
for _, key := range []roachpb.RKey{meta2, meta1} {
metaDesc := roachpb.RangeDescriptor{}
if ok, err := storage.MVCCGetProto(ctx, store.Engine(), key.AsRawKey(),
if ok, err := storage.MVCCGetProto(ctx, store.TODOEngine(), key.AsRawKey(),
store.Clock().Now(), &metaDesc, storage.MVCCGetOptions{}); err != nil {
return err
} else if !ok {
Expand Down Expand Up @@ -611,7 +611,7 @@ func TestRaftLogSizeAfterTruncation(t *testing.T) {
// compute its size.
repl.RaftLock()
realSize, err := kvserver.ComputeRaftLogSize(
ctx, repl.RangeID, repl.Engine(), repl.SideloadedRaftMuLocked(),
ctx, repl.RangeID, repl.Store().TODOEngine(), repl.SideloadedRaftMuLocked(),
)
size, _ := repl.GetRaftLogSize()
repl.RaftUnlock()
Expand Down Expand Up @@ -828,7 +828,7 @@ func TestSnapshotAfterTruncation(t *testing.T) {
func waitForTruncationForTesting(t *testing.T, r *kvserver.Replica, newFirstIndex uint64) {
testutils.SucceedsSoon(t, func() error {
// Flush the engine to advance durability, which triggers truncation.
require.NoError(t, r.Engine().Flush())
require.NoError(t, r.Store().TODOEngine().Flush())
// FirstIndex has changed.
firstIndex := r.GetFirstIndex()
if firstIndex != newFirstIndex {
Expand Down Expand Up @@ -2966,7 +2966,7 @@ func TestRaftRemoveRace(t *testing.T) {
tombstoneKey := keys.RangeTombstoneKey(desc.RangeID)
var tombstone roachpb.RangeTombstone
if ok, err := storage.MVCCGetProto(
ctx, tc.GetFirstStoreFromServer(t, 2).Engine(), tombstoneKey,
ctx, tc.GetFirstStoreFromServer(t, 2).TODOEngine(), tombstoneKey,
hlc.Timestamp{}, &tombstone, storage.MVCCGetOptions{},
); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -5147,15 +5147,15 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
var tombstone roachpb.RangeTombstone
tombstoneKey := keys.RangeTombstoneKey(rangeID)
ok, err := storage.MVCCGetProto(
ctx, store.Engine(), tombstoneKey, hlc.Timestamp{}, &tombstone, storage.MVCCGetOptions{},
ctx, store.TODOEngine(), tombstoneKey, hlc.Timestamp{}, &tombstone, storage.MVCCGetOptions{},
)
require.NoError(t, err)
require.False(t, ok)
}
getHardState := func(
t *testing.T, store *kvserver.Store, rangeID roachpb.RangeID,
) raftpb.HardState {
hs, err := stateloader.Make(rangeID).LoadHardState(ctx, store.Engine())
hs, err := stateloader.Make(rangeID).LoadHardState(ctx, store.TODOEngine())
require.NoError(t, err)
return hs
}
Expand Down Expand Up @@ -5322,7 +5322,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
// raft message when the other nodes split and then after the above call
// it will find out about its new replica ID and write a tombstone for the
// old one.
waitForTombstone(t, tc.GetFirstStoreFromServer(t, 0).Engine(), rhsID)
waitForTombstone(t, tc.GetFirstStoreFromServer(t, 0).TODOEngine(), rhsID)
lhsPartition.deactivate()
tc.WaitForValues(t, keyA, []int64{8, 8, 8})
hs := getHardState(t, tc.GetFirstStoreFromServer(t, 0), rhsID)
Expand Down Expand Up @@ -5375,7 +5375,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
// raft message when the other nodes split and then after the above call
// it will find out about its new replica ID and write a tombstone for the
// old one.
waitForTombstone(t, tc.GetFirstStoreFromServer(t, 0).Engine(), rhsID)
waitForTombstone(t, tc.GetFirstStoreFromServer(t, 0).TODOEngine(), rhsID)

// We do all of this incrementing to ensure that nobody will ever
// succeed in sending a message the new RHS replica after we restart
Expand Down Expand Up @@ -5662,7 +5662,7 @@ func TestReplicaRemovalClosesProposalQuota(t *testing.T) {
ToReplica: newReplDesc,
Message: raftpb.Message{Type: raftpb.MsgVote, Term: 2},
}, noopRaftMessageResponseStream{}))
ts := waitForTombstone(t, store.Engine(), desc.RangeID)
ts := waitForTombstone(t, store.TODOEngine(), desc.RangeID)
require.Equal(t, ts.NextReplicaID, desc.NextReplicaID)
wg.Wait()
_, err = repl.GetProposalQuota().Acquire(ctx, 1)
Expand Down Expand Up @@ -5803,7 +5803,7 @@ func TestElectionAfterRestart(t *testing.T) {
})
for _, srv := range tc.Servers {
require.NoError(t, srv.Stores().VisitStores(func(s *kvserver.Store) error {
return s.Engine().Flush()
return s.TODOEngine().Flush()
}))
}
t.Log("waited for all followers to be caught up")
Expand Down Expand Up @@ -5914,12 +5914,12 @@ func TestRaftSnapshotsWithMVCCRangeKeys(t *testing.T) {
rangeKVWithTS("a", "b", ts1, storage.MVCCValue{}),
rangeKVWithTS("b", "c", ts2, storage.MVCCValue{}),
rangeKVWithTS("b", "c", ts1, storage.MVCCValue{}),
}, storageutils.ScanRange(t, store.Engine(), descA))
}, storageutils.ScanRange(t, store.TODOEngine(), descA))
require.Equal(t, kvs{
rangeKVWithTS("c", "d", ts2, storage.MVCCValue{}),
rangeKVWithTS("c", "d", ts1, storage.MVCCValue{}),
rangeKVWithTS("d", "e", ts2, storage.MVCCValue{}),
}, storageutils.ScanRange(t, store.Engine(), descC))
}, storageutils.ScanRange(t, store.TODOEngine(), descC))
}

// Quick check of MVCC stats.
Expand Down Expand Up @@ -5962,7 +5962,7 @@ func TestRaftSnapshotsWithMVCCRangeKeysEverywhere(t *testing.T) {
defer tc.Stopper().Stop(ctx)
srv := tc.Server(0)
store := tc.GetFirstStoreFromServer(t, 0)
engine := store.Engine()
engine := store.TODOEngine()
sender := srv.DB().NonTransactionalSender()

// Split off ranges at "a" and "b".
Expand Down Expand Up @@ -6013,7 +6013,7 @@ func TestRaftSnapshotsWithMVCCRangeKeysEverywhere(t *testing.T) {

// Look for the range keys on the other servers.
for _, srvIdx := range []int{1, 2} {
e := tc.GetFirstStoreFromServer(t, srvIdx).Engine()
e := tc.GetFirstStoreFromServer(t, srvIdx).TODOEngine()
for _, desc := range descs {
for _, span := range rditer.MakeReplicatedKeySpans(&desc) {
prefix := append(span.Key.Clone(), ':')
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_replica_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestReplicaGCQueueDropReplicaDirect(t *testing.T) {
repl1 := store.LookupReplica(roachpb.RKey(k))
require.NotNil(t, repl1)

eng := store.Engine()
eng := store.TODOEngine()

// Put some bogus sideloaded data on the replica which we're about to
// remove. Then, at the end of the test, check that that sideloaded
Expand Down
Loading

0 comments on commit 16fd101

Please sign in to comment.