From ed68bfb3ac020fbae594646e5c270233c9134827 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 2 Mar 2022 22:08:11 -0500 Subject: [PATCH] kv: plumb LeaseStatus into batch evaluation Unused plumbing in preparation for a later commit. --- pkg/kv/kvserver/batcheval/declare.go | 2 ++ pkg/kv/kvserver/replica_evaluate.go | 10 ++++++++- pkg/kv/kvserver/replica_evaluate_test.go | 1 + pkg/kv/kvserver/replica_gossip.go | 4 ++-- pkg/kv/kvserver/replica_proposal.go | 9 ++++---- pkg/kv/kvserver/replica_raft.go | 4 ++-- pkg/kv/kvserver/replica_read.go | 6 +++-- pkg/kv/kvserver/replica_test.go | 28 +++++++++++++----------- pkg/kv/kvserver/replica_write.go | 23 ++++++++++++------- 9 files changed, 55 insertions(+), 32 deletions(-) diff --git a/pkg/kv/kvserver/batcheval/declare.go b/pkg/kv/kvserver/batcheval/declare.go index 101e1076ddeb..e94c23c272f7 100644 --- a/pkg/kv/kvserver/batcheval/declare.go +++ b/pkg/kv/kvserver/batcheval/declare.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" ) // DefaultDeclareKeys is the default implementation of Command.DeclareKeys. @@ -117,6 +118,7 @@ type CommandArgs struct { EvalCtx EvalContext Header roachpb.Header Args roachpb.Request + Now hlc.ClockTimestamp // *Stats should be mutated to reflect any writes made by the command. Stats *enginepb.MVCCStats Uncertainty uncertainty.Interval diff --git a/pkg/kv/kvserver/replica_evaluate.go b/pkg/kv/kvserver/replica_evaluate.go index 4302cb05a5e6..a1c43e3acc57 100644 --- a/pkg/kv/kvserver/replica_evaluate.go +++ b/pkg/kv/kvserver/replica_evaluate.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -148,6 +149,7 @@ func evaluateBatch( rec batcheval.EvalContext, ms *enginepb.MVCCStats, ba *roachpb.BatchRequest, + st *kvserverpb.LeaseStatus, ui uncertainty.Interval, readOnly bool, ) (_ *roachpb.BatchResponse, _ result.Result, retErr *roachpb.Error) { @@ -268,7 +270,7 @@ func evaluateBatch( // may carry a response transaction and in the case of WriteTooOldError // (which is sometimes deferred) it is fully populated. curResult, err := evaluateCommand( - ctx, readWriter, rec, ms, baHeader, args, reply, ui) + ctx, readWriter, rec, ms, baHeader, args, reply, st, ui) if filter := rec.EvalKnobs().TestingPostEvalFilter; filter != nil { filterArgs := kvserverbase.FilterArgs{ @@ -475,16 +477,22 @@ func evaluateCommand( h roachpb.Header, args roachpb.Request, reply roachpb.Response, + st *kvserverpb.LeaseStatus, ui uncertainty.Interval, ) (result.Result, error) { var err error var pd result.Result if cmd, ok := batcheval.LookupCommand(args.Method()); ok { + var now hlc.ClockTimestamp + if st != nil { + now = st.Now + } cArgs := batcheval.CommandArgs{ EvalCtx: rec, Header: h, Args: args, + Now: now, Stats: ms, Uncertainty: ui, } diff --git a/pkg/kv/kvserver/replica_evaluate_test.go b/pkg/kv/kvserver/replica_evaluate_test.go index 8d5c0f2e92ec..fb1ce01672f9 100644 --- a/pkg/kv/kvserver/replica_evaluate_test.go +++ b/pkg/kv/kvserver/replica_evaluate_test.go @@ -665,6 +665,7 @@ func TestEvaluateBatch(t *testing.T) { d.MockEvalCtx.EvalContext(), &d.ms, &d.ba, + nil, /* st */ uncertainty.Interval{}, d.readOnly, ) diff --git a/pkg/kv/kvserver/replica_gossip.go b/pkg/kv/kvserver/replica_gossip.go index 2ce320d8effc..a8a4da9e7757 100644 --- a/pkg/kv/kvserver/replica_gossip.go +++ b/pkg/kv/kvserver/replica_gossip.go @@ -182,7 +182,7 @@ func (r *Replica) MaybeGossipNodeLivenessRaftMuLocked( defer rw.Close() br, result, pErr := - evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, &ba, uncertainty.Interval{}, true /* readOnly */) + evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, &ba, nil /* st */, uncertainty.Interval{}, true /* readOnly */) if pErr != nil { return errors.Wrapf(pErr.GoError(), "couldn't scan node liveness records in span %s", span) } @@ -228,7 +228,7 @@ func (r *Replica) loadSystemConfig(ctx context.Context) (*config.SystemConfigEnt defer rw.Close() br, result, pErr := evaluateBatch( - ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, &ba, uncertainty.Interval{}, true, /* readOnly */ + ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, &ba, nil /* st */, uncertainty.Interval{}, true, /* readOnly */ ) if pErr != nil { return nil, pErr.GoError() diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 50f8e1a10a66..47088aea8345 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -659,6 +659,7 @@ func (r *Replica) evaluateProposal( ctx context.Context, idKey kvserverbase.CmdIDKey, ba *roachpb.BatchRequest, + st *kvserverpb.LeaseStatus, ui uncertainty.Interval, g *concurrency.Guard, ) (*result.Result, bool, *roachpb.Error) { @@ -676,7 +677,7 @@ func (r *Replica) evaluateProposal( // // TODO(tschottdorf): absorb all returned values in `res` below this point // in the call stack as well. - batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, ui, g) + batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, st, ui, g) // Note: reusing the proposer's batch when applying the command on the // proposer was explored as an optimization but resulted in no performance @@ -772,11 +773,11 @@ func (r *Replica) requestToProposal( ctx context.Context, idKey kvserverbase.CmdIDKey, ba *roachpb.BatchRequest, - st kvserverpb.LeaseStatus, + st *kvserverpb.LeaseStatus, ui uncertainty.Interval, g *concurrency.Guard, ) (*ProposalData, *roachpb.Error) { - res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, ui, g) + res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, st, ui, g) // Fill out the results even if pErr != nil; we'll return the error below. proposal := &ProposalData{ @@ -785,7 +786,7 @@ func (r *Replica) requestToProposal( doneCh: make(chan proposalResult, 1), Local: &res.Local, Request: ba, - leaseStatus: st, + leaseStatus: *st, } if needConsensus { diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 8b8a1ddbf106..8754d90acdca 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -104,7 +104,7 @@ func (r *Replica) evalAndPropose( ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard, - st kvserverpb.LeaseStatus, + st *kvserverpb.LeaseStatus, ui uncertainty.Interval, tok TrackedRequestToken, ) (chan proposalResult, func(), kvserverbase.CmdIDKey, *roachpb.Error) { @@ -124,7 +124,7 @@ func (r *Replica) evalAndPropose( // Attach the endCmds to the proposal and assume responsibility for // releasing the concurrency guard if the proposal makes it to Raft. - proposal.ec = endCmds{repl: r, g: g, st: st} + proposal.ec = endCmds{repl: r, g: g, st: *st} // Pull out proposal channel to return. proposal.doneCh may be set to // nil if it is signaled in this function. diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index ff64126729f4..4c899652ee70 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -87,7 +88,7 @@ func (r *Replica) executeReadOnlyBatch( // the latches are released. var result result.Result - br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(ctx, rw, rec, ba, ui, g) + br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(ctx, rw, rec, ba, &st, ui, g) // If the request hit a server-side concurrency retry error, immediately // propagate the error. Don't assume ownership of the concurrency guard. @@ -243,6 +244,7 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( rw storage.ReadWriter, rec batcheval.EvalContext, ba *roachpb.BatchRequest, + st *kvserverpb.LeaseStatus, ui uncertainty.Interval, g *concurrency.Guard, ) (br *roachpb.BatchResponse, res result.Result, pErr *roachpb.Error) { @@ -294,7 +296,7 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( boundAccount.Clear(ctx) log.VEventf(ctx, 2, "server-side retry of batch") } - br, res, pErr = evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, ba, ui, true /* readOnly */) + br, res, pErr = evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, ba, st, ui, true /* readOnly */) // If we can retry, set a higher batch timestamp and continue. // Allow one retry only. if pErr == nil || retries > 0 || !canDoServersideRetry(ctx, pErr, ba, br, g, nil /* deadline */) { diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 47a2a6d067d8..a30aac136395 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -530,7 +530,7 @@ func sendLeaseRequest(r *Replica, l *roachpb.Lease) error { } ba.Add(leaseReq) _, tok := r.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) - ch, _, _, pErr := r.evalAndPropose(ctx, &ba, allSpansGuard(), st, uncertainty.Interval{}, tok.Move(ctx)) + ch, _, _, pErr := r.evalAndPropose(ctx, &ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx)) if pErr == nil { // Next if the command was committed, wait for the range to apply it. // TODO(bdarnell): refactor this to a more conventional error-handling pattern. @@ -1391,7 +1391,7 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) { ba.Timestamp = tc.repl.store.Clock().Now() ba.Add(&roachpb.RequestLeaseRequest{Lease: *lease}) _, tok := tc.repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) - ch, _, _, pErr := tc.repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, uncertainty.Interval{}, tok.Move(ctx)) + ch, _, _, pErr := tc.repl.evalAndPropose(ctx, &ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx)) if pErr == nil { // Next if the command was committed, wait for the range to apply it. // TODO(bdarnell): refactor to a more conventional error-handling pattern. @@ -7952,7 +7952,7 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) { }) st := repl.CurrentLeaseStatus(ctx) _, tok := repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) - ch, _, id, err := repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, uncertainty.Interval{}, tok.Move(ctx)) + ch, _, id, err := repl.evalAndPropose(ctx, &ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx)) if err != nil { t.Fatal(err) } @@ -8023,7 +8023,7 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { }) _, tok := tc.repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) st := tc.repl.CurrentLeaseStatus(ctx) - ch, _, _, err := tc.repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, uncertainty.Interval{}, tok.Move(ctx)) + ch, _, _, err := tc.repl.evalAndPropose(ctx, &ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx)) if err != nil { t.Fatal(err) } @@ -8137,7 +8137,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { ba.Timestamp = tc.Clock().Now() ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(id)}}) st := r.CurrentLeaseStatus(ctx) - cmd, pErr := r.requestToProposal(ctx, kvserverbase.CmdIDKey(id), &ba, st, uncertainty.Interval{}, allSpansGuard()) + cmd, pErr := r.requestToProposal(ctx, kvserverbase.CmdIDKey(id), &ba, &st, uncertainty.Interval{}, allSpansGuard()) if pErr != nil { t.Fatal(pErr) } @@ -8259,7 +8259,8 @@ func TestReplicaRefreshMultiple(t *testing.T) { incCmdID = makeIDKey() atomic.StoreInt32(&filterActive, 1) - proposal, pErr := repl.requestToProposal(ctx, incCmdID, &ba, repl.CurrentLeaseStatus(ctx), uncertainty.Interval{}, allSpansGuard()) + st := repl.CurrentLeaseStatus(ctx) + proposal, pErr := repl.requestToProposal(ctx, incCmdID, &ba, &st, uncertainty.Interval{}, allSpansGuard()) if pErr != nil { t.Fatal(pErr) } @@ -8964,7 +8965,7 @@ func TestReplicaEvaluationNotTxnMutation(t *testing.T) { assignSeqNumsForReqs(txn, &txnPut, &txnPut2) origTxn := txn.Clone() - batch, _, _, _, pErr := tc.repl.evaluateWriteBatch(ctx, makeIDKey(), &ba, uncertainty.Interval{}, allSpansGuard()) + batch, _, _, _, pErr := tc.repl.evaluateWriteBatch(ctx, makeIDKey(), &ba, nil, uncertainty.Interval{}, allSpansGuard()) defer batch.Close() if pErr != nil { t.Fatal(pErr) @@ -9687,7 +9688,7 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) { exLease, _ := repl.GetLease() st := kvserverpb.LeaseStatus{Lease: exLease, State: kvserverpb.LeaseState_VALID} _, tok := repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) - ch, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, uncertainty.Interval{}, tok.Move(ctx)) + ch, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx)) if pErr != nil { t.Fatal(pErr) } @@ -9735,7 +9736,7 @@ func TestProposeWithAsyncConsensus(t *testing.T) { atomic.StoreInt32(&filterActive, 1) st := tc.repl.CurrentLeaseStatus(ctx) _, tok := repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) - ch, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, uncertainty.Interval{}, tok.Move(ctx)) + ch, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx)) if pErr != nil { t.Fatal(pErr) } @@ -9800,7 +9801,7 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) { atomic.StoreInt32(&filterActive, 1) st := repl.CurrentLeaseStatus(ctx) _, tok := repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) - _, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, uncertainty.Interval{}, tok.Move(ctx)) + _, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx)) if pErr != nil { t.Fatal(pErr) } @@ -9819,7 +9820,7 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) { var pErr *roachpb.Error _, tok := repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) - ch, _, _, pErr = repl.evalAndPropose(ctx, &ba2, allSpansGuard(), st, uncertainty.Interval{}, tok.Move(ctx)) + ch, _, _, pErr = repl.evalAndPropose(ctx, &ba2, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx)) if pErr != nil { t.Fatal(pErr) } @@ -13272,7 +13273,7 @@ func TestProposalNotAcknowledgedOrReproposedAfterApplication(t *testing.T) { _, tok := tc.repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) sp := cfg.AmbientCtx.Tracer.StartSpan("replica send", tracing.WithForceRealSpan()) tracedCtx := tracing.ContextWithSpan(ctx, sp) - ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), st, uncertainty.Interval{}, tok) + ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), &st, uncertainty.Interval{}, tok) if pErr != nil { t.Fatal(pErr) } @@ -13441,7 +13442,8 @@ func TestContainsEstimatesClampProposal(t *testing.T) { ba.Timestamp = tc.Clock().Now() req := putArgs(roachpb.Key("some-key"), []byte("some-value")) ba.Add(&req) - proposal, err := tc.repl.requestToProposal(ctx, cmdIDKey, &ba, tc.repl.CurrentLeaseStatus(ctx), uncertainty.Interval{}, allSpansGuard()) + st := tc.repl.CurrentLeaseStatus(ctx) + proposal, err := tc.repl.requestToProposal(ctx, cmdIDKey, &ba, &st, uncertainty.Interval{}, allSpansGuard()) if err != nil { t.Error(err) } diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index c12fe8375f53..a1bd5f41d0f7 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -164,7 +164,7 @@ func (r *Replica) executeWriteBatch( // If the command is proposed to Raft, ownership of and responsibility for // the concurrency guard will be assumed by Raft, so provide the guard to // evalAndPropose. - ch, abandon, _, pErr := r.evalAndPropose(ctx, ba, g, st, ui, tok.Move(ctx)) + ch, abandon, _, pErr := r.evalAndPropose(ctx, ba, g, &st, ui, tok.Move(ctx)) if pErr != nil { if cErr, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok { // Need to unlock here because setCorruptRaftMuLock needs readOnlyCmdMu not held. @@ -382,6 +382,7 @@ func (r *Replica) evaluateWriteBatch( ctx context.Context, idKey kvserverbase.CmdIDKey, ba *roachpb.BatchRequest, + st *kvserverpb.LeaseStatus, ui uncertainty.Interval, g *concurrency.Guard, ) (storage.Batch, enginepb.MVCCStats, *roachpb.BatchResponse, result.Result, *roachpb.Error) { @@ -395,7 +396,7 @@ func (r *Replica) evaluateWriteBatch( // Attempt 1PC execution, if applicable. If not transactional or there are // indications that the batch's txn will require retry, execute as normal. if r.canAttempt1PCEvaluation(ctx, ba, g) { - res := r.evaluate1PC(ctx, idKey, ba, g) + res := r.evaluate1PC(ctx, idKey, ba, st, g) switch res.success { case onePCSucceeded: return res.batch, res.stats, res.br, res.res, nil @@ -427,7 +428,7 @@ func (r *Replica) evaluateWriteBatch( rec := NewReplicaEvalContext(ctx, r, g.LatchSpans(), ba.RequiresClosedTSOlderThanStorageSnapshot()) defer rec.Release() batch, br, res, pErr := r.evaluateWriteBatchWithServersideRefreshes( - ctx, idKey, rec, ms, ba, ui, g, nil /* deadline */) + ctx, idKey, rec, ms, ba, st, ui, g, nil /* deadline */) return batch, *ms, br, res, pErr } @@ -467,7 +468,11 @@ type onePCResult struct { // efficient - we're avoiding writing the transaction record and writing and the // immediately deleting intents. func (r *Replica) evaluate1PC( - ctx context.Context, idKey kvserverbase.CmdIDKey, ba *roachpb.BatchRequest, g *concurrency.Guard, + ctx context.Context, + idKey kvserverbase.CmdIDKey, + ba *roachpb.BatchRequest, + st *kvserverpb.LeaseStatus, + g *concurrency.Guard, ) (onePCRes onePCResult) { log.VEventf(ctx, 2, "attempting 1PC execution") @@ -503,10 +508,10 @@ func (r *Replica) evaluate1PC( ms := new(enginepb.MVCCStats) if ba.CanForwardReadTimestamp { batch, br, res, pErr = r.evaluateWriteBatchWithServersideRefreshes( - ctx, idKey, rec, ms, &strippedBa, ui, g, etArg.Deadline) + ctx, idKey, rec, ms, &strippedBa, st, ui, g, etArg.Deadline) } else { batch, br, res, pErr = r.evaluateWriteBatchWrapper( - ctx, idKey, rec, ms, &strippedBa, ui, g) + ctx, idKey, rec, ms, &strippedBa, st, ui, g) } if pErr != nil || (!ba.CanForwardReadTimestamp && ba.Timestamp != br.Timestamp) { @@ -596,6 +601,7 @@ func (r *Replica) evaluateWriteBatchWithServersideRefreshes( rec batcheval.EvalContext, ms *enginepb.MVCCStats, ba *roachpb.BatchRequest, + st *kvserverpb.LeaseStatus, ui uncertainty.Interval, g *concurrency.Guard, deadline *hlc.Timestamp, @@ -611,7 +617,7 @@ func (r *Replica) evaluateWriteBatchWithServersideRefreshes( batch.Close() } - batch, br, res, pErr = r.evaluateWriteBatchWrapper(ctx, idKey, rec, ms, ba, ui, g) + batch, br, res, pErr = r.evaluateWriteBatchWrapper(ctx, idKey, rec, ms, ba, st, ui, g) var success bool if pErr == nil { @@ -639,11 +645,12 @@ func (r *Replica) evaluateWriteBatchWrapper( rec batcheval.EvalContext, ms *enginepb.MVCCStats, ba *roachpb.BatchRequest, + st *kvserverpb.LeaseStatus, ui uncertainty.Interval, g *concurrency.Guard, ) (storage.Batch, *roachpb.BatchResponse, result.Result, *roachpb.Error) { batch, opLogger := r.newBatchedEngine(ba, g) - br, res, pErr := evaluateBatch(ctx, idKey, batch, rec, ms, ba, ui, false /* readOnly */) + br, res, pErr := evaluateBatch(ctx, idKey, batch, rec, ms, ba, st, ui, false /* readOnly */) if pErr == nil { if opLogger != nil { res.LogicalOpLog = &kvserverpb.LogicalOpLog{