Skip to content

Commit

Permalink
kvserverbase: introduce struct for return val of CheckForcedErr
Browse files Browse the repository at this point in the history
Epic: CRDB-220
Release note: None
  • Loading branch information
tbg committed Dec 19, 2022
1 parent e61de15 commit 54e3708
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 70 deletions.
20 changes: 7 additions & 13 deletions pkg/kv/kvserver/app_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -54,39 +53,34 @@ type appBatch struct {

func (b *appBatch) assertAndCheckCommand(
ctx context.Context, cmd *raftlog.ReplicatedCmd, state *kvserverpb.ReplicaState, isLocal bool,
) (leaseIndex uint64, _ kvserverbase.ProposalRejectionType, forcedErr *roachpb.Error, _ error) {
) (kvserverbase.ForcedErrResult, error) {
if log.V(4) {
log.Infof(ctx, "processing command %x: raftIndex=%d maxLeaseIndex=%d closedts=%s",
cmd.ID, cmd.Index(), cmd.Cmd.MaxLeaseIndex, cmd.Cmd.ClosedTimestamp)
}

if cmd.Index() == 0 {
return 0, 0, nil, errors.AssertionFailedf("processRaftCommand requires a non-zero index")
return kvserverbase.ForcedErrResult{}, errors.AssertionFailedf("processRaftCommand requires a non-zero index")
}
if idx, applied := cmd.Index(), state.RaftAppliedIndex; idx != applied+1 {
// If we have an out-of-order index, there's corruption. No sense in
// trying to update anything or running the command. Simply return.
return 0, 0, nil, errors.AssertionFailedf("applied index jumped from %d to %d", applied, idx)
return kvserverbase.ForcedErrResult{}, errors.AssertionFailedf("applied index jumped from %d to %d", applied, idx)
}

// TODO(sep-raft-log): move the closedts checks from replicaAppBatch here as
// well. This just needs a bit more untangling as they reference *Replica, but
// for no super-convincing reason.

leaseIndex, rej, forcedErr := kvserverbase.CheckForcedErr(ctx, cmd.ID, &cmd.Cmd, isLocal, state)
return leaseIndex, rej, forcedErr, nil
return kvserverbase.CheckForcedErr(ctx, cmd.ID, &cmd.Cmd, isLocal, state), nil
}

func (b *appBatch) toCheckedCmd(
ctx context.Context,
cmd *raftlog.ReplicatedCmd,
leaseIndex uint64,
rej kvserverbase.ProposalRejectionType,
forcedErr *roachpb.Error,
ctx context.Context, cmd *raftlog.ReplicatedCmd, fr kvserverbase.ForcedErrResult,
) {
cmd.LeaseIndex, cmd.Rejection, cmd.ForcedErr = leaseIndex, rej, forcedErr
cmd.ForcedErrResult = fr
if cmd.Rejected() {
log.VEventf(ctx, 1, "applying command with forced error: %s", cmd.ForcedErr)
log.VEventf(ctx, 1, "applying command with forced error: %s", cmd.ForcedError)

// Apply an empty command.
cmd.Cmd.ReplicatedEvalResult = kvserverpb.ReplicatedEvalResult{}
Expand Down
81 changes: 59 additions & 22 deletions pkg/kv/kvserver/kvserverbase/forced_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ var noopOnEmptyRaftCommandErr = roachpb.NewErrorf("no-op on empty Raft entry")
// corresponding to a ProbeRequest is handled.
var NoopOnProbeCommandErr = roachpb.NewErrorf("no-op on ProbeRequest")

// ForcedErrResult is the output from CheckForcedErr.
type ForcedErrResult struct {
LeaseIndex uint64
Rejection ProposalRejectionType
ForcedError *roachpb.Error
}

// CheckForcedErr determines whether or not a command should be applied to the
// replicated state machine after it has been committed to the Raft log. This
// decision is deterministic on all replicas, such that a command that is
Expand Down Expand Up @@ -69,13 +76,16 @@ func CheckForcedErr(
raftCmd *kvserverpb.RaftCommand,
isLocal bool,
replicaState *kvserverpb.ReplicaState,
) (uint64, ProposalRejectionType, *roachpb.Error) {
) ForcedErrResult {
if raftCmd.ReplicatedEvalResult.IsProbe {
// A Probe is handled by forcing an error during application (which
// avoids a separate "success" code path for this type of request)
// that we can special case as indicating success of the probe above
// raft.
return 0, ProposalRejectionPermanent, NoopOnProbeCommandErr
return ForcedErrResult{
Rejection: ProposalRejectionPermanent,
ForcedError: NoopOnProbeCommandErr,
}
}
leaseIndex := replicaState.LeaseAppliedIndex
isLeaseRequest := raftCmd.ReplicatedEvalResult.IsLeaseRequest
Expand All @@ -89,7 +99,11 @@ func CheckForcedErr(
// Nothing to do here except making sure that the corresponding batch
// (which is bogus) doesn't get executed (for it is empty and so
// properties like key range are undefined).
return leaseIndex, ProposalRejectionPermanent, noopOnEmptyRaftCommandErr
return ForcedErrResult{
LeaseIndex: leaseIndex,
Rejection: ProposalRejectionPermanent,
ForcedError: noopOnEmptyRaftCommandErr,
}
}

// Verify the lease matches the proposer's expectation. We rely on
Expand Down Expand Up @@ -162,11 +176,15 @@ func CheckForcedErr(
// For lease requests we return a special error that
// redirectOnOrAcquireLease() understands. Note that these
// requests don't go through the DistSender.
return leaseIndex, ProposalRejectionPermanent, roachpb.NewError(&roachpb.LeaseRejectedError{
Existing: *replicaState.Lease,
Requested: requestedLease,
Message: "proposed under invalid lease",
})
return ForcedErrResult{
LeaseIndex: leaseIndex,
Rejection: ProposalRejectionPermanent,
ForcedError: roachpb.NewError(&roachpb.LeaseRejectedError{
Existing: *replicaState.Lease,
Requested: requestedLease,
Message: "proposed under invalid lease",
}),
}
}
// We return a NotLeaseHolderError so that the DistSender retries.
// NB: we set proposerStoreID to 0 because we don't know who proposed the
Expand All @@ -176,7 +194,11 @@ func CheckForcedErr(
fmt.Sprintf(
"stale proposal: command was proposed under lease #%d but is being applied "+
"under lease: %s", raftCmd.ProposerLeaseSequence, replicaState.Lease))
return leaseIndex, ProposalRejectionPermanent, roachpb.NewError(nlhe)
return ForcedErrResult{
LeaseIndex: leaseIndex,
Rejection: ProposalRejectionPermanent,
ForcedError: roachpb.NewError(nlhe),
}
}

if isLeaseRequest {
Expand All @@ -188,11 +210,15 @@ func CheckForcedErr(
// However, leases get special vetting to make sure we don't give one to a replica that was
// since removed (see #15385 and a comment in redirectOnOrAcquireLease).
if _, ok := replicaState.Desc.GetReplicaDescriptor(requestedLease.Replica.StoreID); !ok {
return leaseIndex, ProposalRejectionPermanent, roachpb.NewError(&roachpb.LeaseRejectedError{
Existing: *replicaState.Lease,
Requested: requestedLease,
Message: "replica not part of range",
})
return ForcedErrResult{
LeaseIndex: leaseIndex,
Rejection: ProposalRejectionPermanent,
ForcedError: roachpb.NewError(&roachpb.LeaseRejectedError{
Existing: *replicaState.Lease,
Requested: requestedLease,
Message: "replica not part of range",
}),
}
}
} else if replicaState.LeaseAppliedIndex < raftCmd.MaxLeaseIndex {
// The happy case: the command is applying at or ahead of the minimal
Expand All @@ -219,9 +245,12 @@ func CheckForcedErr(
)
retry = ProposalRejectionIllegalLeaseIndex
}
return leaseIndex, retry, roachpb.NewErrorf(
"command observed at lease index %d, but required < %d", leaseIndex, raftCmd.MaxLeaseIndex,
)
return ForcedErrResult{
LeaseIndex: leaseIndex,
Rejection: retry,
ForcedError: roachpb.NewErrorf(
"command observed at lease index %d, but required < %d", leaseIndex, raftCmd.MaxLeaseIndex,
)}
}

// Verify that command is not trying to write below the GC threshold. This is
Expand All @@ -235,10 +264,18 @@ func CheckForcedErr(
// the GC threshold has advanced since then?
wts := raftCmd.ReplicatedEvalResult.WriteTimestamp
if !wts.IsEmpty() && wts.LessEq(*replicaState.GCThreshold) {
return leaseIndex, ProposalRejectionPermanent, roachpb.NewError(&roachpb.BatchTimestampBeforeGCError{
Timestamp: wts,
Threshold: *replicaState.GCThreshold,
})
return ForcedErrResult{
LeaseIndex: leaseIndex,
Rejection: ProposalRejectionPermanent,
ForcedError: roachpb.NewError(&roachpb.BatchTimestampBeforeGCError{
Timestamp: wts,
Threshold: *replicaState.GCThreshold,
}),
}
}
return ForcedErrResult{
LeaseIndex: leaseIndex,
Rejection: ProposalRejectionPermanent,
ForcedError: nil,
}
return leaseIndex, ProposalRejectionPermanent, nil
}
13 changes: 5 additions & 8 deletions pkg/kv/kvserver/raftlog/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/errors"
"go.etcd.io/etcd/raft/v3/raftpb"
)
Expand All @@ -32,13 +31,11 @@ import (
// changes replica boundaries.
type ReplicatedCmd struct {
*Entry
// The following fields are set in shouldApplyCommand when we validate that
// a command applies given the current lease and GC threshold. The process
// of setting these fields is what transforms an apply.Command into an
// The following struct is populated in shouldApplyCommand when we validate
// that a command applies given the current lease and GC threshold. The
// process of populating it is what transforms an apply.Command into an
// apply.CheckedCommand.
LeaseIndex uint64
ForcedErr *roachpb.Error
Rejection kvserverbase.ProposalRejectionType
kvserverbase.ForcedErrResult
}

var _ apply.Command = (*ReplicatedCmd)(nil)
Expand Down Expand Up @@ -106,7 +103,7 @@ func (c *ReplicatedCmd) AckOutcomeAndFinish(context.Context) error { return nil
// A command is rejected if it has a ForcedErr, i.e. if the state machines
// (deterministically) apply the associated entry as a no-op. See
// kvserverbase.CheckForcedErr.
func (c *ReplicatedCmd) Rejected() bool { return c.ForcedErr != nil }
func (c *ReplicatedCmd) Rejected() bool { return c.ForcedError != nil }

// CanAckBeforeApplication implements apply.CheckedCommand.
//
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvserver/replica_app_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,19 @@ func (b *replicaAppBatch) Stage(
// into appBatch at appropriate times.
var ab appBatch

leaseIndex, rej, forcedErr, err := ab.assertAndCheckCommand(ctx, &cmd.ReplicatedCmd, &b.state, cmd.IsLocal())
fr, err := ab.assertAndCheckCommand(ctx, &cmd.ReplicatedCmd, &b.state, cmd.IsLocal())
if err != nil {
return nil, err
}

// Then, maybe override the result with testing knobs.
if b.r.store.TestingKnobs() != nil {
rej, forcedErr = replicaApplyTestingFilters(ctx, b.r, cmd, rej, forcedErr)
fr = replicaApplyTestingFilters(ctx, b.r, cmd, fr)
}

// Now update cmd. We'll either put the lease index in it or zero out
// the cmd in case there's a forced error.
ab.toCheckedCmd(ctx, &cmd.ReplicatedCmd, leaseIndex, rej, forcedErr)
ab.toCheckedCmd(ctx, &cmd.ReplicatedCmd, fr)

// TODO(tbg): these assertions should be pushed into
// (*appBatch).assertAndCheckCommand.
Expand Down Expand Up @@ -785,11 +785,11 @@ func (mb *ephemeralReplicaAppBatch) Stage(
) (apply.CheckedCommand, error) {
cmd := cmdI.(*replicatedCmd)

leaseIndex, rejection, forcedErr := kvserverbase.CheckForcedErr(
fr := kvserverbase.CheckForcedErr(
ctx, cmd.ID, &cmd.Cmd, cmd.IsLocal(), &mb.state,
)
rejection, forcedErr = replicaApplyTestingFilters(ctx, mb.r, cmd, rejection, forcedErr)
cmd.LeaseIndex, cmd.Rejection, cmd.ForcedErr = leaseIndex, rejection, forcedErr
fr = replicaApplyTestingFilters(ctx, mb.r, cmd, fr)
cmd.ForcedErrResult = fr

return cmd, nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
StoreID: r.store.StoreID(),
RangeID: r.RangeID,
Req: cmd.proposal.Request,
ForcedError: cmd.ForcedErr,
ForcedError: cmd.ForcedError,
})
if cmd.Rejection == 0 {
cmd.Rejection = kvserverbase.ProposalRejectionType(newPropRetry)
}
}
if pErr == nil {
pErr = cmd.ForcedErr
pErr = cmd.ForcedError
}

if cmd.Rejection != kvserverbase.ProposalRejectionPermanent && pErr == nil {
Expand Down
31 changes: 13 additions & 18 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,43 +81,38 @@ func (r *Replica) getStateMachine() *replicaStateMachine {

// TODO(tbg): move this to replica_app_batch.go.
func replicaApplyTestingFilters(
ctx context.Context,
r *Replica,
cmd *replicatedCmd,
rejection kvserverbase.ProposalRejectionType,
forcedErr *roachpb.Error,
) (newRejection kvserverbase.ProposalRejectionType, newForcedErr *roachpb.Error) {
ctx context.Context, r *Replica, cmd *replicatedCmd, fr kvserverbase.ForcedErrResult,
) kvserverbase.ForcedErrResult {
// By default, output is input.
newRejection = rejection
newForcedErr = forcedErr
newFR := fr

// Filters may change that.
if filter := r.store.cfg.TestingKnobs.TestingApplyCalledTwiceFilter; forcedErr != nil || filter != nil {
if filter := r.store.cfg.TestingKnobs.TestingApplyCalledTwiceFilter; fr.ForcedError != nil || filter != nil {
args := kvserverbase.ApplyFilterArgs{
CmdID: cmd.ID,
ReplicatedEvalResult: *cmd.ReplicatedResult(),
StoreID: r.store.StoreID(),
RangeID: r.RangeID,
ForcedError: forcedErr,
ForcedError: fr.ForcedError,
}
if forcedErr == nil {
if fr.ForcedError == nil {
if cmd.IsLocal() {
args.Req = cmd.proposal.Request
}
var newRej int
newRej, newForcedErr = filter(args)
if rejection == 0 {
newRejection = kvserverbase.ProposalRejectionType(newRej)
newRej, newFR.ForcedError = filter(args)
if fr.Rejection == 0 {
newFR.Rejection = kvserverbase.ProposalRejectionType(newRej)
}
} else if feFilter := r.store.cfg.TestingKnobs.TestingApplyForcedErrFilter; feFilter != nil {
var newRej int
newRej, newForcedErr = filter(args)
if rejection == 0 {
newRejection = kvserverbase.ProposalRejectionType(newRej)
newRej, newFR.ForcedError = filter(args)
if fr.Rejection == 0 {
newFR.Rejection = kvserverbase.ProposalRejectionType(newRej)
}
}
}
return newRejection, newForcedErr
return newFR
}

// NewEphemeralBatch implements the apply.StateMachine interface.
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,10 +825,11 @@ func TestLeaseReplicaNotInDesc(t *testing.T) {
},
}
tc.repl.mu.Lock()
_, _, pErr := kvserverbase.CheckForcedErr(
fr := kvserverbase.CheckForcedErr(
ctx, makeIDKey(), &raftCmd, false, /* isLocal */
&tc.repl.mu.state,
)
pErr := fr.ForcedError
tc.repl.mu.Unlock()
if _, isErr := pErr.GetDetail().(*roachpb.LeaseRejectedError); !isErr {
t.Fatal(pErr)
Expand Down

0 comments on commit 54e3708

Please sign in to comment.