Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
97564: kvserver: narrow down 'finishing a proposal with outstanding reproposal' r=pavelkalinnikov a=tbg

In cockroachdb#94633, I introduced[^1] an assertion that attempted to catch cases
in which we might otherwise accidentally end up applying a proposal
twice.

This assertion had a false positive.

I was able to reproduce the failure within ~minutes via
`./experiment.sh` in cockroachdb#97173 as of 33dcdef.

Better testing of these cases would be desirable. Unfortunately, while
there is an abstraction over command application (`apply.Task`), most
of the logic worth testing lives in `(*replicaAppBatch)` which is
essentially a `*Replica` with more moving parts attached. This does
not lend itself well to unit testing.

I had a run[^2][^3][^4] earlier this year to make log application
standalone, but then didn't have enough time to follow through.
It would be desirable to do so at a later date, perhaps with
the explicit goals of having interactions like the one discussion
in this PR unit become testable.

[^4]: cockroachdb#93309
[^3]: cockroachdb#93266
[^2]: cockroachdb#93239

[^1]: https://github.com/cockroachdb/cockroach/pull/94633/files#diff-50e458584d176deae52b20a7c04461b3e4110795c8c9a307cf7ee6696ba6bc60R238

This assertion was previously trying to assert too much at a distance
and was not only incorrect, but additionally inscrutable.

It was mixing up two assertions, the first one of which is sensible:
If an entry is accepted, it must not be superseded by inflight proposal.
If this were violated, this superseded proposal could also apply,
resulting in a failure of replay protection. This assertion is now
still around as a stand-alone assertion.

The other half of the assertion was more confused: if an entry is
rejected, it was claiming that it couldn't also be superseded. The
thinking was that if a superseding log entry exists, maybe it could
apply, and that would be bad since we just told the waiting client
that their proposal got rejected.

This reasoning is incorrect, as the following example shows. Consider
the following initial situation:

    [lease seq is 1]
    log idx 99:  unrelated cmd at LAI 10000, lease seq = 1
    log idx 100: cmd X at LAI 10000, lease seq = 1

And next:

- a new lease enters the log at idx 101 (lease seq = 2)
- an identical copy of idx 100 enters the log at idx 102
- we apply idx 100, leading to superseding reproposal at idx 103

resulting in the log:

    [lease seq is 1]
    log idx 99: unrelated cmd at LAI 10000, lease seq = 1
    log idx 100: cmd X at LAI 10000, lease seq = 1
    log idx 101: lease seq = 2
    log idx 102: (same as idx 100)
    log idx 103: cmd X at LAI = 20000, lease seq = 1

During application of idx 102, we get a *permanent* rejection and yet
the entry is superseded (by the proposal at idx 103). This would
erroneously trigger the assertion, even though this is a legal sequence
of events with no detrimental outcomes: the superseding proposal will
always have the same lease sequence as its superseded copies, so it
will also fail.

I initially tried only soften the assertion a *little bit*. Observing
that the example above led to a *permanent* rejection, should we only
require that a proposal (which in this assertion is always local) is not
superseded if it got rejected due to its lease index (which implies that
it passed the lease check)? It turns out that this is primarily an
assertion on when superseded proposals are counted as "local" at this
point in the code: if there were multiple copies of this rejected
proposal in the current `appTask` (i.e. the current `CommittedEntries`
slice handed to us for application by raft), then all copies are
initially local; and a copy that successfully spawns a superseding
proposal would be made non-local from that point on. On the face
of it, All other copies in the same `appTask` would now hit the
assertion (erroneously): they are local, they are rejected, so
why don't they enter the branch? The magic ingredient is that
if an entry is superseded when we handle the lease index rejection,
we also unlink the proposal from it. So these never enter this
path since it's not local at this point.

For example, if these are the log entries to apply (all at valid lease
seq):

    log idx 99: unrelated cmd at LAI 10000
    log idx 100: cmd X at LAI 10000
    log idx 101: (identical copy of idx 100)

