Skip to content

Commit

Permalink
kvserver: allow lease extensions from followers in propBuf
Browse files Browse the repository at this point in the history
The Raft proposal buffer currently rejects lease acquisition proposals
from followers if there is an eligible leader, to avoid interfering with
it trying to claim the lease. However, this logic unintentionally
rejected lease extension proposals from followers as well, which could
cause a leaseholder to lose its lease.

This patch changes the proposal buffer to allow lease extension
proposals from followers as long as they still hold a valid lease.

Release note: None
  • Loading branch information
erikgrinaker committed Mar 16, 2021
1 parent bdff533 commit 1beb8c6
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 11 deletions.
21 changes: 17 additions & 4 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ type proposer interface {
withGroupLocked(func(proposerRaft) error) error
registerProposalLocked(*ProposalData)
leaderStatusRLocked(raftGroup proposerRaft) rangeLeaderInfo
ownsValidLeaseRLocked(ctx context.Context, now hlc.ClockTimestamp) bool
// rejectProposalWithRedirectLocked rejects a proposal and redirects the
// proposer to try it on another node. This is used to sometimes reject lease
// acquisitions when another replica is the leader; the intended consequence
Expand Down Expand Up @@ -562,18 +563,26 @@ func (b *propBuf) FlushLockedWithRaftGroup(
//
// A special case is when the leader is known, but is ineligible to get the
// lease. In that case, we have no choice but to continue with the proposal.
//
// Lease extensions for a currently held lease always go through.
if !leaderInfo.iAmTheLeader && p.Request.IsLeaseRequest() {
req := p.Request.Requests[0].GetRequestLease()
leaderKnownAndEligible := leaderInfo.leaderKnown && leaderInfo.leaderEligibleForLease
if leaderKnownAndEligible && !b.testing.allowLeaseProposalWhenNotLeader {
isValidExtension := req.PrevLease.Replica.StoreID == req.Lease.Replica.StoreID &&
b.p.ownsValidLeaseRLocked(ctx, b.clock.NowAsClockTimestamp())
if leaderKnownAndEligible && !isValidExtension && !b.testing.allowLeaseProposalWhenNotLeader {
log.VEventf(ctx, 2, "not proposing lease acquisition because we're not the leader; replica %d is",
leaderInfo.leader)
b.p.rejectProposalWithRedirectLocked(ctx, p, leaderInfo.leader)
p.tok.doneIfNotMovedLocked(ctx)
continue
}
// If the leader is not known, or if it is known but it's ineligible for
// the lease, continue with the proposal as explained above.
if !leaderInfo.leaderKnown {
// If the leader is not known, or if it is known but it's ineligible
// for the lease, continue with the proposal as explained above. We
// also send lease extensions for an existing leaseholder.
if isValidExtension {
log.VEventf(ctx, 2, "proposing lease extension even though we're not the leader")
} else if !leaderInfo.leaderKnown {
log.VEventf(ctx, 2, "proposing lease acquisition even though we're not the leader; the leader is unknown")
} else {
log.VEventf(ctx, 2, "proposing lease acquisition even though we're not the leader; the leader is ineligible")
Expand Down Expand Up @@ -1017,6 +1026,10 @@ func (rp *replicaProposer) raftTransportClosedTimestampEnabled() bool {
return !(*Replica)(rp).mu.state.RaftClosedTimestamp.IsEmpty()
}

func (rp *replicaProposer) ownsValidLeaseRLocked(ctx context.Context, now hlc.ClockTimestamp) bool {
return (*Replica)(rp).ownsValidLeaseRLocked(ctx, now)
}

func (rp *replicaProposer) withGroupLocked(fn func(raftGroup proposerRaft) error) error {
// Pass true for mayCampaignOnWake because we're about to propose a command.
return (*Replica)(rp).withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) {
Expand Down
58 changes: 51 additions & 7 deletions pkg/kv/kvserver/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type testProposer struct {
// If not nil, this is called by RejectProposalWithRedirectLocked(). If nil,
// RejectProposalWithRedirectLocked() panics.
onRejectProposalWithRedirectLocked func(prop *ProposalData, redirectTo roachpb.ReplicaID)
// ownsValidLease is returned by ownsValidLeaseRLocked()
ownsValidLease bool

// leaderReplicaInDescriptor is set if the leader (as indicated by raftGroup)
// is known, and that leader is part of the range's descriptor (as seen by the
Expand Down Expand Up @@ -148,6 +150,10 @@ func (t *testProposer) registerProposalLocked(p *ProposalData) {
t.registered++
}

func (t *testProposer) ownsValidLeaseRLocked(ctx context.Context, now hlc.ClockTimestamp) bool {
return t.ownsValidLease
}

func (t *testProposer) leaderStatusRLocked(raftGroup proposerRaft) rangeLeaderInfo {
leaderKnown := raftGroup.BasicStatus().Lead != raft.None
var leaderRep roachpb.ReplicaID
Expand Down Expand Up @@ -201,9 +207,15 @@ func (pc proposalCreator) newPutProposal() (*ProposalData, []byte) {
return pc.newProposal(ba)
}

func (pc proposalCreator) newLeaseProposal(lease roachpb.Lease) (*ProposalData, []byte) {
func (pc proposalCreator) newLeaseProposal(
lease roachpb.Lease, extension bool,
) (*ProposalData, []byte) {
var ba roachpb.BatchRequest
ba.Add(&roachpb.RequestLeaseRequest{Lease: lease})
req := roachpb.RequestLeaseRequest{Lease: lease}
if extension {
req.PrevLease = req.Lease
}
ba.Add(&req)
return pc.newProposal(ba)
}

Expand Down Expand Up @@ -260,7 +272,7 @@ func TestProposalBuffer(t *testing.T) {
var pd *ProposalData
var data []byte
if leaseReq {
pd, data = pc.newLeaseProposal(roachpb.Lease{})
pd, data = pc.newLeaseProposal(roachpb.Lease{}, false /* extension */)
} else {
pd, data = pc.newPutProposal()
}
Expand Down Expand Up @@ -466,7 +478,7 @@ func TestProposalBufferRegistrationWithInsertionErrors(t *testing.T) {
var pd *ProposalData
var data []byte
if i%2 == 0 {
pd, data = pc.newLeaseProposal(roachpb.Lease{})
pd, data = pc.newLeaseProposal(roachpb.Lease{}, false /* extension */)
} else {
pd, data = pc.newPutProposal()
}
Expand All @@ -485,7 +497,7 @@ func TestProposalBufferRegistrationWithInsertionErrors(t *testing.T) {
var pd *ProposalData
var data []byte
if i%2 == 0 {
pd, data = pc.newLeaseProposal(roachpb.Lease{})
pd, data = pc.newLeaseProposal(roachpb.Lease{}, false /* extension */)
} else {
pd, data = pc.newPutProposal()
}
Expand Down Expand Up @@ -560,6 +572,10 @@ func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) {
// Set to simulate situations where the local replica is so behind that the
// leader is not even part of the range descriptor.
leaderNotInRngDesc bool
// If true, the follower has a valid lease.
ownsValidLease bool
// If true, the lease acquisition is instead a lease extension.
extension bool

expRejection bool
}{
Expand All @@ -578,6 +594,27 @@ func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) {
// Rejection - a follower can't request a lease.
expRejection: true,
},
{
name: "follower, lease extension despite known eligible leader",
state: raft.StateFollower,
// Someone else is leader, but we're the leaseholder.
leader: self + 1,
extension: true,
ownsValidLease: true,
// No rejection of lease extensions.
expRejection: false,
},
{
name: "follower, stale lease extension rejected with known eligible leader",
state: raft.StateFollower,
// Someone else is leader, we're trying to extend a lease that
// we had but it's no longer valid.
leader: self + 1,
extension: true,
ownsValidLease: false,
// Extension of invalid lease is rejected.
expRejection: true,
},
{
name: "follower, known ineligible leader",
state: raft.StateFollower,
Expand Down Expand Up @@ -643,13 +680,20 @@ func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) {
p.raftGroup = r
p.leaderReplicaInDescriptor = !tc.leaderNotInRngDesc
p.leaderReplicaType = tc.leaderRepType
p.ownsValidLease = tc.ownsValidLease

var b propBuf
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
tracker := tracker.NewLockfreeTracker()
b.Init(&p, tracker, clock, cluster.MakeTestingClusterSettings())

pd, data := pc.newLeaseProposal(roachpb.Lease{})
pd, data := pc.newLeaseProposal(roachpb.Lease{
Replica: roachpb.ReplicaDescriptor{
NodeID: roachpb.NodeID(self),
StoreID: roachpb.StoreID(self),
ReplicaID: roachpb.ReplicaID(self),
},
}, tc.extension)
_, tok := b.TrackEvaluatingRequest(ctx, hlc.MinTimestamp)
_, err := b.Insert(ctx, pd, data, tok.Move(ctx))
require.NoError(t, err)
Expand Down Expand Up @@ -855,7 +899,7 @@ func TestProposalBufferClosedTimestamp(t *testing.T) {
case regularWrite:
pd, data = pc.newPutProposal()
case newLease:
pd, data = pc.newLeaseProposal(tc.lease)
pd, data = pc.newLeaseProposal(tc.lease, false /* extension */)
case leaseTransfer:
var ba roachpb.BatchRequest
ba.Add(&roachpb.TransferLeaseRequest{
Expand Down

0 comments on commit 1beb8c6

Please sign in to comment.