Skip to content

Commit

Permalink
kvserver: improve reproposal assertions and documentation
Browse files Browse the repository at this point in the history
Reproposals are a deep rabbit hole and an area in which past changes
were all related to subtle bugs. Write it all up and in particular make
some simplifications that ought to be possible if my understanding is
correct:

- have proposals always enter `(*Replica).propose` without a
  MaxLeaseIndex or prior encoded command set, i.e. `propose`
  behaves the same for reproposals as for first proposals.
- assert that after a failed call to tryReproposeWithNewLeaseIndex,
  the command is not in the proposals map, i.e. check absence of
  a leak.
- replace code that should be impossible to reach (and had me confused
  for a good amount of time) with an assertion.
- add long comment on `r.mu.proposals`.

This commit also moves `tryReproposeWithNewLeaseIndex` off `(*Replica)`,
which is possible due to recent changes[^1]. In doing so, I realized
there was a (small) data race (now fixed): when returning a
`NotLeaseholderError` from that method, we weren't acquiring `r.mu`. It
may have looked as though we were holding it already since we're
accessing `r.mu.propBuf`, however that field has special semantics - it
wraps `r.mu` and acquires it when needed.

[^1]: The "below raft" test mentioned in the previous comment was
changed in #93785 and
no longer causes a false positive.

Epic: CRDB-220
Release note: None
  • Loading branch information