and idxs 99-101 are applied in one batch, then idx 100 would spawn
a reproposal at a new lease applied index:

    log idx 99: unrelated cmd at LAI 10000
    log idx 100: cmd X at LAI 10000             <- applied
    log idx 101: (identical copy of idx 100)
    log idx 100: cmd X at LAI 20000             <- not in current batch

When we apply 101, we observe an illegal lease index, but the proposal
supersedes the entry, so we mark it as non-local and don't enter the
branch that contains the assertion.

The above reasoning is very difficult to understand, and it happens
too far removed from where the interesting state changes happen.

Also, for testing purposes it is interesting to introduce "errors"
in the lease applied index assignment to artificially exercise these
reproposal mechanisms. In doing so, these assertions can trip because
the lease applied index assigned to a reproposal might accidentally
(or intentionally!) match the existing lease applied index, in which
case copies of the command in the same batch now *don't* consider
themselves superseded.

The value of this testing outweighs the very limited benefit of
this branch of the assertion. An argument could even be made that
this assertion alone as negative benefit due to its complexity.

We are removing it in this commit and will instead work towards
simplifying the mechanisms that played a role in explaining the
asssertion.

Closes cockroachdb#97102.
Closes cockroachdb#97347.
Closes cockroachdb#97447.
Closes cockroachdb#97612.

No release note because unreleased (except perhaps in an alpha).

Epic: none
Release note: None



Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Feb 26, 2023
2 parents 4e6df7b + 554a5d5 commit 13c58f6
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 58 deletions.
111 changes: 77 additions & 34 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,67 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
case kvserverbase.ProposalRejectionPermanent:
cmd.response.Err = pErr
case kvserverbase.ProposalRejectionIllegalLeaseIndex:
// Reset the error as it's now going to be determined by the outcome of
// reproposing (or not).
//
// Note that if pErr remains nil, we will mark the proposal as non-local at
// the end of this block and return, so we're not hitting an NPE near the end
// of this method where we're attempting to reach into `cmd.proposal`.
//
// This control flow is sketchy but it preserves existing behavior
// that would be too risky to change at the time of writing.
//
pErr = nil
// If we failed to apply at the right lease index, try again with a
// new one. This is important for pipelined writes, since they don't
// have a client watching to retry, so a failure to eventually apply
// the proposal would be a user-visible error.
pErr = tryReproposeWithNewLeaseIndex(ctx, cmd, (*replicaReproposer)(r))
//
// If the command associated with this rejected raft entry already applied
// then we don't want to repropose it. Doing so could lead to duplicate
// application of the same proposal. (We can see hit this case if an application
// batch contains multiple copies of the same proposal, in which case they are
// all marked as local, the first one will apply (and set p.applied) and the
// remaining copies will hit this branch).
//
// Similarly, if the proposal associated with this rejected raft entry is
// superseded by a different (larger) MaxLeaseIndex than the one we decoded
// from the entry itself, the command must have already passed through
// tryReproposeWithNewLeaseIndex previously (this can happen if there are
// multiple copies of the command in the logs; see
// TestReplicaRefreshMultiple). We must not create multiple copies with
// multiple lease indexes, so don't repropose it again. This ensures that at
// any time, there is only up to a single lease index that has a chance of
// succeeding in the Raft log for a given command.
//
// Taking a looking glass to the last paragraph, we see that the situation
// is much more subtle. As tryReproposeWithNewLeaseIndex gets to the stage
// where it calls `(*Replica).propose` (i.e. it passed the closed
// timestamp checks), it *resets* `cmd.proposal.MaxLeaseIndex` to zero.
// This means that the proposal immediately supersedes any other copy
// presently in the log, including for the remainder of application of the
// current log entry (since the current entry's LAI is certainly not equal
// to zero). However, the proposal buffer adds another layer of
// possibility on top of this. Typically, the buffer does not flush
// immediately: this only happens at the beginning of the *next* raft
// handling cycle, i.e. the proposal will not re-enter the proposals map
// while the current batches of commands (recall, which may contain an
// arbitrary number of copies of the current command, both with various
// LAIs all but at most one of which are too small) are applied. *But*,
// the proposal buffer has a capacity, and if it does fill up in
// `(*Replica).propose` it will synchronously flush, meaning that a new
// LAI will be assigned to `cmd.proposal.MaxLeaseIndex` AND the command
// will have re-entered the proposals map. So even if we remove the
// "zeroing" of the MaxLeaseIndex prior to proposing, we still have to
// contend with the fact that once `tryReproposeWithNewLeaseIndex` may
// or may not be in the map. (At least it is assigned a new LAI if and
// only if it is back in the map!).
//
// These many possible worlds are a major source of complexity, a
// reduction of which is postponed.
if !cmd.proposal.applied && !cmd.proposal.Supersedes(cmd.Cmd.MaxLeaseIndex) {
pErr = tryReproposeWithNewLeaseIndex(ctx, cmd, (*replicaReproposer)(r))
}

