Skip to content

Commit

Permalink
kvserverbase: extend ApplyFilterArgs
Browse files Browse the repository at this point in the history
Epic: none
Release note: None
  • Loading branch information
erikgrinaker committed Jan 2, 2024
1 parent c316d6a commit a9f4364
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 4 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvserverbase/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ go_library(
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_cockroachdb_redact//:redact",
"@io_etcd_go_raft_v3//raftpb",
"@org_golang_x_time//rate",
],
)
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/kvserverbase/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/redact"
"go.etcd.io/raft/v3/raftpb"
)

// MergeQueueEnabled is a setting that controls whether the merge queue is
Expand Down Expand Up @@ -150,8 +151,12 @@ type ProposalFilterArgs struct {
type ApplyFilterArgs struct {
kvserverpb.ReplicatedEvalResult
CmdID CmdIDKey
Cmd kvserverpb.RaftCommand
Entry raftpb.Entry
RangeID roachpb.RangeID
StoreID roachpb.StoreID
ReplicaID roachpb.ReplicaID
Ephemeral bool
Req *kvpb.BatchRequest // only set on the leaseholder
ForcedError *kvpb.Error
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_app_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (b *replicaAppBatch) Stage(

// Then, maybe override the result with testing knobs.
if b.r.store.TestingKnobs() != nil {
fr = replicaApplyTestingFilters(ctx, b.r, cmd, fr)
fr = replicaApplyTestingFilters(ctx, b.r, cmd, fr, false /* ephemeral */)
}

// Now update cmd. We'll either put the lease index in it or zero out
Expand Down Expand Up @@ -797,7 +797,7 @@ func (mb *ephemeralReplicaAppBatch) Stage(
fr := kvserverbase.CheckForcedErr(
ctx, cmd.ID, &cmd.Cmd, cmd.IsLocal(), &mb.state,
)
fr = replicaApplyTestingFilters(ctx, mb.r, cmd, fr)
fr = replicaApplyTestingFilters(ctx, mb.r, cmd, fr, true /* ephemeral */)
cmd.ForcedErrResult = fr
if !cmd.Rejected() && cmd.LeaseIndex > mb.state.LeaseAppliedIndex {
mb.state.LeaseAppliedIndex = cmd.LeaseIndex
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,12 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
var newPropRetry int
newPropRetry, pErr = filter(kvserverbase.ApplyFilterArgs{
CmdID: cmd.ID,
Cmd: cmd.Cmd,
Entry: cmd.Entry.Entry,
ReplicatedEvalResult: *cmd.ReplicatedResult(),
StoreID: r.store.StoreID(),
RangeID: r.RangeID,
ReplicaID: r.replicaID,
Req: cmd.proposal.Request,
ForcedError: cmd.ForcedError,
})
Expand Down
13 changes: 12 additions & 1 deletion pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ func (r *Replica) getStateMachine() *replicaStateMachine {

// TODO(tbg): move this to replica_app_batch.go.
func replicaApplyTestingFilters(
ctx context.Context, r *Replica, cmd *replicatedCmd, fr kvserverbase.ForcedErrResult,
ctx context.Context,
r *Replica,
cmd *replicatedCmd,
fr kvserverbase.ForcedErrResult,
ephemeral bool,
) kvserverbase.ForcedErrResult {
// By default, output is input.
newFR := fr
Expand All @@ -96,9 +100,13 @@ func replicaApplyTestingFilters(
if filter := r.store.cfg.TestingKnobs.TestingApplyCalledTwiceFilter; fr.ForcedError != nil || filter != nil {
args := kvserverbase.ApplyFilterArgs{
CmdID: cmd.ID,
Cmd: cmd.Cmd,
Entry: cmd.Entry.Entry,
ReplicatedEvalResult: *cmd.ReplicatedResult(),
StoreID: r.store.StoreID(),
RangeID: r.RangeID,
ReplicaID: r.replicaID,
Ephemeral: ephemeral,
ForcedError: fr.ForcedError,
}
if fr.ForcedError == nil {
Expand Down Expand Up @@ -254,8 +262,11 @@ func (sm *replicaStateMachine) ApplySideEffects(
// NB: ReplicatedEvalResult is emptied by now, so don't include it.
if _, pErr := f(kvserverbase.ApplyFilterArgs{
CmdID: cmd.ID,
Cmd: cmd.Cmd,
Entry: cmd.Entry.Entry,
StoreID: sm.r.store.StoreID(),
RangeID: sm.r.RangeID,
ReplicaID: sm.r.replicaID,
ForcedError: cmd.ForcedError,
}); pErr != nil {
return nil, pErr.GoError()
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ type StoreTestingKnobs struct {
// Users have to expect the filter to be invoked twice for each command, once
// from ephemerealReplicaAppBatch.Stage, and once from replicaAppBatch.Stage;
// this has to do with wanting to early-ack successful proposals. The second
// call is conditional on the first call succeeding.
// call is conditional on the first call succeeding. The field
// ApplyFilterArgs.Ephemeral will be true for the initial call.
//
// Consider using a TestPostApplyFilter instead, and use a
// TestingApplyCalledTwiceFilter only to inject forced errors.
Expand Down

0 comments on commit a9f4364

Please sign in to comment.