Skip to content

Commit

Permalink
storage: Remove proposalIllegalLeaseIndex result from refreshProposals
Browse files Browse the repository at this point in the history
This adds considerable complexity for a rare edge case (not exercised
by our test suite). The normal case for illegal lease indexes is for a
retry to be triggered in processRaftCommand. Retrying in
refreshProposals is a slight optimization to allow the retry to start
earlier, but this is not common enough to optimize.

Release note: None
  • Loading branch information
bdarnell committed Mar 17, 2019
1 parent 92f70d7 commit c063211
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 82 deletions.
128 changes: 46 additions & 82 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,6 @@ func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftR
log.Fatalf(context.TODO(), "refreshAtDelta specified for reason %s != reasonTicks", reason)
}

numShouldRetry := 0
var reproposals pendingCmdSlice
for _, p := range r.mu.proposals {
if p.command.MaxLeaseIndex == 0 {
Expand All @@ -1019,75 +1018,43 @@ func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftR
roachpb.NewAmbiguousResultError(
fmt.Sprintf("unknown status for command without MaxLeaseIndex "+
"at refreshProposalsLocked time (refresh reason: %s)", reason)))})
} else if cannotApplyAnyMore := !p.command.ReplicatedEvalResult.IsLeaseRequest &&
p.command.MaxLeaseIndex <= r.mu.state.LeaseAppliedIndex; cannotApplyAnyMore {
// The command's designated lease index slot was filled up. We got to
// LeaseAppliedIndex and p is still pending in r.mu.proposals; generally
// this means that proposal p didn't commit, and it will be sent back to
// the proposer for a retry - the request needs to be re-evaluated and the
// command re-proposed with a new MaxLeaseIndex. Note that this branch is not
// taken for leases as their MaxLeaseIndex plays no role in deciding whether
// they can apply or not -- the sequence number of the previous lease matters.
//
// An exception is the case when we're refreshing because of
// reasonSnapshotApplied - in that case we don't know if p or some other
// command filled the p.command.MaxLeaseIndex slot (i.e. p might have been
// applied, but we're not watching for every proposal when applying a
// snapshot, so nobody removed p from r.mu.proposals). In this
// ambiguous case, we return an ambiguous error.
//
// NOTE: We used to perform a re-evaluation here in order to avoid the
// ambiguity, but it was cumbersome because the higher layer needed to be
// aware that if the re-evaluation failed in any way, it must rewrite the
// error to reflect the ambiguity. This mechanism also proved very hard to
// test and also is of questionable utility since snapshots are only
// applied in the first place if the leaseholder is divorced from the Raft
// leader.
//
// Note that we can't use the commit index here (which is typically a
// little ahead), because a pending command is removed only as it
// applies. Thus we'd risk reproposing a command that has been committed
// but not yet applied.
r.cleanupFailedProposalLocked(p)
log.Eventf(p.ctx, "retry proposal %x: %s", p.idKey, reason)
if reason == reasonSnapshotApplied {
continue
}
switch reason {
case reasonSnapshotApplied:
// If we applied a snapshot, check the MaxLeaseIndexes of all
// pending commands to see if any are now prevented from
// applying, and if so make them return an ambiguous error. We
// can't tell at this point (which should be rare) whether they
// were included in the snapshot we received or not.
if p.command.MaxLeaseIndex <= r.mu.state.LeaseAppliedIndex {
r.cleanupFailedProposalLocked(p)
log.Eventf(p.ctx, "retry proposal %x: %s", p.idKey, reason)
p.finishApplication(proposalResult{Err: roachpb.NewError(
roachpb.NewAmbiguousResultError(
fmt.Sprintf("unable to determine whether command was applied via snapshot")))})
} else {
p.finishApplication(proposalResult{ProposalRetry: proposalIllegalLeaseIndex})
}
numShouldRetry++
} else if reason == reasonTicks && p.proposedAtTicks > r.mu.ticks-refreshAtDelta {
// The command was proposed too recently, don't bother reproprosing it
// yet.
} else {
// The proposal can still apply according to its MaxLeaseIndex. But it's
// likely that the proposal was dropped, so we're going to repropose it
// directly; it's cheaper than asking the proposer to re-evaluate and
// re-propose. Note that we don't need to worry about receiving ambiguous
// responses for this reproposal - we'll only be handling one response
// across the original proposal and the reproposal because we're
// reproposing with the same MaxLeaseIndex and the same idKey as the
// original (a possible second response for the same idKey would be
// ignored).
//
// This situation happens when we've proposed commands that never made it
// to the leader - usually because we didn't know who the leader is. When
// we do discover the leader we need to repropose the command. In local
// testing, by far the most common reason for these reproposals is the
// initial leader election after a split. Specifically, when a range is
// split when the next command is proposed to the RHS the leader is
// elected. After the election completes the command is reproposed for
// both reasonNewLeader and reasonNewLeaderOrConfigChange.
continue

case reasonTicks:
if p.proposedAtTicks <= r.mu.ticks-refreshAtDelta {
// The command was proposed a while ago and may have been dropped. Try it again.
reproposals = append(reproposals, p)
}

default:
// We have reason to believe that all pending proposals were
// dropped on the floor (e.g. because of a leader election), so
// repropose everything.
reproposals = append(reproposals, p)
}
}
if log.V(1) && (numShouldRetry > 0 || len(reproposals) > 0) {

if log.V(1) && len(reproposals) > 0 {
ctx := r.AnnotateCtx(context.TODO())
log.Infof(ctx,
"pending commands: sent %d back to client, reproposing %d (at %d.%d) %s",
numShouldRetry, len(reproposals), r.mu.state.RaftAppliedIndex,
"pending commands: reproposing %d (at %d.%d) %s",
len(reproposals), r.mu.state.RaftAppliedIndex,
r.mu.state.LeaseAppliedIndex, reason)
}

Expand Down Expand Up @@ -2076,41 +2043,38 @@ func (r *Replica) processRaftCommand(
}

if proposedLocally {
if response.ProposalRetry == proposalIllegalLeaseIndex {
// If we failed to apply at the right lease index, try again with a new one.
r.mu.Lock()
lease := *r.mu.state.Lease
r.mu.Unlock()
if lease.OwnedBy(r.StoreID()) {
// Some tests check for this log message in the trace.
log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex")
// Use a goroutine so this doesn't count as "marshaling a proto downstream of raft".
go proposeAfterIllegalLeaseIndex(ctx, r, proposal, *r.mu.state.Lease)
return false
}
} else {
proposal.finishApplication(response)
// If we failed to apply at the right lease index, try again with a new one.
if response.ProposalRetry == proposalIllegalLeaseIndex && r.tryReproposeWithNewLeaseIndex(proposal) {
return false
}
// Otherwise, signal the command's status to the client.
proposal.finishApplication(response)
} else if response.Err != nil {
log.VEventf(ctx, 1, "applying raft command resulted in error: %s", response.Err)
}

return raftCmd.ReplicatedEvalResult.ChangeReplicas != nil
}

// proposeAfterIllegalLeaseIndex is used by processRaftCommand to
// tryReproposeWithNewLeaseIndex is used by processRaftCommand to
// repropose commands that have gotten an illegal lease index error.
// It is not intended for use elsewhere and is only a top-level
// function so that it can avoid the below_raft_protos check.
func proposeAfterIllegalLeaseIndex(
ctx context.Context, r *Replica, proposal *ProposalData, lease roachpb.Lease,
) {
// Returns true if the command was successfully reproposed.
func (r *Replica) tryReproposeWithNewLeaseIndex(proposal *ProposalData) bool {
r.mu.Lock()
defer r.mu.Unlock()
_, pErr := r.proposeLocked(ctx, proposal, lease)
if pErr != nil {
panic(pErr)
lease := *r.mu.state.Lease
if lease.OwnedBy(r.StoreID()) && lease.Sequence == proposal.command.ProposerLeaseSequence {
// Some tests check for this log message in the trace.
log.VEventf(proposal.ctx, 2, "retry: proposalIllegalLeaseIndex")
if _, pErr := r.proposeLocked(proposal.ctx, proposal, lease); pErr != nil {
log.Warningf(proposal.ctx, "failed to repropose with new lease index: %s", pErr)
return false
}
return true
}
return false
}

// maybeAcquireSnapshotMergeLock checks whether the incoming snapshot subsumes
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/track_raft_protos.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ func TrackRaftProtos() func() []reflect.Type {
// but tombstones are unreplicated and thus not subject to the strict
// consistency requirements.
funcName((*Replica).setTombstoneKey),
// tryReproposeWithNewLeaseIndex is only run on the replica that
// proposed the command.
funcName((*Replica).tryReproposeWithNewLeaseIndex),
}

belowRaftProtos := struct {
Expand Down

0 comments on commit c063211

Please sign in to comment.