if pErr != nil {
// An error from tryReproposeWithNewLeaseIndex implies that the current
Expand All @@ -135,6 +191,21 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
log.Infof(ctx, "failed to repropose %s at idx %d with new lease index: %s", cmd.ID, cmd.Index(), pErr)
cmd.response.Err = pErr

// This assertion can mis-fire if we artificially inject invalid LAIs
// during proposal, see the explanatory comment above. If, in the
// current app batch, we had two identical copies of an entry (which
// maps to a local proposal), and the LAI was stale, then both entries
// would be local. The first could succeed to repropose, which, if the
// propBuf was full, would immediately insert into the proposals map.
// Normally, when we then apply the second entry, it would be superseded
// and not hit the assertion. But, if we injected a stale LAI during
// this last reproposal, we could "accidentally" assign the same LAI
// again. The second entry would then try to repropose again, which is
// fine, but it could bump into the closed timestamp, get an error,
// enter this branch, and then trip the assertion.
//
// For proposed simplifications, see:
// https://github.com/cockroachdb/cockroach/issues/97633
r.mu.RLock()
_, inMap := r.mu.proposals[cmd.ID]
r.mu.RUnlock()
Expand All @@ -143,9 +214,9 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
log.Fatalf(ctx, "failed reproposal unexpectedly in proposals map: %+v", cmd)
}
} else {
// Unbind the entry's local proposal because we just succeeded
// in reproposing it and we don't want to acknowledge the client
// yet.
// Unbind the entry's local proposal because we just succeeded in
// reproposing it or decided not to repropose. Either way, we don't want
// to acknowledge the client yet.
cmd.proposal = nil
return
}
Expand Down Expand Up @@ -207,40 +278,12 @@ func (r *replicaReproposer) newNotLeaseHolderError(msg string) *kvpb.NotLeaseHol
// observed all applied entries between proposal and the lease index becoming
// invalid, as opposed to skipping some of them by applying a snapshot).
//
// Returns a nil error if the command has already been successfully applied or
// has been reproposed here or by a different entry for the same proposal that
// hit an illegal lease index error.
//
// If this returns a nil error once, it will return a nil error for future calls
// as well, assuming that trackEvaluatingRequest returns monotonically increasing
// timestamps across subsequent calls.
// The caller must already have checked that the entry is local and not
// superseded, and that it was rejected with an illegal lease index error.
func tryReproposeWithNewLeaseIndex(
ctx context.Context, cmd *replicatedCmd, r reproposer,
) *kvpb.Error {
// Note that we don't need to validate anything about the proposal's
// lease here - if we got this far, we know that everything but the
// index is valid at this point in the log.
p := cmd.proposal
if p.applied || p.Supersedes(cmd.Cmd.MaxLeaseIndex) {
// If the command associated with this rejected raft entry already applied
// then we don't want to repropose it. Doing so could lead to duplicate
// application of the same proposal. (We can see hit this case if an application
// batch contains multiple copies of the same proposal, in which case they are
// all marked as local, the first one will apply (and set p.applied) and the
// remaining copies will hit this branch).
//
// Similarly, if the proposal associated with this rejected raft entry is
// superseded by a different (larger) MaxLeaseIndex than the one we decoded
// from the entry itself, the command must have already passed through
// tryReproposeWithNewLeaseIndex previously (this can happen if there are
// multiple copies of the command in the logs; see
// TestReplicaRefreshMultiple). We must not create multiple copies with
// multiple lease indexes, so don't repropose it again. This ensures that at
// any time, there is only up to a single lease index that has a chance of
// succeeding in the Raft log for a given command.
return nil
}

// We need to track the request again in order to protect its timestamp until
// it gets reproposed.
// TODO(andrei): Only track if the request consults the ts cache. Some
Expand Down
60 changes: 36 additions & 24 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/kr/pretty"
"go.etcd.io/raft/v3"
)
Expand Down Expand Up @@ -150,6 +151,26 @@ func (sm *replicaStateMachine) NewBatch() apply.Batch {
return b
}

