Skip to content

Commit

Permalink
kv: pull lease request rejection into method on propBuf
Browse files Browse the repository at this point in the history
Pure refactor in preparation of the next commit.
  • Loading branch information
nvanbenschoten committed Jun 12, 2022
1 parent 78ed26c commit 8efa949
Showing 1 changed file with 105 additions and 66 deletions.
171 changes: 105 additions & 66 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,20 +389,8 @@ func (b *propBuf) FlushLockedWithRaftGroup(
buf := b.arr.asSlice()[:used]
ents := make([]raftpb.Entry, 0, used)

// Figure out leadership info. We'll use it to conditionally drop some
// requests.
var leaderInfo rangeLeaderInfo
if raftGroup != nil {
leaderInfo = b.p.leaderStatusRLocked(raftGroup)
// Sanity check.
if leaderInfo.leaderKnown && leaderInfo.leader == b.p.getReplicaID() &&
!leaderInfo.iAmTheLeader {
log.Fatalf(ctx,
"inconsistent Raft state: state %s while the current replica is also the lead: %d",
raftGroup.BasicStatus().RaftState, leaderInfo.leader)
}
}

// Compute the closed timestamp target, which will be used to assign a closed
// timestamp to all proposals in this batch.
closedTSTarget := b.p.closedTimestampTarget()

// Remember the first error that we see when proposing the batch. We don't
Expand All @@ -418,58 +406,10 @@ func (b *propBuf) FlushLockedWithRaftGroup(
buf[i] = nil // clear buffer
reproposal := !p.tok.stillTracked()

// Handle an edge case about lease acquisitions: we don't want to forward
// lease acquisitions to another node (which is what happens when we're not
// the leader) because:
// a) if there is a different leader, that leader should acquire the lease
// itself and thus avoid a change of leadership caused by the leaseholder
// and leader being different (Raft leadership follows the lease), and
// b) being a follower, it's possible that this replica is behind in
// applying the log. Thus, there might be another lease in place that this
// follower doesn't know about, in which case the lease we're proposing here
// would be rejected. Not only would proposing such a lease be wasted work,
// but we're trying to protect against pathological cases where it takes a
// long time for this follower to catch up (for example because it's waiting
// for a snapshot, and the snapshot is queued behind many other snapshots).
// In such a case, we don't want all requests arriving at this node to be
// blocked on this lease acquisition (which is very likely to eventually
// fail anyway).
//
// Thus, we do one of two things:
// - if the leader is known, we reject this proposal and make sure the
// request that needed the lease is redirected to the leaseholder;
// - if the leader is not known, we don't do anything special here to
// terminate the proposal, but we know that Raft will reject it with a
// ErrProposalDropped. We'll eventually re-propose it once a leader is
// known, at which point it will either go through or be rejected based on
// whether or not it is this replica that became the leader.
//
// A special case is when the leader is known, but is ineligible to get the
// lease. In that case, we have no choice but to continue with the proposal.
//
// Lease extensions for a currently held lease always go through, to
// keep the lease alive until the normal lease transfer mechanism can
// colocate it with the leader.
if !leaderInfo.iAmTheLeader && p.Request.IsSingleRequestLeaseRequest() {
leaderKnownAndEligible := leaderInfo.leaderKnown && leaderInfo.leaderEligibleForLease
ownsCurrentLease := b.p.ownsValidLeaseRLocked(ctx, b.clock.NowAsClockTimestamp())
if leaderKnownAndEligible && !ownsCurrentLease && !b.testing.allowLeaseProposalWhenNotLeader {
log.VEventf(ctx, 2, "not proposing lease acquisition because we're not the leader; replica %d is",
leaderInfo.leader)
b.p.rejectProposalWithRedirectLocked(ctx, p, leaderInfo.leader)
p.tok.doneIfNotMovedLocked(ctx)
continue
}
// If the leader is not known, or if it is known but it's ineligible
// for the lease, continue with the proposal as explained above. We
// also send lease extensions for an existing leaseholder.
if ownsCurrentLease {
log.VEventf(ctx, 2, "proposing lease extension even though we're not the leader; we hold the current lease")
} else if !leaderInfo.leaderKnown {
log.VEventf(ctx, 2, "proposing lease acquisition even though we're not the leader; the leader is unknown")
} else {
log.VEventf(ctx, 2, "proposing lease acquisition even though we're not the leader; the leader is ineligible")
}
// Conditionally reject the proposal based on the state of the raft group.
if b.maybeRejectUnsafeProposalLocked(ctx, raftGroup, p) {
p.tok.doneIfNotMovedLocked(ctx)
continue
}

// Raft processing bookkeeping.
Expand Down Expand Up @@ -585,6 +525,105 @@ func (b *propBuf) FlushLockedWithRaftGroup(
return used, proposeBatch(raftGroup, b.p.getReplicaID(), ents)
}

// maybeRejectUnsafeProposalLocked conditionally rejects proposals that are
// deemed unsafe, given the current state of the raft group. Requests that may
// be deemed unsafe and rejected at this level are those whose safety has some
// dependency on raft leadership, follower progress, leadership term, commit
// index, or other properties of raft. By rejecting these requests on the
// "flushing" side of the proposal buffer (i.e. while holding the raftMu), we
// can perform the safety checks without risk of the state of the raft group
// changing before the proposal is passed to etcd/raft.
//
// Currently, the request types which may be rejected by this function are:
// - RequestLease when the proposer is not the raft leader (with caveats).
//
// The function returns true if the proposal was rejected, and false if not.
// If the proposal was rejected and true is returned, it will have been cleaned
// up (passed to Replica.cleanupFailedProposalLocked) and finished
// (ProposalData.finishApplication called).
func (b *propBuf) maybeRejectUnsafeProposalLocked(
ctx context.Context, raftGroup proposerRaft, p *ProposalData,
) (rejected bool) {
switch {
case p.Request.IsSingleRequestLeaseRequest():
// Handle an edge case about lease acquisitions: we don't want to forward
// lease acquisitions to another node (which is what happens when we're not
// the leader) because:
// a) if there is a different leader, that leader should acquire the lease
// itself and thus avoid a change of leadership caused by the leaseholder
// and leader being different (Raft leadership follows the lease), and
// b) being a follower, it's possible that this replica is behind in
// applying the log. Thus, there might be another lease in place that this
// follower doesn't know about, in which case the lease we're proposing here
// would be rejected. Not only would proposing such a lease be wasted work,
// but we're trying to protect against pathological cases where it takes a
// long time for this follower to catch up (for example because it's waiting
// for a snapshot, and the snapshot is queued behind many other snapshots).
// In such a case, we don't want all requests arriving at this node to be
// blocked on this lease acquisition (which is very likely to eventually
// fail anyway).
//
// Thus, we do one of two things:
// - if the leader is known, we reject this proposal and make sure the
// request that needed the lease is redirected to the leaseholder;
// - if the leader is not known, we don't do anything special here to
// terminate the proposal, but we know that Raft will reject it with a
// ErrProposalDropped. We'll eventually re-propose it once a leader is
// known, at which point it will either go through or be rejected based on
// whether or not it is this replica that became the leader.
//
// A special case is when the leader is known, but is ineligible to get the
// lease. In that case, we have no choice but to continue with the proposal.
//
// Lease extensions for a currently held lease always go through, to
// keep the lease alive until the normal lease transfer mechanism can
// colocate it with the leader.
li := b.leaderStatusRLocked(ctx, raftGroup)
if li.iAmTheLeader {
return false
}
leaderKnownAndEligible := li.leaderKnown && li.leaderEligibleForLease
ownsCurrentLease := b.p.ownsValidLeaseRLocked(ctx, b.clock.NowAsClockTimestamp())
if leaderKnownAndEligible && !ownsCurrentLease && !b.testing.allowLeaseProposalWhenNotLeader {
log.VEventf(ctx, 2, "not proposing lease acquisition because we're not the leader; replica %d is",
li.leader)
b.p.rejectProposalWithRedirectLocked(ctx, p, li.leader)
return true
}
// If the leader is not known, or if it is known but it's ineligible
// for the lease, continue with the proposal as explained above. We
// also send lease extensions for an existing leaseholder.
if ownsCurrentLease {
log.VEventf(ctx, 2, "proposing lease extension even though we're not the leader; we hold the current lease")
} else if !li.leaderKnown {
log.VEventf(ctx, 2, "proposing lease acquisition even though we're not the leader; the leader is unknown")
} else {
log.VEventf(ctx, 2, "proposing lease acquisition even though we're not the leader; the leader is ineligible")
}
return false

default:
return false
}
}

// leaderStatusRLocked returns the rangeLeaderInfo for the provided raft group,
// or an empty rangeLeaderInfo if the raftGroup is nil.
func (b *propBuf) leaderStatusRLocked(ctx context.Context, raftGroup proposerRaft) rangeLeaderInfo {
if raftGroup == nil {
return rangeLeaderInfo{}
}
leaderInfo := b.p.leaderStatusRLocked(raftGroup)
// Sanity check.
if leaderInfo.leaderKnown && leaderInfo.leader == b.p.getReplicaID() &&
!leaderInfo.iAmTheLeader {
log.Fatalf(ctx,
"inconsistent Raft state: state %s while the current replica is also the lead: %d",
raftGroup.BasicStatus().RaftState, leaderInfo.leader)
}
return leaderInfo
}

// allocateLAIAndClosedTimestampLocked computes a LAI and closed timestamp to be
// carried by an outgoing proposal.
//
Expand Down

0 comments on commit 8efa949

Please sign in to comment.