From 1beb8c63fd1980ca809206586b85bde8b6ce8388 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 10 Mar 2021 14:00:24 +0100 Subject: [PATCH] kvserver: allow lease extensions from followers in propBuf The Raft proposal buffer currently rejects lease acquisition proposals from followers if there is an eligible leader, to avoid interfering with it trying to claim the lease. However, this logic unintentionally rejected lease extension proposals from followers as well, which could cause a leaseholder to lose its lease. This patch changes the proposal buffer to allow lease extension proposals from followers as long as they still hold a valid lease. Release note: None --- pkg/kv/kvserver/replica_proposal_buf.go | 21 +++++-- pkg/kv/kvserver/replica_proposal_buf_test.go | 58 +++++++++++++++++--- 2 files changed, 68 insertions(+), 11 deletions(-) diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index 2d4cb62846fb..fc6d930622fa 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -223,6 +223,7 @@ type proposer interface { withGroupLocked(func(proposerRaft) error) error registerProposalLocked(*ProposalData) leaderStatusRLocked(raftGroup proposerRaft) rangeLeaderInfo + ownsValidLeaseRLocked(ctx context.Context, now hlc.ClockTimestamp) bool // rejectProposalWithRedirectLocked rejects a proposal and redirects the // proposer to try it on another node. This is used to sometimes reject lease // acquisitions when another replica is the leader; the intended consequence @@ -562,18 +563,26 @@ func (b *propBuf) FlushLockedWithRaftGroup( // // A special case is when the leader is known, but is ineligible to get the // lease. In that case, we have no choice but to continue with the proposal. + // + // Lease extensions for a currently held lease always go through. if !leaderInfo.iAmTheLeader && p.Request.IsLeaseRequest() { + req := p.Request.Requests[0].GetRequestLease() leaderKnownAndEligible := leaderInfo.leaderKnown && leaderInfo.leaderEligibleForLease - if leaderKnownAndEligible && !b.testing.allowLeaseProposalWhenNotLeader { + isValidExtension := req.PrevLease.Replica.StoreID == req.Lease.Replica.StoreID && + b.p.ownsValidLeaseRLocked(ctx, b.clock.NowAsClockTimestamp()) + if leaderKnownAndEligible && !isValidExtension && !b.testing.allowLeaseProposalWhenNotLeader { log.VEventf(ctx, 2, "not proposing lease acquisition because we're not the leader; replica %d is", leaderInfo.leader) b.p.rejectProposalWithRedirectLocked(ctx, p, leaderInfo.leader) p.tok.doneIfNotMovedLocked(ctx) continue } - // If the leader is not known, or if it is known but it's ineligible for - // the lease, continue with the proposal as explained above. - if !leaderInfo.leaderKnown { + // If the leader is not known, or if it is known but it's ineligible + // for the lease, continue with the proposal as explained above. We + // also send lease extensions for an existing leaseholder. + if isValidExtension { + log.VEventf(ctx, 2, "proposing lease extension even though we're not the leader") + } else if !leaderInfo.leaderKnown { log.VEventf(ctx, 2, "proposing lease acquisition even though we're not the leader; the leader is unknown") } else { log.VEventf(ctx, 2, "proposing lease acquisition even though we're not the leader; the leader is ineligible") @@ -1017,6 +1026,10 @@ func (rp *replicaProposer) raftTransportClosedTimestampEnabled() bool { return !(*Replica)(rp).mu.state.RaftClosedTimestamp.IsEmpty() } +func (rp *replicaProposer) ownsValidLeaseRLocked(ctx context.Context, now hlc.ClockTimestamp) bool { + return (*Replica)(rp).ownsValidLeaseRLocked(ctx, now) +} + func (rp *replicaProposer) withGroupLocked(fn func(raftGroup proposerRaft) error) error { // Pass true for mayCampaignOnWake because we're about to propose a command. return (*Replica)(rp).withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index 83460917ff6a..46a5bc235892 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -51,6 +51,8 @@ type testProposer struct { // If not nil, this is called by RejectProposalWithRedirectLocked(). If nil, // RejectProposalWithRedirectLocked() panics. onRejectProposalWithRedirectLocked func(prop *ProposalData, redirectTo roachpb.ReplicaID) + // ownsValidLease is returned by ownsValidLeaseRLocked() + ownsValidLease bool // leaderReplicaInDescriptor is set if the leader (as indicated by raftGroup) // is known, and that leader is part of the range's descriptor (as seen by the @@ -148,6 +150,10 @@ func (t *testProposer) registerProposalLocked(p *ProposalData) { t.registered++ } +func (t *testProposer) ownsValidLeaseRLocked(ctx context.Context, now hlc.ClockTimestamp) bool { + return t.ownsValidLease +} + func (t *testProposer) leaderStatusRLocked(raftGroup proposerRaft) rangeLeaderInfo { leaderKnown := raftGroup.BasicStatus().Lead != raft.None var leaderRep roachpb.ReplicaID @@ -201,9 +207,15 @@ func (pc proposalCreator) newPutProposal() (*ProposalData, []byte) { return pc.newProposal(ba) } -func (pc proposalCreator) newLeaseProposal(lease roachpb.Lease) (*ProposalData, []byte) { +func (pc proposalCreator) newLeaseProposal( + lease roachpb.Lease, extension bool, +) (*ProposalData, []byte) { var ba roachpb.BatchRequest - ba.Add(&roachpb.RequestLeaseRequest{Lease: lease}) + req := roachpb.RequestLeaseRequest{Lease: lease} + if extension { + req.PrevLease = req.Lease + } + ba.Add(&req) return pc.newProposal(ba) } @@ -260,7 +272,7 @@ func TestProposalBuffer(t *testing.T) { var pd *ProposalData var data []byte if leaseReq { - pd, data = pc.newLeaseProposal(roachpb.Lease{}) + pd, data = pc.newLeaseProposal(roachpb.Lease{}, false /* extension */) } else { pd, data = pc.newPutProposal() } @@ -466,7 +478,7 @@ func TestProposalBufferRegistrationWithInsertionErrors(t *testing.T) { var pd *ProposalData var data []byte if i%2 == 0 { - pd, data = pc.newLeaseProposal(roachpb.Lease{}) + pd, data = pc.newLeaseProposal(roachpb.Lease{}, false /* extension */) } else { pd, data = pc.newPutProposal() } @@ -485,7 +497,7 @@ func TestProposalBufferRegistrationWithInsertionErrors(t *testing.T) { var pd *ProposalData var data []byte if i%2 == 0 { - pd, data = pc.newLeaseProposal(roachpb.Lease{}) + pd, data = pc.newLeaseProposal(roachpb.Lease{}, false /* extension */) } else { pd, data = pc.newPutProposal() } @@ -560,6 +572,10 @@ func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) { // Set to simulate situations where the local replica is so behind that the // leader is not even part of the range descriptor. leaderNotInRngDesc bool + // If true, the follower has a valid lease. + ownsValidLease bool + // If true, the lease acquisition is instead a lease extension. + extension bool expRejection bool }{ @@ -578,6 +594,27 @@ func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) { // Rejection - a follower can't request a lease. expRejection: true, }, + { + name: "follower, lease extension despite known eligible leader", + state: raft.StateFollower, + // Someone else is leader, but we're the leaseholder. + leader: self + 1, + extension: true, + ownsValidLease: true, + // No rejection of lease extensions. + expRejection: false, + }, + { + name: "follower, stale lease extension rejected with known eligible leader", + state: raft.StateFollower, + // Someone else is leader, we're trying to extend a lease that + // we had but it's no longer valid. + leader: self + 1, + extension: true, + ownsValidLease: false, + // Extension of invalid lease is rejected. + expRejection: true, + }, { name: "follower, known ineligible leader", state: raft.StateFollower, @@ -643,13 +680,20 @@ func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) { p.raftGroup = r p.leaderReplicaInDescriptor = !tc.leaderNotInRngDesc p.leaderReplicaType = tc.leaderRepType + p.ownsValidLease = tc.ownsValidLease var b propBuf clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) tracker := tracker.NewLockfreeTracker() b.Init(&p, tracker, clock, cluster.MakeTestingClusterSettings()) - pd, data := pc.newLeaseProposal(roachpb.Lease{}) + pd, data := pc.newLeaseProposal(roachpb.Lease{ + Replica: roachpb.ReplicaDescriptor{ + NodeID: roachpb.NodeID(self), + StoreID: roachpb.StoreID(self), + ReplicaID: roachpb.ReplicaID(self), + }, + }, tc.extension) _, tok := b.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) _, err := b.Insert(ctx, pd, data, tok.Move(ctx)) require.NoError(t, err) @@ -855,7 +899,7 @@ func TestProposalBufferClosedTimestamp(t *testing.T) { case regularWrite: pd, data = pc.newPutProposal() case newLease: - pd, data = pc.newLeaseProposal(tc.lease) + pd, data = pc.newLeaseProposal(tc.lease, false /* extension */) case leaseTransfer: var ba roachpb.BatchRequest ba.Add(&roachpb.TransferLeaseRequest{