diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index e9897343ef17..b5b0a0800b20 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -389,20 +389,8 @@ func (b *propBuf) FlushLockedWithRaftGroup( buf := b.arr.asSlice()[:used] ents := make([]raftpb.Entry, 0, used) - // Figure out leadership info. We'll use it to conditionally drop some - // requests. - var leaderInfo rangeLeaderInfo - if raftGroup != nil { - leaderInfo = b.p.leaderStatusRLocked(raftGroup) - // Sanity check. - if leaderInfo.leaderKnown && leaderInfo.leader == b.p.getReplicaID() && - !leaderInfo.iAmTheLeader { - log.Fatalf(ctx, - "inconsistent Raft state: state %s while the current replica is also the lead: %d", - raftGroup.BasicStatus().RaftState, leaderInfo.leader) - } - } - + // Compute the closed timestamp target, which will be used to assign a closed + // timestamp to all proposals in this batch. closedTSTarget := b.p.closedTimestampTarget() // Remember the first error that we see when proposing the batch. We don't @@ -418,58 +406,10 @@ func (b *propBuf) FlushLockedWithRaftGroup( buf[i] = nil // clear buffer reproposal := !p.tok.stillTracked() - // Handle an edge case about lease acquisitions: we don't want to forward - // lease acquisitions to another node (which is what happens when we're not - // the leader) because: - // a) if there is a different leader, that leader should acquire the lease - // itself and thus avoid a change of leadership caused by the leaseholder - // and leader being different (Raft leadership follows the lease), and - // b) being a follower, it's possible that this replica is behind in - // applying the log. Thus, there might be another lease in place that this - // follower doesn't know about, in which case the lease we're proposing here - // would be rejected. Not only would proposing such a lease be wasted work, - // but we're trying to protect against pathological cases where it takes a - // long time for this follower to catch up (for example because it's waiting - // for a snapshot, and the snapshot is queued behind many other snapshots). - // In such a case, we don't want all requests arriving at this node to be - // blocked on this lease acquisition (which is very likely to eventually - // fail anyway). - // - // Thus, we do one of two things: - // - if the leader is known, we reject this proposal and make sure the - // request that needed the lease is redirected to the leaseholder; - // - if the leader is not known, we don't do anything special here to - // terminate the proposal, but we know that Raft will reject it with a - // ErrProposalDropped. We'll eventually re-propose it once a leader is - // known, at which point it will either go through or be rejected based on - // whether or not it is this replica that became the leader. - // - // A special case is when the leader is known, but is ineligible to get the - // lease. In that case, we have no choice but to continue with the proposal. - // - // Lease extensions for a currently held lease always go through, to - // keep the lease alive until the normal lease transfer mechanism can - // colocate it with the leader. - if !leaderInfo.iAmTheLeader && p.Request.IsSingleRequestLeaseRequest() { - leaderKnownAndEligible := leaderInfo.leaderKnown && leaderInfo.leaderEligibleForLease - ownsCurrentLease := b.p.ownsValidLeaseRLocked(ctx, b.clock.NowAsClockTimestamp()) - if leaderKnownAndEligible && !ownsCurrentLease && !b.testing.allowLeaseProposalWhenNotLeader { - log.VEventf(ctx, 2, "not proposing lease acquisition because we're not the leader; replica %d is", - leaderInfo.leader) - b.p.rejectProposalWithRedirectLocked(ctx, p, leaderInfo.leader) - p.tok.doneIfNotMovedLocked(ctx) - continue - } - // If the leader is not known, or if it is known but it's ineligible - // for the lease, continue with the proposal as explained above. We - // also send lease extensions for an existing leaseholder. - if ownsCurrentLease { - log.VEventf(ctx, 2, "proposing lease extension even though we're not the leader; we hold the current lease") - } else if !leaderInfo.leaderKnown { - log.VEventf(ctx, 2, "proposing lease acquisition even though we're not the leader; the leader is unknown") - } else { - log.VEventf(ctx, 2, "proposing lease acquisition even though we're not the leader; the leader is ineligible") - } + // Conditionally reject the proposal based on the state of the raft group. + if b.maybeRejectUnsafeProposalLocked(ctx, raftGroup, p) { + p.tok.doneIfNotMovedLocked(ctx) + continue } // Raft processing bookkeeping. @@ -585,6 +525,105 @@ func (b *propBuf) FlushLockedWithRaftGroup( return used, proposeBatch(raftGroup, b.p.getReplicaID(), ents) } +// maybeRejectUnsafeProposalLocked conditionally rejects proposals that are +// deemed unsafe, given the current state of the raft group. Requests that may +// be deemed unsafe and rejected at this level are those whose safety has some +// dependency on raft leadership, follower progress, leadership term, commit +// index, or other properties of raft. By rejecting these requests on the +// "flushing" side of the proposal buffer (i.e. while holding the raftMu), we +// can perform the safety checks without risk of the state of the raft group +// changing before the proposal is passed to etcd/raft. +// +// Currently, the request types which may be rejected by this function are: +// - RequestLease when the proposer is not the raft leader (with caveats). +// +// The function returns true if the proposal was rejected, and false if not. +// If the proposal was rejected and true is returned, it will have been cleaned +// up (passed to Replica.cleanupFailedProposalLocked) and finished +// (ProposalData.finishApplication called). +func (b *propBuf) maybeRejectUnsafeProposalLocked( + ctx context.Context, raftGroup proposerRaft, p *ProposalData, +) (rejected bool) { + switch { + case p.Request.IsSingleRequestLeaseRequest(): + // Handle an edge case about lease acquisitions: we don't want to forward + // lease acquisitions to another node (which is what happens when we're not + // the leader) because: + // a) if there is a different leader, that leader should acquire the lease + // itself and thus avoid a change of leadership caused by the leaseholder + // and leader being different (Raft leadership follows the lease), and + // b) being a follower, it's possible that this replica is behind in + // applying the log. Thus, there might be another lease in place that this + // follower doesn't know about, in which case the lease we're proposing here + // would be rejected. Not only would proposing such a lease be wasted work, + // but we're trying to protect against pathological cases where it takes a + // long time for this follower to catch up (for example because it's waiting + // for a snapshot, and the snapshot is queued behind many other snapshots). + // In such a case, we don't want all requests arriving at this node to be + // blocked on this lease acquisition (which is very likely to eventually + // fail anyway). + // + // Thus, we do one of two things: + // - if the leader is known, we reject this proposal and make sure the + // request that needed the lease is redirected to the leaseholder; + // - if the leader is not known, we don't do anything special here to + // terminate the proposal, but we know that Raft will reject it with a + // ErrProposalDropped. We'll eventually re-propose it once a leader is + // known, at which point it will either go through or be rejected based on + // whether or not it is this replica that became the leader. + // + // A special case is when the leader is known, but is ineligible to get the + // lease. In that case, we have no choice but to continue with the proposal. + // + // Lease extensions for a currently held lease always go through, to + // keep the lease alive until the normal lease transfer mechanism can + // colocate it with the leader. + li := b.leaderStatusRLocked(ctx, raftGroup) + if li.iAmTheLeader { + return false + } + leaderKnownAndEligible := li.leaderKnown && li.leaderEligibleForLease + ownsCurrentLease := b.p.ownsValidLeaseRLocked(ctx, b.clock.NowAsClockTimestamp()) + if leaderKnownAndEligible && !ownsCurrentLease && !b.testing.allowLeaseProposalWhenNotLeader { + log.VEventf(ctx, 2, "not proposing lease acquisition because we're not the leader; replica %d is", + li.leader) + b.p.rejectProposalWithRedirectLocked(ctx, p, li.leader) + return true + } + // If the leader is not known, or if it is known but it's ineligible + // for the lease, continue with the proposal as explained above. We + // also send lease extensions for an existing leaseholder. + if ownsCurrentLease { + log.VEventf(ctx, 2, "proposing lease extension even though we're not the leader; we hold the current lease") + } else if !li.leaderKnown { + log.VEventf(ctx, 2, "proposing lease acquisition even though we're not the leader; the leader is unknown") + } else { + log.VEventf(ctx, 2, "proposing lease acquisition even though we're not the leader; the leader is ineligible") + } + return false + + default: + return false + } +} + +// leaderStatusRLocked returns the rangeLeaderInfo for the provided raft group, +// or an empty rangeLeaderInfo if the raftGroup is nil. +func (b *propBuf) leaderStatusRLocked(ctx context.Context, raftGroup proposerRaft) rangeLeaderInfo { + if raftGroup == nil { + return rangeLeaderInfo{} + } + leaderInfo := b.p.leaderStatusRLocked(raftGroup) + // Sanity check. + if leaderInfo.leaderKnown && leaderInfo.leader == b.p.getReplicaID() && + !leaderInfo.iAmTheLeader { + log.Fatalf(ctx, + "inconsistent Raft state: state %s while the current replica is also the lead: %d", + raftGroup.BasicStatus().RaftState, leaderInfo.leader) + } + return leaderInfo +} + // allocateLAIAndClosedTimestampLocked computes a LAI and closed timestamp to be // carried by an outgoing proposal. //