Skip to content

Commit

Permalink
kvserver: allow lease extensions from followers in propBuf
Browse files Browse the repository at this point in the history
The Raft proposal buffer currently rejects lease acquisition proposals
from followers if there is an eligible leader, to avoid interfering with
it trying to claim the lease. However, this logic unintentionally
rejected lease extension proposals from followers as well, which could
cause a leaseholder to lose its lease.

This patch changes the proposal buffer to allow lease extension
proposals from followers.

Release note: None
  • Loading branch information
erikgrinaker committed Mar 14, 2021
1 parent bdff533 commit 1239745
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 11 deletions.
14 changes: 10 additions & 4 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,18 +562,24 @@ func (b *propBuf) FlushLockedWithRaftGroup(
//
// A special case is when the leader is known, but is ineligible to get the
// lease. In that case, we have no choice but to continue with the proposal.
//
// Lease extensions always go through.
if !leaderInfo.iAmTheLeader && p.Request.IsLeaseRequest() {
leaderKnownAndEligible := leaderInfo.leaderKnown && leaderInfo.leaderEligibleForLease
if leaderKnownAndEligible && !b.testing.allowLeaseProposalWhenNotLeader {
isExtension := p.Request.Requests[0].GetRequestLease().IsExtension()
if leaderKnownAndEligible && !isExtension && !b.testing.allowLeaseProposalWhenNotLeader {
log.VEventf(ctx, 2, "not proposing lease acquisition because we're not the leader; replica %d is",
leaderInfo.leader)
b.p.rejectProposalWithRedirectLocked(ctx, p, leaderInfo.leader)
p.tok.doneIfNotMovedLocked(ctx)
continue
}
// If the leader is not known, or if it is known but it's ineligible for
// the lease, continue with the proposal as explained above.
if !leaderInfo.leaderKnown {
// If the leader is not known, or if it is known but it's ineligible
// for the lease, continue with the proposal as explained above. We
// also send lease extensions for an existing leaseholder.
if isExtension {
log.VEventf(ctx, 4, "proposing lease extension even though we're not the leader")
} else if !leaderInfo.leaderKnown {
log.VEventf(ctx, 2, "proposing lease acquisition even though we're not the leader; the leader is unknown")
} else {
log.VEventf(ctx, 2, "proposing lease acquisition even though we're not the leader; the leader is ineligible")
Expand Down
38 changes: 31 additions & 7 deletions pkg/kv/kvserver/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,15 @@ func (pc proposalCreator) newPutProposal() (*ProposalData, []byte) {
return pc.newProposal(ba)
}

func (pc proposalCreator) newLeaseProposal(lease roachpb.Lease) (*ProposalData, []byte) {
func (pc proposalCreator) newLeaseProposal(
lease roachpb.Lease, extension bool,
) (*ProposalData, []byte) {
var ba roachpb.BatchRequest
ba.Add(&roachpb.RequestLeaseRequest{Lease: lease})
req := roachpb.RequestLeaseRequest{Lease: lease}
if extension {
req.PrevLease = req.Lease
}
ba.Add(&req)
return pc.newProposal(ba)
}

Expand Down Expand Up @@ -260,7 +266,7 @@ func TestProposalBuffer(t *testing.T) {
var pd *ProposalData
var data []byte
if leaseReq {
pd, data = pc.newLeaseProposal(roachpb.Lease{})
pd, data = pc.newLeaseProposal(roachpb.Lease{}, false)
} else {
pd, data = pc.newPutProposal()
}
Expand Down Expand Up @@ -466,7 +472,7 @@ func TestProposalBufferRegistrationWithInsertionErrors(t *testing.T) {
var pd *ProposalData
var data []byte
if i%2 == 0 {
pd, data = pc.newLeaseProposal(roachpb.Lease{})
pd, data = pc.newLeaseProposal(roachpb.Lease{}, false)
} else {
pd, data = pc.newPutProposal()
}
Expand All @@ -485,7 +491,7 @@ func TestProposalBufferRegistrationWithInsertionErrors(t *testing.T) {
var pd *ProposalData
var data []byte
if i%2 == 0 {
pd, data = pc.newLeaseProposal(roachpb.Lease{})
pd, data = pc.newLeaseProposal(roachpb.Lease{}, false)
} else {
pd, data = pc.newPutProposal()
}
Expand Down Expand Up @@ -560,6 +566,9 @@ func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) {
// Set to simulate situations where the local replica is so behind that the
// leader is not even part of the range descriptor.
leaderNotInRngDesc bool
// If true, the lease acquisition is instead a lease extension, which
// should be accepted from followers.
extension bool

expRejection bool
}{
Expand All @@ -578,6 +587,15 @@ func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) {
// Rejection - a follower can't request a lease.
expRejection: true,
},
{
name: "follower, lease extension to known eligible leader",
state: raft.StateFollower,
// Someone else is leader, but we're the leaseholder.
leader: self + 1,
extension: true,
// No rejection of lease extensions.
expRejection: false,
},
{
name: "follower, known ineligible leader",
state: raft.StateFollower,
Expand Down Expand Up @@ -649,7 +667,13 @@ func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) {
tracker := tracker.NewLockfreeTracker()
b.Init(&p, tracker, clock, cluster.MakeTestingClusterSettings())

pd, data := pc.newLeaseProposal(roachpb.Lease{})
pd, data := pc.newLeaseProposal(roachpb.Lease{
Replica: roachpb.ReplicaDescriptor{
NodeID: roachpb.NodeID(self),
StoreID: roachpb.StoreID(self),
ReplicaID: roachpb.ReplicaID(self),
},
}, tc.extension)
_, tok := b.TrackEvaluatingRequest(ctx, hlc.MinTimestamp)
_, err := b.Insert(ctx, pd, data, tok.Move(ctx))
require.NoError(t, err)
Expand Down Expand Up @@ -855,7 +879,7 @@ func TestProposalBufferClosedTimestamp(t *testing.T) {
case regularWrite:
pd, data = pc.newPutProposal()
case newLease:
pd, data = pc.newLeaseProposal(tc.lease)
pd, data = pc.newLeaseProposal(tc.lease, false)
case leaseTransfer:
var ba roachpb.BatchRequest
ba.Add(&roachpb.TransferLeaseRequest{
Expand Down
7 changes: 7 additions & 0 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,13 @@ func (tlr *TransferLeaseRequest) prevLease() Lease {
return tlr.PrevLease
}

// IsExtension returns whether the lease request is an extension of an existing
// lease or whether it is claiming the lease away from a previous owner.
func (rlr *RequestLeaseRequest) IsExtension() bool {
return rlr.PrevLease.Replica.StoreID == rlr.Lease.Replica.StoreID &&
rlr.PrevLease.Equivalent(rlr.Lease)
}

// Response is an interface for RPC responses.
type Response interface {
protoutil.Message
Expand Down

0 comments on commit 1239745

Please sign in to comment.