From 15b1c6ae6058c8060eb8afe2110e582106380e96 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Fri, 6 Jan 2023 09:57:12 +0100 Subject: [PATCH 1/2] kvserver: prevent finished proposal from being present in proposals map The conjecture in #86547 is that a finished proposal somehow makes its way into the proposal map, most likely by never being removed prior to being finished. This commit adds an assertion that we're never outright *inserting* a finished proposals, and better documents the places in which we're running a risk of violating the invariant. It also clarifies the handling of proposals in an apply batch when a replication change that removes the replica is encountered. I suspected that this could lead to a case in which proposals would be finished despite remaining in the proposals map. Upon inspection this turned out to be incorrect - the map (at least by code inspection) is empty at that point, so the invariant holds trivially. Unfortunately, that leaves me without an explanation for #86547, but the newly added invariants may prove helpful. Touches #86547. --- pkg/kv/kvserver/apply/task.go | 35 +++++++++++++++++------- pkg/kv/kvserver/replica_proposal_buf.go | 4 +++ pkg/kv/kvserver/replica_raft.go | 18 +++++++++++++ pkg/kv/kvserver/replica_test.go | 36 +++++++++++++++++++++++++ 4 files changed, 83 insertions(+), 10 deletions(-) diff --git a/pkg/kv/kvserver/apply/task.go b/pkg/kv/kvserver/apply/task.go index 989258346681..c15dc31af9d6 100644 --- a/pkg/kv/kvserver/apply/task.go +++ b/pkg/kv/kvserver/apply/task.go @@ -248,17 +248,32 @@ func (t *Task) ApplyCommittedEntries(ctx context.Context) error { iter := t.dec.NewCommandIter() for iter.Valid() { - if err := t.applyOneBatch(ctx, iter); err != nil { - // If the batch threw an error, reject all remaining commands in the - // iterator to avoid leaking resources or leaving a proposer hanging. - // - // NOTE: forEachCmdIter closes iter. - if rejectErr := forEachCmdIter(ctx, iter, func(cmd Command, ctx context.Context) error { - return cmd.AckErrAndFinish(ctx, err) - }); rejectErr != nil { - return rejectErr + err := t.applyOneBatch(ctx, iter) + if err != nil { + if errors.Is(err, ErrRemoved) { + // On ErrRemoved, we know that the replica has been destroyed and in + // particular, the Replica's proposals map has already been cleared out. + // But there may be unfinished proposals that are only known to the + // current Task (because we remove proposals we're about to apply from the + // map). To avoid leaking resources and/or leaving proposers hanging, + // finish them here. Note that it is important that we know that the + // proposals map is (and always will be, due to replicaGC setting the + // destroy status) empty at this point, since there is an invariant + // that all proposals in the map are unfinished, and the Task has only + // removed a subset[^1] of the proposals that might be finished below. + // But since it's empty, we can finish them all without having to + // check which ones are no longer in the map. + // + // NOTE: forEachCmdIter closes iter. + // + // [^1]: (*replicaDecoder).retrieveLocalProposals + if rejectErr := forEachCmdIter(ctx, iter, func(cmd Command, ctx context.Context) error { + return cmd.AckErrAndFinish(ctx, err) + }); rejectErr != nil { + return rejectErr + } + return err } - return err } } iter.Close() diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index de57abf3085a..13272a7e3cf8 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/errorutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -1187,6 +1188,9 @@ func (rp *replicaProposer) registerProposalLocked(p *ProposalData) { if p.createdAtTicks == 0 { p.createdAtTicks = rp.mu.ticks } + if buildutil.CrdbTestBuild && (p.ec.repl == nil || p.ec.g == nil) { + log.Fatalf(rp.store.AnnotateCtx(context.Background()), "finished proposal inserted into map: %+v", p) + } rp.mu.proposals[p.idKey] = p } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index cce57af3de26..f3d1a765d8fa 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -729,6 +729,24 @@ func (r *Replica) handleRaftReadyRaftMuLocked( err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { r.deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked(ctx, raftGroup) + // NB: we need to have flushed the proposals before each application cycle + // because due to reproposals it is possible to have a proposal that + // + // - is going to be applied in this raft cycle, and + // - is not in the proposals map, and + // - is in the proposals buffer. + // + // The current structure of the code makes sure that by the time we apply the + // entry, the in-mem proposal has moved from the proposal buffer to the proposals + // map. Without this property, we could have the following interleaving: + // + // - proposal is in map (initial state) + // - refreshProposalsLocked adds it to the proposal buffer again + // - proposal applies with an error: removes it from map, finishes proposal + // - proposal buffer flushes, inserts proposal into map + // - we now have a finished proposal in the proposal map, an invariant violation. + // + // See Replica.mu.proposals. numFlushed, err := r.mu.proposalBuf.FlushLockedWithRaftGroup(ctx, raftGroup) if err != nil { return false, err diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 02daab501997..0a8f12735726 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -7990,6 +7990,24 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { t.Fatal(pErr) } + g, _, pErr := r.concMgr.SequenceReq(ctx, nil /* guard */, concurrency.Request{ + Txn: ba.Txn, + Timestamp: ba.Timestamp, + NonTxnPriority: ba.UserPriority, + ReadConsistency: ba.ReadConsistency, + WaitPolicy: ba.WaitPolicy, + LockTimeout: ba.LockTimeout, + Requests: ba.Requests, + LatchSpans: spanset.New(), + LockSpans: spanset.New(), + }, concurrency.PessimisticEval) + require.NoError(t, pErr.GoError()) + + cmd.ec = endCmds{ + repl: r, + g: g, + } + dropProposals.Lock() dropProposals.m[cmd] = struct{}{} // silently drop proposals dropProposals.Unlock() @@ -8113,6 +8131,24 @@ func TestReplicaRefreshMultiple(t *testing.T) { // Save this channel; it may get reset to nil before we read from it. proposalDoneCh := proposal.doneCh + g, _, pErr := repl.concMgr.SequenceReq(ctx, nil /* guard */, concurrency.Request{ + Txn: ba.Txn, + Timestamp: ba.Timestamp, + NonTxnPriority: ba.UserPriority, + ReadConsistency: ba.ReadConsistency, + WaitPolicy: ba.WaitPolicy, + LockTimeout: ba.LockTimeout, + Requests: ba.Requests, + LatchSpans: spanset.New(), + LockSpans: spanset.New(), + }, concurrency.PessimisticEval) + require.NoError(t, pErr.GoError()) + + proposal.ec = endCmds{ + repl: repl, + g: g, + } + repl.mu.Lock() ai := repl.mu.state.LeaseAppliedIndex if ai <= 1 { From ac23f4667f5bb6ef03f95cdff5691d8f1069d140 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 6 Feb 2023 08:39:28 +0100 Subject: [PATCH 2/2] fixup! kvserver: prevent finished proposal from being present in proposals map --- pkg/kv/kvserver/apply/task.go | 2 +- pkg/kv/kvserver/replica_test.go | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/kv/kvserver/apply/task.go b/pkg/kv/kvserver/apply/task.go index c15dc31af9d6..76560969c0a9 100644 --- a/pkg/kv/kvserver/apply/task.go +++ b/pkg/kv/kvserver/apply/task.go @@ -272,8 +272,8 @@ func (t *Task) ApplyCommittedEntries(ctx context.Context) error { }); rejectErr != nil { return rejectErr } - return err } + return err } } iter.Close() diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 0a8f12735726..8b7aac60d1c7 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -8123,14 +8123,6 @@ func TestReplicaRefreshMultiple(t *testing.T) { ba.Add(inc) ba.Timestamp = tc.Clock().Now() - st := repl.CurrentLeaseStatus(ctx) - proposal, pErr := repl.requestToProposal(ctx, incCmdID, ba, allSpansGuard(), &st, uncertainty.Interval{}) - if pErr != nil { - t.Fatal(pErr) - } - // Save this channel; it may get reset to nil before we read from it. - proposalDoneCh := proposal.doneCh - g, _, pErr := repl.concMgr.SequenceReq(ctx, nil /* guard */, concurrency.Request{ Txn: ba.Txn, Timestamp: ba.Timestamp, @@ -8144,6 +8136,14 @@ func TestReplicaRefreshMultiple(t *testing.T) { }, concurrency.PessimisticEval) require.NoError(t, pErr.GoError()) + st := repl.CurrentLeaseStatus(ctx) + proposal, pErr := repl.requestToProposal(ctx, incCmdID, ba, allSpansGuard(), &st, uncertainty.Interval{}) + if pErr != nil { + t.Fatal(pErr) + } + // Save this channel; it may get reset to nil before we read from it. + proposalDoneCh := proposal.doneCh + proposal.ec = endCmds{ repl: repl, g: g,