diff --git a/pkg/kv/kvserver/client_lease_test.go b/pkg/kv/kvserver/client_lease_test.go index dfdcde67809d..b782f7c04201 100644 --- a/pkg/kv/kvserver/client_lease_test.go +++ b/pkg/kv/kvserver/client_lease_test.go @@ -16,7 +16,6 @@ import ( "math" "runtime" "strconv" - "strings" "sync" "sync/atomic" "testing" @@ -909,15 +908,24 @@ func gossipLiveness(t *testing.T, tc *testcluster.TestCluster) { } // This test replicates the behavior observed in -// https://github.com/cockroachdb/cockroach/issues/62485. We verify that -// when a dc with the leaseholder is lost, a node in a dc that does not have the -// lease preference can steal the lease, upreplicate the range and then give up the -// lease in a single cycle of the replicate_queue. +// https://github.com/cockroachdb/cockroach/issues/62485. We verify that when a +// dc with the leaseholder is lost, a node in a dc that does not have the lease +// preference, can steal the lease, upreplicate the range and then give up the +// lease in a short period of time. Previously, the replicate queue would +// reprocess, instead of requeue replicas. This behavior changed in #85219, to +// prevent queue priority inversion. Subsequently, this test only asserts that +// the lease preferences are satisfied quickly, rather than in a single +// replicate queue process() call. func TestLeasePreferencesDuringOutage(t *testing.T) { defer leaktest.AfterTest(t)() - skip.WithIssue(t, 88769, "flaky test") defer log.Scope(t).Close(t) + // This is a hefty test, so we skip it under short. + skip.UnderShort(t) + // The test has 5 nodes. Its possible in stress-race for nodes to be starved + // out heartbeating their liveness. + skip.UnderStressRace(t) + stickyRegistry := server.NewStickyVFSRegistry() ctx := context.Background() manualClock := hlc.NewHybridManualClock() @@ -947,15 +955,34 @@ func TestLeasePreferencesDuringOutage(t *testing.T) { locality("us", "mi"), locality("us", "mi"), } - // Disable expiration based lease transfers. It is possible that a (pseudo) - // dead node acquires the lease and we are forced to wait out the expiration - // timer, if this were not set. + + // This test disables the replicate queue. We wish to enable the replicate + // queue only for range we are testing, after marking some servers as dead. + // We also wait until the expected live stores are considered live from n1, + // if we didn't do this it would be possible for the range to process and be + // seen as unavailable due to manual clock jumps. + var testRangeID int64 + var clockJumpMu syncutil.Mutex + atomic.StoreInt64(&testRangeID, -1) + disabledQueueBypassFn := func(rangeID roachpb.RangeID) bool { + if rangeID == roachpb.RangeID(atomic.LoadInt64(&testRangeID)) { + clockJumpMu.Lock() + defer clockJumpMu.Unlock() + return true + } + return false + } settings := cluster.MakeTestingClusterSettings() sv := &settings.SV - kvserver.TransferExpirationLeasesFirstEnabled.Override(ctx, sv, false) - kvserver.ExpirationLeasesOnly.Override(ctx, sv, false) + // The remaining live stores (n1,n4,n5) may become suspect due to manual + // clock jumps. Disable the suspect timer to prevent them becoming suspect + // when we bump the clocks. + liveness.TimeAfterNodeSuspect.Override(ctx, sv, 0) + timeUntilNodeDead := liveness.TimeUntilNodeDead.Get(sv) + for i := 0; i < numNodes; i++ { serverArgs[i] = base.TestServerArgs{ + Settings: settings, Locality: localities[i], Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ @@ -967,6 +994,7 @@ func TestLeasePreferencesDuringOutage(t *testing.T) { // The Raft leadership may not end up on the eu node, but it needs to // be able to acquire the lease anyway. AllowLeaseRequestProposalsWhenNotLeader: true, + BaseQueueDisabledBypassFilter: disabledQueueBypassFn, }, }, StoreSpecs: []base.StoreSpec{ @@ -992,105 +1020,106 @@ func TestLeasePreferencesDuringOutage(t *testing.T) { require.NoError(t, tc.WaitForVoters(key, tc.Targets(1, 3)...)) tc.TransferRangeLeaseOrFatal(t, *repl.Desc(), tc.Target(1)) - // Shutdown the sf datacenter, which is going to kill the node with the lease. - tc.StopServer(1) - tc.StopServer(2) - - wait := func(duration time.Duration) { - manualClock.Increment(duration.Nanoseconds()) - // Gossip and heartbeat all the live stores, we do this manually otherwise the - // allocator on server 0 may see everyone as temporarily dead due to the - // clock move above. - for _, i := range []int{0, 3, 4} { - require.NoError(t, tc.Servers[i].HeartbeatNodeLiveness()) - require.NoError(t, tc.GetFirstStoreFromServer(t, i).GossipStore(ctx, true)) + func() { + // Lock the clockJumpMu, in order to prevent processing the test range before + // the intended stores are considered live from n1. If we didn't do this, it + // is possible for n1 to process the test range and find it unavailable + // (unactionable). + clockJumpMu.Lock() + defer clockJumpMu.Unlock() + + // Enable queue processing of the test range, right before we stop the sf + // datacenter. We expect the test range to be enqueued into the replicate + // queue shortly after. + rangeID := repl.GetRangeID() + atomic.StoreInt64(&testRangeID, int64(rangeID)) + + // Shutdown the sf datacenter, which is going to kill the node with the lease. + tc.StopServer(1) + tc.StopServer(2) + + wait := func(duration time.Duration) { + manualClock.Increment(duration.Nanoseconds()) + // Gossip and heartbeat all the live stores, we do this manually otherwise the + // allocator on server 0 may see everyone as temporarily dead due to the + // clock move above. + for _, i := range []int{0, 3, 4} { + require.NoError(t, tc.Servers[i].HeartbeatNodeLiveness()) + require.NoError(t, tc.GetFirstStoreFromServer(t, i).GossipStore(ctx, true)) + } } - } - // We need to wait until 2 and 3 are considered to be dead. - timeUntilNodeDead := liveness.TimeUntilNodeDead.Get(&tc.GetFirstStoreFromServer(t, 0).GetStoreConfig().Settings.SV) - wait(timeUntilNodeDead) - - checkDead := func(store *kvserver.Store, storeIdx int) error { - if dead, timetoDie, err := store.GetStoreConfig().StorePool.IsDead( - tc.GetFirstStoreFromServer(t, storeIdx).StoreID()); err != nil || !dead { - // Sometimes a gossip update arrives right after server shutdown and - // after we manually moved the time, so move it again. - if err == nil { - wait(timetoDie) + wait(timeUntilNodeDead) + + checkDead := func(store *kvserver.Store, storeIdx int) error { + if dead, timetoDie, err := store.GetStoreConfig().StorePool.IsDead( + tc.GetFirstStoreFromServer(t, storeIdx).StoreID()); err != nil || !dead { + // Sometimes a gossip update arrives right after server shutdown and + // after we manually moved the time, so move it again. + if err == nil { + wait(timetoDie) + } + // NB: errors.Wrapf(nil, ...) returns nil. + // nolint:errwrap + return errors.Errorf("expected server %d to be dead, instead err=%v, dead=%v", storeIdx, err, dead) } - // NB: errors.Wrapf(nil, ...) returns nil. - // nolint:errwrap - return errors.Errorf("expected server %d to be dead, instead err=%v, dead=%v", storeIdx, err, dead) + return nil } - return nil - } + + testutils.SucceedsSoon(t, func() error { + store := tc.GetFirstStoreFromServer(t, 0) + sl, available, _ := store.GetStoreConfig().StorePool.TestingGetStoreList() + if available != 3 { + return errors.Errorf( + "expected all 3 remaining stores to be live, but only got %d, stores=%v", + available, sl) + } + if err := checkDead(store, 1); err != nil { + return err + } + if err := checkDead(store, 2); err != nil { + return err + } + return nil + }) + }() + + // Send a request to force lease acquisition on _some_ remaining live node. + // Note, we expect this to be n1 (server 0). + ba := &kvpb.BatchRequest{} + ba.Add(getArgs(key)) + _, pErr := tc.Servers[0].DistSenderI().(kv.Sender).Send(ctx, ba) + require.Nil(t, pErr) testutils.SucceedsSoon(t, func() error { - store := tc.GetFirstStoreFromServer(t, 0) - sl, _, _ := store.GetStoreConfig().StorePool.TestingGetStoreList() - if len(sl.TestingStores()) != 3 { - return errors.Errorf("expected all 3 remaining stores to be live, but only got %v", - sl.TestingStores()) - } - if err := checkDead(store, 1); err != nil { - return err + // Validate that we upreplicated outside of SF. NB: This will occur prior + // to the lease preference being satisfied. + require.Equal(t, 3, len(repl.Desc().Replicas().Voters().VoterDescriptors())) + for _, replDesc := range repl.Desc().Replicas().Voters().VoterDescriptors() { + serv, err := tc.FindMemberServer(replDesc.StoreID) + require.NoError(t, err) + servLocality := serv.Locality() + dc, ok := servLocality.Find("dc") + require.True(t, ok) + if dc == "sf" { + return errors.Errorf( + "expected no replicas in dc=sf, but found replica in "+ + "dc=%s node_id=%v desc=%v", + dc, replDesc.NodeID, repl.Desc()) + } } - if err := checkDead(store, 2); err != nil { - return err + // Validate that the lease also transferred to a preferred locality. n4 + // (us) and n5 (us) are the only valid stores to be leaseholders during the + // outage. n1 is the original leaseholder, expect it to not be the + // leaseholder now. + if !repl.OwnsValidLease(ctx, tc.Servers[0].Clock().NowAsClockTimestamp()) { + return nil } - return nil - }) - _, _, enqueueError := tc.GetFirstStoreFromServer(t, 0). - Enqueue(ctx, "replicate", repl, true /* skipShouldQueue */, false /* async */) - require.NoError(t, enqueueError, "failed to enqueue replica for replication") - - var newLeaseHolder roachpb.ReplicationTarget - testutils.SucceedsSoon(t, func() error { - var err error - newLeaseHolder, err = tc.FindRangeLeaseHolder(*repl.Desc(), nil) - return err + return errors.Errorf( + "expected no leaseholder in region=us, but found %v", + repl.CurrentLeaseStatus(ctx), + ) }) - - srv, err := tc.FindMemberServer(newLeaseHolder.StoreID) - require.NoError(t, err) - loc := srv.Locality() - region, ok := loc.Find("region") - require.True(t, ok) - require.Equal(t, "us", region) - require.Equal(t, 3, len(repl.Desc().Replicas().Voters().VoterDescriptors())) - // Validate that we upreplicated outside of SF. - for _, replDesc := range repl.Desc().Replicas().Voters().VoterDescriptors() { - serv, err := tc.FindMemberServer(replDesc.StoreID) - require.NoError(t, err) - memberLoc := serv.Locality() - dc, ok := memberLoc.Find("dc") - require.True(t, ok) - require.NotEqual(t, "sf", dc) - } - history := repl.GetLeaseHistory() - // Make sure we see the eu node as a lease holder in the second to last - // leaseholder change. - // Since we can have expiration and epoch based leases at the tail of the - // history, we need to ignore them together if they originate from the same - // leaseholder. - nextNodeID := history[len(history)-1].Replica.NodeID - lastMove := len(history) - 2 - for ; lastMove >= 0; lastMove-- { - if history[lastMove].Replica.NodeID != nextNodeID { - break - } - } - lastMove++ - var leasesMsg []string - for _, h := range history { - leasesMsg = append(leasesMsg, h.String()) - } - leaseHistory := strings.Join(leasesMsg, ", ") - require.Greater(t, lastMove, 0, - "must have at least one leaseholder change in history (lease history: %s)", leaseHistory) - require.Equal(t, tc.Target(0).NodeID, history[lastMove-1].Replica.NodeID, - "node id prior to last lease move (lease history: %s)", leaseHistory) } // This test verifies that when a node starts flapping its liveness, all leases diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index c3ba4a1fe672..5c7234024ae9 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -655,13 +655,24 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc. } bq.mu.Lock() - stopped := bq.mu.stopped || bq.mu.disabled + stopped := bq.mu.stopped + disabled := bq.mu.disabled bq.mu.Unlock() if stopped { return } + if disabled { + // The disabled queue bypass is used in tests which enable manual + // replication, however still require specific range(s) to be processed + // through the queue. + bypassDisabled := bq.store.TestingKnobs().BaseQueueDisabledBypassFilter + if bypassDisabled == nil || !bypassDisabled(repl.GetRangeID()) { + return + } + } + if !repl.IsInitialized() { return } @@ -729,10 +740,16 @@ func (bq *baseQueue) addInternal( } if bq.mu.disabled { - if log.V(3) { - log.Infof(ctx, "queue disabled") + // The disabled queue bypass is used in tests which enable manual + // replication, however still require specific range(s) to be processed + // through the queue. + bypassDisabled := bq.store.TestingKnobs().BaseQueueDisabledBypassFilter + if bypassDisabled == nil || !bypassDisabled(desc.RangeID) { + if log.V(3) { + log.Infof(ctx, "queue disabled") + } + return false, errQueueDisabled } - return false, errQueueDisabled } // If the replica is currently in purgatory, don't re-add it. diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index bcc9c98210ab..e735de1f61a0 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -493,6 +493,10 @@ type StoreTestingKnobs struct { // required. BaseQueueInterceptor func(ctx context.Context, bq *baseQueue) + // BaseQueueDisabledBypassFilter checks whether the replica for the given + // rangeID should ignore the queue being disabled, and be processed anyway. + BaseQueueDisabledBypassFilter func(rangeID roachpb.RangeID) bool + // InjectReproposalError injects an error in tryReproposeWithNewLeaseIndex. // If nil is returned, reproposal will be attempted. InjectReproposalError func(p *ProposalData) error