From 7eeb6db59755b74efa9ece61f09c8d4f52fb3608 Mon Sep 17 00:00:00 2001 From: Alex Lunev Date: Mon, 15 Jun 2020 22:43:44 -0700 Subject: [PATCH] kv: Replace multiTestContext with TestCluster in consistency_queue_test Makes progress on #8299 multiTestContext is legacy construct that is deprecated in favor of running tests via TestCluster. This is one PR out of many to remove the usage of multiTestContext in the consistency_queue test cases. Release note : None --- pkg/kv/kvserver/consistency_queue.go | 57 +++-- pkg/kv/kvserver/consistency_queue_test.go | 246 +++++++++++++--------- pkg/kv/kvserver/helpers_test.go | 15 +- 3 files changed, 195 insertions(+), 123 deletions(-) diff --git a/pkg/kv/kvserver/consistency_queue.go b/pkg/kv/kvserver/consistency_queue.go index 40e0bb3e3328..24695b631a67 100644 --- a/pkg/kv/kvserver/consistency_queue.go +++ b/pkg/kv/kvserver/consistency_queue.go @@ -49,6 +49,15 @@ type consistencyQueue struct { replicaCountFn func() int } +// A data wrapper to allow for the shouldQueue method to be easier to test +type consistencyShouldQueueData struct { + desc *roachpb.RangeDescriptor + getQueueLastProcessed func(ctx context.Context) (hlc.Timestamp, error) + isNodeLive func(nodeID roachpb.NodeID) (bool, error) + disableLastProcessedCheck bool + interval time.Duration +} + // newConsistencyQueue returns a new instance of consistencyQueue. func newConsistencyQueue(store *Store, gossip *gossip.Gossip) *consistencyQueue { q := &consistencyQueue{ @@ -77,30 +86,50 @@ func newConsistencyQueue(store *Store, gossip *gossip.Gossip) *consistencyQueue func (q *consistencyQueue) shouldQueue( ctx context.Context, now hlc.Timestamp, repl *Replica, _ *config.SystemConfig, ) (bool, float64) { - interval := q.interval() - if interval <= 0 { + return consistencyQueueShouldQueueImpl(ctx, now, + consistencyShouldQueueData{ + desc: repl.Desc(), + getQueueLastProcessed: func(ctx context.Context) (hlc.Timestamp, error) { + return repl.getQueueLastProcessed(ctx, q.name) + }, + isNodeLive: func(nodeID roachpb.NodeID) (bool, error) { + if repl.store.cfg.NodeLiveness != nil { + return repl.store.cfg.NodeLiveness.IsLive(nodeID) + } + // Some tests run without a NodeLiveness configured. + return true, nil + }, + disableLastProcessedCheck: repl.store.cfg.TestingKnobs.DisableLastProcessedCheck, + interval: q.interval(), + }) +} + +// ConsistencyQueueShouldQueueImpl is exposed for testability without having +// to setup a fully fledged replica. +func consistencyQueueShouldQueueImpl( + ctx context.Context, now hlc.Timestamp, data consistencyShouldQueueData, +) (bool, float64) { + if data.interval <= 0 { return false, 0 } shouldQ, priority := true, float64(0) - if !repl.store.cfg.TestingKnobs.DisableLastProcessedCheck { - lpTS, err := repl.getQueueLastProcessed(ctx, q.name) + if !data.disableLastProcessedCheck { + lpTS, err := data.getQueueLastProcessed(ctx) if err != nil { return false, 0 } - if shouldQ, priority = shouldQueueAgain(now, lpTS, interval); !shouldQ { + if shouldQ, priority = shouldQueueAgain(now, lpTS, data.interval); !shouldQ { return false, 0 } } - // Check if all replicas are live. Some tests run without a NodeLiveness configured. - if repl.store.cfg.NodeLiveness != nil { - for _, rep := range repl.Desc().Replicas().All() { - if live, err := repl.store.cfg.NodeLiveness.IsLive(rep.NodeID); err != nil { - log.VErrEventf(ctx, 3, "node %d liveness failed: %s", rep.NodeID, err) - return false, 0 - } else if !live { - return false, 0 - } + // Check if all replicas are live. + for _, rep := range data.desc.Replicas().All() { + if live, err := data.isNodeLive(rep.NodeID); err != nil { + log.VErrEventf(ctx, 3, "node %d liveness failed: %s", rep.NodeID, err) + return false, 0 + } else if !live { + return false, 0 } } return true, priority diff --git a/pkg/kv/kvserver/consistency_queue_test.go b/pkg/kv/kvserver/consistency_queue_test.go index 1108076afafd..f34cf85036db 100644 --- a/pkg/kv/kvserver/consistency_queue_test.go +++ b/pkg/kv/kvserver/consistency_queue_test.go @@ -21,7 +21,6 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" @@ -34,8 +33,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" - "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -44,29 +43,42 @@ import ( // process ranges whose replicas are not all live. func TestConsistencyQueueRequiresLive(t *testing.T) { defer leaktest.AfterTest(t)() - sc := kvserver.TestStoreConfig(nil) - sc.Clock = nil // manual clock - mtc := &multiTestContext{storeConfig: &sc} - defer mtc.Stop() - mtc.Start(t, 3) - - // Replicate the range to three nodes. - repl := mtc.stores[0].LookupReplica(roachpb.RKeyMin) - rangeID := repl.RangeID - mtc.replicateRange(rangeID, 1, 2) - - // Verify that queueing is immediately possible. - if shouldQ, priority := mtc.stores[0].ConsistencyQueueShouldQueue( - context.Background(), mtc.clock().Now(), repl, config.NewSystemConfig(sc.DefaultZoneConfig)); !shouldQ { + manualClock := hlc.NewManualClock(timeutil.Now().UnixNano()) + clock := hlc.NewClock(manualClock.UnixNano, 10) + interval := time.Second * 5 + live := true + testStart := clock.Now() + + // Move time by the interval, so we run the job again. + manualClock.Increment(interval.Nanoseconds()) + + desc := &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 1, ReplicaID: 2}, + {NodeID: 3, StoreID: 1, ReplicaID: 3}, + }, + } + + getQueueLastProcessed := func(ctx context.Context) (hlc.Timestamp, error) { + return testStart, nil + } + + isNodeLive := func(nodeID roachpb.NodeID) (bool, error) { + return live, nil + } + + if shouldQ, priority := kvserver.ConsistencyQueueShouldQueue( + context.Background(), clock.Now(), desc, getQueueLastProcessed, isNodeLive, + false, interval); !shouldQ { t.Fatalf("expected shouldQ true; got %t, %f", shouldQ, priority) } - // Stop a node and expire leases. - mtc.stopStore(2) - mtc.advanceClock(context.Background()) + live = false - if shouldQ, priority := mtc.stores[0].ConsistencyQueueShouldQueue( - context.Background(), mtc.clock().Now(), repl, config.NewSystemConfig(sc.DefaultZoneConfig)); shouldQ { + if shouldQ, priority := kvserver.ConsistencyQueueShouldQueue( + context.Background(), clock.Now(), desc, getQueueLastProcessed, isNodeLive, + false, interval); shouldQ { t.Fatalf("expected shouldQ false; got %t, %f", shouldQ, priority) } } @@ -77,16 +89,22 @@ func TestConsistencyQueueRequiresLive(t *testing.T) { func TestCheckConsistencyMultiStore(t *testing.T) { defer leaktest.AfterTest(t)() - const numStores = 3 - mtc := &multiTestContext{} - defer mtc.Stop() - mtc.Start(t, numStores) - // Setup replication of range 1 on store 0 to stores 1 and 2. - mtc.replicateRange(1, 1, 2) + tc := testcluster.StartTestCluster(t, 3, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationAuto, + }, + ) + + defer tc.Stopper().Stop(context.Background()) + ts := tc.Servers[0] + store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID()) + if pErr != nil { + t.Fatal(pErr) + } // Write something to the DB. putArgs := putArgs([]byte("a"), []byte("b")) - if _, err := kv.SendWrapped(context.Background(), mtc.stores[0].TestSender(), putArgs); err != nil { + if _, err := kv.SendWrapped(context.Background(), store.TestSender(), putArgs); err != nil { t.Fatal(err) } @@ -98,8 +116,8 @@ func TestCheckConsistencyMultiStore(t *testing.T) { EndKey: []byte("aa"), }, } - if _, err := kv.SendWrappedWith(context.Background(), mtc.stores[0].TestSender(), roachpb.Header{ - Timestamp: mtc.stores[0].Clock().Now(), + if _, err := kv.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{ + Timestamp: store.Clock().Now(), }, &checkArgs); err != nil { t.Fatal(err) } @@ -122,13 +140,10 @@ func TestCheckConsistencyReplay(t *testing.T) { } state.applies = map[applyKey]int{} - var mtc *multiTestContext - ctx := context.Background() - storeCfg := kvserver.TestStoreConfig(nil /* clock */) - + testKnobs := kvserver.StoreTestingKnobs{} // Arrange to count the number of times each checksum command applies to each // store. - storeCfg.TestingKnobs.TestingApplyFilter = func(args kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) { + testKnobs.TestingApplyFilter = func(args kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) { state.Lock() defer state.Unlock() if ccr := args.ComputeChecksum; ccr != nil { @@ -138,31 +153,45 @@ func TestCheckConsistencyReplay(t *testing.T) { } // Arrange to trigger a retry when a ComputeChecksum request arrives. - storeCfg.TestingKnobs.TestingResponseFilter = func( + testKnobs.TestingResponseFilter = func( ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse, ) *roachpb.Error { state.Lock() defer state.Unlock() if ba.IsSingleComputeChecksumRequest() && !state.forcedRetry { state.forcedRetry = true - return roachpb.NewError(errors.New(magicMultiTestContextKVTransportError)) + // We need to return a retryable error from the perspective of the sender + return roachpb.NewError(&roachpb.NotLeaseHolderError{}) } return nil } - mtc = &multiTestContext{storeConfig: &storeCfg} - defer mtc.Stop() - mtc.Start(t, 2) + tc := testcluster.StartTestCluster(t, 2, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationAuto, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &testKnobs, + }, + }, + }, + ) - mtc.replicateRange(roachpb.RangeID(1), 1) + defer tc.Stopper().Stop(context.Background()) + ts := tc.Servers[0] + store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID()) + if pErr != nil { + t.Fatal(pErr) + } checkArgs := roachpb.CheckConsistencyRequest{ RequestHeader: roachpb.RequestHeader{ Key: []byte("a"), EndKey: []byte("b"), }, } - if _, err := kv.SendWrapped(ctx, mtc.Store(0).TestSender(), &checkArgs); err != nil { + + if _, err := kv.SendWrapped(context.Background(), store.TestSender(), &checkArgs); err != nil { t.Fatal(err) } // Check that the request was evaluated twice (first time when forcedRetry was @@ -182,46 +211,24 @@ func TestCheckConsistencyReplay(t *testing.T) { func TestCheckConsistencyInconsistent(t *testing.T) { defer leaktest.AfterTest(t)() - sc := kvserver.TestStoreConfig(nil) - sc.Clock = nil // manual clock - mtc := &multiTestContext{ - storeConfig: &sc, - // This test was written before the multiTestContext started creating many - // system ranges at startup, and hasn't been update to take that into - // account. - startWithSingleRange: true, - } - const numStores = 3 + testKnobs := kvserver.StoreTestingKnobs{} - dir, cleanup := testutils.TempDir(t) - defer cleanup() - cache := storage.NewRocksDBCache(1 << 20) - defer cache.Release() - - // Use on-disk stores because we want to take a RocksDB checkpoint and be - // able to find it. - for i := 0; i < numStores; i++ { - eng, err := storage.NewRocksDB(storage.RocksDBConfig{ - StorageConfig: base.StorageConfig{ - Dir: filepath.Join(dir, fmt.Sprintf("%d", i)), - }, - }, cache) - if err != nil { - t.Fatal(err) - } - defer eng.Close() - mtc.engines = append(mtc.engines, eng) - } + var tc *testcluster.TestCluster // s1 will report a diff with inconsistent key "e", and only s2 has that // write (s3 agrees with s1). diffKey := []byte("e") var diffTimestamp hlc.Timestamp notifyReportDiff := make(chan struct{}, 1) - sc.TestingKnobs.ConsistencyTestingKnobs.BadChecksumReportDiff = + testKnobs.ConsistencyTestingKnobs.BadChecksumReportDiff = func(s roachpb.StoreIdent, diff kvserver.ReplicaSnapshotDiffSlice) { - if s != *mtc.Store(0).Ident { + ts := tc.Servers[0] + store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID()) + if pErr != nil { + t.Fatal(pErr) + } + if s != *store.Ident { t.Errorf("BadChecksumReportDiff called from follower (StoreIdent = %v)", s) return } @@ -234,46 +241,64 @@ func TestCheckConsistencyInconsistent(t *testing.T) { t.Errorf("diff = %v", d) } - diff[0].Timestamp.Logical = 987 // mock this out for a consistent string below - - act := diff.String() - - exp := `--- leaseholder -+++ follower -+0.000000123,987 "e" -+ ts:1970-01-01 00:00:00.000000123 +0000 UTC -+ value:"\x00\x00\x00\x00\x01T" -+ raw mvcc_key/value: 6500000000000000007b000003db0d 000000000154 -` - if act != exp { - // We already logged the actual one above. - t.Errorf("expected:\n%s\ngot:\n%s", exp, act) - } - + // With a real clock its not possible to check the actual diff, + // so we settle for just a key check. notifyReportDiff <- struct{}{} } // s2 (index 1) will panic. notifyFatal := make(chan struct{}, 1) - sc.TestingKnobs.ConsistencyTestingKnobs.OnBadChecksumFatal = func(s roachpb.StoreIdent) { - if s != *mtc.Store(1).Ident { + testKnobs.ConsistencyTestingKnobs.OnBadChecksumFatal = func(s roachpb.StoreIdent) { + ts := tc.Servers[1] + store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID()) + if pErr != nil { + t.Fatal(pErr) + } + if s != *store.Ident { t.Errorf("OnBadChecksumFatal called from %v", s) return } notifyFatal <- struct{}{} } - defer mtc.Stop() - mtc.Start(t, numStores) - // Setup replication of range 1 on store 0 to stores 1 and 2. - mtc.replicateRange(1, 1, 2) + dir, cleanup := testutils.TempDir(t) + defer cleanup() + serverArgsPerNode := make(map[int]base.TestServerArgs) + for i := 0; i < numStores; i++ { + testServerArgs := base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &testKnobs, + }, + StoreSpecs: []base.StoreSpec{ + { + Path: filepath.Join(dir, fmt.Sprintf("%d", i)), + InMemory: false, + }, + }, + } + serverArgsPerNode[i] = testServerArgs + } + + tc = testcluster.StartTestCluster(t, numStores, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationAuto, + ServerArgsPerNode: serverArgsPerNode, + }, + ) + defer tc.Stopper().Stop(context.Background()) + + ts := tc.Servers[0] + store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID()) + if pErr != nil { + t.Fatal(pErr) + } // Write something to the DB. pArgs := putArgs([]byte("a"), []byte("b")) - if _, err := kv.SendWrapped(context.Background(), mtc.stores[0].TestSender(), pArgs); err != nil { + if _, err := kv.SendWrapped(context.Background(), store.TestSender(), pArgs); err != nil { t.Fatal(err) } pArgs = putArgs([]byte("c"), []byte("d")) - if _, err := kv.SendWrapped(context.Background(), mtc.stores[0].TestSender(), pArgs); err != nil { + if _, err := kv.SendWrapped(context.Background(), store.TestSender(), pArgs); err != nil { t.Fatal(err) } @@ -286,7 +311,7 @@ func TestCheckConsistencyInconsistent(t *testing.T) { }, Mode: roachpb.ChecksumMode_CHECK_VIA_QUEUE, } - resp, err := kv.SendWrapped(context.Background(), mtc.stores[0].TestSender(), &checkArgs) + resp, err := kv.SendWrapped(context.Background(), store.TestSender(), &checkArgs) if err != nil { t.Fatal(err) } @@ -294,7 +319,12 @@ func TestCheckConsistencyInconsistent(t *testing.T) { } checkpoints := func(nodeIdx int) []string { - pat := filepath.Join(mtc.engines[nodeIdx].GetAuxiliaryDir(), "checkpoints") + "/*" + testServer := tc.Servers[nodeIdx] + testStore, pErr := testServer.Stores().GetStore(testServer.GetFirstStoreID()) + if pErr != nil { + t.Fatal(pErr) + } + pat := filepath.Join(testStore.Engine().GetAuxiliaryDir(), "checkpoints") + "/*" m, err := filepath.Glob(pat) assert.NoError(t, err) return m @@ -318,11 +348,16 @@ func TestCheckConsistencyInconsistent(t *testing.T) { } // Write some arbitrary data only to store 1. Inconsistent key "e"! + ts1 := tc.Servers[1] + store1, pErr := ts1.Stores().GetStore(ts1.GetFirstStoreID()) + if pErr != nil { + t.Fatal(pErr) + } var val roachpb.Value val.SetInt(42) - diffTimestamp = mtc.stores[1].Clock().Now() + diffTimestamp = ts.Clock().Now() if err := storage.MVCCPut( - context.Background(), mtc.stores[1].Engine(), nil, diffKey, diffTimestamp, val, nil, + context.Background(), store1.Engine(), nil, diffKey, diffTimestamp, val, nil, ); err != nil { t.Fatal(err) } @@ -345,11 +380,12 @@ func TestCheckConsistencyInconsistent(t *testing.T) { for i := 0; i < numStores; i++ { cps := checkpoints(i) assert.Len(t, cps, 1) - cpEng, err := storage.NewRocksDB(storage.RocksDBConfig{ - StorageConfig: base.StorageConfig{ + cpEng, err := storage.NewDefaultEngine( + 1<<20, + base.StorageConfig{ Dir: cps[0], }, - }, cache) + ) assert.NoError(t, err) defer cpEng.Close() @@ -368,7 +404,7 @@ func TestCheckConsistencyInconsistent(t *testing.T) { assert.Contains(t, resp.Result[0].Detail, `stats`) // A death rattle should have been written on s2 (store index 1). - eng := mtc.stores[1].Engine() + eng := store1.Engine() f, err := eng.Open(base.PreventedStartupFile(eng.GetAuxiliaryDir())) require.NoError(t, err) b, err := ioutil.ReadAll(f) diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index e02fd98d1e16..e5e135e50189 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -24,7 +24,6 @@ import ( "unsafe" circuit "github.com/cockroachdb/circuitbreaker" - "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" @@ -91,10 +90,18 @@ func (s *Store) ComputeMVCCStats() (enginepb.MVCCStats, error) { // ConsistencyQueueShouldQueue invokes the shouldQueue method on the // store's consistency queue. -func (s *Store) ConsistencyQueueShouldQueue( - ctx context.Context, now hlc.Timestamp, r *Replica, cfg *config.SystemConfig, +func ConsistencyQueueShouldQueue( + ctx context.Context, + now hlc.Timestamp, + desc *roachpb.RangeDescriptor, + getQueueLastProcessed func(ctx context.Context) (hlc.Timestamp, error), + isNodeLive func(nodeID roachpb.NodeID) (bool, error), + disableLastProcessedCheck bool, + interval time.Duration, ) (bool, float64) { - return s.consistencyQueue.shouldQueue(ctx, now, r, cfg) + return consistencyQueueShouldQueueImpl(ctx, now, consistencyShouldQueueData{ + desc, getQueueLastProcessed, isNodeLive, + disableLastProcessedCheck, interval}) } // LogReplicaChangeTest adds a fake replica change event to the log for the