diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index b4ab6989c3ab..9cdcbd700b73 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -58,6 +58,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/opentracing/opentracing-go" + "go.etcd.io/etcd/raft" ) var leaseStatusLogLimiter = log.Every(5 * time.Second) @@ -583,6 +584,15 @@ func (r *Replica) leaseStatus( return status } +// currentLeaseStatus returns the status of the current lease for a current +// timestamp. +func (r *Replica) currentLeaseStatus(ctx context.Context) kvserverpb.LeaseStatus { + timestamp := r.store.Clock().Now() + r.mu.RLock() + defer r.mu.RUnlock() + return r.leaseStatus(ctx, *r.mu.state.Lease, timestamp, r.mu.minLeaseProposedTS) +} + // requiresExpiringLeaseRLocked returns whether this range uses an // expiration-based lease; false if epoch-based. Ranges located before or // including the node liveness table must use expiration leases to avoid @@ -616,8 +626,14 @@ func (r *Replica) requestLeaseLocked( return r.mu.pendingLeaseRequest.newResolvedHandle(roachpb.NewError( newNotLeaseHolderError(&transferLease, r.store.StoreID(), r.mu.state.Desc))) } - if r.store.IsDraining() { - // We've retired from active duty. + // If we're draining, we'd rather not take any new leases (since we're also + // trying to move leases away elsewhere). But if we're the leader, we don't + // really have a choice and we take the lease - there might not be any other + // replica available to take this lease (perhaps they're all draining). + if r.store.IsDraining() && (r.raftBasicStatusRLocked().RaftState != raft.StateLeader) { + // TODO(andrei): If we start refusing to take leases on followers elsewhere, + // this code can go away. + log.VEventf(ctx, 2, "refusing to take the lease because we're draining") return r.mu.pendingLeaseRequest.newResolvedHandle(roachpb.NewError( newNotLeaseHolderError(nil, r.store.StoreID(), r.mu.state.Desc))) } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index df9b0d969cc7..6a4b177351b0 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -1428,36 +1428,108 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) { } } -// TestReplicaDrainLease makes sure that no new leases are granted when -// the Store is draining. +// Test that draining nodes only take the lease if they're the leader. func TestReplicaDrainLease(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - tc := testContext{} - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) - tc.Start(t, stopper) - - // Acquire initial lease. ctx := context.Background() - status, pErr := tc.repl.redirectOnOrAcquireLease(ctx) - if pErr != nil { - t.Fatal(pErr) + clusterArgs := base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + NodeLiveness: NodeLivenessTestingKnobs{ + // This test waits for an epoch-based lease to expire, so we're setting the + // liveness duration as low as possible while still keeping the test stable. + LivenessDuration: 2000 * time.Millisecond, + RenewalDuration: 1000 * time.Millisecond, + }, + Store: &StoreTestingKnobs{ + // We eliminate clock offsets in order to eliminate the stasis period of + // leases. Otherwise we'd need to make leases longer. + MaxOffset: time.Nanosecond, + }, + }, + }, } + tc := serverutils.StartNewTestCluster(t, 2, clusterArgs) + defer tc.Stopper().Stop(ctx) + rngKey := tc.ScratchRange(t) + tc.AddReplicasOrFatal(t, rngKey, tc.Target(1)) - tc.store.SetDraining(true, nil /* reporter */) - tc.repl.mu.Lock() - pErr = <-tc.repl.requestLeaseLocked(ctx, status).C() - tc.repl.mu.Unlock() - _, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError) - if !ok { - t.Fatalf("expected NotLeaseHolderError, not %v", pErr) - } - tc.store.SetDraining(false, nil /* reporter */) - // Newly undrained, leases work again. - if _, pErr := tc.repl.redirectOnOrAcquireLease(ctx); pErr != nil { - t.Fatal(pErr) - } + s1 := tc.Server(0) + s2 := tc.Server(1) + store1, err := s1.GetStores().(*Stores).GetStore(s1.GetFirstStoreID()) + require.NoError(t, err) + store2, err := s2.GetStores().(*Stores).GetStore(s2.GetFirstStoreID()) + require.NoError(t, err) + + rd := tc.LookupRangeOrFatal(t, rngKey) + r1, err := store1.GetReplica(rd.RangeID) + require.NoError(t, err) + status := r1.currentLeaseStatus(ctx) + require.True(t, status.Lease.OwnedBy(store1.StoreID()), "someone else got the lease: %s", status) + // We expect the lease to be valid, but don't check that because, under race, it might have + // expired already. + + // Stop n1's heartbeats and wait for the lease to expire. + + log.Infof(ctx, "test: suspending heartbeats for n1") + cleanup := s1.NodeLiveness().(*NodeLiveness).PauseAllHeartbeatsForTest() + defer cleanup() + + require.NoError(t, err) + testutils.SucceedsSoon(t, func() error { + status := r1.currentLeaseStatus(ctx) + require.True(t, status.Lease.OwnedBy(store1.StoreID()), "someone else got the lease: %s", status) + if status.State == kvserverpb.LeaseState_VALID { + return errors.New("lease still valid") + } + // We need to wait for the stasis state to pass too; during stasis other + // replicas can't take the lease. + if status.State == kvserverpb.LeaseState_STASIS { + return errors.New("lease still in stasis") + } + return nil + }) + + require.Equal(t, r1.RaftStatus().Lead, uint64(r1.ReplicaID()), + "expected leadership to still be on the first replica") + + // Mark the stores as draining. We'll then start checking how acquiring leases + // behaves while draining. + store1.draining.Store(true) + store2.draining.Store(true) + + r2, err := store2.GetReplica(rd.RangeID) + require.NoError(t, err) + // Check that a draining replica that's not the leader does NOT take the + // lease. + _, pErr := r2.redirectOnOrAcquireLease(ctx) + require.NotNil(t, pErr) + require.IsType(t, &roachpb.NotLeaseHolderError{}, pErr.GetDetail()) + + // Now transfer the leadership from r1 to r2 and check that r1 can now acquire + // the lease. + + // Initiate the leadership transfer. + r1.mu.Lock() + r1.mu.internalRaftGroup.TransferLeader(uint64(r2.ReplicaID())) + r1.mu.Unlock() + // Run the range through the Raft scheduler, otherwise the leadership messages + // doesn't get sent because the range is quiesced. + store1.EnqueueRaftUpdateCheck(r1.RangeID) + + // Wait for the leadership transfer to happen. + testutils.SucceedsSoon(t, func() error { + if r2.RaftStatus().SoftState.RaftState != raft.StateLeader { + return errors.Newf("r1 not yet leader") + } + return nil + }) + + // Check that r2 can now acquire the lease. + _, pErr = r2.redirectOnOrAcquireLease(ctx) + require.NoError(t, pErr.GoError()) } // TestReplicaGossipFirstRange verifies that the first range gossips its diff --git a/pkg/testutils/serverutils/test_cluster_shim.go b/pkg/testutils/serverutils/test_cluster_shim.go index 6b3ab1ec436d..12db416025ed 100644 --- a/pkg/testutils/serverutils/test_cluster_shim.go +++ b/pkg/testutils/serverutils/test_cluster_shim.go @@ -119,6 +119,11 @@ type TestClusterInterface interface { // ReplicationMode returns the ReplicationMode that the test cluster was // configured with. ReplicationMode() base.TestClusterReplicationMode + + // ScratchRange returns the start key of a span of keyspace suitable for use + // as kv scratch space (it doesn't overlap system spans or SQL tables). The + // range is lazily split off on the first call to ScratchRange. + ScratchRange(t testing.TB) roachpb.Key } // TestClusterFactory encompasses the actual implementation of the shim