From a9f4364eea13b01229cd314b1dd3308dc304756d Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 19 Dec 2023 11:32:09 +0000 Subject: [PATCH] kvserverbase: extend `ApplyFilterArgs` Epic: none Release note: None --- pkg/kv/kvserver/kvserverbase/BUILD.bazel | 1 + pkg/kv/kvserver/kvserverbase/base.go | 5 +++++ pkg/kv/kvserver/replica_app_batch.go | 4 ++-- pkg/kv/kvserver/replica_application_result.go | 3 +++ .../kvserver/replica_application_state_machine.go | 13 ++++++++++++- pkg/kv/kvserver/testing_knobs.go | 3 ++- 6 files changed, 25 insertions(+), 4 deletions(-) diff --git a/pkg/kv/kvserver/kvserverbase/BUILD.bazel b/pkg/kv/kvserver/kvserverbase/BUILD.bazel index 42acd7cb249d..afabd75b41d5 100644 --- a/pkg/kv/kvserver/kvserverbase/BUILD.bazel +++ b/pkg/kv/kvserver/kvserverbase/BUILD.bazel @@ -30,6 +30,7 @@ go_library( "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_pebble//vfs", "@com_github_cockroachdb_redact//:redact", + "@io_etcd_go_raft_v3//raftpb", "@org_golang_x_time//rate", ], ) diff --git a/pkg/kv/kvserver/kvserverbase/base.go b/pkg/kv/kvserver/kvserverbase/base.go index 2a709353ffc7..6fe687b8d042 100644 --- a/pkg/kv/kvserver/kvserverbase/base.go +++ b/pkg/kv/kvserver/kvserverbase/base.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/redact" + "go.etcd.io/raft/v3/raftpb" ) // MergeQueueEnabled is a setting that controls whether the merge queue is @@ -150,8 +151,12 @@ type ProposalFilterArgs struct { type ApplyFilterArgs struct { kvserverpb.ReplicatedEvalResult CmdID CmdIDKey + Cmd kvserverpb.RaftCommand + Entry raftpb.Entry RangeID roachpb.RangeID StoreID roachpb.StoreID + ReplicaID roachpb.ReplicaID + Ephemeral bool Req *kvpb.BatchRequest // only set on the leaseholder ForcedError *kvpb.Error } diff --git a/pkg/kv/kvserver/replica_app_batch.go b/pkg/kv/kvserver/replica_app_batch.go index 6d07e13333ed..385633cfceb9 100644 --- a/pkg/kv/kvserver/replica_app_batch.go +++ b/pkg/kv/kvserver/replica_app_batch.go @@ -110,7 +110,7 @@ func (b *replicaAppBatch) Stage( // Then, maybe override the result with testing knobs. if b.r.store.TestingKnobs() != nil { - fr = replicaApplyTestingFilters(ctx, b.r, cmd, fr) + fr = replicaApplyTestingFilters(ctx, b.r, cmd, fr, false /* ephemeral */) } // Now update cmd. We'll either put the lease index in it or zero out @@ -797,7 +797,7 @@ func (mb *ephemeralReplicaAppBatch) Stage( fr := kvserverbase.CheckForcedErr( ctx, cmd.ID, &cmd.Cmd, cmd.IsLocal(), &mb.state, ) - fr = replicaApplyTestingFilters(ctx, mb.r, cmd, fr) + fr = replicaApplyTestingFilters(ctx, mb.r, cmd, fr, true /* ephemeral */) cmd.ForcedErrResult = fr if !cmd.Rejected() && cmd.LeaseIndex > mb.state.LeaseAppliedIndex { mb.state.LeaseAppliedIndex = cmd.LeaseIndex diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index 6940c08a31f1..0ed13dcd9477 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -82,9 +82,12 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) { var newPropRetry int newPropRetry, pErr = filter(kvserverbase.ApplyFilterArgs{ CmdID: cmd.ID, + Cmd: cmd.Cmd, + Entry: cmd.Entry.Entry, ReplicatedEvalResult: *cmd.ReplicatedResult(), StoreID: r.store.StoreID(), RangeID: r.RangeID, + ReplicaID: r.replicaID, Req: cmd.proposal.Request, ForcedError: cmd.ForcedError, }) diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index 2ce135393087..3f0d4720be69 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -87,7 +87,11 @@ func (r *Replica) getStateMachine() *replicaStateMachine { // TODO(tbg): move this to replica_app_batch.go. func replicaApplyTestingFilters( - ctx context.Context, r *Replica, cmd *replicatedCmd, fr kvserverbase.ForcedErrResult, + ctx context.Context, + r *Replica, + cmd *replicatedCmd, + fr kvserverbase.ForcedErrResult, + ephemeral bool, ) kvserverbase.ForcedErrResult { // By default, output is input. newFR := fr @@ -96,9 +100,13 @@ func replicaApplyTestingFilters( if filter := r.store.cfg.TestingKnobs.TestingApplyCalledTwiceFilter; fr.ForcedError != nil || filter != nil { args := kvserverbase.ApplyFilterArgs{ CmdID: cmd.ID, + Cmd: cmd.Cmd, + Entry: cmd.Entry.Entry, ReplicatedEvalResult: *cmd.ReplicatedResult(), StoreID: r.store.StoreID(), RangeID: r.RangeID, + ReplicaID: r.replicaID, + Ephemeral: ephemeral, ForcedError: fr.ForcedError, } if fr.ForcedError == nil { @@ -254,8 +262,11 @@ func (sm *replicaStateMachine) ApplySideEffects( // NB: ReplicatedEvalResult is emptied by now, so don't include it. if _, pErr := f(kvserverbase.ApplyFilterArgs{ CmdID: cmd.ID, + Cmd: cmd.Cmd, + Entry: cmd.Entry.Entry, StoreID: sm.r.store.StoreID(), RangeID: sm.r.RangeID, + ReplicaID: sm.r.replicaID, ForcedError: cmd.ForcedError, }); pErr != nil { return nil, pErr.GoError() diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 7dcbcce036c9..8a2911c9fe15 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -98,7 +98,8 @@ type StoreTestingKnobs struct { // Users have to expect the filter to be invoked twice for each command, once // from ephemerealReplicaAppBatch.Stage, and once from replicaAppBatch.Stage; // this has to do with wanting to early-ack successful proposals. The second - // call is conditional on the first call succeeding. + // call is conditional on the first call succeeding. The field + // ApplyFilterArgs.Ephemeral will be true for the initial call. // // Consider using a TestPostApplyFilter instead, and use a // TestingApplyCalledTwiceFilter only to inject forced errors.