diff --git a/pkg/kv/kvserver/apply/task.go b/pkg/kv/kvserver/apply/task.go index 989258346681..76560969c0a9 100644 --- a/pkg/kv/kvserver/apply/task.go +++ b/pkg/kv/kvserver/apply/task.go @@ -248,15 +248,30 @@ func (t *Task) ApplyCommittedEntries(ctx context.Context) error { iter := t.dec.NewCommandIter() for iter.Valid() { - if err := t.applyOneBatch(ctx, iter); err != nil { - // If the batch threw an error, reject all remaining commands in the - // iterator to avoid leaking resources or leaving a proposer hanging. - // - // NOTE: forEachCmdIter closes iter. - if rejectErr := forEachCmdIter(ctx, iter, func(cmd Command, ctx context.Context) error { - return cmd.AckErrAndFinish(ctx, err) - }); rejectErr != nil { - return rejectErr + err := t.applyOneBatch(ctx, iter) + if err != nil { + if errors.Is(err, ErrRemoved) { + // On ErrRemoved, we know that the replica has been destroyed and in + // particular, the Replica's proposals map has already been cleared out. + // But there may be unfinished proposals that are only known to the + // current Task (because we remove proposals we're about to apply from the + // map). To avoid leaking resources and/or leaving proposers hanging, + // finish them here. Note that it is important that we know that the + // proposals map is (and always will be, due to replicaGC setting the + // destroy status) empty at this point, since there is an invariant + // that all proposals in the map are unfinished, and the Task has only + // removed a subset[^1] of the proposals that might be finished below. + // But since it's empty, we can finish them all without having to + // check which ones are no longer in the map. + // + // NOTE: forEachCmdIter closes iter. + // + // [^1]: (*replicaDecoder).retrieveLocalProposals + if rejectErr := forEachCmdIter(ctx, iter, func(cmd Command, ctx context.Context) error { + return cmd.AckErrAndFinish(ctx, err) + }); rejectErr != nil { + return rejectErr + } } return err } diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index de57abf3085a..13272a7e3cf8 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/errorutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -1187,6 +1188,9 @@ func (rp *replicaProposer) registerProposalLocked(p *ProposalData) { if p.createdAtTicks == 0 { p.createdAtTicks = rp.mu.ticks } + if buildutil.CrdbTestBuild && (p.ec.repl == nil || p.ec.g == nil) { + log.Fatalf(rp.store.AnnotateCtx(context.Background()), "finished proposal inserted into map: %+v", p) + } rp.mu.proposals[p.idKey] = p } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 06185131b49c..a29a7ca4015f 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -742,6 +742,24 @@ func (r *Replica) handleRaftReadyRaftMuLocked( err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { r.deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked(ctx, raftGroup) + // NB: we need to have flushed the proposals before each application cycle + // because due to reproposals it is possible to have a proposal that + // + // - is going to be applied in this raft cycle, and + // - is not in the proposals map, and + // - is in the proposals buffer. + // + // The current structure of the code makes sure that by the time we apply the + // entry, the in-mem proposal has moved from the proposal buffer to the proposals + // map. Without this property, we could have the following interleaving: + // + // - proposal is in map (initial state) + // - refreshProposalsLocked adds it to the proposal buffer again + // - proposal applies with an error: removes it from map, finishes proposal + // - proposal buffer flushes, inserts proposal into map + // - we now have a finished proposal in the proposal map, an invariant violation. + // + // See Replica.mu.proposals. numFlushed, err := r.mu.proposalBuf.FlushLockedWithRaftGroup(ctx, raftGroup) if err != nil { return false, err diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 02daab501997..8b7aac60d1c7 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -7990,6 +7990,24 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { t.Fatal(pErr) } + g, _, pErr := r.concMgr.SequenceReq(ctx, nil /* guard */, concurrency.Request{ + Txn: ba.Txn, + Timestamp: ba.Timestamp, + NonTxnPriority: ba.UserPriority, + ReadConsistency: ba.ReadConsistency, + WaitPolicy: ba.WaitPolicy, + LockTimeout: ba.LockTimeout, + Requests: ba.Requests, + LatchSpans: spanset.New(), + LockSpans: spanset.New(), + }, concurrency.PessimisticEval) + require.NoError(t, pErr.GoError()) + + cmd.ec = endCmds{ + repl: r, + g: g, + } + dropProposals.Lock() dropProposals.m[cmd] = struct{}{} // silently drop proposals dropProposals.Unlock() @@ -8105,6 +8123,19 @@ func TestReplicaRefreshMultiple(t *testing.T) { ba.Add(inc) ba.Timestamp = tc.Clock().Now() + g, _, pErr := repl.concMgr.SequenceReq(ctx, nil /* guard */, concurrency.Request{ + Txn: ba.Txn, + Timestamp: ba.Timestamp, + NonTxnPriority: ba.UserPriority, + ReadConsistency: ba.ReadConsistency, + WaitPolicy: ba.WaitPolicy, + LockTimeout: ba.LockTimeout, + Requests: ba.Requests, + LatchSpans: spanset.New(), + LockSpans: spanset.New(), + }, concurrency.PessimisticEval) + require.NoError(t, pErr.GoError()) + st := repl.CurrentLeaseStatus(ctx) proposal, pErr := repl.requestToProposal(ctx, incCmdID, ba, allSpansGuard(), &st, uncertainty.Interval{}) if pErr != nil { @@ -8113,6 +8144,11 @@ func TestReplicaRefreshMultiple(t *testing.T) { // Save this channel; it may get reset to nil before we read from it. proposalDoneCh := proposal.doneCh + proposal.ec = endCmds{ + repl: repl, + g: g, + } + repl.mu.Lock() ai := repl.mu.state.LeaseAppliedIndex if ai <= 1 {