Skip to content

Commit

Permalink
Merge #89085
Browse files Browse the repository at this point in the history
89085: kvserver: cleanup consistency queue tests r=erikgrinaker a=pavelkalinnikov

This change refactors consistency queue tests while here.

Release note: None

Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
craig[bot] and pav-kv committed Oct 3, 2022
2 parents d1cdaf6 + 8f5b18a commit 240675d
Showing 1 changed file with 48 additions and 120 deletions.
168 changes: 48 additions & 120 deletions pkg/kv/kvserver/consistency_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,7 @@ func TestCheckConsistencyMultiStore(t *testing.T) {
)

defer tc.Stopper().Stop(context.Background())
ts := tc.Servers[0]
store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID())
if pErr != nil {
t.Fatal(pErr)
}
store := tc.GetFirstStoreFromServer(t, 0)

// Write something to the DB.
putArgs := putArgs([]byte("a"), []byte("b"))
Expand Down Expand Up @@ -199,11 +195,7 @@ func TestCheckConsistencyReplay(t *testing.T) {

defer tc.Stopper().Stop(context.Background())

ts := tc.Servers[0]
store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID())
if pErr != nil {
t.Fatal(pErr)
}
store := tc.GetFirstStoreFromServer(t, 0)
checkArgs := roachpb.CheckConsistencyRequest{
RequestHeader: roachpb.RequestHeader{
Key: []byte("a"),
Expand Down Expand Up @@ -258,25 +250,19 @@ func TestCheckConsistencyInconsistent(t *testing.T) {
// write (s3 agrees with s1).
diffKey := []byte("e")
var diffTimestamp hlc.Timestamp
notifyReportDiff := make(chan struct{}, 1)
notifyReportDiff := make(chan struct{})
testKnobs.ConsistencyTestingKnobs.BadChecksumReportDiff =
func(s roachpb.StoreIdent, diff kvserver.ReplicaSnapshotDiffSlice) {
rangeDesc := tc.LookupRangeOrFatal(t, diffKey)
repl, pErr := tc.FindRangeLeaseHolder(rangeDesc, nil)
if pErr != nil {
t.Fatal(pErr)
}
repl, err := tc.FindRangeLeaseHolder(rangeDesc, nil)
require.NoError(t, err)
// Servers start at 0, but NodeID starts at 1.
store, pErr := tc.Servers[repl.NodeID-1].Stores().GetStore(repl.StoreID)
if pErr != nil {
t.Fatal(pErr)
}
if s != *store.Ident {
t.Errorf("BadChecksumReportDiff called from follower (StoreIdent = %v)", s)
store, err := tc.Servers[repl.NodeID-1].Stores().GetStore(repl.StoreID)
require.NoError(t, err)
if !assert.Equal(t, *store.Ident, s) {
return
}
if len(diff) != 1 {
t.Errorf("diff length = %d, diff = %v", len(diff), diff)
if !assert.Len(t, diff, 1) {
return
}
d := diff[0]
Expand All @@ -296,60 +282,40 @@ func TestCheckConsistencyInconsistent(t *testing.T) {
+ value:"\x00\x00\x00\x00\x01T"
+ raw mvcc_key/value: 6500000000000000007b000003db0d 000000000154
`
if act != exp {
// We already logged the actual one above.
t.Errorf("expected:\n%s\ngot:\n%s", exp, act)
}

notifyReportDiff <- struct{}{}
assert.Equal(t, exp, act)
close(notifyReportDiff)
}
// s2 (index 1) will panic.
notifyFatal := make(chan struct{}, 1)
notifyFatal := make(chan struct{})
testKnobs.ConsistencyTestingKnobs.OnBadChecksumFatal = func(s roachpb.StoreIdent) {
ts := tc.Servers[1]
store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID())
if pErr != nil {
t.Fatal(pErr)
}
if s != *store.Ident {
t.Errorf("OnBadChecksumFatal called from %v", s)
store := tc.GetFirstStoreFromServer(t, 1)
if !assert.Equal(t, *store.Ident, s) {
return
}
notifyFatal <- struct{}{}
close(notifyFatal)
}

serverArgsPerNode := make(map[int]base.TestServerArgs)
serverArgsPerNode := make(map[int]base.TestServerArgs, numStores)
for i := 0; i < numStores; i++ {
testServerArgs := base.TestServerArgs{
serverArgsPerNode[i] = base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &testKnobs,
Server: &server.TestingKnobs{
StickyEngineRegistry: stickyEngineRegistry,
},
},
StoreSpecs: []base.StoreSpec{
{
InMemory: true,
StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10),
},
Store: &testKnobs,
Server: &server.TestingKnobs{StickyEngineRegistry: stickyEngineRegistry},
},
StoreSpecs: []base.StoreSpec{{
InMemory: true,
StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10),
}},
}
serverArgsPerNode[i] = testServerArgs
}

tc = testcluster.StartTestCluster(t, numStores,
base.TestClusterArgs{
ReplicationMode: base.ReplicationAuto,
ServerArgsPerNode: serverArgsPerNode,
},
)
tc = testcluster.StartTestCluster(t, numStores, base.TestClusterArgs{
ReplicationMode: base.ReplicationAuto,
ServerArgsPerNode: serverArgsPerNode,
})
defer tc.Stopper().Stop(context.Background())

ts := tc.Servers[0]
store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID())
if pErr != nil {
t.Fatal(pErr)
}
store := tc.GetFirstStoreFromServer(t, 0)
// Write something to the DB.
pArgs := putArgs([]byte("a"), []byte("b"))
if _, err := kv.SendWrapped(context.Background(), store.DB().NonTransactionalSender(), pArgs); err != nil {
Expand All @@ -370,24 +336,16 @@ func TestCheckConsistencyInconsistent(t *testing.T) {
Mode: roachpb.ChecksumMode_CHECK_VIA_QUEUE,
}
resp, err := kv.SendWrapped(context.Background(), store.DB().NonTransactionalSender(), &checkArgs)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err.GoError())
return resp.(*roachpb.CheckConsistencyResponse)
}

onDiskCheckpointPaths := func(nodeIdx int) []string {
testServer := tc.Servers[nodeIdx]
fs, pErr := stickyEngineRegistry.GetUnderlyingFS(
fs, err := stickyEngineRegistry.GetUnderlyingFS(
base.StoreSpec{StickyInMemoryEngineID: strconv.FormatInt(int64(nodeIdx), 10)})
if pErr != nil {
t.Fatal(pErr)
}
testStore, pErr := testServer.Stores().GetStore(testServer.GetFirstStoreID())
if pErr != nil {
t.Fatal(pErr)
}
checkpointPath := filepath.Join(testStore.Engine().GetAuxiliaryDir(), "checkpoints")
require.NoError(t, err)
store := tc.GetFirstStoreFromServer(t, nodeIdx)
checkpointPath := filepath.Join(store.Engine().GetAuxiliaryDir(), "checkpoints")
checkpoints, _ := fs.List(checkpointPath)
var checkpointPaths []string
for _, cpDirName := range checkpoints {
Expand All @@ -414,19 +372,12 @@ func TestCheckConsistencyInconsistent(t *testing.T) {
}

// Write some arbitrary data only to store 1. Inconsistent key "e"!
ts1 := tc.Servers[1]
store1, pErr := ts1.Stores().GetStore(ts1.GetFirstStoreID())
if pErr != nil {
t.Fatal(pErr)
}
store1 := tc.GetFirstStoreFromServer(t, 1)
var val roachpb.Value
val.SetInt(42)
diffTimestamp = ts.Clock().Now()
if err := storage.MVCCPut(
context.Background(), store1.Engine(), nil, diffKey, diffTimestamp, hlc.ClockTimestamp{}, val, nil,
); err != nil {
t.Fatal(err)
}
diffTimestamp = tc.Server(0).Clock().Now()
require.NoError(t, storage.MVCCPut(context.Background(), store1.Engine(), nil,
diffKey, diffTimestamp, hlc.ClockTimestamp{}, val, nil))

// Run consistency check again, this time it should find something.
resp := runConsistencyCheck()
Expand Down Expand Up @@ -549,9 +500,7 @@ func testConsistencyQueueRecomputeStatsImpl(t *testing.T, hadEstimates bool) {
RequestHeader: roachpb.RequestHeader{Key: key},
DryRun: true,
})
if err := db.Run(ctx, &b); err != nil {
t.Fatal(err)
}
require.NoError(t, db.Run(ctx, &b))
resp := b.RawResponse().Responses[0].GetInner().(*roachpb.RecomputeStatsResponse)
delta := enginepb.MVCCStats(resp.AddedDelta)
delta.AgeTo(0)
Expand All @@ -567,9 +516,7 @@ func testConsistencyQueueRecomputeStatsImpl(t *testing.T, hadEstimates bool) {
// Split off a range so that we get away from the timeseries writes, which
// pollute the stats with ContainsEstimates=true. Note that the split clears
// the right hand side (which is what we operate on) from that flag.
if err := db0.AdminSplit(ctx, key, hlc.MaxTimestamp /* expirationTime */); err != nil {
t.Fatal(err)
}
require.NoError(t, db0.AdminSplit(ctx, key, hlc.MaxTimestamp /* expirationTime */))

delta := computeDelta(db0)

Expand All @@ -578,9 +525,7 @@ func testConsistencyQueueRecomputeStatsImpl(t *testing.T, hadEstimates bool) {
}

rangeDesc, err := tc.LookupRange(key)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

return rangeDesc.RangeID
}()
Expand All @@ -592,16 +537,12 @@ func testConsistencyQueueRecomputeStatsImpl(t *testing.T, hadEstimates bool) {
storage.Filesystem(path),
storage.CacheSize(1<<20 /* 1 MiB */),
storage.MustExist)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
defer eng.Close()

rsl := stateloader.Make(rangeID)
ms, err := rsl.LoadMVCCStats(ctx, eng)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

// Put some garbage in the stats that we're hoping the consistency queue will
// trigger a removal of via RecomputeStats. SysCount was chosen because it is
Expand All @@ -616,9 +557,7 @@ func testConsistencyQueueRecomputeStatsImpl(t *testing.T, hadEstimates bool) {
// Overwrite with the new stats; remember that this range hasn't upreplicated,
// so the consistency checker won't see any replica divergence when it runs,
// but it should definitely see that its recomputed stats mismatch.
if err := rsl.SetMVCCStats(ctx, eng, &ms); err != nil {
t.Fatal(err)
}
require.NoError(t, rsl.SetMVCCStats(ctx, eng, &ms))
}()

// Now that we've tampered with the stats, restart the cluster and extend it
Expand Down Expand Up @@ -649,10 +588,7 @@ func testConsistencyQueueRecomputeStatsImpl(t *testing.T, hadEstimates bool) {
// can starve the actual work to be done.
done := time.After(5 * time.Second)
for {
if err := db0.Put(ctx, fmt.Sprintf("%s%d", key, rand.Int63()), "ballast"); err != nil {
t.Error(err)
}

require.NoError(t, db0.Put(ctx, fmt.Sprintf("%s%d", key, rand.Int63()), "ballast"))
select {
case <-ctx.Done():
return
Expand All @@ -674,21 +610,13 @@ func testConsistencyQueueRecomputeStatsImpl(t *testing.T, hadEstimates bool) {
}

// Force a run of the consistency queue, otherwise it might take a while.
ts := tc.Servers[0]
store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID())
if pErr != nil {
t.Fatal(pErr)
}
if err := store.ForceConsistencyQueueProcess(); err != nil {
t.Fatal(err)
}
store := tc.GetFirstStoreFromServer(t, 0)
require.NoError(t, store.ForceConsistencyQueueProcess())

// The stats should magically repair themselves. We'll first do a quick check
// and then a full recomputation.
repl, _, err := ts.Stores().GetReplicaForRangeID(ctx, rangeID)
if err != nil {
t.Fatal(err)
}
repl, _, err := tc.Servers[0].Stores().GetReplicaForRangeID(ctx, rangeID)
require.NoError(t, err)
ms := repl.GetMVCCStats()
if ms.SysCount >= sysCountGarbage {
t.Fatalf("still have a SysCount of %d", ms.SysCount)
Expand Down

0 comments on commit 240675d

Please sign in to comment.