tbg committed Feb 8, 2023
1 parent ec38fb4 commit 866d58a
Show file tree
Hide file tree
Showing 6 changed files with 307 additions and 72 deletions.
156 changes: 147 additions & 9 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,9 +498,13 @@ type Replica struct {
// Instead, the buffer internally holds a reference to mu and will use
// it appropriately.
proposalBuf propBuf
// proposals stores the Raft in-flight commands which originated at
// this Replica, i.e. all commands for which propose has been called,
// but which have not yet applied.

// proposals stores the Raft in-flight commands which originated at this
// Replica, i.e. all commands for which propose has been called, but which
// have not yet applied. A proposal is "pending" until it is "finalized",
// meaning that `finishApplication` has been invoked on the proposal (which
// informs the client that the proposal has now been applied, optionally
// with an error, which may be an AmbiguousResultError).
//
// The *ProposalData in the map are "owned" by it. Elements from the
// map must only be referenced while the Replica.mu is held, except
Expand All @@ -509,12 +513,146 @@ type Replica struct {
// underneath raft. See comments on ProposalData fields for synchronization
// requirements.
//
// Due to Raft reproposals, multiple in-flight Raft entries can have
// the same CmdIDKey, all corresponding to the same KV request. However,
// not all Raft entries with a given command ID will correspond directly
// to the *RaftCommand contained in its associated *ProposalData. This
// is because the *RaftCommand can be mutated during reproposals by
// Replica.tryReproposeWithNewLeaseIndex.
// Due to Raft reproposals, multiple in-flight Raft entries can have the
// same CmdIDKey. There are two kinds of reproposals:
//
// (1) the exact same entry is handed to raft (possibly despite already being
// present in the log), usually after a timeout[^1].
//
// (2) an existing proposal is updated with a new MaxLeaseIndex and handed to
// raft, i.e. we're intentionally creating a duplicate. This exists because
// for pipelined proposals, the client's goroutine returns without waiting
// for the proposal to apply.[^2][^3] When (2) is carried out, the existing
// copies of the proposal in the log will be "Superseded", see below. Note
// that (2) will only be invoked for proposals that aren't currently in the
// proposals map any more because they're in the middle of being applied;
// as part of (2), they are re-added to the map.
//
// To understand reproposals, we need a broad overview of entry application,
// which is batched (i.e. may process multiple log entries to be applied in
// a batched fashion). In entry application, the following steps are taken:
//
// 1. retrieve all local proposals: iterate through the entries in order,
// and look them up in the proposals map. For each "local" entry (i.e.
// tracked in the map), remove it from the map (unless the proposal
// is not superseded, see below) and attach the value to the entry.
// 2. for each entry:
// - stage written and in-memory effects of the entry (some may apply as no-ops
// if they fail below-raft checks such as the MaxLeaseIndex check)
// - Assuming the MaxLeaseIndex is violated and additional constraints are
// satisfied, carry out (2) from above. On success, we know now that there
// will be a reproposal in the log that can successfully apply. We unbind
// the local proposal (so we don't signal it) and apply the current entry
// as a no-op.
// 3. carry out additional side effects of the entire batch (stats updates etc).
//
// A prerequisite for (2) is that there currently aren't any copies of the proposal
// in the log that may ultimately apply, or we risk doubly applying commands - a
// correctness bug. After (2), any copies of the entry present in the log will have
// a MaxLeaseIndex strictly less than that of the in-memory command, and will be
// Superseded() by it.
//
// We can always safely create an identical copy (i.e. (1)) because of the
// replay protection conferred by the MaxLeaseIndex - all but the first
// proposal (that reach consensus) will be rejected (i.e. apply as a no-op).
//
// Naively, one might hope that by invoking (2) upon applying an entry for
// a command that is rejected due to a MaxLeaseIndex one could achieve the
// invariant that there is only ever one unapplied copy of the entry in the
// log, and then the in-memory proposal could reflect the MaxLeaseIndex
// assigned to this unapplied copy at all times.
//
// Unfortunately, for various reasons, this invariant does not hold:
// - entry application isn't durable, so upon a restart, we might roll
// back to a log position that yet has to catch up over multiple previous
// incarnations of (2), i.e. we will see the same entry multiple times at
// various MaxLeaseIndex values.
// (This technically not a problem, since we're losing the in-memory proposal
// during the restart anyway, but should be kept in mind anyway).
// - Raft proposal forwarding due to (1)-type reproposals could "in
// principle" lead to an old copy of the entry appearing again in the
// unapplied log, at least if we make the reasonable assumption that
// forwarded proposals may arrive at the leader with arbitrary delays.
//
// As a result, we can't "just" invoke (2) when seeing a rejected command,
// we additionally have to verify that there isn't a more recent reproposal
// underway that could apply successfully and supersedes the one we're
// currently looking at.
// So we carry out (2) only if the MaxLeaseIndex of the in-mem proposal matches
// that of the current entry, and update the in-mem MaxLeaseIndex with the result
// of (2) if it did.
//
// An example follows. Consider the following situation (where N is some base
// index not relevant to the example) in which we have one inflight proposal which
// has been triplicated in the log (due to [^1]):
//
// proposals[id] = p{Cmd{MaxLeaseIndex: 100, ...}}
//
// ... (unrelated entries)
// raftlog[N] = Cmd{MaxLeaseIndex: 100, ...}
// ... (unrelated entries)
// raftlog[N+12] = (same as N)
// ... (unrelated entries)
// raftlog[N+15] = (same as N)
//
// where we assume that the `MaxLeaseIndex` 100 is invalid, i.e. when we see
// the first copy of the command being applied, we've already applied some
// command with equal or higher `MaxLeaseIndex`. In a world without
// mechanism (2), `N` would be rejected, and would finalize the proposal
// (i.e. signal the client with an error and remove the entry from
// `proposals`). Later, `N+12` and `N+15` would similarly be rejected (but
// they wouldn't even be regarded as local proposals any more due to not
// being present in `proposals`).
//
// However, (2) exists and it will engage during application of `N`: realizing
// that the current copies of the entry are all going to be rejected, it will
// modify the proposal by assigning a new `MaxLeaseIndex` to it, and handing
// it to `(*Replica).propose` again (which hands it to the proposal buffer,
// which will at some point flush it, leading to re-insertion into the raft
// log and the `proposals` map). The result will be this picture:
//
// proposals[id] = p{Cmd{MaxLeaseIndex: 192, ...}} <-- modified
//
// ... (unrelated entries)
// raftlog[N] = Cmd{MaxLeaseIndex: 100, ...} <-- applied (as no-op)
// ... (unrelated entries)
// raftlog[N+12] = (same as N) (superseded)
// ... (unrelated entries)
// raftlog[N+15] = (same as N) (superseded)
// ... (unrelated entries)
// raftlog[N+18] = Cmd{MaxLeaseIndex: 192, ...} <-- modified
//
// `N+18` might (in fact, is likely to) apply successfully. As a result, when
// we consider `N+12` or `N+15` for application, we must *not* carry out (2)
// again, or we break replay protection. In other words, the `MaxLeaseIndex`
// of the command being applied must be compared with the `MaxLeaseIndex` of
// the command in the proposals map; only if they match do we know that this
// is the most recent (in MaxLeaseIndex order) copy of the command, and only
// then can (2) engage. In addition, an entry that doesn't pass this equality
// check must not signal the proposer and/or unlink from the proposals map (as a
// newer reproposal which might succeed is likely in the log)[^4].
//
// Another way of framing the above is that `proposals[id].Cmd.MaxLeaseIndex`
// actually tracks the maximum `MaxLeaseIndex` of all copies that may be present in
// the log.
//
// If (2) results in an error (for example, since the proposal now fails to
// respect the closed timestamp), that error will finalize the proposal and
// is returned to the client.
//
// [^1]: https://github.com/cockroachdb/cockroach/blob/59ce13b6052a99a0318e3dfe017908ff5630db30/pkg/kv/kvserver/replica_raft.go#L1224
// [^2]: https://github.com/cockroachdb/cockroach/blob/59ce13b6052a99a0318e3dfe017908ff5630db30/pkg/kv/kvserver/replica_application_result.go#L148
// [^3]: it's debatable how useful this below-raft reproposal mechanism is.
// It was introduced in https://github.com/cockroachdb/cockroach/pull/35261,
// and perhaps could be phased out again if we also did
// https://github.com/cockroachdb/cockroach/issues/21849. Historical
// evidence points to https://github.com/cockroachdb/cockroach/issues/28876
// as the motivation for introducing this mechanism, i.e. it was about
// reducing failure rates early in the life of a cluster when raft
// leaderships were being determined. Perhaps we could "simply" disable
// async writes unless leadership was stable instead, by blocking on the
// proposal anyway.
// [^4]: https://github.com/cockroachdb/cockroach/blob/ab6a8650621ae798377f12bbfc1eee2fbec95480/pkg/kv/kvserver/replica_application_decoder.go#L100-L114
proposals map[kvserverbase.CmdIDKey]*ProposalData
// Indicates that the replica is in the process of applying log entries.
// Updated to true in handleRaftReady before entries are removed from
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_application_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b
// criterion. While such proposals can be reproposed, only the first
// instance that gets applied matters and so removing the command is
// always what we want to happen.
cmd.Cmd.MaxLeaseIndex == cmd.proposal.command.MaxLeaseIndex
!cmd.proposal.Supersedes(cmd.Cmd.MaxLeaseIndex)

if shouldRemove {
// Delete the proposal from the proposals map. There may be reproposals
// of the proposal in the pipeline, but those will all have the same max
Expand Down
125 changes: 97 additions & 28 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,34 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
// new one. This is important for pipelined writes, since they don't
// have a client watching to retry, so a failure to eventually apply
// the proposal would be a user-visible error.
pErr = r.tryReproposeWithNewLeaseIndex(ctx, cmd)
pErr = tryReproposeWithNewLeaseIndex(ctx, cmd, (*replicaReproposer)(r))

if pErr != nil {
// An error from tryReproposeWithNewLeaseIndex implies that the current
// entry is not superseded (i.e. we don't have a reproposal at a higher
// MaxLeaseIndex in the log).
//
// This implies that any additional copies of the command (which may be present
// in the log ahead of the current entry) will also fail.
//
// It is thus safe to signal the error back to the client, which is also
// the only sensible choice at this point.
//
// We also know that the proposal is not in the proposals map, since the
// command is local and wasn't superseded, which is the condition in
// retrieveLocalProposals for removing from the map. So we're not leaking
// a map entry here, which we assert against below (and which has coverage,
// at time of writing, through TestReplicaReproposalWithNewLeaseIndexError).
log.Warningf(ctx, "failed to repropose with new lease index: %s", pErr)
cmd.response.Err = pErr

r.mu.RLock()
_, inMap := r.mu.proposals[cmd.ID]
r.mu.RUnlock()

if inMap {
log.Fatalf(ctx, "failed reproposal unexpectedly in proposals map: %+v", cmd)
}
} else {
// Unbind the entry's local proposal because we just succeeded
// in reproposing it and we don't want to acknowledge the client
Expand All @@ -135,36 +159,77 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
}
}

// reproposer is used by tryReproposeWithNewLeaseIndex.
type reproposer interface {
trackEvaluatingRequest(context.Context, hlc.Timestamp) (hlc.Timestamp, TrackedRequestToken)
propose(context.Context, *ProposalData, TrackedRequestToken) *roachpb.Error
newNotLeaseHolderError(string) *roachpb.NotLeaseHolderError
}

type replicaReproposer Replica

var _ reproposer = (*replicaReproposer)(nil)

func (r *replicaReproposer) trackEvaluatingRequest(
ctx context.Context, wts hlc.Timestamp,
) (hlc.Timestamp, TrackedRequestToken) {
// NB: must not hold r.mu here, the propBuf acquires it itself.
return r.mu.proposalBuf.TrackEvaluatingRequest(ctx, wts)
}

func (r *replicaReproposer) propose(
ctx context.Context, p *ProposalData, tok TrackedRequestToken,
) *roachpb.Error {
return (*Replica)(r).propose(ctx, p, tok)
}

func (r *replicaReproposer) newNotLeaseHolderError(msg string) *roachpb.NotLeaseHolderError {
r.mu.RLock()
defer r.mu.RUnlock()
return roachpb.NewNotLeaseHolderError(
*r.mu.state.Lease,
r.store.StoreID(),
r.mu.state.Desc,
msg,
)
}

// tryReproposeWithNewLeaseIndex is used by prepareLocalResult to repropose
// commands that have gotten an illegal lease index error, and that we know
// could not have applied while their lease index was valid (that is, we
// observed all applied entries between proposal and the lease index becoming
// invalid, as opposed to skipping some of them by applying a snapshot).
//
// It is not intended for use elsewhere and is only a top-level function so that
// it can avoid the below_raft_protos check. Returns a nil error if the command
// has already been successfully applied or has been reproposed here or by a
// different entry for the same proposal that hit an illegal lease index error.
func (r *Replica) tryReproposeWithNewLeaseIndex(
ctx context.Context, cmd *replicatedCmd,
// Returns a nil error if the command has already been successfully applied or
// has been reproposed here or by a different entry for the same proposal that
// hit an illegal lease index error.
//
// If this returns a nil error once, it will return a nil error for future calls
// as well, assuming that trackEvaluatingRequest returns monotonically increasing
// timestamps across subsequent calls.
func tryReproposeWithNewLeaseIndex(
ctx context.Context, cmd *replicatedCmd, r reproposer,
) *roachpb.Error {
// Note that we don't need to validate anything about the proposal's
// lease here - if we got this far, we know that everything but the
// index is valid at this point in the log.
p := cmd.proposal
if p.applied || cmd.Cmd.MaxLeaseIndex != p.command.MaxLeaseIndex {
// If the command associated with this rejected raft entry already
// applied then we don't want to repropose it. Doing so could lead
// to duplicate application of the same proposal.
if p.applied || p.Supersedes(cmd.Cmd.MaxLeaseIndex) {
// If the command associated with this rejected raft entry already applied
// then we don't want to repropose it. Doing so could lead to duplicate
// application of the same proposal. (We can see hit this case if an application
// batch contains multiple copies of the same proposal, in which case they are
// all marked as local, the first one will apply (and set p.applied) and the
// remaining copies will hit this branch).
//
// Similarly, if the command associated with this rejected raft
// entry has a different (larger) MaxLeaseIndex than the one we
// decoded from the entry itself, the command must have already
// been reproposed (this can happen if there are multiple copies
// of the command in the logs; see TestReplicaRefreshMultiple).
// We must not create multiple copies with multiple lease indexes,
// so don't repropose it again. This ensures that at any time,
// there is only up to a single lease index that has a chance of
// Similarly, if the proposal associated with this rejected raft entry is
// superseded by a different (larger) MaxLeaseIndex than the one we decoded
// from the entry itself, the command must have already passed through
// tryReproposeWithNewLeaseIndex previously (this can happen if there are
// multiple copies of the command in the logs; see
// TestReplicaRefreshMultiple). We must not create multiple copies with
// multiple lease indexes, so don't repropose it again. This ensures that at
// any time, there is only up to a single lease index that has a chance of
// succeeding in the Raft log for a given command.
return nil
}
Expand All @@ -173,27 +238,31 @@ func (r *Replica) tryReproposeWithNewLeaseIndex(
// it gets reproposed.
// TODO(andrei): Only track if the request consults the ts cache. Some
// requests (e.g. EndTxn) don't care about closed timestamps.
minTS, tok := r.mu.proposalBuf.TrackEvaluatingRequest(ctx, p.Request.WriteTimestamp())
minTS, tok := r.trackEvaluatingRequest(ctx, p.Request.WriteTimestamp())
defer tok.DoneIfNotMoved(ctx)

// NB: p.Request.Timestamp reflects the action of ba.SetActiveTimestamp.
if p.Request.AppliesTimestampCache() && p.Request.WriteTimestamp().LessEq(minTS) {
// The tracker wants us to forward the request timestamp, but we can't
// do that without re-evaluating, so give up. The error returned here
// will go to back to DistSender, so send something it can digest.
err := roachpb.NewNotLeaseHolderError(
*r.mu.state.Lease,
r.store.StoreID(),
r.mu.state.Desc,
"reproposal failed due to closed timestamp",
)
return roachpb.NewError(err)
// will go back to DistSender, so send something it can digest.
return roachpb.NewError(r.newNotLeaseHolderError("reproposal failed due to closed timestamp"))
}
// Some tests check for this log message in the trace.
log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex")

// Reset the command for reproposal.
prevMaxLeaseIndex := p.command.MaxLeaseIndex
prevEncodedCommand := p.encodedCommand
p.command.MaxLeaseIndex = 0
p.encodedCommand = nil
pErr := r.propose(ctx, p, tok.Move(ctx))
if pErr != nil {
// On error, reset the fields we zeroed out to their old value.
// This ensures that the proposal doesn't count as Superseded
// now.
p.command.MaxLeaseIndex = prevMaxLeaseIndex
p.encodedCommand = prevEncodedCommand
return pErr
}
log.VEventf(ctx, 2, "reproposed command %x", cmd.ID)
Expand Down
Loading

0 comments on commit 866d58a

Please sign in to comment.