Skip to content

Commit

Permalink
storage: prevent command reproposal with new lease index after applic…
Browse files Browse the repository at this point in the history
…ation

Fixes #39018.
Fixes #37810.
May fix other tests.

This commit fixes a bug introduced in e4ce717 which allowed a single Raft
proposal to be applied multiple times at multiple applied indexes. The
bug was possible if a raft proposal was reproposed twice, once with the
same max lease index and once with a new max lease index. Because there
are two entries for the same proposal, one necessarily has to have an
invalid max lease applied index (see the invariant in https://github.com/cockroachdb/cockroach/blob/5cbc4b50712546465de75dba69a1c0cdd1fe2f87/pkg/storage/replica_raft.go#L1430)
If these two entries happened to land in the same application batch on
the leaseholder then the first entry would be rejected and the second
would apply. The replicas LeaseAppliedIndex would then be bumped all the
way to the max lease index of the entry that applied. The bug occurred when
the first entry (who must have hit a proposalIllegalLeaseIndex), called into
tryReproposeWithNewLeaseIndex. The ProposalData's MaxLeaseIndex would be
equal to the Replica's LeaseAppliedIndex because it would match the index
in the successful entry. We would then repropose the proposal with a larger
lease applied index. This new entry could then apply and result in duplicate
entry application.

Luckily, rangefeed's intent reference counting was sensitive enough that it
caught this duplicate entry application and panicked loudly. Other tests might
also be failing because of it but might not have as obvious of symptoms when
they hit the bug.

In addition to this primary bug fix, this commit has a secondary effect of
fixing an issue where two entries for the same command could be in the same
batch and only one would be linked to its ProposalData struct and be considered
locally proposed (see the change in retrieveLocalProposals). I believe that this
would prevent the command from being properly acknowledged if the first entry
was rejected due to its max lease index and the second entry had a larger max
lease index and applied. I think the first entry would have eventually hit the
check in tryReproposeWithNewLeaseIndex and observed that the linked ProposalData
had a new MaxLeaseIndex so it would have added it back to the proposal map, but
then it would have had to wait for refreshProposalsLocked to refresh the
proposal, at which point this refresh would have hit a lease index error and
would be reproposed at a higher index. Not only could this cause duplicate
versions of the same command to apply (described above), but I think this could
even loop forever without acknowledging the client. It seems like there should
be a way for this to cause #39022, but it doesn't exactly line up.

Again, these kinds of cases will be easier to test once we properly mock out
these interfaces in #38954. I'm working on that, but I don't think it should
hold up the next alpha (#39036). However, this commit does address a TODO to
properly handle errors during tryReproposeWithNewLeaseIndex reproposals and
that is properly tested.

My debugging process to track this down was to repeatedly run a set of 10
`cdc/ledger/rangefeed=true` roachtests after reducing its duration down to
5m. Usually, at least one of these tests would hit the `negative refcount`
assertion. I then incrementally added more and more logging around entry
application until I painted a full picture of which logical ops were being
consumed by the rangefeed processor and why the same raft command was being
applied twice (once it became clear that one was). After a few more rounds
of fine-tuning the logging, the interaction with reproposals with a new max
lease index became clear.

Release note: None
  • Loading branch information
nvanbenschoten committed Jul 24, 2019
1 parent 2d9630a commit aada4fc
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 101 deletions.
58 changes: 49 additions & 9 deletions pkg/storage/replica_application.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,25 +154,65 @@ func (r *Replica) retrieveLocalProposals(ctx context.Context, b *cmdAppBatch) {
// Copy stats as it gets updated in-place in applyRaftCommandToBatch.
b.replicaState.Stats = &b.stats
*b.replicaState.Stats = *r.mu.state.Stats
// Assign all the local proposals first then delete all of them from the map
// in a second pass. This ensures that we retrieve all proposals correctly
// even if the batch has multiple entries for the same proposal, in which
// case the proposal was reproposed (either under its original or a new
// MaxLeaseIndex) which we handle in a second pass below.
var anyLocal bool
var it cmdAppCtxBufIterator
haveProposalQuota := r.mu.proposalQuota != nil
for ok := it.init(&b.cmdBuf); ok; ok = it.next() {
cmd := it.cur()
cmd.proposal = r.mu.proposals[cmd.idKey]
if cmd.proposedLocally() {
// We initiated this command, so use the caller-supplied context.
cmd.ctx = cmd.proposal.ctx
delete(r.mu.proposals, cmd.idKey)
// At this point we're not guaranteed to have proposalQuota initialized,
// the same is true for quotaReleaseQueues. Only queue the proposal's
// quota for release if the proposalQuota is initialized.
if haveProposalQuota {
r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, cmd.proposal.quotaSize)
}
anyLocal = true
} else {
cmd.ctx = ctx
}
}
if !anyLocal {
// Fast-path.
return
}
for ok := it.init(&b.cmdBuf); ok; ok = it.next() {
cmd := it.cur()
if !cmd.proposedLocally() {
continue
}
if cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex {
// If this entry does not have the most up-to-date view of the
// corresponding proposal's maximum lease index then the proposal
// must have been reproposed with a higher lease index. (see
// tryReproposeWithNewLeaseIndex). In that case, there's a newer
// version of the proposal in the pipeline, so don't remove the
// proposal from the map. We expect this entry to be rejected by
// checkForcedErr.
continue
}
// Delete the proposal from the proposals map. There may be reproposals
// of the proposal in the pipeline, but those will all have the same max
// lease index, meaning that they will all be rejected after this entry
// applies (successfully or otherwise). If tryReproposeWithNewLeaseIndex
// picks up the proposal on failure, it will re-add the proposal to the
// proposal map, but this won't affect anything in this cmdAppBatch.
//
// While here, add the proposal's quota size to the quota release queue.
// We check the proposal map again first to avoid double free-ing quota
// when reproposals from the same proposal end up in the same entry
// application batch.
if _, ok := r.mu.proposals[cmd.idKey]; !ok {
continue
}
delete(r.mu.proposals, cmd.idKey)
// At this point we're not guaranteed to have proposalQuota initialized,
// the same is true for quotaReleaseQueues. Only queue the proposal's
// quota for release if the proposalQuota is initialized.
if r.mu.proposalQuota != nil {
r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, cmd.proposal.quotaSize)
}
}
}

