Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: remove broken attempt to reject lease acquisitions on draining nodes #87885

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,13 @@ func (r *Replica) leasePostApplyLocked(
leaseChangingHands := prevLease.Replica.StoreID != newLease.Replica.StoreID || prevLease.Sequence != newLease.Sequence

if iAmTheLeaseHolder {
// Log lease acquisition whenever an Epoch-based lease changes hands (or verbose
// logging is enabled).
if newLease.Type() == roachpb.LeaseEpoch && leaseChangingHands || log.V(1) {
log.VEventf(ctx, 1, "new range lease %s following %s", newLease, prevLease)
// Log lease acquisitions loudly when verbose logging is enabled or when the
// new leaseholder is draining, in which case it should be shedding leases.
// Otherwise, log a trace event.
if log.V(1) || r.store.IsDraining() {
log.Infof(ctx, "new range lease %s following %s", newLease, prevLease)
} else {
log.Eventf(ctx, "new range lease %s following %s", newLease, prevLease)
}
}

Expand Down Expand Up @@ -421,10 +424,6 @@ func (r *Replica) leasePostApplyLocked(
log.Errorf(ctx, "%v", err)
}
})
if leaseChangingHands && log.V(1) {
// This logging is useful to troubleshoot incomplete drains.
log.Info(ctx, "is now leaseholder")
}
}

// Inform the store of this lease.
Expand Down
25 changes: 0 additions & 25 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
"go.etcd.io/etcd/raft/v3"
)

var leaseStatusLogLimiter = func() *log.EveryN {
Expand Down Expand Up @@ -800,30 +799,6 @@ func (r *Replica) requestLeaseLocked(
return r.mu.pendingLeaseRequest.newResolvedHandle(pErr)
}

// If we're draining, we'd rather not take any new leases (since we're also
// trying to move leases away elsewhere). But if we're the leader, we don't
// really have a choice and we take the lease - there might not be any other
// replica available to take this lease (perhaps they're all draining).
if r.store.IsDraining() {
// NB: Replicas that are not the Raft leader will not take leases anyway
// (see the check inside propBuf.FlushLockedWithRaftGroup()), so we don't
// really need any special behavior for draining nodes here. This check
// serves mostly as a means to get more granular logging and as a defensive
// precaution.
if r.raftBasicStatusRLocked().RaftState != raft.StateLeader {
log.VEventf(ctx, 2, "refusing to take the lease because we're draining")
return r.mu.pendingLeaseRequest.newResolvedHandle(
roachpb.NewError(
newNotLeaseHolderError(
roachpb.Lease{}, r.store.StoreID(), r.mu.state.Desc,
"refusing to take the lease; node is draining",
),
),
)
}
log.Info(ctx, "trying to take the lease while we're draining since we're the raft leader")
}

// Propose a Raft command to get a lease for this replica.
repDesc, err := r.getReplicaDescriptorRLocked()
if err != nil {
Expand Down
121 changes: 0 additions & 121 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1351,127 +1351,6 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) {
}
}

// Test that draining nodes only take the lease if they're the leader.
func TestReplicaDrainLease(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
clusterArgs := base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
NodeLiveness: NodeLivenessTestingKnobs{
// This test waits for an epoch-based lease to expire, so we're setting the
// liveness duration as low as possible while still keeping the test stable.
LivenessDuration: 2000 * time.Millisecond,
RenewalDuration: 1000 * time.Millisecond,
},
Store: &StoreTestingKnobs{
// We eliminate clock offsets in order to eliminate the stasis period of
// leases. Otherwise we'd need to make leases longer.
MaxOffset: time.Nanosecond,
},
},
},
}
tc := serverutils.StartNewTestCluster(t, 2, clusterArgs)
defer tc.Stopper().Stop(ctx)
rngKey := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, rngKey, tc.Target(1))

s1 := tc.Server(0)
s2 := tc.Server(1)
store1, err := s1.GetStores().(*Stores).GetStore(s1.GetFirstStoreID())
require.NoError(t, err)
store2, err := s2.GetStores().(*Stores).GetStore(s2.GetFirstStoreID())
require.NoError(t, err)

rd := tc.LookupRangeOrFatal(t, rngKey)
r1, err := store1.GetReplica(rd.RangeID)
require.NoError(t, err)
status := r1.CurrentLeaseStatus(ctx)
require.True(t, status.Lease.OwnedBy(store1.StoreID()), "someone else got the lease: %s", status)
// We expect the lease to be valid, but don't check that because, under race, it might have
// expired already.

// Stop n1's heartbeats and wait for the lease to expire.
log.Infof(ctx, "test: suspending heartbeats for n1")
cleanup := s1.NodeLiveness().(*liveness.NodeLiveness).PauseAllHeartbeatsForTest()
defer cleanup()

testutils.SucceedsSoon(t, func() error {
status := r1.CurrentLeaseStatus(ctx)
require.True(t, status.Lease.OwnedBy(store1.StoreID()), "someone else got the lease: %s", status)
if status.State == kvserverpb.LeaseState_VALID {
return errors.New("lease still valid")
}
// We need to wait for the stasis state to pass too; during stasis other
// replicas can't take the lease.
if status.State == kvserverpb.LeaseState_UNUSABLE {
return errors.New("lease still in stasis")
}
return nil
})

require.Equal(t, r1.RaftStatus().Lead, uint64(r1.ReplicaID()),
"expected leadership to still be on the first replica")

// Wait until n1 has heartbeat its liveness record (epoch >= 1) and n2
// knows about it. Otherwise, the following could occur:
//
// - n1's heartbeats to epoch 1 and acquires lease
// - n2 doesn't receive this yet (gossip)
// - when n2 is asked to acquire the lease, it uses a lease with epoch 1
// but the liveness record with epoch zero
// - lease status is ERROR, lease acquisition (and thus test) fails.
testutils.SucceedsSoon(t, func() error {
nl, ok := s2.NodeLiveness().(*liveness.NodeLiveness).GetLiveness(s1.NodeID())
if !ok {
return errors.New("no liveness record for n1")
}
if nl.Epoch < 1 {
return errors.New("epoch for n1 still zero")
}
return nil
})

// Mark the stores as draining. We'll then start checking how acquiring leases
// behaves while draining.
store1.draining.Store(true)
store2.draining.Store(true)

r2, err := store2.GetReplica(rd.RangeID)
require.NoError(t, err)
// Check that a draining replica that's not the leader does NOT take the
// lease.
_, pErr := r2.redirectOnOrAcquireLease(ctx)
require.NotNil(t, pErr)
require.IsType(t, &roachpb.NotLeaseHolderError{}, pErr.GetDetail())

// Now transfer the leadership from r1 to r2 and check that r1 can now acquire
// the lease.

// Initiate the leadership transfer.
r1.mu.Lock()
r1.mu.internalRaftGroup.TransferLeader(uint64(r2.ReplicaID()))
r1.mu.Unlock()
// Run the range through the Raft scheduler, otherwise the leadership messages
// doesn't get sent because the range is quiesced.
store1.EnqueueRaftUpdateCheck(r1.RangeID)

// Wait for the leadership transfer to happen.
testutils.SucceedsSoon(t, func() error {
if r2.RaftStatus().SoftState.RaftState != raft.StateLeader {
return errors.Newf("r1 not yet leader")
}
return nil
})

// Check that r2 can now acquire the lease.
_, pErr = r2.redirectOnOrAcquireLease(ctx)
require.NoError(t, pErr.GoError())
}

// TestReplicaGossipFirstRange verifies that the first range gossips its
// location and the cluster ID.
func TestReplicaGossipFirstRange(t *testing.T) {
Expand Down