func formatReplicatedCmd(cmd *replicatedCmd) redact.RedactableString {
var buf redact.StringBuilder
// We need to zero various data structures that would otherwise
// cause panics in `pretty.Sprint`.
var pd ProposalData
if cmd.proposal != nil {
pd = *cmd.proposal
pd.ctx = nil
pd.sp = nil
pd.command.TraceData = nil
pd.quotaAlloc = nil
pd.tok = TrackedRequestToken{}
pd.ec = endCmds{}
}

// NB: this redacts very poorly, but this is considered acceptable for now.
redact.Fprintf(&buf, "cmd:%s\n\nproposal: %s", pretty.Sprint(cmd.ReplicatedCmd), pretty.Sprint(pd))
return buf.RedactableString()
}

// ApplySideEffects implements the apply.StateMachine interface. The method
// handles the third phase of applying a command to the replica state machine.
//
Expand Down Expand Up @@ -223,30 +244,21 @@ func (sm *replicaStateMachine) ApplySideEffects(
sm.r.handleReadWriteLocalEvalResult(ctx, *cmd.localResult)
}

if higherReproposalsExist := cmd.proposal.Supersedes(cmd.Cmd.MaxLeaseIndex); higherReproposalsExist {
// If the command wasn't rejected, we just applied it and no higher
// reproposal must exist (since that one may also apply).
//
// If the command was rejected with ProposalRejectionPermanent, no higher
// reproposal should exist (after all, whoever made that reproposal should
// also have seen a permanent rejection).
//
// If it was rejected with ProposalRejectionIllegalLeaseIndex, then the
// subsequent call to tryReproposeWithNewLeaseIndex[^1] must have returned an
// error (or the proposal would not be IsLocal() now). But that call
// cannot return an error for a proposal that is already superseded
// initially.
//
// [^1]: see (*replicaDecoder).retrieveLocalProposals()
log.Fatalf(ctx, "finishing proposal with outstanding reproposal at a higher max lease index: %+v", cmd)
}
if !cmd.Rejected() && cmd.proposal.applied {
// If the command already applied then we shouldn't be "finishing" its
// application again because it should only be able to apply successfully
// once. We expect that when any reproposal for the same command attempts
// to apply it will be rejected by the below raft lease sequence or lease
// index check in checkForcedErr.
log.Fatalf(ctx, "command already applied: %+v; unexpected successful result", cmd)
if !cmd.Rejected() {
if cmd.proposal.applied {
// If the command already applied then we shouldn't be "finishing" its
// application again because it should only be able to apply successfully
// once. We expect that when any reproposal for the same command attempts
// to apply it will be rejected by the below raft lease sequence or lease
// index check in checkForcedErr.
log.Fatalf(ctx, "command already applied: %+v; unexpected successful result", cmd)
}
if cmd.proposal.Supersedes(cmd.Cmd.MaxLeaseIndex) {
// If an entry is superseded but it wasn't rejected, something is wrong.
// The superseding reproposal could apply as well, leading to doubly applying
// a command.
log.Fatalf(ctx, "applying superseded proposal: %s", formatReplicatedCmd(cmd))
}
}
cmd.proposal.applied = true
}
Expand Down

0 comments on commit 13c58f6

Please sign in to comment.