// stageRaftCommand handles the first phase of applying a command to the
Expand Down Expand Up @@ -856,7 +896,7 @@ func (r *Replica) applyCmdAppBatch(
}
cmd.replicatedResult().SuggestedCompactions = nil
isNonTrivial := batchIsNonTrivial && it.isLast()
if errExpl, err = r.handleRaftCommandResult(ctx, cmd, isNonTrivial,
if errExpl, err = r.handleRaftCommandResult(cmd.ctx, cmd, isNonTrivial,
b.replicaState.UsingAppliedStateKey); err != nil {
return errExpl, err
}
Expand Down
129 changes: 90 additions & 39 deletions pkg/storage/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,19 @@ func (r *Replica) handleRaftCommandResult(
) (errExpl string, err error) {
// Set up the local result prior to handling the ReplicatedEvalResult to
// give testing knobs an opportunity to inspect it.
r.prepareLocalResult(cmd.ctx, cmd)
r.prepareLocalResult(ctx, cmd)
if log.ExpensiveLogEnabled(ctx, 2) {
log.VEvent(ctx, 2, cmd.localResult.String())
}

// Handle the ReplicatedEvalResult, executing any side effects of the last
// state machine transition.
//
// Note that this must happen after committing (the engine.Batch), but
// before notifying a potentially waiting client.
clearTrivialReplicatedEvalResultFields(cmd.replicatedResult(), usingAppliedStateKey)
if isNonTrivial {
r.handleComplexReplicatedEvalResult(cmd.ctx, *cmd.replicatedResult())
r.handleComplexReplicatedEvalResult(ctx, *cmd.replicatedResult())
} else if !cmd.replicatedResult().Equal(storagepb.ReplicatedEvalResult{}) {
log.Fatalf(ctx, "failed to handle all side-effects of ReplicatedEvalResult: %v",
cmd.replicatedResult())
Expand All @@ -186,13 +190,13 @@ func (r *Replica) handleRaftCommandResult(
}

if cmd.localResult != nil {
r.handleLocalEvalResult(cmd.ctx, *cmd.localResult)
r.handleLocalEvalResult(ctx, *cmd.localResult)
}
r.finishRaftCommand(cmd.ctx, cmd)
r.finishRaftCommand(ctx, cmd)
switch cmd.e.Type {
case raftpb.EntryNormal:
if cmd.replicatedResult().ChangeReplicas != nil {
log.Fatalf(cmd.ctx, "unexpected replication change from command %s", &cmd.raftCmd)
log.Fatalf(ctx, "unexpected replication change from command %s", &cmd.raftCmd)
}
case raftpb.EntryConfChange:
if cmd.replicatedResult().ChangeReplicas == nil {
Expand Down Expand Up @@ -307,6 +311,10 @@ func (r *Replica) handleComplexReplicatedEvalResult(
// result will zero-out the struct to ensure that is has properly performed all
// of the implied side-effects.
func (r *Replica) prepareLocalResult(ctx context.Context, cmd *cmdAppCtx) {
if !cmd.proposedLocally() {
return
}

var pErr *roachpb.Error
if filter := r.store.cfg.TestingKnobs.TestingPostApplyFilter; filter != nil {
var newPropRetry int
Expand All @@ -328,36 +336,90 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *cmdAppCtx) {
pErr = cmd.forcedErr
}

if cmd.proposedLocally() {
if cmd.proposalRetry != proposalNoReevaluation && pErr == nil {
log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", cmd.proposal)
}
if pErr != nil {
// A forced error was set (i.e. we did not apply the proposal,
// for instance due to its log position) or the Replica is now
// corrupted.
// If proposalRetry is set, we don't also return an error, as per the
// proposalResult contract.
if cmd.proposalRetry == proposalNoReevaluation {
if cmd.proposalRetry != proposalNoReevaluation && pErr == nil {
log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", cmd.proposal)
}
if pErr != nil {
// A forced error was set (i.e. we did not apply the proposal,
// for instance due to its log position) or the Replica is now
// corrupted.
switch cmd.proposalRetry {
case proposalNoReevaluation:
cmd.response.Err = pErr
case proposalIllegalLeaseIndex:
// If we failed to apply at the right lease index, try again with a
// new one. This is important for pipelined writes, since they don't
// have a client watching to retry, so a failure to eventually apply
// the proposal would be a user-visible error.
pErr = r.tryReproposeWithNewLeaseIndex(ctx, cmd)
if pErr != nil {
cmd.response.Err = pErr
} else {
// Unbind the entry's local proposal because we just succeeded
// in reproposing it and we don't want to acknowledge the client
// yet.
cmd.proposal = nil
return
}
} else if cmd.proposal.Local.Reply != nil {
cmd.response.Reply = cmd.proposal.Local.Reply
} else {
log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", cmd.proposal)
}
cmd.response.Intents = cmd.proposal.Local.DetachIntents()
cmd.response.EndTxns = cmd.proposal.Local.DetachEndTxns(pErr != nil)
if pErr == nil {
cmd.localResult = cmd.proposal.Local
default:
panic("unexpected")
}
} else if cmd.proposal.Local.Reply != nil {
cmd.response.Reply = cmd.proposal.Local.Reply
} else {
log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", cmd.proposal)
}
if pErr != nil && cmd.localResult != nil {
cmd.response.Intents = cmd.proposal.Local.DetachIntents()
cmd.response.EndTxns = cmd.proposal.Local.DetachEndTxns(pErr != nil)
if pErr == nil {
cmd.localResult = cmd.proposal.Local
} else if cmd.localResult != nil {
log.Fatalf(ctx, "shouldn't have a local result if command processing failed. pErr: %s", pErr)
}
if log.ExpensiveLogEnabled(ctx, 2) {
log.VEvent(ctx, 2, cmd.localResult.String())
}

// tryReproposeWithNewLeaseIndex is used by prepareLocalResult to repropose
// commands that have gotten an illegal lease index error, and that we know
// could not have applied while their lease index was valid (that is, we
// observed all applied entries between proposal and the lease index becoming
// invalid, as opposed to skipping some of them by applying a snapshot).
//
// It is not intended for use elsewhere and is only a top-level function so that
// it can avoid the below_raft_protos check. Returns a nil error if the command
// has already been successfully applied or has been reproposed here or by a
// different entry for the same proposal that hit an illegal lease index error.
func (r *Replica) tryReproposeWithNewLeaseIndex(
ctx context.Context, cmd *cmdAppCtx,
) *roachpb.Error {
// Note that we don't need to validate anything about the proposal's
// lease here - if we got this far, we know that everything but the
// index is valid at this point in the log.
p := cmd.proposal
if p.applied || cmd.raftCmd.MaxLeaseIndex != p.command.MaxLeaseIndex {
// If the command associated with this rejected raft entry already
// applied then we don't want to repropose it. Doing so could lead
// to duplicate application of the same proposal.
//
// Similarly, if the command associated with this rejected raft
// entry has a different (larger) MaxLeaseIndex than the one we
// decoded from the entry itself, the command must have already
// been reproposed (this can happen if there are multiple copies
// of the command in the logs; see TestReplicaRefreshMultiple).
// We must not create multiple copies with multiple lease indexes,
// so don't repropose it again. This ensures that at any time,
// there is only up to a single lease index that has a chance of
// succeeding in the Raft log for a given command.
return nil
}
// Some tests check for this log message in the trace.
log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex")
maxLeaseIndex, pErr := r.propose(ctx, p)
if pErr != nil {
log.Warningf(ctx, "failed to repropose with new lease index: %s", pErr)
return pErr
}
log.VEventf(ctx, 2, "reproposed command %x at maxLeaseIndex=%d", cmd.idKey, maxLeaseIndex)
return nil
}

// finishRaftCommand is called after a command's side effects have been applied
Expand Down Expand Up @@ -395,17 +457,6 @@ func (r *Replica) finishRaftCommand(ctx context.Context, cmd *cmdAppCtx) {
}

if cmd.proposedLocally() {
// If we failed to apply at the right lease index, try again with
// a new one. This is important for pipelined writes, since they
// don't have a client watching to retry, so a failure to
// eventually apply the proposal would be a uservisible error.
// TODO(nvanbenschoten): This reproposal is not tracked by the
// quota pool. We should fix that.
if cmd.proposalRetry == proposalIllegalLeaseIndex &&
r.tryReproposeWithNewLeaseIndex(cmd.proposal) {
return
}
// Otherwise, signal the command's status to the client.
cmd.proposal.finishApplication(cmd.response)
} else if cmd.response.Err != nil {
log.VEventf(ctx, 1, "applying raft command resulted in error: %s", cmd.response.Err)
Expand Down
21 changes: 20 additions & 1 deletion pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ type ProposalData struct {
// cache and release latches.
ec endCmds

// applied is set when the a command finishes application. It is used to
// avoid reproposing a failed proposal if an earlier version of the same
// proposal succeeded in applying.
applied bool

// doneCh is used to signal the waiting RPC handler (the contents of
// proposalResult come from LocalEvalResult).
//
Expand Down Expand Up @@ -122,11 +127,25 @@ type ProposalData struct {
// is canceled, it won't be listening to this done channel, and so it can't be
// counted on to invoke endCmds itself.)
func (proposal *ProposalData) finishApplication(pr proposalResult) {
if proposal.applied {
// If the command already applied then we shouldn't be "finishing" its
// application again because it should only be able to apply successfully
// once. We expect that when any reproposal for the same command attempts
// to apply it will be rejected by the below raft lease sequence or lease
// index check in checkForcedErr.
if pr.Err != nil {
return
}
log.Fatalf(proposal.ctx,
"command already applied: %+v; unexpected successful result: %+v", proposal, pr)
}
proposal.applied = true
proposal.ec.done(proposal.Request, pr.Reply, pr.Err)
proposal.signalProposalResult(pr)
if proposal.sp != nil {
tracing.FinishSpan(proposal.sp)
proposal.sp = nil
}
proposal.signalProposalResult(pr)
}

// returnProposalResult signals proposal.doneCh with the proposal result if it
Expand Down
50 changes: 0 additions & 50 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1404,56 +1404,6 @@ func (m lastUpdateTimesMap) isFollowerActive(
return now.Sub(lastUpdateTime) <= MaxQuotaReplicaLivenessDuration
}

// tryReproposeWithNewLeaseIndex is used by processRaftCommand to
// repropose commands that have gotten an illegal lease index error,
// and that we know could not have applied while their lease index was
// valid (that is, we observed all applied entries between proposal
// and the lease index becoming invalid, as opposed to skipping some
// of them by applying a snapshot).
//
// It is not intended for use elsewhere and is only a top-level
// function so that it can avoid the below_raft_protos check. Returns
// true if the command has been successfully reproposed (not
// necessarily by this method! But if this method returns true, the
// command will be in the local proposals map).
func (r *Replica) tryReproposeWithNewLeaseIndex(proposal *ProposalData) bool {
// Note that we don't need to validate anything about the proposal's
// lease here - if we got this far, we know that everything but the
// index is valid at this point in the log.
r.mu.Lock()
if proposal.command.MaxLeaseIndex > r.mu.state.LeaseAppliedIndex {
// If the command's MaxLeaseIndex is greater than the
// LeaseAppliedIndex, it must have already been reproposed (this
// can happen if there are multiple copies of the command in the
// logs; see TestReplicaRefreshMultiple). We must not create
// multiple copies with multiple lease indexes, so don't repropose
// it again. This ensures that at any time, there is only up to a
// single lease index that has a chance of succeeding in the Raft
// log for a given command.
//
// Note that the caller has already removed the current version of
// the proposal from the pending proposals map. We must re-add it
// since it's still pending.
log.VEventf(proposal.ctx, 2, "skipping reproposal, already reproposed at index %d",
proposal.command.MaxLeaseIndex)
r.mu.proposals[proposal.idKey] = proposal
r.mu.Unlock()
return true
}
r.mu.Unlock()

// Some tests check for this log message in the trace.
log.VEventf(proposal.ctx, 2, "retry: proposalIllegalLeaseIndex")
if _, pErr := r.propose(proposal.ctx, proposal); pErr != nil {
// TODO(nvanbenschoten): Returning false here isn't ok. It will result
// in a proposal returning without a response or an error, which
// triggers a panic higher up in the stack. We need to fix this.
log.Warningf(proposal.ctx, "failed to repropose with new lease index: %s", pErr)
return false
}
return true
}

// maybeAcquireSnapshotMergeLock checks whether the incoming snapshot subsumes
// any replicas and, if so, locks them for subsumption. See acquireMergeLock
// for details about the lock itself.
Expand Down
Loading

0 comments on commit aada4fc

Please sign in to comment.