From 78078b85c62fdf3d960df7377df582bcdf4176cb Mon Sep 17 00:00:00 2001 From: Alex Lunev Date: Tue, 26 Jan 2021 18:23:33 -0800 Subject: [PATCH] kvserver: replace multiTestContext with TestCluster/TestServer in client split/merge/status tests Makes progress on #8299 multiTestContext is a legacy construct that is deprecated in favor of running tests via TestCluster. This is one PR out of many to remove the usage of multiTestContext in client_status_test, client_merge_test, client_split_test. To support these changes the following enhancements are introduced - Allow a test to provide a ProtectedTimestampCache to the TestServer/TestCluster - Refactor TestServer.ScratchRangeWithExpirationLease to expose the descriptors for the split. - Add a convinience method to TestCluster to manually hearbeat node liveliness. Release note: None --- pkg/kv/kvserver/client_merge_test.go | 516 ++++++++++-------- pkg/kv/kvserver/client_raft_test.go | 2 +- pkg/kv/kvserver/client_split_test.go | 265 +++++---- pkg/kv/kvserver/client_status_test.go | 31 +- pkg/kv/kvserver/client_test.go | 48 -- pkg/kv/kvserver/helpers_test.go | 2 +- .../protectedts/ptprovider/provider.go | 18 +- pkg/server/server.go | 1 - pkg/server/testserver.go | 22 +- pkg/testutils/testcluster/BUILD.bazel | 1 + pkg/testutils/testcluster/testcluster.go | 24 +- 11 files changed, 518 insertions(+), 412 deletions(-) diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index e55b4898c55c..fd76ec8eb4a7 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -19,7 +19,6 @@ import ( "reflect" "regexp" "sort" - "strconv" "strings" "sync" "sync/atomic" @@ -34,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" @@ -618,7 +618,7 @@ func TestStoreRangeMergeTimestampCacheCausality(t *testing.T) { // Wait for all relevant stores to have the same value. This indirectly // ensures the lease transfers have applied on all relevant stores. - tc.WaitForValues(t, rhsKey, []int64{1, 1, 1}) + tc.WaitForValues(t, rhsKey, []int64{0, 1, 1, 1}) // Merge [a, b) and [b, Max). Our request filter above will intercept the // merge and execute a read with a large timestamp immediately before the @@ -1351,12 +1351,6 @@ func TestStoreRangeMergeRHSLeaseExpiration(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - ctx := context.Background() - storeCfg := kvserver.TestStoreConfig(nil) - storeCfg.TestingKnobs.DisableReplicateQueue = true - storeCfg.TestingKnobs.DisableMergeQueue = true - storeCfg.Clock = nil // manual clock - // The synchronization in this test is tricky. The merge transaction is // controlled by the AdminMerge function and normally commits quite quickly, // but we need to ensure an expiration of the RHS's lease occurs while the @@ -1367,7 +1361,7 @@ func TestStoreRangeMergeRHSLeaseExpiration(t *testing.T) { // Install a hook to control when the merge transaction commits. mergeEndTxnReceived := make(chan *roachpb.Transaction, 10) // headroom in case the merge transaction retries finishMerge := make(chan struct{}) - storeCfg.TestingKnobs.TestingRequestFilter = func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error { + testingRequestFilter := func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error { for _, r := range ba.Requests { if et := r.GetEndTxn(); et != nil && et.InternalCommitTrigger.GetMergeTrigger() != nil { mergeEndTxnReceived <- ba.Txn @@ -1380,9 +1374,9 @@ func TestStoreRangeMergeRHSLeaseExpiration(t *testing.T) { // Install a hook to observe when a get or a put request for a special key, // rhsSentinel, acquires latches and begins evaluating. const reqConcurrency = 10 - rhsSentinel := roachpb.Key("rhs-sentinel") + var rhsSentinel roachpb.Key reqAcquiredLatch := make(chan struct{}, reqConcurrency) - storeCfg.TestingKnobs.TestingLatchFilter = func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error { + testingLatchFilter := func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error { for _, r := range ba.Requests { req := r.GetInner() switch req.Method() { @@ -1395,34 +1389,42 @@ func TestStoreRangeMergeRHSLeaseExpiration(t *testing.T) { return nil } - mtc := &multiTestContext{ - storeConfig: &storeCfg, - // This test was written before the multiTestContext started creating many - // system ranges at startup, and hasn't been update to take that into - // account. - startWithSingleRange: true, - } - - mtc.Start(t, 2) - defer mtc.Stop() + manualClock := hlc.NewHybridManualClock() + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 2, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ClockSource: manualClock.UnixNano, + }, + Store: &kvserver.StoreTestingKnobs{ + TestingRequestFilter: testingRequestFilter, + TestingLatchFilter: testingLatchFilter, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + store := tc.GetFirstStoreFromServer(t, 0) // Create the ranges to be merged. Put both ranges on both stores, but give // the second store the lease on the RHS. The LHS is largely irrelevant. What // matters is that the RHS exists on two stores so we can transfer its lease // during the merge. - lhsDesc, rhsDesc, err := createSplitRanges(ctx, mtc.stores[0]) - if err != nil { - t.Fatal(err) - } - mtc.replicateRange(lhsDesc.RangeID, 1) - mtc.replicateRange(rhsDesc.RangeID, 1) - mtc.transferLease(ctx, rhsDesc.RangeID, 0, 1) + lhsDesc, rhsDesc, err := tc.Servers[0].ScratchRangeWithExpirationLeaseEx() + require.NoError(t, err) + + tc.AddVotersOrFatal(t, lhsDesc.StartKey.AsRawKey(), tc.Target(1)) + tc.AddVotersOrFatal(t, rhsDesc.StartKey.AsRawKey(), tc.Target(1)) + tc.TransferRangeLeaseOrFatal(t, rhsDesc, tc.Target(1)) // Launch the merge. mergeErr := make(chan error) go func() { args := adminMergeArgs(lhsDesc.StartKey.AsRawKey()) - _, pErr := kv.SendWrapped(ctx, mtc.stores[0].TestSender(), args) + _, pErr := kv.SendWrapped(ctx, store.TestSender(), args) mergeErr <- pErr.GoError() }() @@ -1442,19 +1444,19 @@ func TestStoreRangeMergeRHSLeaseExpiration(t *testing.T) { // the transaction being inadvertently aborted during its first attempt, // which this test is not designed to handle. If the merge transaction did // abort then the get requests could complete on r2 before the merge retried. - hb, hbH := heartbeatArgs(mergeTxn, mtc.clock().Now()) - if _, pErr := kv.SendWrappedWith(ctx, mtc.stores[0].TestSender(), hbH, hb); pErr != nil { + hb, hbH := heartbeatArgs(mergeTxn, tc.Servers[0].Clock().Now()) + if _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), hbH, hb); pErr != nil { t.Fatal(pErr) } // Turn off liveness heartbeats on the second store, then advance the clock // past the liveness expiration time. This expires all leases on all stores. - mtc.nodeLivenesses[1].PauseHeartbeatLoopForTest() - mtc.advanceClock(ctx) + tc.Servers[1].NodeLiveness().(*liveness.NodeLiveness).PauseHeartbeatLoopForTest() + manualClock.Increment(store.GetStoreConfig().LeaseExpiration()) // Manually heartbeat the liveness on the first store to ensure it's // considered live. The automatic heartbeat might not come for a while. - require.NoError(t, mtc.heartbeatLiveness(ctx, 0)) + require.NoError(t, tc.HeartbeatLiveness(ctx, 0)) // Send several get and put requests to the RHS. The first of these to // arrive will acquire the lease; the remaining requests will wait for that @@ -1502,7 +1504,7 @@ func TestStoreRangeMergeRHSLeaseExpiration(t *testing.T) { } else { req = putArgs(rhsSentinel, []byte(fmt.Sprintf("val%d", i))) } - _, pErr := kv.SendWrappedWith(ctx, mtc.stores[0].TestSender(), roachpb.Header{ + _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{ RangeID: rhsDesc.RangeID, }, req) reqErrs <- pErr @@ -1638,15 +1640,9 @@ func TestStoreRangeMergeConcurrentRequests(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - ctx := context.Background() - storeCfg := kvserver.TestStoreConfig(nil) - storeCfg.TestingKnobs.DisableSplitQueue = true - storeCfg.TestingKnobs.DisableMergeQueue = true - storeCfg.TestingKnobs.DisableReplicateQueue = true - storeCfg.Clock = nil // manual clock - - var mtc *multiTestContext - storeCfg.TestingKnobs.TestingResponseFilter = func( + var store *kvserver.Store + manualClock := hlc.NewHybridManualClock() + testingResponseFilter := func( ctx context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse, ) *roachpb.Error { del := ba.Requests[0].GetDelete() @@ -1672,21 +1668,42 @@ func TestStoreRangeMergeConcurrentRequests(t *testing.T) { // complete until the Subsume request completed, but the Subsume request // was unable to acquire latches until the Get request finished, which // was itself waiting for the merge to complete. Whoops! - mtc.advanceClock(ctx) + manualClock.Increment(store.GetStoreConfig().LeaseExpiration()) } return nil } - mtc = &multiTestContext{storeConfig: &storeCfg} - mtc.Start(t, 1) - defer mtc.Stop() - store := mtc.Store(0) + ctx := context.Background() + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + DisableSplitQueue: true, + DisableReplicateQueue: true, + TestingResponseFilter: testingResponseFilter, + }, + Server: &server.TestingKnobs{ + ClockSource: manualClock.UnixNano, + }, + }, + }) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) + + key, err := s.ScratchRangeWithExpirationLease() + require.NoError(t, err) - keys := []roachpb.Key{ - roachpb.Key("a1"), roachpb.Key("a2"), roachpb.Key("a3"), - roachpb.Key("c1"), roachpb.Key("c2"), roachpb.Key("c3"), + // We need to use a range with expiration based leases, so set up some test keys + // in the expiration based scratch space. + keys := make([]roachpb.Key, 6) + keys[0] = key.Next() + for i := 1; i < 6; i++ { + keys[i] = keys[i-1].Next().Next() } - + // We chose a split key between key[2] and key[3] + splitKey := keys[2].Next() for _, k := range keys { if err := store.DB().Put(ctx, k, "val"); err != nil { t.Fatal(err) @@ -1726,12 +1743,11 @@ func TestStoreRangeMergeConcurrentRequests(t *testing.T) { } for i := 0; i < numMerges; i++ { - lhsDesc, _, err := createSplitRanges(ctx, store) + lhsDesc, _, err := s.SplitRange(splitKey) if err != nil { t.Fatal(err) } - args := adminMergeArgs(lhsDesc.StartKey.AsRawKey()) - if _, pErr := kv.SendWrapped(ctx, store.TestSender(), args); pErr != nil { + if _, pErr := s.MergeRanges(lhsDesc.StartKey.AsRawKey()); pErr != nil { t.Fatal(pErr) } } @@ -2072,15 +2088,16 @@ func TestStoreRangeMergeSlowUnabandonedFollower_WithSplit(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - storeCfg := kvserver.TestStoreConfig(nil) - storeCfg.TestingKnobs.DisableReplicateQueue = true - mtc := &multiTestContext{storeConfig: &storeCfg} - mtc.Start(t, 3) - defer mtc.Stop() - store0, store2 := mtc.Store(0), mtc.Store(2) - - rngID := store0.LookupReplica(roachpb.RKey("a")).Desc().RangeID - mtc.replicateRange(rngID, 1, 2) + tc := testcluster.StartTestCluster(t, 3, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + + store0, store2 := tc.GetFirstStoreFromServer(t, 0), tc.GetFirstStoreFromServer(t, 2) + repl0 := store0.LookupReplica(roachpb.RKey("a")) + + tc.AddVotersOrFatal(t, repl0.Desc().StartKey.AsRawKey(), tc.Targets(1, 2)...) lhsDesc, rhsDesc, err := createSplitRanges(ctx, store0) if err != nil { t.Fatal(err) @@ -2094,9 +2111,14 @@ func TestStoreRangeMergeSlowUnabandonedFollower_WithSplit(t *testing.T) { // Start dropping all Raft traffic to the LHS on store2 so that it won't be // aware that there is a merge in progress. - mtc.transport.Listen(store2.Ident.StoreID, &unreliableRaftHandler{ + tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, &unreliableRaftHandler{ rangeID: lhsDesc.RangeID, RaftMessageHandler: store2, + unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ + dropReq: func(req *kvserver.RaftMessageRequest) bool { + return true + }, + }, }) args := adminMergeArgs(lhsDesc.StartKey.AsRawKey()) @@ -2115,19 +2137,19 @@ func TestStoreRangeMergeSlowUnabandonedFollower_WithSplit(t *testing.T) { } // Remove the LHS replica from store2. - mtc.unreplicateRange(lhsDesc.RangeID, 2) + tc.RemoveVotersOrFatal(t, lhsDesc.StartKey.AsRawKey(), tc.Target(2)) // Transfer the lease on the new RHS to store2 and wait for it to apply. This // will force its replica to of the new RHS to become up to date, which // indirectly tests that the replica GC queue cleans up both the LHS replica // and the old RHS replica. - mtc.transferLease(ctx, newRHSDesc.RangeID, 0, 2) + tc.TransferRangeLeaseOrFatal(t, *newRHSDesc, tc.Target(2)) testutils.SucceedsSoon(t, func() error { rhsRepl, err := store2.GetReplica(newRHSDesc.RangeID) if err != nil { return err } - if !rhsRepl.OwnsValidLease(ctx, mtc.clock().NowAsClockTimestamp()) { + if !rhsRepl.OwnsValidLease(ctx, tc.Servers[2].Clock().NowAsClockTimestamp()) { return errors.New("rhs store does not own valid lease for rhs range") } return nil @@ -2337,15 +2359,15 @@ func TestStoreRangeMergeAbandonedFollowersAutomaticallyGarbageCollected(t *testi defer log.Scope(t).Close(t) ctx := context.Background() - storeCfg := kvserver.TestStoreConfig(nil) - storeCfg.TestingKnobs.DisableReplicateQueue = true - mtc := &multiTestContext{storeConfig: &storeCfg} - mtc.Start(t, 3) - defer mtc.Stop() - store0, store2 := mtc.Store(0), mtc.Store(2) - - rngID := store0.LookupReplica(roachpb.RKey("a")).Desc().RangeID - mtc.replicateRange(rngID, 1, 2) + tc := testcluster.StartTestCluster(t, 3, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + store0, store2 := tc.GetFirstStoreFromServer(t, 0), tc.GetFirstStoreFromServer(t, 2) + + repl0 := store0.LookupReplica(roachpb.RKey("a")) + tc.AddVotersOrFatal(t, repl0.Desc().StartKey.AsRawKey(), tc.Targets(1, 2)...) lhsDesc, rhsDesc, err := createSplitRanges(ctx, store0) if err != nil { t.Fatal(err) @@ -2353,13 +2375,13 @@ func TestStoreRangeMergeAbandonedFollowersAutomaticallyGarbageCollected(t *testi // Make store2 the leaseholder for the RHS and wait for the lease transfer to // apply. - mtc.transferLease(ctx, rhsDesc.RangeID, 0, 2) + tc.TransferRangeLeaseOrFatal(t, *rhsDesc, tc.Target(2)) testutils.SucceedsSoon(t, func() error { rhsRepl, err := store2.GetReplica(rhsDesc.RangeID) if err != nil { return err } - if !rhsRepl.OwnsValidLease(ctx, mtc.clock().NowAsClockTimestamp()) { + if !rhsRepl.OwnsValidLease(ctx, tc.Servers[2].Clock().NowAsClockTimestamp()) { return errors.New("store2 does not own valid lease for rhs range") } return nil @@ -2367,9 +2389,14 @@ func TestStoreRangeMergeAbandonedFollowersAutomaticallyGarbageCollected(t *testi // Start dropping all Raft traffic to the LHS replica on store2 so that it // won't be aware that there is a merge in progress. - mtc.transport.Listen(store2.Ident.StoreID, &unreliableRaftHandler{ + tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, &unreliableRaftHandler{ rangeID: lhsDesc.RangeID, RaftMessageHandler: store2, + unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ + dropReq: func(*kvserver.RaftMessageRequest) bool { + return true + }, + }, }) // Perform the merge. The LHS replica on store2 whon't hear about this merge @@ -2384,7 +2411,7 @@ func TestStoreRangeMergeAbandonedFollowersAutomaticallyGarbageCollected(t *testi // Remove the merged range from store2. Its replicas of both the LHS and RHS // are now eligible for GC. - mtc.unreplicateRange(lhsDesc.RangeID, 2) + tc.RemoveVotersOrFatal(t, lhsDesc.StartKey.AsRawKey(), tc.Target(2)) // Note that we purposely do not call store.ManualReplicaGC here, as that // calls replicaGCQueue.process directly, bypassing the logic in @@ -2436,8 +2463,6 @@ func TestStoreRangeMergeDeadFollowerDuringTxn(t *testing.T) { ctx := context.Background() var tc *testcluster.TestCluster - storeCfg := kvserver.TestStoreConfig(nil) - storeCfg.TestingKnobs.DisableMergeQueue = true testingRequestFilter := func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error { if ba.IsSingleSubsumeRequest() { tc.StopServer(2) // This is safe to call multiple times, it will only stop once @@ -2621,13 +2646,13 @@ func TestStoreRangeMergeUninitializedLHSFollower(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - storeCfg := kvserver.TestStoreConfig(nil) - storeCfg.TestingKnobs.DisableReplicateQueue = true - mtc := &multiTestContext{storeConfig: &storeCfg} - mtc.Start(t, 3) - defer mtc.Stop() - store0, store2 := mtc.Store(0), mtc.Store(2) - distSender := mtc.distSenders[0] + tc := testcluster.StartTestCluster(t, 3, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + store0, store2 := tc.GetFirstStoreFromServer(t, 0), tc.GetFirstStoreFromServer(t, 2) + distSender := tc.Servers[0].DistSender() split := func(key roachpb.RKey) roachpb.RangeID { t.Helper() @@ -2642,8 +2667,8 @@ func TestStoreRangeMergeUninitializedLHSFollower(t *testing.T) { aKey, bKey := roachpb.RKey("a"), roachpb.RKey("b") // Put range 1 on all three stores. - rngID := store0.LookupReplica(aKey).Desc().RangeID - mtc.replicateRange(rngID, 1, 2) + desc := store0.LookupReplica(aKey).Desc() + tc.AddVotersOrFatal(t, desc.StartKey.AsRawKey(), tc.Targets(1, 2)...) // Create range B and wait for store2 to process the split. bRangeID := split(bKey) @@ -2661,10 +2686,15 @@ func TestStoreRangeMergeUninitializedLHSFollower(t *testing.T) { // of range 1 never processes the split trigger, which would create an // initialized replica of A. unreliableHandler := &unreliableRaftHandler{ - rangeID: rngID, + rangeID: desc.RangeID, RaftMessageHandler: store2, + unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ + dropReq: func(request *kvserver.RaftMessageRequest) bool { + return true + }, + }, } - mtc.transport.Listen(store2.Ident.StoreID, unreliableHandler) + tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, unreliableHandler) // Perform the split of A, now that store2 won't be able to initialize its // replica of A. @@ -2678,7 +2708,7 @@ func TestStoreRangeMergeUninitializedLHSFollower(t *testing.T) { RaftMessageHandler: unreliableHandler, } defer slowSnapHandler.unblock() - mtc.transport.Listen(store2.Ident.StoreID, slowSnapHandler) + tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, slowSnapHandler) // Remove the replica of range 1 on store2. If we were to leave it in place, // store2 would refuse to GC its replica of C after the merge commits, because @@ -2686,16 +2716,16 @@ func TestStoreRangeMergeUninitializedLHSFollower(t *testing.T) { // (Remember that we refused to let it process the split of A.) So we need to // remove it so that C has no left neighbor and thus will be eligible for GC. { - r1Repl2, err := store2.GetReplica(rngID) + r1Repl2, err := store2.GetReplica(desc.RangeID) if err != nil { t.Fatal(err) } - mtc.unreplicateRange(rngID, 2) + tc.RemoveVotersOrFatal(t, desc.StartKey.AsRawKey(), tc.Target(2)) testutils.SucceedsSoon(t, func() error { if err := store2.ManualReplicaGC(r1Repl2); err != nil { return err } - if _, err := store2.GetReplica(rngID); err == nil { + if _, err := store2.GetReplica(desc.RangeID); err == nil { return errors.New("r1Repl2 still exists") } return nil @@ -2745,9 +2775,11 @@ func TestStoreRangeMergeUninitializedLHSFollower(t *testing.T) { t.Fatal(err) } + repl, err := store0.GetReplica(aRangeID) + require.NoError(t, err) // Give store2 the lease on the merged range to force all commands to be // applied, including the merge trigger. - mtc.transferLease(ctx, aRangeID, 0, 2) + tc.TransferRangeLeaseOrFatal(t, *repl.Desc(), tc.Target(2)) } // TestStoreRangeMergeWatcher verifies that the watcher goroutine for a merge's @@ -3034,12 +3066,10 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { // We will be testing the SSTs written on store2's engine. var receivingEng, sendingEng storage.Engine - ctx := context.Background() - storeCfg := kvserver.TestStoreConfig(nil) - storeCfg.TestingKnobs.DisableReplicateQueue = true - storeCfg.TestingKnobs.DisableReplicaGCQueue = true - storeCfg.Clock = nil // manual clock - storeCfg.TestingKnobs.BeforeSnapshotSSTIngestion = func( + // All of these variables will be populated later, after starting the cluster.. + var keyStart, keyA, keyB, keyC, keyD, keyEnd roachpb.Key + rangeIds := make(map[string]roachpb.RangeID, 4) + beforeSnapshotSSTIngestion := func( inSnap kvserver.IncomingSnapshot, snapType kvserver.SnapshotRequest_Type, sstNames []string, @@ -3051,7 +3081,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { // on in the test. This function verifies that the subsumed replicas have // been handled properly. if snapType != kvserver.SnapshotRequest_VIA_SNAPSHOT_QUEUE || - inSnap.State.Desc.RangeID != roachpb.RangeID(2) { + inSnap.State.Desc.RangeID != rangeIds[string(keyA)] { return nil } // TODO(sumeer): fix this test (and others in this file) when @@ -3148,7 +3178,8 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { // Construct SSTs for the range-id local keys of the subsumed replicas. // with RangeIDs 3 and 4. - for _, rangeID := range []roachpb.RangeID{roachpb.RangeID(3), roachpb.RangeID(4)} { + for _, k := range []roachpb.Key{keyB, keyC} { + rangeID := rangeIds[string(k)] sstFile := &storage.MemFile{} sst := storage.MakeIngestionSSTWriter(sstFile) defer sst.Close() @@ -3173,8 +3204,8 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { sst := storage.MakeIngestionSSTWriter(sstFile) defer sst.Close() desc := roachpb.RangeDescriptor{ - StartKey: roachpb.RKey("d"), - EndKey: roachpb.RKeyMax, + StartKey: roachpb.RKey(keyD), + EndKey: roachpb.RKey(keyEnd), } r := rditer.MakeUserKeyRange(&desc) if err := storage.ClearRangeWithHeuristic(receivingEng, &sst, r.Start.Key, r.End.Key); err != nil { @@ -3202,68 +3233,92 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { } return nil } - mtc := &multiTestContext{ - storeConfig: &storeCfg, - // This test was written before the multiTestContext started creating many - // system ranges at startup, and hasn't been update to take that into - // account. - startWithSingleRange: true, - } - mtc.Start(t, 3) - defer mtc.Stop() - store0, store2 := mtc.Store(0), mtc.Store(2) + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 3, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + BeforeSnapshotSSTIngestion: beforeSnapshotSSTIngestion, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + store0, store2 := tc.GetFirstStoreFromServer(t, 0), tc.GetFirstStoreFromServer(t, 2) sendingEng = store0.Engine() receivingEng = store2.Engine() - distSender := mtc.distSenders[0] + distSender := tc.Servers[0].DistSender() + + // This test works across 5 ranges in total. We start with a scratch range(1) + // [Start, End). We then split this range as follows: + // range(1) = [Start, a) + // range(2) = [a, b) + // range(3) = [b, c) + // range(4) = [c, End). + keyStart = tc.ScratchRange(t) + repl := store0.LookupReplica(roachpb.RKey(keyStart)) + keyEnd = repl.Desc().EndKey.AsRawKey() + keyA = keyStart.Next().Next() + keyB = keyA.Next().Next() + keyC = keyB.Next().Next() + keyD = keyC.Next().Next() + rangeIds[string(keyStart)] = repl.RangeID // Create three fully-caught-up, adjacent ranges on all three stores. - mtc.replicateRange(roachpb.RangeID(1), 1, 2) - for _, key := range []roachpb.Key{roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")} { - if _, pErr := kv.SendWrapped(ctx, distSender, adminSplitArgs(key)); pErr != nil { - t.Fatal(pErr) - } + tc.AddVotersOrFatal(t, keyStart, tc.Targets(1, 2)...) + for _, key := range []roachpb.Key{keyA, keyB, keyC} { + _, rhsDesc := tc.SplitRangeOrFatal(t, key) + rangeIds[string(key)] = rhsDesc.RangeID if _, pErr := kv.SendWrapped(ctx, distSender, incrementArgs(key, 1)); pErr != nil { t.Fatal(pErr) } - mtc.waitForValues(key, []int64{1, 1, 1}) + tc.WaitForValues(t, key, []int64{1, 1, 1}) } // Put some keys in [d, /Max) so the subsumed replica of [c, /Max) with range // ID 4 has tombstones. We will clear uncontained key range of subsumed // replicas, so when we are receiving a snapshot for [a, d), we expect to // clear the keys in [d, /Max). + key := keyD for i := 0; i < 10; i++ { - key := roachpb.Key("d" + strconv.Itoa(i)) + key = key.Next() if _, pErr := kv.SendWrapped(ctx, distSender, incrementArgs(key, 1)); pErr != nil { t.Fatal(pErr) } - mtc.waitForValues(key, []int64{1, 1, 1}) + tc.WaitForValues(t, key, []int64{1, 1, 1}) } - aRepl0 := store0.LookupReplica(roachpb.RKey("a")) + aRepl0 := store0.LookupReplica(roachpb.RKey(keyA)) // Start dropping all Raft traffic to the first range on store2. - mtc.transport.Listen(store2.Ident.StoreID, &unreliableRaftHandler{ + tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, &unreliableRaftHandler{ rangeID: aRepl0.RangeID, RaftMessageHandler: store2, + unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ + dropReq: func(request *kvserver.RaftMessageRequest) bool { + return true + }, + }, }) // Merge [a, b) into [b, c), then [a, c) into [c, /Max). for i := 0; i < 2; i++ { - if _, pErr := kv.SendWrapped(ctx, distSender, adminMergeArgs(roachpb.Key("a"))); pErr != nil { + if _, pErr := kv.SendWrapped(ctx, distSender, adminMergeArgs(keyA)); pErr != nil { t.Fatal(pErr) } } // Split [a, /Max) into [a, d) and [d, /Max). This means the Raft snapshot // will span both a merge and a split. - if _, pErr := kv.SendWrapped(ctx, distSender, adminSplitArgs(roachpb.Key("d"))); pErr != nil { + if _, pErr := kv.SendWrapped(ctx, distSender, adminSplitArgs(keyD)); pErr != nil { t.Fatal(pErr) } // Truncate the logs of the LHS. index := func() uint64 { - repl := store0.LookupReplica(roachpb.RKey("a")) + repl := store0.LookupReplica(roachpb.RKey(keyA)) index, err := repl.GetLastIndex() if err != nil { t.Fatal(err) @@ -3271,11 +3326,11 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { // Truncate the log at index+1 (log entries < N are removed, so this // includes the merge). truncArgs := &roachpb.TruncateLogRequest{ - RequestHeader: roachpb.RequestHeader{Key: roachpb.Key("a")}, + RequestHeader: roachpb.RequestHeader{Key: keyA}, Index: index, RangeID: repl.RangeID, } - if _, err := kv.SendWrapped(ctx, mtc.distSenders[0], truncArgs); err != nil { + if _, err := kv.SendWrapped(ctx, distSender, truncArgs); err != nil { t.Fatal(err) } return index @@ -3285,7 +3340,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { // Restore Raft traffic to the LHS on store2. log.Infof(ctx, "restored traffic to store 2") - mtc.transport.Listen(store2.Ident.StoreID, &unreliableRaftHandler{ + tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, &unreliableRaftHandler{ rangeID: aRepl0.RangeID, RaftMessageHandler: store2, unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ @@ -3313,32 +3368,28 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { if afterRaftSnaps <= beforeRaftSnaps { return errors.New("expected store2 to apply at least 1 additional raft snapshot") } - - // Verify that the sets of keys in store0 and store2 are identical. - storeKeys0 := getEngineKeySet(t, store0.Engine()) - storeKeys2 := getEngineKeySet(t, store2.Engine()) - dRepl0 := store0.LookupReplica(roachpb.RKey("d")) - ignoreKey := func(k string) bool { - // Unreplicated keys for the remaining ranges are allowed to differ. - for _, id := range []roachpb.RangeID{1, aRepl0.RangeID, dRepl0.RangeID} { - if strings.HasPrefix(k, string(keys.MakeRangeIDUnreplicatedPrefix(id))) { - return true - } + // We only look at the range of keys the test has been manipulating. + getKeySet := func(engine storage.Engine) map[string]struct{} { + kvs, err := storage.Scan(engine, keyStart, keyEnd, 0 /* max */) + if err != nil { + t.Fatal(err) } - return false + out := map[string]struct{}{} + for _, kv := range kvs { + out[string(kv.Key.Key)] = struct{}{} + } + return out } + + // Verify that the sets of keys in store0 and store2 are identical. + storeKeys0 := getKeySet(store0.Engine()) + storeKeys2 := getKeySet(store2.Engine()) for k := range storeKeys0 { - if ignoreKey(k) { - continue - } if _, ok := storeKeys2[k]; !ok { return fmt.Errorf("store2 missing key %s", roachpb.Key(k)) } } for k := range storeKeys2 { - if ignoreKey(k) { - continue - } if _, ok := storeKeys0[k]; !ok { return fmt.Errorf("store2 has extra key %s", roachpb.Key(k)) } @@ -3355,21 +3406,15 @@ func TestStoreRangeMergeDuringShutdown(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - storeCfg := kvserver.TestStoreConfig(nil) - storeCfg.TestingKnobs.DisableSplitQueue = true - storeCfg.TestingKnobs.DisableMergeQueue = true - storeCfg.TestingKnobs.DisableReplicateQueue = true - storeCfg.Clock = nil // manual clock - // Install a filter that triggers a shutdown when stop is non-zero and the // rhsDesc requests a new lease. - var mtc *multiTestContext + var s *server.TestServer var state struct { syncutil.Mutex rhsDesc *roachpb.RangeDescriptor stop, stopping bool } - storeCfg.TestingKnobs.TestingPostApplyFilter = func(args kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) { + testingPostApplyFilter := func(args kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) { state.Lock() if state.stop && !state.stopping && args.RangeID == state.rhsDesc.RangeID && args.IsLeaseRequest { // Shut down the store. The lease acquisition will notice that a merge is @@ -3379,7 +3424,7 @@ func TestStoreRangeMergeDuringShutdown(t *testing.T) { // incorrectly (#27552). state.stopping = true state.Unlock() - go mtc.Stop() + go s.Stopper().Stop(ctx) // Sleep to give the shutdown time to propagate. The test appeared to work // without this sleep, but best to be somewhat robust to different // goroutine schedules. @@ -3390,22 +3435,28 @@ func TestStoreRangeMergeDuringShutdown(t *testing.T) { return 0, nil } - mtc = &multiTestContext{ - storeConfig: &storeCfg, - // This test was written before the multiTestContext started creating many - // system ranges at startup, and hasn't been update to take that into - // account. - startWithSingleRange: true, - } - mtc.Start(t, 1) - defer mtc.Stop() - store := mtc.Store(0) - stopper := mtc.engineStoppers[0] + manualClock := hlc.NewHybridManualClock() + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + DisableSplitQueue: true, + DisableReplicateQueue: true, + TestingPostApplyFilter: testingPostApplyFilter, + }, + Server: &server.TestingKnobs{ + ClockSource: manualClock.UnixNano, + }, + }, + }) + s = serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) - _, rhsDesc, err := createSplitRanges(ctx, store) - if err != nil { - t.Fatal(err) - } + key, err := s.ScratchRangeWithExpirationLease() + require.NoError(t, err) + rhsDesc := store.LookupReplica(roachpb.RKey(key)).Desc() state.Lock() state.rhsDesc = rhsDesc state.Unlock() @@ -3427,12 +3478,12 @@ func TestStoreRangeMergeDuringShutdown(t *testing.T) { state.Unlock() // Expire all leases. - mtc.advanceClock(ctx) + manualClock.Increment(store.GetStoreConfig().LeaseExpiration()) // Send a dummy get request on the RHS to force a lease acquisition. We expect // this to fail, as quiescing stores cannot acquire leases. - err = stopper.RunTaskWithErr(ctx, "test-get-rhs-key", func(ctx context.Context) error { - _, err := store.DB().Get(ctx, "dummy-rhs-key") + err = s.Stopper().RunTaskWithErr(ctx, "test-get-rhs-key", func(ctx context.Context) error { + _, err := store.DB().Get(ctx, key.Next()) return err }) if exp := "not lease holder"; !testutils.IsError(err, exp) { @@ -3444,7 +3495,7 @@ func verifyMerged(t *testing.T, store *kvserver.Store, lhsStartKey, rhsStartKey t.Helper() repl := store.LookupReplica(rhsStartKey) if !repl.Desc().StartKey.Equal(lhsStartKey) { - t.Fatalf("ranges unexpectedly unmerged") + t.Fatalf("ranges unexpectedly unmerged expected startKey %s, but got %s", lhsStartKey, repl.Desc().StartKey) } } @@ -3461,29 +3512,36 @@ func TestMergeQueue(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - manualClock := hlc.NewManualClock(123) - clock := hlc.NewClock(manualClock.UnixNano, time.Nanosecond) - storeCfg := kvserver.TestStoreConfig(nil) - storeCfg.TestingKnobs.DisableSplitQueue = true - storeCfg.TestingKnobs.DisableReplicateQueue = true - storeCfg.TestingKnobs.DisableScanner = true + manualClock := hlc.NewHybridManualClock() + zoneConfig := zonepb.DefaultZoneConfig() rangeMinBytes := int64(1 << 10) // 1KB - storeCfg.DefaultZoneConfig.RangeMinBytes = &rangeMinBytes - sv := &storeCfg.Settings.SV + zoneConfig.RangeMinBytes = &rangeMinBytes + settings := cluster.MakeTestingClusterSettings() + sv := &settings.SV kvserverbase.MergeQueueEnabled.Override(sv, true) kvserver.MergeQueueInterval.Override(sv, 0) // process greedily - var mtc multiTestContext - // This test was written before the multiTestContext started creating many - // system ranges at startup, and hasn't been update to take that into account. - mtc.startWithSingleRange = true - - mtc.storeConfig = &storeCfg - // Inject clock for manipulation in tests. - mtc.storeConfig.Clock = clock - mtc.Start(t, 2) - defer mtc.Stop() - mtc.initGossipNetwork() // needed for the non-collocated case's rebalancing to work - store := mtc.Store(0) + + tc := testcluster.StartTestCluster(t, 2, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ClockSource: manualClock.UnixNano, + DefaultZoneConfigOverride: &zoneConfig, + }, + Store: &kvserver.StoreTestingKnobs{ + DisableScanner: true, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + store := tc.GetFirstStoreFromServer(t, 0) + // The cluster with manual replication disables the merge queue, + // so we need to re-enable. + kvserverbase.MergeQueueEnabled.Override(sv, true) store.SetMergeQueueActive(true) split := func(t *testing.T, key roachpb.Key, expirationTime hlc.Timestamp) { @@ -3502,11 +3560,13 @@ func TestMergeQueue(t *testing.T) { t.Fatal(pErr) } } + rng, _ := randutil.NewPseudoRand() + randBytes := randutil.RandBytes(rng, int(*zoneConfig.RangeMinBytes)) + + lhsStartKey := roachpb.RKey(tc.ScratchRange(t)) + rhsStartKey := lhsStartKey.Next().Next() + rhsEndKey := rhsStartKey.Next().Next() - // Create two empty ranges, a - b and b - c, by splitting at a, b, and c. - lhsStartKey := roachpb.RKey("a") - rhsStartKey := roachpb.RKey("b") - rhsEndKey := roachpb.RKey("c") for _, k := range []roachpb.RKey{lhsStartKey, rhsStartKey, rhsEndKey} { split(t, k.AsRawKey(), hlc.Timestamp{} /* expirationTime */) } @@ -3520,9 +3580,6 @@ func TestMergeQueue(t *testing.T) { rhs().SetZoneConfig(&zone) } - rng, _ := randutil.NewPseudoRand() - randBytes := randutil.RandBytes(rng, int(*storeCfg.DefaultZoneConfig.RangeMinBytes)) - reset := func(t *testing.T) { t.Helper() clearRange(t, lhsStartKey, rhsEndKey) @@ -3531,9 +3588,9 @@ func TestMergeQueue(t *testing.T) { t.Fatal(err) } } - setZones(*storeCfg.DefaultZoneConfig) + setZones(zoneConfig) store.MustForceMergeScanAndProcess() // drain any merges that might already be queued - split(t, roachpb.Key("b"), hlc.Timestamp{} /* expirationTime */) + split(t, rhsStartKey.AsRawKey(), hlc.Timestamp{} /* expirationTime */) } t.Run("sanity", func(t *testing.T) { @@ -3555,7 +3612,7 @@ func TestMergeQueue(t *testing.T) { t.Run("lhs-undersize", func(t *testing.T) { reset(t) - zone := protoutil.Clone(storeCfg.DefaultZoneConfig).(*zonepb.ZoneConfig) + zone := protoutil.Clone(&zoneConfig).(*zonepb.ZoneConfig) *zone.RangeMinBytes *= 2 lhs().SetZoneConfig(zone) store.MustForceMergeScanAndProcess() @@ -3567,9 +3624,9 @@ func TestMergeQueue(t *testing.T) { // The ranges are individually beneath the minimum size threshold, but // together they'll exceed the maximum size threshold. - zone := protoutil.Clone(storeCfg.DefaultZoneConfig).(*zonepb.ZoneConfig) - zone.RangeMinBytes = proto.Int64(lhs().GetMVCCStats().Total() + 1) - zone.RangeMaxBytes = proto.Int64(lhs().GetMVCCStats().Total()*2 - 1) + zone := protoutil.Clone(&zoneConfig).(*zonepb.ZoneConfig) + zone.RangeMinBytes = proto.Int64(rhs().GetMVCCStats().Total() + 1) + zone.RangeMaxBytes = proto.Int64(lhs().GetMVCCStats().Total() + rhs().GetMVCCStats().Total() - 1) setZones(*zone) store.MustForceMergeScanAndProcess() verifyUnmerged(t, store, lhsStartKey, rhsStartKey) @@ -3577,18 +3634,29 @@ func TestMergeQueue(t *testing.T) { // Once the maximum size threshold is increased, the merge can occur. zone.RangeMaxBytes = proto.Int64(*zone.RangeMaxBytes + 1) setZones(*zone) + l := lhs().RangeID + r := rhs().RangeID + log.Infof(ctx, "Left=%s, Right=%s", l, r) store.MustForceMergeScanAndProcess() verifyMerged(t, store, lhsStartKey, rhsStartKey) }) t.Run("non-collocated", func(t *testing.T) { reset(t) + rangeID := rhs().RangeID verifyUnmerged(t, store, lhsStartKey, rhsStartKey) - rhsRangeID := rhs().RangeID - mtc.replicateRange(rhsRangeID, 1) - mtc.transferLease(ctx, rhsRangeID, 0, 1) - mtc.unreplicateRange(rhsRangeID, 0) - require.NoError(t, mtc.waitForUnreplicated(rhsRangeID, 0)) + tc.AddVotersOrFatal(t, rhs().Desc().StartKey.AsRawKey(), tc.Target(1)) + tc.TransferRangeLeaseOrFatal(t, *rhs().Desc(), tc.Target(1)) + tc.RemoveVotersOrFatal(t, rhs().Desc().StartKey.AsRawKey(), tc.Target(0)) + testutils.SucceedsSoon(t, func() error { + _, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(rangeID) + if err == nil { + return fmt.Errorf("replica still exists on dest %d", tc.Target(0)) + } else if errors.HasType(err, (*roachpb.RangeNotFoundError)(nil)) { + return nil + } + return err + }) clearRange(t, lhsStartKey, rhsEndKey) store.MustForceMergeScanAndProcess() @@ -3629,18 +3697,18 @@ func TestMergeQueue(t *testing.T) { verifyUnmerged(t, store, lhsStartKey, rhsStartKey) // Perform manual merge and verify that no merge occurred. - split(t, rhsStartKey.AsRawKey(), clock.Now().Add(manualSplitTTL.Nanoseconds(), 0) /* expirationTime */) + split(t, rhsStartKey.AsRawKey(), tc.Servers[0].Clock().Now().Add(manualSplitTTL.Nanoseconds(), 0) /* expirationTime */) clearRange(t, lhsStartKey, rhsEndKey) store.MustForceMergeScanAndProcess() verifyUnmerged(t, store, lhsStartKey, rhsStartKey) // Sticky bit is not expired yet. - manualClock.Set(manualSplitTTL.Nanoseconds()) + manualClock.Increment(manualSplitTTL.Nanoseconds() / 2) store.MustForceMergeScanAndProcess() verifyUnmerged(t, store, lhsStartKey, rhsStartKey) // Sticky bit is expired. - manualClock.Set(manualSplitTTL.Nanoseconds() * 2) + manualClock.Increment(manualSplitTTL.Nanoseconds()) store.MustForceMergeScanAndProcess() verifyMerged(t, store, lhsStartKey, rhsStartKey) }) diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 75ac8933827c..b758412e4905 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -3432,7 +3432,7 @@ func TestReplicateRemovedNodeDisruptiveElection(t *testing.T) { // 2. Wait for all nodes to process the increment (and therefore the // new lease). - tc.WaitForValues(t, key, []int64{value, value, value}) + tc.WaitForValues(t, key, []int64{0, value, value, value}) // 3. Wait for the lease holder to obtain raft leadership too. testutils.SucceedsSoon(t, func() error { diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index c3be633b487d..85ff57b40213 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -1498,24 +1498,48 @@ func TestStoreRangeSystemSplits(t *testing.T) { // // See https://github.com/cockroachdb/cockroach/issues/1644. func runSetupSplitSnapshotRace( - t *testing.T, testFn func(*multiTestContext, roachpb.Key, roachpb.Key), + t *testing.T, + stickyEnginesRegistry server.StickyInMemEnginesRegistry, + testFn func(*testcluster.TestCluster, roachpb.Key, roachpb.Key), ) { - sc := kvserver.TestStoreConfig(nil) - // We'll control replication by hand. - sc.TestingKnobs.DisableReplicateQueue = true - // Async intent resolution can sometimes lead to hangs when we stop - // most of the stores at the end of this function. - sc.TestingKnobs.IntentResolverKnobs.DisableAsyncIntentResolution = true - // Avoid fighting with the merge queue while trying to reproduce this race. - sc.TestingKnobs.DisableMergeQueue = true - sc.TestingKnobs.DisableGCQueue = true - // Disable the split delay mechanism, or it'll spend 10s going in circles. - // (We can't set it to zero as otherwise the default overrides us). - sc.RaftDelaySplitToSuppressSnapshotTicks = -1 - sc.Clock = nil // manual clock - mtc := &multiTestContext{storeConfig: &sc} - defer mtc.Stop() - mtc.Start(t, 6) + const numServers int = 6 + stickyServerArgs := make(map[int]base.TestServerArgs) + for i := 0; i < numServers; i++ { + stickyServerArgs[i] = base.TestServerArgs{ + StoreSpecs: []base.StoreSpec{ + { + InMemory: true, + StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + }, + }, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + StickyEngineRegistry: stickyEnginesRegistry, + }, + Store: &kvserver.StoreTestingKnobs{ + DisableGCQueue: true, + // Async intent resolution can sometimes lead to hangs when we stop + // most of the stores at the end of this function. + IntentResolverKnobs: kvserverbase.IntentResolverTestingKnobs{ + DisableAsyncIntentResolution: true, + }, + }, + }, + RaftConfig: base.RaftConfig{ + // Disable the split delay mechanism, or it'll spend 10s going in circles. + // (We can't set it to zero as otherwise the default overrides us). + RaftDelaySplitToSuppressSnapshotTicks: -1, + }, + } + } + ctx := context.Background() + tc := testcluster.StartTestCluster(t, numServers, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: stickyServerArgs, + }) + defer tc.Stopper().Stop(ctx) + store := tc.GetFirstStoreFromServer(t, 0) leftKey := roachpb.Key("a") rightKey := roachpb.Key("z") @@ -1523,62 +1547,59 @@ func runSetupSplitSnapshotRace( // First, do a couple of writes; we'll use these to determine when // the dust has settled. incArgs := incrementArgs(leftKey, 1) - if _, pErr := kv.SendWrapped(context.Background(), mtc.stores[0].TestSender(), incArgs); pErr != nil { + if _, pErr := kv.SendWrapped(context.Background(), store.TestSender(), incArgs); pErr != nil { t.Fatal(pErr) } incArgs = incrementArgs(rightKey, 2) - if _, pErr := kv.SendWrapped(context.Background(), mtc.stores[0].TestSender(), incArgs); pErr != nil { + if _, pErr := kv.SendWrapped(context.Background(), store.TestSender(), incArgs); pErr != nil { t.Fatal(pErr) } // Split the system range from the rest of the keyspace. splitArgs := adminSplitArgs(keys.SystemMax) - if _, pErr := kv.SendWrapped(context.Background(), mtc.stores[0].TestSender(), splitArgs); pErr != nil { + if _, pErr := kv.SendWrapped(context.Background(), store.TestSender(), splitArgs); pErr != nil { t.Fatal(pErr) } - // Get the left range's ID. This is currently 2, but using - // LookupReplica is more future-proof (and see below for - // rightRangeID). - leftRangeID := mtc.stores[0].LookupReplica(roachpb.RKey("a")).RangeID + // Get the left range replica. + lhsRepl := store.LookupReplica(roachpb.RKey("a")) // Replicate the left range onto nodes 1-3 and remove it from node 0. We have - // to transfer the lease before unreplicating from range 0 because it isn't + // to transfer the lease before unreplicating from store 0 because it isn't // safe (or allowed) for a leaseholder to remove itself from a cluster // without first giving up its lease. - mtc.replicateRange(leftRangeID, 1, 2, 3) - mtc.transferLease(context.Background(), leftRangeID, 0, 1) - mtc.unreplicateRange(leftRangeID, 0) + desc := tc.AddVotersOrFatal(t, lhsRepl.Desc().StartKey.AsRawKey(), tc.Targets(1, 2, 3)...) + tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1)) + tc.RemoveVotersOrFatal(t, lhsRepl.Desc().StartKey.AsRawKey(), tc.Target(0)) - mtc.waitForValues(leftKey, []int64{0, 1, 1, 1, 0, 0}) - mtc.waitForValues(rightKey, []int64{0, 2, 2, 2, 0, 0}) + tc.WaitForValues(t, leftKey, []int64{0, 1, 1, 1, 0, 0}) + tc.WaitForValues(t, rightKey, []int64{0, 2, 2, 2, 0, 0}) // Stop node 3 so it doesn't hear about the split. - mtc.stopStore(3) - mtc.advanceClock(context.Background()) + tc.StopServer(3) // Split the data range. splitArgs = adminSplitArgs(roachpb.Key("m")) - if _, pErr := kv.SendWrapped(context.Background(), mtc.distSenders[0], splitArgs); pErr != nil { + if _, pErr := kv.SendWrapped(context.Background(), tc.Servers[0].DistSender(), splitArgs); pErr != nil { t.Fatal(pErr) } // Get the right range's ID. Since the split was performed on node // 1, it is currently 11 and not 3 as might be expected. - var rightRangeID roachpb.RangeID + var rhsRepl *kvserver.Replica testutils.SucceedsSoon(t, func() error { - rightRangeID = mtc.stores[1].LookupReplica(roachpb.RKey("z")).RangeID - if rightRangeID == leftRangeID { + rhsRepl = tc.GetFirstStoreFromServer(t, 1).LookupReplica(roachpb.RKey("z")) + if rhsRepl.RangeID == lhsRepl.RangeID { return errors.Errorf("store 1 hasn't processed split yet") } return nil }) // Relocate the right range onto nodes 3-5. - mtc.replicateRange(rightRangeID, 4, 5) - mtc.unreplicateRange(rightRangeID, 2) - mtc.transferLease(context.Background(), rightRangeID, 1, 4) - mtc.unreplicateRange(rightRangeID, 1) + tc.AddVotersOrFatal(t, rhsRepl.Desc().StartKey.AsRawKey(), tc.Targets(4, 5)...) + tc.RemoveVotersOrFatal(t, rhsRepl.Desc().StartKey.AsRawKey(), tc.Target(2)) + tc.TransferRangeLeaseOrFatal(t, *rhsRepl.Desc(), tc.Target(4)) + tc.RemoveVotersOrFatal(t, rhsRepl.Desc().StartKey.AsRawKey(), tc.Target(1)) // Perform another increment after all the replication changes. This // lets us ensure that all the replication changes have been @@ -1592,15 +1613,15 @@ func runSetupSplitSnapshotRace( // failure and render the range unable to achieve quorum after // restart (in the SnapshotWins branch). incArgs = incrementArgs(rightKey, 3) - if _, pErr := kv.SendWrapped(context.Background(), mtc.distSenders[0], incArgs); pErr != nil { + if _, pErr := kv.SendWrapped(context.Background(), tc.Servers[0].DistSender(), incArgs); pErr != nil { t.Fatal(pErr) } // Store 3 still has the old value, but 4 and 5 are up to date. - mtc.waitForValues(rightKey, []int64{0, 0, 0, 2, 5, 5}) + tc.WaitForValues(t, rightKey, []int64{0, 0, 0, 2, 5, 5}) // Scan the meta ranges to resolve all intents - if _, pErr := kv.SendWrapped(context.Background(), mtc.distSenders[0], + if _, pErr := kv.SendWrapped(context.Background(), tc.Servers[0].DistSender(), &roachpb.ScanRequest{ RequestHeader: roachpb.RequestHeader{ Key: keys.MetaMin, @@ -1611,13 +1632,13 @@ func runSetupSplitSnapshotRace( } // Stop the remaining data stores. - mtc.stopStore(1) - mtc.stopStore(2) + tc.StopServer(1) + tc.StopServer(2) // 3 is already stopped. - mtc.stopStore(4) - mtc.stopStore(5) + tc.StopServer(4) + tc.StopServer(5) - testFn(mtc, leftKey, rightKey) + testFn(tc, leftKey, rightKey) } // TestSplitSnapshotRace_SplitWins exercises one outcome of the @@ -1627,29 +1648,33 @@ func runSetupSplitSnapshotRace( func TestSplitSnapshotRace_SplitWins(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - runSetupSplitSnapshotRace(t, func(mtc *multiTestContext, leftKey, rightKey roachpb.Key) { + + stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() + defer stickyEngineRegistry.CloseAllStickyInMemEngines() + + runSetupSplitSnapshotRace(t, stickyEngineRegistry, func(tc *testcluster.TestCluster, leftKey, rightKey roachpb.Key) { // Bring the left range up first so that the split happens before it sees a snapshot. for i := 1; i <= 3; i++ { - mtc.restartStore(i) + require.NoError(t, tc.RestartServer(i)) } // Perform a write on the left range and wait for it to propagate. incArgs := incrementArgs(leftKey, 10) - if _, pErr := kv.SendWrapped(context.Background(), mtc.distSenders[0], incArgs); pErr != nil { + if _, pErr := kv.SendWrapped(context.Background(), tc.Servers[0].DistSender(), incArgs); pErr != nil { t.Fatal(pErr) } - mtc.waitForValues(leftKey, []int64{0, 11, 11, 11, 0, 0}) + tc.WaitForValues(t, leftKey, []int64{0, 11, 11, 11, 0, 0}) // Now wake the other stores up. - mtc.restartStore(4) - mtc.restartStore(5) + require.NoError(t, tc.RestartServer(4)) + require.NoError(t, tc.RestartServer(5)) // Write to the right range. incArgs = incrementArgs(rightKey, 20) - if _, pErr := kv.SendWrapped(context.Background(), mtc.distSenders[0], incArgs); pErr != nil { + if _, pErr := kv.SendWrapped(context.Background(), tc.Servers[0].DistSender(), incArgs); pErr != nil { t.Fatal(pErr) } - mtc.waitForValues(rightKey, []int64{0, 0, 0, 25, 25, 25}) + tc.WaitForValues(t, rightKey, []int64{0, 0, 0, 25, 25, 25}) }) } @@ -1660,15 +1685,19 @@ func TestSplitSnapshotRace_SplitWins(t *testing.T) { func TestSplitSnapshotRace_SnapshotWins(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - runSetupSplitSnapshotRace(t, func(mtc *multiTestContext, leftKey, rightKey roachpb.Key) { + + stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() + defer stickyEngineRegistry.CloseAllStickyInMemEngines() + + runSetupSplitSnapshotRace(t, stickyEngineRegistry, func(tc *testcluster.TestCluster, leftKey, rightKey roachpb.Key) { // Bring the right range up first. for i := 3; i <= 5; i++ { - mtc.restartStore(i) + require.NoError(t, tc.RestartServer(i)) } // Perform a write on the right range. incArgs := incrementArgs(rightKey, 20) - if _, pErr := kv.SendWrapped(context.Background(), mtc.distSenders[0], incArgs); pErr != nil { + if _, pErr := kv.SendWrapped(context.Background(), tc.Servers[0].DistSender(), incArgs); pErr != nil { t.Fatal(pErr) } @@ -1681,27 +1710,27 @@ func TestSplitSnapshotRace_SnapshotWins(t *testing.T) { // for. There is a high probability that the message will have been // received by the time that nodes 4 and 5 have processed their // update. - mtc.waitForValues(rightKey, []int64{0, 0, 0, 2, 25, 25}) + tc.WaitForValues(t, rightKey, []int64{0, 0, 0, 2, 25, 25}) // Wake up the left-hand range. This will allow the left-hand // range's split to complete and unblock the right-hand range. - mtc.restartStore(1) - mtc.restartStore(2) + require.NoError(t, tc.RestartServer(1)) + require.NoError(t, tc.RestartServer(2)) // Perform writes on both sides. This is not strictly necessary but // it helps wake up dormant ranges that would otherwise have to wait // for retry timeouts. incArgs = incrementArgs(leftKey, 10) - if _, pErr := kv.SendWrapped(context.Background(), mtc.distSenders[0], incArgs); pErr != nil { + if _, pErr := kv.SendWrapped(context.Background(), tc.Servers[0].DistSender(), incArgs); pErr != nil { t.Fatal(pErr) } - mtc.waitForValues(leftKey, []int64{0, 11, 11, 11, 0, 0}) + tc.WaitForValues(t, leftKey, []int64{0, 11, 11, 11, 0, 0}) incArgs = incrementArgs(rightKey, 200) - if _, pErr := kv.SendWrapped(context.Background(), mtc.distSenders[0], incArgs); pErr != nil { + if _, pErr := kv.SendWrapped(context.Background(), tc.Servers[0].DistSender(), incArgs); pErr != nil { t.Fatal(pErr) } - mtc.waitForValues(rightKey, []int64{0, 0, 0, 225, 225, 225}) + tc.WaitForValues(t, rightKey, []int64{0, 0, 0, 225, 225, 225}) }) } @@ -2566,24 +2595,36 @@ func TestUnsplittableRange(t *testing.T) { ctx := context.Background() ttl := 1 * time.Hour const maxBytes = 1 << 16 - - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - - manual := hlc.NewManualClock(123) - splitQueuePurgatoryChan := make(chan time.Time, 1) - cfg := kvserver.TestStoreConfig(hlc.NewClock(manual.UnixNano, time.Nanosecond)) - cfg.DefaultZoneConfig.RangeMaxBytes = proto.Int64(maxBytes) - cfg.DefaultZoneConfig.GC = &zonepb.GCPolicy{ + manualClock := hlc.NewHybridManualClock() + zoneConfig := zonepb.DefaultZoneConfig() + zoneConfig.RangeMaxBytes = proto.Int64(maxBytes) + zoneConfig.GC = &zonepb.GCPolicy{ TTLSeconds: int32(ttl.Seconds()), } - cfg.DefaultSystemZoneConfig.RangeMaxBytes = proto.Int64(maxBytes) - cfg.DefaultSystemZoneConfig.GC = &zonepb.GCPolicy{ + zoneSystemConfig := zonepb.DefaultSystemZoneConfig() + zoneSystemConfig.RangeMaxBytes = proto.Int64(maxBytes) + zoneSystemConfig.GC = &zonepb.GCPolicy{ TTLSeconds: int32(ttl.Seconds()), } - cfg.TestingKnobs.SplitQueuePurgatoryChan = splitQueuePurgatoryChan - cfg.TestingKnobs.DisableMergeQueue = true - store := createTestStoreWithConfig(t, stopper, cfg) + splitQueuePurgatoryChan := make(chan time.Time, 1) + + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + SplitQueuePurgatoryChan: splitQueuePurgatoryChan, + }, + Server: &server.TestingKnobs{ + ClockSource: manualClock.UnixNano, + DefaultZoneConfigOverride: &zoneConfig, + DefaultSystemZoneConfigOverride: &zoneSystemConfig, + }, + }, + }) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) // Add a single large row to /Table/14. tableKey := roachpb.RKey(keys.SystemSQLCodec.TablePrefix(keys.UITableID)) @@ -2597,13 +2638,12 @@ func TestUnsplittableRange(t *testing.T) { // Wait for half of the ttl and add another large value in the same row. // Together, these two values bump the range over the max range size. - manual.Increment(ttl.Nanoseconds() / 2) + manualClock.Increment(ttl.Nanoseconds() / 2) value2Len := 0.2 * maxBytes value2 := bytes.Repeat([]byte("y"), int(value2Len)) if err := store.DB().Put(ctx, col1Key, value2); err != nil { t.Fatal(err) } - // Ensure that an attempt to split the range will hit an // unsplittableRangeError and place the range in purgatory. if err := store.ForceSplitScanAndProcess(); err != nil { @@ -2621,10 +2661,12 @@ func TestUnsplittableRange(t *testing.T) { } // Wait for much longer than the ttl to accumulate GCByteAge. - manual.Increment(10 * ttl.Nanoseconds()) + manualClock.Increment(10 * ttl.Nanoseconds()) // Trigger the GC queue, which should clean up the earlier version of the // row. Once the first version of the row is cleaned up, the range should - // exit the split queue purgatory. + // exit the split queue purgatory. We need to tickle the protected timestamp + // subsystem to release a timestamp at which we get to actually remove the data. + require.NoError(t, store.GetStoreConfig().ProtectedTimestampCache.Refresh(ctx, s.Clock().Now())) repl := store.LookupReplica(tableKey) if err := store.ManualGC(repl); err != nil { t.Fatal(err) @@ -2770,20 +2812,37 @@ func TestTxnWaitQueueDependencyCycleWithRangeSplit(t *testing.T) { func TestStoreCapacityAfterSplit(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) - manualClock := hlc.NewManualClock(123) - cfg := kvserver.TestStoreConfig(hlc.NewClock(manualClock.UnixNano, time.Nanosecond)) - cfg.TestingKnobs.DisableSplitQueue = true - cfg.TestingKnobs.DisableMergeQueue = true - s := createTestStoreWithOpts( - t, - testStoreOpts{ - // This test was written before the test stores were able to start with - // more than one range and is not prepared to handle many ranges. - dontCreateSystemRanges: true, - cfg: &cfg}, - stopper) + + ctx := context.Background() + manualClock := hlc.NewHybridManualClock() + tc := testcluster.StartTestCluster(t, 2, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ClockSource: manualClock.UnixNano, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + // We conduct the test on the second server, because we can keep it clean + // and control exactly which ranges end up on it. + s := tc.GetFirstStoreFromServer(t, 1) + key := tc.ScratchRange(t) + desc := tc.AddVotersOrFatal(t, key, tc.Target(1)) + tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1)) + testutils.SucceedsSoon(t, func() error { + repl, err := tc.GetFirstStoreFromServer(t, 1).GetReplica(desc.RangeID) + if err != nil { + return err + } + if !repl.OwnsValidLease(ctx, tc.Servers[1].Clock().NowAsClockTimestamp()) { + return errors.New("s2 does not own valid lease for this range") + } + return nil + }) cap, err := s.Capacity(context.Background(), false /* useCached */) if err != nil { @@ -2793,8 +2852,8 @@ func TestStoreCapacityAfterSplit(t *testing.T) { t.Errorf("expected cap.RangeCount=%d, got %d", e, a) } bpr1 := cap.BytesPerReplica - if bpr1.P10 <= 0 { - t.Errorf("expected all bytes-per-replica to be positive, got %+v", bpr1) + if bpr1.P10 != 0 { + t.Errorf("expected all bytes-per-replica to be 0, got %+v", bpr1) } if bpr1.P10 != bpr1.P25 || bpr1.P10 != bpr1.P50 || bpr1.P10 != bpr1.P75 || bpr1.P10 != bpr1.P90 { t.Errorf("expected all bytes-per-replica percentiles to be identical, got %+v", bpr1) @@ -2806,11 +2865,11 @@ func TestStoreCapacityAfterSplit(t *testing.T) { // Increment the manual clock and do a write to increase the qps above zero. manualClock.Increment(int64(kvserver.MinStatsDuration)) - key := roachpb.Key("a") - pArgs := putArgs(key, []byte("aaa")) + pArgs := incrementArgs(key, 10) if _, pErr := kv.SendWrapped(context.Background(), s.TestSender(), pArgs); pErr != nil { t.Fatal(pErr) } + tc.WaitForValues(t, key, []int64{10, 10}) cap, err = s.Capacity(context.Background(), false /* useCached */) if err != nil { @@ -2845,7 +2904,7 @@ func TestStoreCapacityAfterSplit(t *testing.T) { } // Split the range to verify stats work properly with more than one range. - sArgs := adminSplitArgs(key) + sArgs := adminSplitArgs(key.Next().Next()) if _, pErr := kv.SendWrapped(context.Background(), s.TestSender(), sArgs); pErr != nil { t.Fatal(pErr) } diff --git a/pkg/kv/kvserver/client_status_test.go b/pkg/kv/kvserver/client_status_test.go index de994011e8ee..00607a04fb34 100644 --- a/pkg/kv/kvserver/client_status_test.go +++ b/pkg/kv/kvserver/client_status_test.go @@ -14,38 +14,37 @@ import ( "context" "testing" - "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" ) func TestComputeStatsForKeySpan(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - mtc := &multiTestContext{} - defer mtc.Stop() - mtc.Start(t, 3) + ctx := context.Background() + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{}) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) // Create a number of ranges using splits. splitKeys := []string{"a", "c", "e", "g", "i"} for _, k := range splitKeys { - key := roachpb.Key(k) - repl := mtc.stores[0].LookupReplica(roachpb.RKey(key)) - args := adminSplitArgs(key) - header := roachpb.Header{ - RangeID: repl.RangeID, - } - if _, err := kv.SendWrappedWith(context.Background(), mtc.stores[0], header, args); err != nil { - t.Fatal(err) - } + _, _, err := s.SplitRange(roachpb.Key(k)) + require.NoError(t, err) } // Wait for splits to finish. testutils.SucceedsSoon(t, func() error { - repl := mtc.stores[0].LookupReplica(roachpb.RKey("z")) + repl := store.LookupReplica(roachpb.RKey("z")) if actualRSpan := repl.Desc().RSpan(); !actualRSpan.Key.Equal(roachpb.RKey("i")) { return errors.Errorf("expected range %s to begin at key 'i'", repl) } @@ -55,7 +54,7 @@ func TestComputeStatsForKeySpan(t *testing.T) { // Create some keys across the ranges. incKeys := []string{"b", "bb", "bbb", "d", "dd", "h"} for _, k := range incKeys { - if _, err := mtc.dbs[0].Inc(context.Background(), []byte(k), 5); err != nil { + if _, err := store.DB().Inc(context.Background(), []byte(k), 5); err != nil { t.Fatal(err) } } @@ -73,7 +72,7 @@ func TestComputeStatsForKeySpan(t *testing.T) { {"e", "i", 2, 1}, } { start, end := tcase.startKey, tcase.endKey - result, err := mtc.stores[0].ComputeStatsForKeySpan( + result, err := store.ComputeStatsForKeySpan( roachpb.RKey(start), roachpb.RKey(end)) if err != nil { t.Fatal(err) diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index 8d3331c332d1..224c87e00026 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -498,54 +498,6 @@ func (m *multiTestContext) Stop() { } } -// gossipStores forces each store to gossip its store descriptor and then -// blocks until all nodes have received these updated descriptors. -func (m *multiTestContext) gossipStores() { - timestamps := make(map[string]int64) - for i := 0; i < len(m.stores); i++ { - <-m.gossips[i].Connected - if err := m.stores[i].GossipStore(context.Background(), false /* useCached */); err != nil { - m.t.Fatal(err) - } - infoStatus := m.gossips[i].GetInfoStatus() - storeKey := gossip.MakeStoreKey(m.stores[i].Ident.StoreID) - timestamps[storeKey] = infoStatus.Infos[storeKey].OrigStamp - } - // Wait until all stores know about each other. - testutils.SucceedsSoon(m.t, func() error { - for i := 0; i < len(m.stores); i++ { - nodeID := m.stores[i].Ident.NodeID - infoStatus := m.gossips[i].GetInfoStatus() - for storeKey, timestamp := range timestamps { - info, ok := infoStatus.Infos[storeKey] - if !ok { - return errors.Errorf("node %d does not have a storeDesc for %s yet", nodeID, storeKey) - } - if info.OrigStamp < timestamp { - return errors.Errorf("node %d's storeDesc for %s is not up to date", nodeID, storeKey) - } - } - } - return nil - }) -} - -// initGossipNetwork gossips all store descriptors and waits until all -// storePools have received those descriptors. -func (m *multiTestContext) initGossipNetwork() { - m.gossipStores() - testutils.SucceedsSoon(m.t, func() error { - for i := 0; i < len(m.stores); i++ { - if _, alive, _ := m.storePools[i].GetStoreList(); alive != len(m.stores) { - return errors.Errorf("node %d's store pool only has %d alive stores, expected %d", - m.stores[i].Ident.NodeID, alive, len(m.stores)) - } - } - return nil - }) - log.Info(context.Background(), "gossip network initialized") -} - type multiTestContextKVTransport struct { mtc *multiTestContext idx int diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index a71b4f0c981e..843a1456f331 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -147,7 +147,7 @@ func (s *Store) SetSplitQueueActive(active bool) { s.setSplitQueueActive(active) } -// SetMergeQueueActive enables or disables the split queue. +// SetMergeQueueActive enables or disables the merge queue. func (s *Store) SetMergeQueueActive(active bool) { s.setMergeQueueActive(active) } diff --git a/pkg/kv/kvserver/protectedts/ptprovider/provider.go b/pkg/kv/kvserver/protectedts/ptprovider/provider.go index 25423b5e37ef..3137b2ad3412 100644 --- a/pkg/kv/kvserver/protectedts/ptprovider/provider.go +++ b/pkg/kv/kvserver/protectedts/ptprovider/provider.go @@ -50,14 +50,13 @@ func New(cfg Config) (protectedts.Provider, error) { } storage := ptstorage.New(cfg.Settings, cfg.InternalExecutor) verifier := ptverifier.New(cfg.DB, storage) - cache := ptcache.New(ptcache.Config{ - DB: cfg.DB, - Storage: storage, - Settings: cfg.Settings, - }) return &provider{ - Storage: storage, - Cache: cache, + Storage: storage, + Cache: ptcache.New(ptcache.Config{ + DB: cfg.DB, + Storage: storage, + Settings: cfg.Settings, + }), Verifier: verifier, }, nil } @@ -76,5 +75,8 @@ func validateConfig(cfg Config) error { } func (p *provider) Start(ctx context.Context, stopper *stop.Stopper) error { - return p.Cache.(*ptcache.Cache).Start(ctx, stopper) + if cache, ok := p.Cache.(*ptcache.Cache); ok { + return cache.Start(ctx, stopper) + } + return nil } diff --git a/pkg/server/server.go b/pkg/server/server.go index 316d3d0943ee..56b154925dfb 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -486,7 +486,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { user security.SQLUsername) (cloud.ExternalStorage, error) { return externalStorageBuilder.makeExternalStorageFromURI(ctx, uri, user) } - protectedtsProvider, err := ptprovider.New(ptprovider.Config{ DB: db, InternalExecutor: internalExecutor, diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 86929babea67..d4ec668657e9 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -1386,16 +1386,26 @@ func (ts *TestServer) ScratchRange() (roachpb.Key, error) { return desc.StartKey.AsRawKey(), nil } -// ScratchRangeWithExpirationLease is like ScratchRange but creates a range with -// an expiration based lease. +// ScratchRangeWithExpirationLease is like ScratchRangeWithExpirationLeaseEx but +// returns a key for the RHS ranges, instead of both descriptors from the split. func (ts *TestServer) ScratchRangeWithExpirationLease() (roachpb.Key, error) { - scratchKey := roachpb.Key(bytes.Join([][]byte{keys.SystemPrefix, - roachpb.RKey("\x00aaa-testing")}, nil)) - _, _, err := ts.SplitRange(scratchKey) + _, desc, err := ts.ScratchRangeWithExpirationLeaseEx() if err != nil { return nil, err } - return scratchKey, nil + return desc.StartKey.AsRawKey(), nil +} + +// ScratchRangeWithExpirationLeaseEx is like ScratchRange but creates a range with +// an expiration based lease. +func (ts *TestServer) ScratchRangeWithExpirationLeaseEx() ( + roachpb.RangeDescriptor, + roachpb.RangeDescriptor, + error, +) { + scratchKey := roachpb.Key(bytes.Join([][]byte{keys.SystemPrefix, + roachpb.RKey("\x00aaa-testing")}, nil)) + return ts.SplitRange(scratchKey) } // MetricsRecorder periodically records node-level and store-level metrics. diff --git a/pkg/testutils/testcluster/BUILD.bazel b/pkg/testutils/testcluster/BUILD.bazel index 6206d13a1873..a004bd8d5ef2 100644 --- a/pkg/testutils/testcluster/BUILD.bazel +++ b/pkg/testutils/testcluster/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "//pkg/gossip", "//pkg/keys", "//pkg/kv/kvserver", + "//pkg/kv/kvserver/liveness", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/roachpb", "//pkg/rpc", diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index a9613d7635b0..71d87934ba1c 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" @@ -1065,8 +1066,8 @@ func (tc *TestCluster) ToggleReplicateQueues(active bool) { // from all configured engines, filling in zeros when the value is not // found. func (tc *TestCluster) readIntFromStores(key roachpb.Key) []int64 { - results := make([]int64, 0, len(tc.Servers)) - for _, server := range tc.Servers { + results := make([]int64, len(tc.Servers)) + for i, server := range tc.Servers { err := server.Stores().VisitStores(func(s *kvserver.Store) error { val, _, err := storage.MVCCGet(context.Background(), s.Engine(), key, server.Clock().Now(), storage.MVCCGetOptions{}) @@ -1075,11 +1076,10 @@ func (tc *TestCluster) readIntFromStores(key roachpb.Key) []int64 { } else if val == nil { log.VEventf(context.Background(), 1, "store %d: missing key %s", s.StoreID(), key) } else { - result, err := val.GetInt() + results[i], err = val.GetInt() if err != nil { log.Errorf(context.Background(), "store %d: error decoding %s from key %s: %+v", s.StoreID(), val, key, err) } - results = append(results, result) } return nil }) @@ -1265,6 +1265,22 @@ func (tc *TestCluster) GetStatusClient( return serverpb.NewStatusClient(cc) } +// HeartbeatLiveness sends a liveness heartbeat on a particular store. +func (tc *TestCluster) HeartbeatLiveness(ctx context.Context, storeIdx int) error { + nl := tc.Servers[storeIdx].NodeLiveness().(*liveness.NodeLiveness) + l, ok := nl.Self() + if !ok { + return errors.New("liveness not found") + } + var err error + for r := retry.StartWithCtx(ctx, retry.Options{MaxRetries: 5}); r.Next(); { + if err = nl.Heartbeat(ctx, l); !errors.Is(err, liveness.ErrEpochIncremented) { + break + } + } + return err +} + type testClusterFactoryImpl struct{} // TestClusterFactory can be passed to serverutils.InitTestClusterFactory