diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 64cd92cf0f39..f83fbee07f23 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -282,6 +282,17 @@ func (rq *replicateQueue) shouldQueue( log.VEventf(ctx, 2, "lease transfer needed, enqueuing") return true, 0 } + if !status.IsValid() { + // The lease for this range is currently invalid, if this replica is + // the raft leader then it is necessary that it acquires the lease. We + // enqueue it regardless of being a leader or follower, where the + // leader at the time of processing will succeed. There is no + // requirement that the expired lease belongs to this replica, as + // regardless of the lease history, the current leader should hold the + // lease. + log.VEventf(ctx, 2, "invalid lease, enqueuing") + return true, 0 + } return false, 0 } diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index b44fb3463487..a50ee62ea68a 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -1280,3 +1280,110 @@ func TestTransferLeaseToLaggingNode(t *testing.T) { return errors.Errorf("Repeat check for correct leaseholder") }) } + +// TestReplicateQueueAcquiresInvalidLeases asserts that following a restart, +// leases are invalidated and that the replicate queue acquires invalid leases +// when enabled. +func TestReplicateQueueAcquiresInvalidLeases(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() + defer stickyEngineRegistry.CloseAllStickyInMemEngines() + + zcfg := zonepb.DefaultZoneConfig() + zcfg.NumReplicas = proto.Int32(1) + tc := testcluster.StartTestCluster(t, 1, + base.TestClusterArgs{ + // Disable the replication queue initially, to assert on the lease + // statuses pre and post enabling the replicate queue. + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + StoreSpecs: []base.StoreSpec{ + { + InMemory: true, + StickyInMemoryEngineID: "1", + }, + }, + ScanMinIdleTime: time.Millisecond, + ScanMaxIdleTime: time.Millisecond, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + StickyEngineRegistry: stickyEngineRegistry, + DefaultZoneConfigOverride: &zcfg, + }, + }, + }, + }, + ) + defer tc.Stopper().Stop(ctx) + db := tc.Conns[0] + // Disable consistency checker and sql stats collection that may acquire a + // lease by querying a range. + _, err := db.Exec("set cluster setting server.consistency_check.interval = '0s'") + require.NoError(t, err) + _, err = db.Exec("set cluster setting sql.stats.automatic_collection.enabled = false") + require.NoError(t, err) + + // Create ranges to assert on their lease status post restart and after + // replicate queue processing. + ranges := 30 + scratchRangeKeys := make([]roachpb.Key, ranges) + splitKey := tc.ScratchRange(t) + for i := range scratchRangeKeys { + _, _ = tc.SplitRangeOrFatal(t, splitKey) + scratchRangeKeys[i] = splitKey + splitKey = splitKey.Next() + } + + invalidLeases := func() []kvserverpb.LeaseStatus { + invalid := []kvserverpb.LeaseStatus{} + for _, key := range scratchRangeKeys { + // Assert that the lease is invalid after restart. + repl := tc.GetRaftLeader(t, roachpb.RKey(key)) + if leaseStatus := repl.CurrentLeaseStatus(ctx); !leaseStatus.IsValid() { + invalid = append(invalid, leaseStatus) + } + } + return invalid + } + + // Assert that the leases are valid initially. + require.Len(t, invalidLeases(), 0) + + // Restart the servers to invalidate the leases. + for i := range tc.Servers { + tc.StopServer(i) + err = tc.RestartServerWithInspect(i, nil) + require.NoError(t, err) + } + + forceProcess := func() { + // Speed up the queue processing. + for _, s := range tc.Servers { + err := s.Stores().VisitStores(func(store *kvserver.Store) error { + return store.ForceReplicationScanAndProcess() + }) + require.NoError(t, err) + } + } + + // NB: The consistency checker and sql stats collector both will attempt a + // lease acquisition when processing a range, if it the lease is currently + // invalid. They are disabled in this test. We do not assert on the number + // of invalid leases prior to enabling the replicate queue here to avoid + // test flakiness if this changes in the future or for some other reason. + // Instead, we are only concerned that no invalid leases remain. + toggleReplicationQueues(tc, true /* active */) + testutils.SucceedsSoon(t, func() error { + forceProcess() + // Assert that there are now no invalid leases. + invalid := invalidLeases() + if len(invalid) > 0 { + return errors.Newf("The number of invalid leases are greater than 0, %+v", invalid) + } + return nil + }) +}