Skip to content

Commit

Permalink
Merge #94825
Browse files Browse the repository at this point in the history
94825: kvserver: prevent finished proposal from being present in proposals map r=nvanbenschoten a=tbg

The conjecture in #86547 is that a finished proposal somehow makes its
way into the proposal map, most likely by never being removed prior to
being finished.

This commit adds an assertion that we're never outright *inserting*
a finished proposals, and better documents the places in which we're
running a risk of violating the invariant.

It also clarifies the handling of proposals in an apply batch when a
replication change that removes the replica is encountered. I suspected
that this could lead to a case in which proposals would be finished
despite remaining in the proposals map. Upon inspection this turned out
to be incorrect - the map (at least by code inspection) is empty at that
point, so the invariant holds trivially.

Unfortunately, that leaves me without an explanation for #86547, but the
newly added invariants may prove helpful.

Touches #86547.

Epic: None
Release note: None

Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Feb 9, 2023
2 parents 0918837 + ac23f46 commit 95fcd95
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 9 deletions.
33 changes: 24 additions & 9 deletions pkg/kv/kvserver/apply/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,15 +248,30 @@ func (t *Task) ApplyCommittedEntries(ctx context.Context) error {

iter := t.dec.NewCommandIter()
for iter.Valid() {
if err := t.applyOneBatch(ctx, iter); err != nil {
// If the batch threw an error, reject all remaining commands in the
// iterator to avoid leaking resources or leaving a proposer hanging.
//
// NOTE: forEachCmdIter closes iter.
if rejectErr := forEachCmdIter(ctx, iter, func(cmd Command, ctx context.Context) error {
return cmd.AckErrAndFinish(ctx, err)
}); rejectErr != nil {
return rejectErr
err := t.applyOneBatch(ctx, iter)
if err != nil {
if errors.Is(err, ErrRemoved) {
// On ErrRemoved, we know that the replica has been destroyed and in
// particular, the Replica's proposals map has already been cleared out.
// But there may be unfinished proposals that are only known to the
// current Task (because we remove proposals we're about to apply from the
// map). To avoid leaking resources and/or leaving proposers hanging,
// finish them here. Note that it is important that we know that the
// proposals map is (and always will be, due to replicaGC setting the
// destroy status) empty at this point, since there is an invariant
// that all proposals in the map are unfinished, and the Task has only
// removed a subset[^1] of the proposals that might be finished below.
// But since it's empty, we can finish them all without having to
// check which ones are no longer in the map.
//
// NOTE: forEachCmdIter closes iter.
//
// [^1]: (*replicaDecoder).retrieveLocalProposals
if rejectErr := forEachCmdIter(ctx, iter, func(cmd Command, ctx context.Context) error {
return cmd.AckErrAndFinish(ctx, err)
}); rejectErr != nil {
return rejectErr
}
}
return err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -1187,6 +1188,9 @@ func (rp *replicaProposer) registerProposalLocked(p *ProposalData) {
if p.createdAtTicks == 0 {
p.createdAtTicks = rp.mu.ticks
}
if buildutil.CrdbTestBuild && (p.ec.repl == nil || p.ec.g == nil) {
log.Fatalf(rp.store.AnnotateCtx(context.Background()), "finished proposal inserted into map: %+v", p)
}
rp.mu.proposals[p.idKey] = p
}

Expand Down
18 changes: 18 additions & 0 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,24 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) {
r.deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked(ctx, raftGroup)

// NB: we need to have flushed the proposals before each application cycle
// because due to reproposals it is possible to have a proposal that
//
// - is going to be applied in this raft cycle, and
// - is not in the proposals map, and
// - is in the proposals buffer.
//
// The current structure of the code makes sure that by the time we apply the
// entry, the in-mem proposal has moved from the proposal buffer to the proposals
// map. Without this property, we could have the following interleaving:
//
// - proposal is in map (initial state)
// - refreshProposalsLocked adds it to the proposal buffer again
// - proposal applies with an error: removes it from map, finishes proposal
// - proposal buffer flushes, inserts proposal into map
// - we now have a finished proposal in the proposal map, an invariant violation.
//
// See Replica.mu.proposals.
numFlushed, err := r.mu.proposalBuf.FlushLockedWithRaftGroup(ctx, raftGroup)
if err != nil {
return false, err
Expand Down
36 changes: 36 additions & 0 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7990,6 +7990,24 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) {
t.Fatal(pErr)
}

g, _, pErr := r.concMgr.SequenceReq(ctx, nil /* guard */, concurrency.Request{
Txn: ba.Txn,
Timestamp: ba.Timestamp,
NonTxnPriority: ba.UserPriority,
ReadConsistency: ba.ReadConsistency,
WaitPolicy: ba.WaitPolicy,
LockTimeout: ba.LockTimeout,
Requests: ba.Requests,
LatchSpans: spanset.New(),
LockSpans: spanset.New(),
}, concurrency.PessimisticEval)
require.NoError(t, pErr.GoError())

cmd.ec = endCmds{
repl: r,
g: g,
}

dropProposals.Lock()
dropProposals.m[cmd] = struct{}{} // silently drop proposals
dropProposals.Unlock()
Expand Down Expand Up @@ -8105,6 +8123,19 @@ func TestReplicaRefreshMultiple(t *testing.T) {
ba.Add(inc)
ba.Timestamp = tc.Clock().Now()

g, _, pErr := repl.concMgr.SequenceReq(ctx, nil /* guard */, concurrency.Request{
Txn: ba.Txn,
Timestamp: ba.Timestamp,
NonTxnPriority: ba.UserPriority,
ReadConsistency: ba.ReadConsistency,
WaitPolicy: ba.WaitPolicy,
LockTimeout: ba.LockTimeout,
Requests: ba.Requests,
LatchSpans: spanset.New(),
LockSpans: spanset.New(),
}, concurrency.PessimisticEval)
require.NoError(t, pErr.GoError())

st := repl.CurrentLeaseStatus(ctx)
proposal, pErr := repl.requestToProposal(ctx, incCmdID, ba, allSpansGuard(), &st, uncertainty.Interval{})
if pErr != nil {
Expand All @@ -8113,6 +8144,11 @@ func TestReplicaRefreshMultiple(t *testing.T) {
// Save this channel; it may get reset to nil before we read from it.
proposalDoneCh := proposal.doneCh

proposal.ec = endCmds{
repl: repl,
g: g,
}

repl.mu.Lock()
ai := repl.mu.state.LeaseAppliedIndex
if ai <= 1 {
Expand Down

0 comments on commit 95fcd95

Please sign in to comment.