Skip to content

Commit

Permalink
Revert "kvserver: prevent finished proposal from being present in pro…
Browse files Browse the repository at this point in the history
…posals map"

Reverts cockroachdb#94825.

This reverts commit 15b1c6a.
  • Loading branch information
tbg committed Mar 13, 2023
1 parent 500bc82 commit e23a2ea
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 83 deletions.
35 changes: 10 additions & 25 deletions pkg/kv/kvserver/apply/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,32 +248,17 @@ func (t *Task) ApplyCommittedEntries(ctx context.Context) error {

iter := t.dec.NewCommandIter()
for iter.Valid() {
err := t.applyOneBatch(ctx, iter)
if err != nil {
if errors.Is(err, ErrRemoved) {
// On ErrRemoved, we know that the replica has been destroyed and in
// particular, the Replica's proposals map has already been cleared out.
// But there may be unfinished proposals that are only known to the
// current Task (because we remove proposals we're about to apply from the
// map). To avoid leaking resources and/or leaving proposers hanging,
// finish them here. Note that it is important that we know that the
// proposals map is (and always will be, due to replicaGC setting the
// destroy status) empty at this point, since there is an invariant
// that all proposals in the map are unfinished, and the Task has only
// removed a subset[^1] of the proposals that might be finished below.
// But since it's empty, we can finish them all without having to
// check which ones are no longer in the map.
//
// NOTE: forEachCmdIter closes iter.
//
// [^1]: (*replicaDecoder).retrieveLocalProposals
if rejectErr := forEachCmdIter(ctx, iter, func(cmd Command, ctx context.Context) error {
return cmd.AckErrAndFinish(ctx, err)
}); rejectErr != nil {
return rejectErr
}
return err
if err := t.applyOneBatch(ctx, iter); err != nil {
// If the batch threw an error, reject all remaining commands in the
// iterator to avoid leaking resources or leaving a proposer hanging.
//
// NOTE: forEachCmdIter closes iter.
if rejectErr := forEachCmdIter(ctx, iter, func(cmd Command, ctx context.Context) error {
return cmd.AckErrAndFinish(ctx, err)
}); rejectErr != nil {
return rejectErr
}
return err
}
}
iter.Close()
Expand Down
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -1189,9 +1188,6 @@ func (rp *replicaProposer) registerProposalLocked(p *ProposalData) {
if p.createdAtTicks == 0 {
p.createdAtTicks = rp.mu.ticks
}
if buildutil.CrdbTestBuild && (p.ec.repl == nil || p.ec.g == nil) {
log.Fatalf(rp.store.AnnotateCtx(context.Background()), "finished proposal inserted into map: %+v", p)
}
if prev := rp.mu.proposals[p.idKey]; prev != nil && prev != p {
log.Fatalf(rp.store.AnnotateCtx(context.Background()), "two proposals under same ID:\n%+v,\n%+v", prev, p)
}
Expand Down
18 changes: 0 additions & 18 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,24 +743,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) {
r.deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked(ctx, raftGroup)

// NB: we need to have flushed the proposals before each application cycle
// because due to reproposals it is possible to have a proposal that
//
// - is going to be applied in this raft cycle, and
// - is not in the proposals map, and
// - is in the proposals buffer.
//
// The current structure of the code makes sure that by the time we apply the
// entry, the in-mem proposal has moved from the proposal buffer to the proposals
// map. Without this property, we could have the following interleaving:
//
// - proposal is in map (initial state)
// - refreshProposalsLocked adds it to the proposal buffer again
// - proposal applies with an error: removes it from map, finishes proposal
// - proposal buffer flushes, inserts proposal into map
// - we now have a finished proposal in the proposal map, an invariant violation.
//
// See Replica.mu.proposals.
numFlushed, err := r.mu.proposalBuf.FlushLockedWithRaftGroup(ctx, raftGroup)
if err != nil {
return false, err
Expand Down
36 changes: 0 additions & 36 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7921,24 +7921,6 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) {
t.Fatal(pErr)
}

g, _, pErr := r.concMgr.SequenceReq(ctx, nil /* guard */, concurrency.Request{
Txn: ba.Txn,
Timestamp: ba.Timestamp,
NonTxnPriority: ba.UserPriority,
ReadConsistency: ba.ReadConsistency,
WaitPolicy: ba.WaitPolicy,
LockTimeout: ba.LockTimeout,
Requests: ba.Requests,
LatchSpans: spanset.New(),
LockSpans: spanset.New(),
}, concurrency.PessimisticEval)
require.NoError(t, pErr.GoError())

cmd.ec = endCmds{
repl: r,
g: g,
}

dropProposals.Lock()
dropProposals.m[cmd] = struct{}{} // silently drop proposals
dropProposals.Unlock()
Expand Down Expand Up @@ -8062,24 +8044,6 @@ func TestReplicaRefreshMultiple(t *testing.T) {
// Save this channel; it may get reset to nil before we read from it.
proposalDoneCh := proposal.doneCh

g, _, pErr := repl.concMgr.SequenceReq(ctx, nil /* guard */, concurrency.Request{
Txn: ba.Txn,
Timestamp: ba.Timestamp,
NonTxnPriority: ba.UserPriority,
ReadConsistency: ba.ReadConsistency,
WaitPolicy: ba.WaitPolicy,
LockTimeout: ba.LockTimeout,
Requests: ba.Requests,
LatchSpans: spanset.New(),
LockSpans: spanset.New(),
}, concurrency.PessimisticEval)
require.NoError(t, pErr.GoError())

proposal.ec = endCmds{
repl: repl,
g: g,
}

repl.mu.Lock()
ai := repl.mu.state.LeaseAppliedIndex
if ai <= 1 {
Expand Down

0 comments on commit e23a2ea

Please sign in to comment.