Skip to content

Commit

Permalink
kvserver: enqueue replica on invalid lease
Browse files Browse the repository at this point in the history
This patch adds a check that will  enqueue replicas that do not have a
valid lease into the replicate queue. The replicate and base queue already
will attempt to acquire the lease when processing these replicas.

This ensures that following a node restart, a replica with a valid lease
will be installed within the replica scanner interval, for all ranges.

resolves cockroachdb#83444

Release note: None
  • Loading branch information
kvoli committed Oct 18, 2022
1 parent 06fb481 commit f439a77
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 0 deletions.
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,17 @@ func (rq *replicateQueue) shouldQueue(
log.VEventf(ctx, 2, "lease transfer needed, enqueuing")
return true, 0
}
if !status.IsValid() {
// The lease for this range is currently invalid, if this replica is
// the raft leader then it is necessary that it acquires the lease. We
// enqueue it regardless of being a leader or follower, where the
// leader at the time of processing will succeed. There is no
// requirement that the expired lease belongs to this replica, as
// regardless of the lease history, the current leader should hold the
// lease.
log.VEventf(ctx, 2, "invalid lease, enqueuing")
return true, 0
}

return false, 0
}
Expand Down
107 changes: 107 additions & 0 deletions pkg/kv/kvserver/replicate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1280,3 +1280,110 @@ func TestTransferLeaseToLaggingNode(t *testing.T) {
return errors.Errorf("Repeat check for correct leaseholder")
})
}

// TestReplicateQueueAcquiresInvalidLeases asserts that following a restart,
// leases are invalidated and that the replicate queue acquires invalid leases
// when enabled.
func TestReplicateQueueAcquiresInvalidLeases(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

stickyEngineRegistry := server.NewStickyInMemEnginesRegistry()
defer stickyEngineRegistry.CloseAllStickyInMemEngines()

zcfg := zonepb.DefaultZoneConfig()
zcfg.NumReplicas = proto.Int32(1)
tc := testcluster.StartTestCluster(t, 1,
base.TestClusterArgs{
// Disable the replication queue initially, to assert on the lease
// statuses pre and post enabling the replicate queue.
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
StoreSpecs: []base.StoreSpec{
{
InMemory: true,
StickyInMemoryEngineID: "1",
},
},
ScanMinIdleTime: time.Millisecond,
ScanMaxIdleTime: time.Millisecond,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
StickyEngineRegistry: stickyEngineRegistry,
DefaultZoneConfigOverride: &zcfg,
},
},
},
},
)
defer tc.Stopper().Stop(ctx)
db := tc.Conns[0]
// Disable consistency checker and sql stats collection that may acquire a
// lease by querying a range.
_, err := db.Exec("set cluster setting server.consistency_check.interval = '0s'")
require.NoError(t, err)
_, err = db.Exec("set cluster setting sql.stats.automatic_collection.enabled = false")
require.NoError(t, err)

// Create ranges to assert on their lease status post restart and after
// replicate queue processing.
ranges := 30
scratchRangeKeys := make([]roachpb.Key, ranges)
splitKey := tc.ScratchRange(t)
for i := range scratchRangeKeys {
_, _ = tc.SplitRangeOrFatal(t, splitKey)
scratchRangeKeys[i] = splitKey
splitKey = splitKey.Next()
}

invalidLeases := func() []kvserverpb.LeaseStatus {
invalid := []kvserverpb.LeaseStatus{}
for _, key := range scratchRangeKeys {
// Assert that the lease is invalid after restart.
repl := tc.GetRaftLeader(t, roachpb.RKey(key))
if leaseStatus := repl.CurrentLeaseStatus(ctx); !leaseStatus.IsValid() {
invalid = append(invalid, leaseStatus)
}
}
return invalid
}

// Assert that the leases are valid initially.
require.Len(t, invalidLeases(), 0)

// Restart the servers to invalidate the leases.
for i := range tc.Servers {
tc.StopServer(i)
err = tc.RestartServerWithInspect(i, nil)
require.NoError(t, err)
}

forceProcess := func() {
// Speed up the queue processing.
for _, s := range tc.Servers {
err := s.Stores().VisitStores(func(store *kvserver.Store) error {
return store.ForceReplicationScanAndProcess()
})
require.NoError(t, err)
}
}

// NB: The consistency checker and sql stats collector both will attempt a
// lease acquisition when processing a range, if it the lease is currently
// invalid. They are disabled in this test. We do not assert on the number
// of invalid leases prior to enabling the replicate queue here to avoid
// test flakiness if this changes in the future or for some other reason.
// Instead, we are only concerned that no invalid leases remain.
toggleReplicationQueues(tc, true /* active */)
testutils.SucceedsSoon(t, func() error {
forceProcess()
// Assert that there are now no invalid leases.
invalid := invalidLeases()
if len(invalid) > 0 {
return errors.Newf("The number of invalid leases are greater than 0, %+v", invalid)
}
return nil
})
}

0 comments on commit f439a77

Please sign in to comment.