From 01757dc8cbafade5954116f91c76d59c2f984bbf Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 23 Jul 2019 21:35:53 -0400 Subject: [PATCH] storage: prevent command reproposal with new lease index after application Fixes #39018. Fixes #37810. May fix other tests. This commit fixes a bug introduced in e4ce717 which allowed a single Raft proposal to be applied multiple times at multiple applied indexes. The bug was possible if a raft proposal was reproposed twice, once with the same max lease index and once with a new max lease index. Because there are two entries for the same proposal, one necessarily has to have an invalid max lease applied index (see the invariant in https://github.com/cockroachdb/cockroach/blob/5cbc4b50712546465de75dba69a1c0cdd1fe2f87/pkg/storage/replica_raft.go#L1430) If these two entries happened to land in the same application batch on the leaseholder then the first entry would be rejected and the second would apply. The replicas LeaseAppliedIndex would then be bumped all the way to the max lease index of the entry that applied. The bug occurred when the first entry (who must have hit a proposalIllegalLeaseIndex), called into tryReproposeWithNewLeaseIndex. The ProposalData's MaxLeaseIndex would be equal to the Replica's LeaseAppliedIndex because it would match the index in the successful entry. We would then repropose the proposal with a larger lease applied index. This new entry could then apply and result in duplicate entry application. Luckily, rangefeed's intent reference counting was sensitive enough that it caught this duplicate entry application and panicked loudly. Other tests might also be failing because of it but might not have as obvious of symptoms when they hit the bug. In addition to this primary bug fix, this commit has a secondary effect of fixing an issue where two entries for the same command could be in the same batch and only one would be linked to its ProposalData struct and be considered locally proposed (see the change in retrieveLocalProposals). I believe that this would prevent the command from being properly acknowledged if the first entry was rejected due to its max lease index and the second entry had a larger max lease index and applied. I think the first entry would have eventually hit the check in tryReproposeWithNewLeaseIndex and observed that the linked ProposalData had a new MaxLeaseIndex so it would have added it back to the proposal map, but then it would have had to wait for refreshProposalsLocked to refresh the proposal, at which point this refresh would have hit a lease index error and would be reproposed at a higher index. Not only could this cause duplicate versions of the same command to apply (described above), but I think this could even loop forever without acknowledging the client. It seems like there should be a way for this to cause #39022, but it doesn't exactly line up. Again, these kinds of cases will be easier to test once we properly mock out these interfaces in #38954. I'm working on that, but I don't think it should hold up the next alpha (#39036). However, this commit does address a TODO to properly handle errors during tryReproposeWithNewLeaseIndex reproposals and that is properly tested. Release note: None --- pkg/storage/replica_application.go | 47 ++++++-- pkg/storage/replica_application_result.go | 129 +++++++++++++++------- pkg/storage/replica_proposal.go | 12 +- pkg/storage/replica_raft.go | 50 --------- pkg/storage/replica_test.go | 86 ++++++++++++++- 5 files changed, 223 insertions(+), 101 deletions(-) diff --git a/pkg/storage/replica_application.go b/pkg/storage/replica_application.go index 1685fcb2abb3..728a6d5b46df 100644 --- a/pkg/storage/replica_application.go +++ b/pkg/storage/replica_application.go @@ -154,25 +154,54 @@ func (r *Replica) retrieveLocalProposals(ctx context.Context, b *cmdAppBatch) { // Copy stats as it gets updated in-place in applyRaftCommandToBatch. b.replicaState.Stats = &b.stats *b.replicaState.Stats = *r.mu.state.Stats + // Assign all the local proposals first then delete all of them from the map + // in a second pass. This ensures that we retrieve all proposals correctly + // even if the batch has multiple entries for the same proposal. var it cmdAppCtxBufIterator - haveProposalQuota := r.mu.proposalQuota != nil for ok := it.init(&b.cmdBuf); ok; ok = it.next() { cmd := it.cur() cmd.proposal = r.mu.proposals[cmd.idKey] if cmd.proposedLocally() { // We initiated this command, so use the caller-supplied context. cmd.ctx = cmd.proposal.ctx - delete(r.mu.proposals, cmd.idKey) - // At this point we're not guaranteed to have proposalQuota initialized, - // the same is true for quotaReleaseQueues. Only queue the proposal's - // quota for release if the proposalQuota is initialized. - if haveProposalQuota { - r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, cmd.proposal.quotaSize) - } } else { cmd.ctx = ctx } } + for ok := it.init(&b.cmdBuf); ok; ok = it.next() { + cmd := it.cur() + if !cmd.proposedLocally() { + continue + } + if cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex { + // If this entry does not have the most up-to-date view of the + // corresponding proposal's maximum lease index then the proposal + // must have been reproposed (see tryReproposeWithNewLeaseIndex). + // In that case, there's a newer version of the proposal in the + // pipeline, so don't remove the proposal from the map. We expect + // this entry to be rejected by checkForcedErr. + continue + } + // Delete the proposal from the proposals map. There may be reproposals + // of the proposal in the pipeline, but those will all have the same max + // lease index, meaning that they will all be rejected after this entry + // applies (successfully or otherwise). + // + // While here, add the proposal's quota size to the quota release queue. + // We check the proposal map again first to avoid double free-ing quota + // when reproposals from the same proposal end up in the same entry + // application batch. + if _, ok := r.mu.proposals[cmd.idKey]; !ok { + continue + } + delete(r.mu.proposals, cmd.idKey) + // At this point we're not guaranteed to have proposalQuota initialized, + // the same is true for quotaReleaseQueues. Only queue the proposal's + // quota for release if the proposalQuota is initialized. + if r.mu.proposalQuota != nil { + r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, cmd.proposal.quotaSize) + } + } } // stageRaftCommand handles the first phase of applying a command to the @@ -853,7 +882,7 @@ func (r *Replica) applyCmdAppBatch( } cmd.replicatedResult().SuggestedCompactions = nil isNonTrivial := batchIsNonTrivial && it.isLast() - if errExpl, err = r.handleRaftCommandResult(ctx, cmd, isNonTrivial, + if errExpl, err = r.handleRaftCommandResult(cmd.ctx, cmd, isNonTrivial, b.replicaState.UsingAppliedStateKey); err != nil { return errExpl, err } diff --git a/pkg/storage/replica_application_result.go b/pkg/storage/replica_application_result.go index 8257243626e5..3cf0159e777a 100644 --- a/pkg/storage/replica_application_result.go +++ b/pkg/storage/replica_application_result.go @@ -160,7 +160,11 @@ func (r *Replica) handleRaftCommandResult( ) (errExpl string, err error) { // Set up the local result prior to handling the ReplicatedEvalResult to // give testing knobs an opportunity to inspect it. - r.prepareLocalResult(cmd.ctx, cmd) + r.prepareLocalResult(ctx, cmd) + if log.ExpensiveLogEnabled(ctx, 2) { + log.VEvent(ctx, 2, cmd.localResult.String()) + } + // Handle the ReplicatedEvalResult, executing any side effects of the last // state machine transition. // @@ -168,7 +172,7 @@ func (r *Replica) handleRaftCommandResult( // before notifying a potentially waiting client. clearTrivialReplicatedEvalResultFields(cmd.replicatedResult(), usingAppliedStateKey) if isNonTrivial { - r.handleComplexReplicatedEvalResult(cmd.ctx, *cmd.replicatedResult()) + r.handleComplexReplicatedEvalResult(ctx, *cmd.replicatedResult()) } else if !cmd.replicatedResult().Equal(storagepb.ReplicatedEvalResult{}) { log.Fatalf(ctx, "failed to handle all side-effects of ReplicatedEvalResult: %v", cmd.replicatedResult()) @@ -186,13 +190,13 @@ func (r *Replica) handleRaftCommandResult( } if cmd.localResult != nil { - r.handleLocalEvalResult(cmd.ctx, *cmd.localResult) + r.handleLocalEvalResult(ctx, *cmd.localResult) } - r.finishRaftCommand(cmd.ctx, cmd) + r.finishRaftCommand(ctx, cmd) switch cmd.e.Type { case raftpb.EntryNormal: if cmd.replicatedResult().ChangeReplicas != nil { - log.Fatalf(cmd.ctx, "unexpected replication change from command %s", &cmd.raftCmd) + log.Fatalf(ctx, "unexpected replication change from command %s", &cmd.raftCmd) } case raftpb.EntryConfChange: if cmd.replicatedResult().ChangeReplicas == nil { @@ -307,6 +311,10 @@ func (r *Replica) handleComplexReplicatedEvalResult( // result will zero-out the struct to ensure that is has properly performed all // of the implied side-effects. func (r *Replica) prepareLocalResult(ctx context.Context, cmd *cmdAppCtx) { + if !cmd.proposedLocally() { + return + } + var pErr *roachpb.Error if filter := r.store.cfg.TestingKnobs.TestingPostApplyFilter; filter != nil { var newPropRetry int @@ -328,36 +336,90 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *cmdAppCtx) { pErr = cmd.forcedErr } - if cmd.proposedLocally() { - if cmd.proposalRetry != proposalNoReevaluation && pErr == nil { - log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", cmd.proposal) - } - if pErr != nil { - // A forced error was set (i.e. we did not apply the proposal, - // for instance due to its log position) or the Replica is now - // corrupted. - // If proposalRetry is set, we don't also return an error, as per the - // proposalResult contract. - if cmd.proposalRetry == proposalNoReevaluation { + if cmd.proposalRetry != proposalNoReevaluation && pErr == nil { + log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", cmd.proposal) + } + if pErr != nil { + // A forced error was set (i.e. we did not apply the proposal, + // for instance due to its log position) or the Replica is now + // corrupted. + switch cmd.proposalRetry { + case proposalNoReevaluation: + cmd.response.Err = pErr + case proposalIllegalLeaseIndex: + // If we failed to apply at the right lease index, try again with + // a new one. This is important for pipelined writes, since they + // don't have a client watching to retry, so a failure to + // eventually apply the proposal would be a uservisible error. + pErr = r.tryReproposeWithNewLeaseIndex(ctx, cmd) + if pErr != nil { cmd.response.Err = pErr + } else { + // Unbind the entry's local proposal because we just succeeded + // in reproposing it and we don't want to acknowledge the client + // yet. + cmd.proposal = nil + return } - } else if cmd.proposal.Local.Reply != nil { - cmd.response.Reply = cmd.proposal.Local.Reply - } else { - log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", cmd.proposal) - } - cmd.response.Intents = cmd.proposal.Local.DetachIntents() - cmd.response.EndTxns = cmd.proposal.Local.DetachEndTxns(pErr != nil) - if pErr == nil { - cmd.localResult = cmd.proposal.Local + default: + panic("unexpected") } + } else if cmd.proposal.Local.Reply != nil { + cmd.response.Reply = cmd.proposal.Local.Reply + } else { + log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", cmd.proposal) } - if pErr != nil && cmd.localResult != nil { + cmd.response.Intents = cmd.proposal.Local.DetachIntents() + cmd.response.EndTxns = cmd.proposal.Local.DetachEndTxns(pErr != nil) + if pErr == nil { + cmd.localResult = cmd.proposal.Local + } else if cmd.localResult != nil { log.Fatalf(ctx, "shouldn't have a local result if command processing failed. pErr: %s", pErr) } - if log.ExpensiveLogEnabled(ctx, 2) { - log.VEvent(ctx, 2, cmd.localResult.String()) +} + +// tryReproposeWithNewLeaseIndex is used by prepareLocalResult to repropose +// commands that have gotten an illegal lease index error, and that we know +// could not have applied while their lease index was valid (that is, we +// observed all applied entries between proposal and the lease index becoming +// invalid, as opposed to skipping some of them by applying a snapshot). +// +// It is not intended for use elsewhere and is only a top-level function so that +// it can avoid the below_raft_protos check. Returns a nil error if the command +// has already been successfully applied or has been reproposed here or by a +// different entry for the same proposal that hit an illegal lease index error. +func (r *Replica) tryReproposeWithNewLeaseIndex( + ctx context.Context, cmd *cmdAppCtx, +) *roachpb.Error { + // Note that we don't need to validate anything about the proposal's + // lease here - if we got this far, we know that everything but the + // index is valid at this point in the log. + p := cmd.proposal + if p.applied || p.command.MaxLeaseIndex != cmd.raftCmd.MaxLeaseIndex { + // If the command associated with this rejected raft entry already + // applied then we don't want to repropose it. Doing so could lead + // to duplicate application of the same proposal. + // + // Similarly, if the command associated with this rejected raft + // entry has a different (larger) MaxLeaseIndex than the one we + // decoded from the entry itself, the command must have already + // been reproposed (this can happen if there are multiple copies + // of the command in the logs; see TestReplicaRefreshMultiple). + // We must not create multiple copies with multiple lease indexes, + // so don't repropose it again. This ensures that at any time, + // there is only up to a single lease index that has a chance of + // succeeding in the Raft log for a given command. + return nil } + // Some tests check for this log message in the trace. + log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex") + maxLeaseIndex, pErr := r.propose(ctx, p) + if pErr != nil { + log.Warningf(ctx, "failed to repropose with new lease index: %s", pErr) + return pErr + } + log.VEventf(ctx, 2, "reproposed command %x at maxLeaseIndex=%d", cmd.idKey, maxLeaseIndex) + return nil } // finishRaftCommand is called after a command's side effects have been applied @@ -395,17 +457,6 @@ func (r *Replica) finishRaftCommand(ctx context.Context, cmd *cmdAppCtx) { } if cmd.proposedLocally() { - // If we failed to apply at the right lease index, try again with - // a new one. This is important for pipelined writes, since they - // don't have a client watching to retry, so a failure to - // eventually apply the proposal would be a uservisible error. - // TODO(nvanbenschoten): This reproposal is not tracked by the - // quota pool. We should fix that. - if cmd.proposalRetry == proposalIllegalLeaseIndex && - r.tryReproposeWithNewLeaseIndex(cmd.proposal) { - return - } - // Otherwise, signal the command's status to the client. cmd.proposal.finishApplication(cmd.response) } else if cmd.response.Err != nil { log.VEventf(ctx, 1, "applying raft command resulted in error: %s", cmd.response.Err) diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 3ff7e608700f..1343a9e0a17c 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -87,6 +87,11 @@ type ProposalData struct { // cache and release latches. ec endCmds + // applied is set when the a command finishes application. It is used to + // avoid reproposing a failed proposal if an earlier version of the same + // proposal succeeded in applying. + applied bool + // doneCh is used to signal the waiting RPC handler (the contents of // proposalResult come from LocalEvalResult). // @@ -122,11 +127,16 @@ type ProposalData struct { // is canceled, it won't be listening to this done channel, and so it can't be // counted on to invoke endCmds itself.) func (proposal *ProposalData) finishApplication(pr proposalResult) { + if proposal.applied { + log.Fatalf(proposal.ctx, "proposal already applied: %+v", proposal) + } + proposal.applied = true proposal.ec.done(proposal.Request, pr.Reply, pr.Err) + proposal.signalProposalResult(pr) if proposal.sp != nil { tracing.FinishSpan(proposal.sp) + proposal.sp = nil } - proposal.signalProposalResult(pr) } // returnProposalResult signals proposal.doneCh with the proposal result if it diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index e11ae2529640..068d609ed7b6 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -1404,56 +1404,6 @@ func (m lastUpdateTimesMap) isFollowerActive( return now.Sub(lastUpdateTime) <= MaxQuotaReplicaLivenessDuration } -// tryReproposeWithNewLeaseIndex is used by processRaftCommand to -// repropose commands that have gotten an illegal lease index error, -// and that we know could not have applied while their lease index was -// valid (that is, we observed all applied entries between proposal -// and the lease index becoming invalid, as opposed to skipping some -// of them by applying a snapshot). -// -// It is not intended for use elsewhere and is only a top-level -// function so that it can avoid the below_raft_protos check. Returns -// true if the command has been successfully reproposed (not -// necessarily by this method! But if this method returns true, the -// command will be in the local proposals map). -func (r *Replica) tryReproposeWithNewLeaseIndex(proposal *ProposalData) bool { - // Note that we don't need to validate anything about the proposal's - // lease here - if we got this far, we know that everything but the - // index is valid at this point in the log. - r.mu.Lock() - if proposal.command.MaxLeaseIndex > r.mu.state.LeaseAppliedIndex { - // If the command's MaxLeaseIndex is greater than the - // LeaseAppliedIndex, it must have already been reproposed (this - // can happen if there are multiple copies of the command in the - // logs; see TestReplicaRefreshMultiple). We must not create - // multiple copies with multiple lease indexes, so don't repropose - // it again. This ensures that at any time, there is only up to a - // single lease index that has a chance of succeeding in the Raft - // log for a given command. - // - // Note that the caller has already removed the current version of - // the proposal from the pending proposals map. We must re-add it - // since it's still pending. - log.VEventf(proposal.ctx, 2, "skipping reproposal, already reproposed at index %d", - proposal.command.MaxLeaseIndex) - r.mu.proposals[proposal.idKey] = proposal - r.mu.Unlock() - return true - } - r.mu.Unlock() - - // Some tests check for this log message in the trace. - log.VEventf(proposal.ctx, 2, "retry: proposalIllegalLeaseIndex") - if _, pErr := r.propose(proposal.ctx, proposal); pErr != nil { - // TODO(nvanbenschoten): Returning false here isn't ok. It will result - // in a proposal returning without a response or an error, which - // triggers a panic higher up in the stack. We need to fix this. - log.Warningf(proposal.ctx, "failed to repropose with new lease index: %s", pErr) - return false - } - return true -} - // maybeAcquireSnapshotMergeLock checks whether the incoming snapshot subsumes // any replicas and, if so, locks them for subsumption. See acquireMergeLock // for details about the lock itself. diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 4e2d28c0e225..a05d8b8593cf 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -7849,6 +7849,88 @@ func TestReplicaRefreshMultiple(t *testing.T) { } } +// TestReplicaReproposalWithNewLeaseIndexError tests an interaction where a +// proposal is rejected beneath raft due an illegal lease index error and then +// hits an error when being reproposed. The expectation is that this error +// manages to make its way back to the client. +func TestReplicaReproposalWithNewLeaseIndexError(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + var tc testContext + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc.Start(t, stopper) + + type magicKey struct{} + magicCtx := context.WithValue(ctx, magicKey{}, "foo") + + var c int32 // updated atomically + tc.repl.mu.Lock() + tc.repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) { + if v := p.ctx.Value(magicKey{}); v != nil { + curAttempt := atomic.AddInt32(&c, 1) + switch curAttempt { + case 1: + // This is the first time the command is being given a max lease + // applied index. Set the index to that of the recently applied + // write. Two requests can't have the same lease applied index, + // so this will cause it to be rejected beneath raft with an + // illegal lease index error. + wrongLeaseIndex := uint64(1) + return wrongLeaseIndex, nil + case 2: + // This is the second time the command is being given a max + // lease applied index, which should be after the command was + // rejected beneath raft. Return an error. We expect this error + // to propagate up through tryReproposeWithNewLeaseIndex and + // make it back to the client. + return 0, errors.New("boom") + default: + // Unexpected. Asserted against below. + return 0, nil + } + } + return 0, nil + } + tc.repl.mu.Unlock() + + // Perform a few writes to advance the lease applied index. + const initCount = 3 + key := roachpb.Key("a") + for i := 0; i < initCount; i++ { + iArg := incrementArgs(key, 1) + if _, pErr := tc.SendWrapped(&iArg); pErr != nil { + t.Fatal(pErr) + } + } + + // Perform a write that will first hit an illegal lease index error and + // will then hit the injected error when we attempt to repropose it. + var ba roachpb.BatchRequest + iArg := incrementArgs(key, 10) + ba.Add(&iArg) + if _, pErr := tc.Sender().Send(magicCtx, ba); pErr == nil { + t.Fatal("expected a non-nil error") + } else if !testutils.IsPError(pErr, "boom") { + t.Fatalf("unexpected error: %v", pErr) + } + // The command should have picked a new max lease index exactly twice. + if exp, act := int32(2), atomic.LoadInt32(&c); exp != act { + t.Fatalf("expected %d proposals, got %d", exp, act) + } + + // The command should not have applied. + gArgs := getArgs(key) + if reply, pErr := tc.SendWrapped(&gArgs); pErr != nil { + t.Fatal(pErr) + } else if v, err := reply.(*roachpb.GetResponse).Value.GetInt(); err != nil { + t.Fatal(err) + } else if v != initCount { + t.Fatalf("expected value of %d, found %d", initCount, v) + } +} + // TestGCWithoutThreshold validates that GCRequest only declares the threshold // key if it is subject to change, and that it does not access this key if it // does not declare them. @@ -7958,10 +8040,10 @@ func TestFailureToProcessCommandClearsLocalResult(t *testing.T) { if err := testutils.MatchInOrder(formatted, // The first proposal is rejected. "retry proposal.*applied at lease index.*but required", - // The LocalResult is nil. This is the important part for this test. - "LocalResult: nil", // The request will be re-evaluated. "retry: proposalIllegalLeaseIndex", + // The LocalResult is nil. This is the important part for this test. + "LocalResult: nil", // Re-evaluation succeeds and one txn is to be updated. "LocalResult \\(reply.*#updated txns: 1", ); err != nil {