diff --git a/pkg/kv/kvserver/batcheval/declare.go b/pkg/kv/kvserver/batcheval/declare.go index 101e1076ddeb..e94c23c272f7 100644 --- a/pkg/kv/kvserver/batcheval/declare.go +++ b/pkg/kv/kvserver/batcheval/declare.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" ) // DefaultDeclareKeys is the default implementation of Command.DeclareKeys. @@ -117,6 +118,7 @@ type CommandArgs struct { EvalCtx EvalContext Header roachpb.Header Args roachpb.Request + Now hlc.ClockTimestamp // *Stats should be mutated to reflect any writes made by the command. Stats *enginepb.MVCCStats Uncertainty uncertainty.Interval diff --git a/pkg/kv/kvserver/replica_evaluate.go b/pkg/kv/kvserver/replica_evaluate.go index 9ce9b7ff58a9..dcfdb050413f 100644 --- a/pkg/kv/kvserver/replica_evaluate.go +++ b/pkg/kv/kvserver/replica_evaluate.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -148,6 +149,7 @@ func evaluateBatch( rec batcheval.EvalContext, ms *enginepb.MVCCStats, ba *roachpb.BatchRequest, + st *kvserverpb.LeaseStatus, ui uncertainty.Interval, readOnly bool, ) (_ *roachpb.BatchResponse, _ result.Result, retErr *roachpb.Error) { @@ -268,7 +270,7 @@ func evaluateBatch( // may carry a response transaction and in the case of WriteTooOldError // (which is sometimes deferred) it is fully populated. curResult, err := evaluateCommand( - ctx, readWriter, rec, ms, baHeader, args, reply, ui) + ctx, readWriter, rec, ms, baHeader, args, reply, st, ui) if filter := rec.EvalKnobs().TestingPostEvalFilter; filter != nil { filterArgs := kvserverbase.FilterArgs{ @@ -475,16 +477,22 @@ func evaluateCommand( h roachpb.Header, args roachpb.Request, reply roachpb.Response, + st *kvserverpb.LeaseStatus, ui uncertainty.Interval, ) (result.Result, error) { var err error var pd result.Result if cmd, ok := batcheval.LookupCommand(args.Method()); ok { + var now hlc.ClockTimestamp + if st != nil { + now = st.Now + } cArgs := batcheval.CommandArgs{ EvalCtx: rec, Header: h, Args: args, + Now: now, Stats: ms, Uncertainty: ui, } diff --git a/pkg/kv/kvserver/replica_evaluate_test.go b/pkg/kv/kvserver/replica_evaluate_test.go index 8d5c0f2e92ec..fb1ce01672f9 100644 --- a/pkg/kv/kvserver/replica_evaluate_test.go +++ b/pkg/kv/kvserver/replica_evaluate_test.go @@ -665,6 +665,7 @@ func TestEvaluateBatch(t *testing.T) { d.MockEvalCtx.EvalContext(), &d.ms, &d.ba, + nil, /* st */ uncertainty.Interval{}, d.readOnly, ) diff --git a/pkg/kv/kvserver/replica_gossip.go b/pkg/kv/kvserver/replica_gossip.go index f22a76ced01f..4930375b63bd 100644 --- a/pkg/kv/kvserver/replica_gossip.go +++ b/pkg/kv/kvserver/replica_gossip.go @@ -179,7 +179,7 @@ func (r *Replica) MaybeGossipNodeLivenessRaftMuLocked( defer rw.Close() br, result, pErr := - evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, &ba, uncertainty.Interval{}, true /* readOnly */) + evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, &ba, nil /* st */, uncertainty.Interval{}, true /* readOnly */) if pErr != nil { return errors.Wrapf(pErr.GoError(), "couldn't scan node liveness records in span %s", span) } @@ -222,7 +222,7 @@ func (r *Replica) loadSystemConfig(ctx context.Context) (*config.SystemConfigEnt defer rw.Close() br, result, pErr := evaluateBatch( - ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, &ba, uncertainty.Interval{}, true, /* readOnly */ + ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, &ba, nil /* st */, uncertainty.Interval{}, true, /* readOnly */ ) if pErr != nil { return nil, pErr.GoError() diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 0a57827832a7..fc7b2abc9eba 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -653,6 +653,7 @@ func (r *Replica) evaluateProposal( ctx context.Context, idKey kvserverbase.CmdIDKey, ba *roachpb.BatchRequest, + st *kvserverpb.LeaseStatus, ui uncertainty.Interval, g *concurrency.Guard, ) (*result.Result, bool, *roachpb.Error) { @@ -670,7 +671,7 @@ func (r *Replica) evaluateProposal( // // TODO(tschottdorf): absorb all returned values in `res` below this point // in the call stack as well. - batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, ui, g) + batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, st, ui, g) // Note: reusing the proposer's batch when applying the command on the // proposer was explored as an optimization but resulted in no performance @@ -766,11 +767,11 @@ func (r *Replica) requestToProposal( ctx context.Context, idKey kvserverbase.CmdIDKey, ba *roachpb.BatchRequest, - st kvserverpb.LeaseStatus, + st *kvserverpb.LeaseStatus, ui uncertainty.Interval, g *concurrency.Guard, ) (*ProposalData, *roachpb.Error) { - res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, ui, g) + res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, st, ui, g) // Fill out the results even if pErr != nil; we'll return the error below. proposal := &ProposalData{ @@ -779,7 +780,7 @@ func (r *Replica) requestToProposal( doneCh: make(chan proposalResult, 1), Local: &res.Local, Request: ba, - leaseStatus: st, + leaseStatus: *st, } if needConsensus { diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 148734cf2874..44c53816919b 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -104,7 +104,7 @@ func (r *Replica) evalAndPropose( ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard, - st kvserverpb.LeaseStatus, + st *kvserverpb.LeaseStatus, ui uncertainty.Interval, tok TrackedRequestToken, ) (chan proposalResult, func(), kvserverbase.CmdIDKey, *roachpb.Error) { @@ -124,7 +124,7 @@ func (r *Replica) evalAndPropose( // Attach the endCmds to the proposal and assume responsibility for // releasing the concurrency guard if the proposal makes it to Raft. - proposal.ec = endCmds{repl: r, g: g, st: st} + proposal.ec = endCmds{repl: r, g: g, st: *st} // Pull out proposal channel to return. proposal.doneCh may be set to // nil if it is signaled in this function. diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 9e59fab0b8c9..2ef5d64b8aaf 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -80,7 +81,7 @@ func (r *Replica) executeReadOnlyBatch( // the latches are released. var result result.Result - br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(ctx, rw, rec, ba, ui, g) + br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(ctx, rw, rec, ba, &st, ui, g) // If the request hit a server-side concurrency retry error, immediately // propagate the error. Don't assume ownership of the concurrency guard. @@ -233,6 +234,7 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( rw storage.ReadWriter, rec batcheval.EvalContext, ba *roachpb.BatchRequest, + st *kvserverpb.LeaseStatus, ui uncertainty.Interval, g *concurrency.Guard, ) (br *roachpb.BatchResponse, res result.Result, pErr *roachpb.Error) { @@ -284,7 +286,7 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( boundAccount.Clear(ctx) log.VEventf(ctx, 2, "server-side retry of batch") } - br, res, pErr = evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, ba, ui, true /* readOnly */) + br, res, pErr = evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, ba, st, ui, true /* readOnly */) // If we can retry, set a higher batch timestamp and continue. // Allow one retry only. if pErr == nil || retries > 0 || !canDoServersideRetry(ctx, pErr, ba, br, g, nil /* deadline */) { diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index d2daa30d2744..e0d535b122aa 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -530,7 +530,7 @@ func sendLeaseRequest(r *Replica, l *roachpb.Lease) error { } ba.Add(leaseReq) _, tok := r.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) - ch, _, _, pErr := r.evalAndPropose(ctx, &ba, allSpansGuard(), st, uncertainty.Interval{}, tok.Move(ctx)) + ch, _, _, pErr := r.evalAndPropose(ctx, &ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx)) if pErr == nil { // Next if the command was committed, wait for the range to apply it. // TODO(bdarnell): refactor this to a more conventional error-handling pattern. @@ -1389,7 +1389,7 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) { ba.Timestamp = tc.repl.store.Clock().Now() ba.Add(&roachpb.RequestLeaseRequest{Lease: *lease}) _, tok := tc.repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) - ch, _, _, pErr := tc.repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, uncertainty.Interval{}, tok.Move(ctx)) + ch, _, _, pErr := tc.repl.evalAndPropose(ctx, &ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx)) if pErr == nil { // Next if the command was committed, wait for the range to apply it. // TODO(bdarnell): refactor to a more conventional error-handling pattern. @@ -7942,7 +7942,7 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) { }) st := repl.CurrentLeaseStatus(ctx) _, tok := repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) - ch, _, id, err := repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, uncertainty.Interval{}, tok.Move(ctx)) + ch, _, id, err := repl.evalAndPropose(ctx, &ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx)) if err != nil { t.Fatal(err) } @@ -8013,7 +8013,7 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { }) _, tok := tc.repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) st := tc.repl.CurrentLeaseStatus(ctx) - ch, _, _, err := tc.repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, uncertainty.Interval{}, tok.Move(ctx)) + ch, _, _, err := tc.repl.evalAndPropose(ctx, &ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx)) if err != nil { t.Fatal(err) } @@ -8127,7 +8127,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { ba.Timestamp = tc.Clock().Now() ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(id)}}) st := r.CurrentLeaseStatus(ctx) - cmd, pErr := r.requestToProposal(ctx, kvserverbase.CmdIDKey(id), &ba, st, uncertainty.Interval{}, allSpansGuard()) + cmd, pErr := r.requestToProposal(ctx, kvserverbase.CmdIDKey(id), &ba, &st, uncertainty.Interval{}, allSpansGuard()) if pErr != nil { t.Fatal(pErr) } @@ -8249,7 +8249,8 @@ func TestReplicaRefreshMultiple(t *testing.T) { incCmdID = makeIDKey() atomic.StoreInt32(&filterActive, 1) - proposal, pErr := repl.requestToProposal(ctx, incCmdID, &ba, repl.CurrentLeaseStatus(ctx), uncertainty.Interval{}, allSpansGuard()) + st := repl.CurrentLeaseStatus(ctx) + proposal, pErr := repl.requestToProposal(ctx, incCmdID, &ba, &st, uncertainty.Interval{}, allSpansGuard()) if pErr != nil { t.Fatal(pErr) } @@ -8953,7 +8954,7 @@ func TestReplicaEvaluationNotTxnMutation(t *testing.T) { assignSeqNumsForReqs(txn, &txnPut, &txnPut2) origTxn := txn.Clone() - batch, _, _, _, pErr := tc.repl.evaluateWriteBatch(ctx, makeIDKey(), &ba, uncertainty.Interval{}, allSpansGuard()) + batch, _, _, _, pErr := tc.repl.evaluateWriteBatch(ctx, makeIDKey(), &ba, nil, uncertainty.Interval{}, allSpansGuard()) defer batch.Close() if pErr != nil { t.Fatal(pErr) @@ -9655,7 +9656,7 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) { exLease, _ := repl.GetLease() st := kvserverpb.LeaseStatus{Lease: exLease, State: kvserverpb.LeaseState_VALID} _, tok := repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) - ch, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, uncertainty.Interval{}, tok.Move(ctx)) + ch, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx)) if pErr != nil { t.Fatal(pErr) } @@ -9703,7 +9704,7 @@ func TestProposeWithAsyncConsensus(t *testing.T) { atomic.StoreInt32(&filterActive, 1) st := tc.repl.CurrentLeaseStatus(ctx) _, tok := repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) - ch, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, uncertainty.Interval{}, tok.Move(ctx)) + ch, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx)) if pErr != nil { t.Fatal(pErr) } @@ -9768,7 +9769,7 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) { atomic.StoreInt32(&filterActive, 1) st := repl.CurrentLeaseStatus(ctx) _, tok := repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) - _, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, uncertainty.Interval{}, tok.Move(ctx)) + _, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx)) if pErr != nil { t.Fatal(pErr) } @@ -9787,7 +9788,7 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) { var pErr *roachpb.Error _, tok := repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) - ch, _, _, pErr = repl.evalAndPropose(ctx, &ba2, allSpansGuard(), st, uncertainty.Interval{}, tok.Move(ctx)) + ch, _, _, pErr = repl.evalAndPropose(ctx, &ba2, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx)) if pErr != nil { t.Fatal(pErr) } @@ -13230,7 +13231,7 @@ func TestProposalNotAcknowledgedOrReproposedAfterApplication(t *testing.T) { _, tok := tc.repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) sp := cfg.AmbientCtx.Tracer.StartSpan("replica send", tracing.WithForceRealSpan()) tracedCtx := tracing.ContextWithSpan(ctx, sp) - ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), st, uncertainty.Interval{}, tok) + ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), &st, uncertainty.Interval{}, tok) if pErr != nil { t.Fatal(pErr) } @@ -13399,7 +13400,8 @@ func TestContainsEstimatesClampProposal(t *testing.T) { ba.Timestamp = tc.Clock().Now() req := putArgs(roachpb.Key("some-key"), []byte("some-value")) ba.Add(&req) - proposal, err := tc.repl.requestToProposal(ctx, cmdIDKey, &ba, tc.repl.CurrentLeaseStatus(ctx), uncertainty.Interval{}, allSpansGuard()) + st := tc.repl.CurrentLeaseStatus(ctx) + proposal, err := tc.repl.requestToProposal(ctx, cmdIDKey, &ba, &st, uncertainty.Interval{}, allSpansGuard()) if err != nil { t.Error(err) } diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 8b9ffc849c38..879d3fd4cb37 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -164,7 +164,7 @@ func (r *Replica) executeWriteBatch( // If the command is proposed to Raft, ownership of and responsibility for // the concurrency guard will be assumed by Raft, so provide the guard to // evalAndPropose. - ch, abandon, _, pErr := r.evalAndPropose(ctx, ba, g, st, ui, tok.Move(ctx)) + ch, abandon, _, pErr := r.evalAndPropose(ctx, ba, g, &st, ui, tok.Move(ctx)) if pErr != nil { if cErr, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok { // Need to unlock here because setCorruptRaftMuLock needs readOnlyCmdMu not held. @@ -382,6 +382,7 @@ func (r *Replica) evaluateWriteBatch( ctx context.Context, idKey kvserverbase.CmdIDKey, ba *roachpb.BatchRequest, + st *kvserverpb.LeaseStatus, ui uncertainty.Interval, g *concurrency.Guard, ) (storage.Batch, enginepb.MVCCStats, *roachpb.BatchResponse, result.Result, *roachpb.Error) { @@ -395,7 +396,7 @@ func (r *Replica) evaluateWriteBatch( // Attempt 1PC execution, if applicable. If not transactional or there are // indications that the batch's txn will require retry, execute as normal. if r.canAttempt1PCEvaluation(ctx, ba, g) { - res := r.evaluate1PC(ctx, idKey, ba, g) + res := r.evaluate1PC(ctx, idKey, ba, st, g) switch res.success { case onePCSucceeded: return res.batch, res.stats, res.br, res.res, nil @@ -426,7 +427,7 @@ func (r *Replica) evaluateWriteBatch( ms := new(enginepb.MVCCStats) rec := NewReplicaEvalContext(r, g.LatchSpans()) batch, br, res, pErr := r.evaluateWriteBatchWithServersideRefreshes( - ctx, idKey, rec, ms, ba, ui, g, nil /* deadline */) + ctx, idKey, rec, ms, ba, st, ui, g, nil /* deadline */) return batch, *ms, br, res, pErr } @@ -466,7 +467,11 @@ type onePCResult struct { // efficient - we're avoiding writing the transaction record and writing and the // immediately deleting intents. func (r *Replica) evaluate1PC( - ctx context.Context, idKey kvserverbase.CmdIDKey, ba *roachpb.BatchRequest, g *concurrency.Guard, + ctx context.Context, + idKey kvserverbase.CmdIDKey, + ba *roachpb.BatchRequest, + st *kvserverpb.LeaseStatus, + g *concurrency.Guard, ) (onePCRes onePCResult) { log.VEventf(ctx, 2, "attempting 1PC execution") @@ -501,10 +506,10 @@ func (r *Replica) evaluate1PC( ms := new(enginepb.MVCCStats) if ba.CanForwardReadTimestamp { batch, br, res, pErr = r.evaluateWriteBatchWithServersideRefreshes( - ctx, idKey, rec, ms, &strippedBa, ui, g, etArg.Deadline) + ctx, idKey, rec, ms, &strippedBa, st, ui, g, etArg.Deadline) } else { batch, br, res, pErr = r.evaluateWriteBatchWrapper( - ctx, idKey, rec, ms, &strippedBa, ui, g) + ctx, idKey, rec, ms, &strippedBa, st, ui, g) } if pErr != nil || (!ba.CanForwardReadTimestamp && ba.Timestamp != br.Timestamp) { @@ -594,6 +599,7 @@ func (r *Replica) evaluateWriteBatchWithServersideRefreshes( rec batcheval.EvalContext, ms *enginepb.MVCCStats, ba *roachpb.BatchRequest, + st *kvserverpb.LeaseStatus, ui uncertainty.Interval, g *concurrency.Guard, deadline *hlc.Timestamp, @@ -609,7 +615,7 @@ func (r *Replica) evaluateWriteBatchWithServersideRefreshes( batch.Close() } - batch, br, res, pErr = r.evaluateWriteBatchWrapper(ctx, idKey, rec, ms, ba, ui, g) + batch, br, res, pErr = r.evaluateWriteBatchWrapper(ctx, idKey, rec, ms, ba, st, ui, g) var success bool if pErr == nil { @@ -637,11 +643,12 @@ func (r *Replica) evaluateWriteBatchWrapper( rec batcheval.EvalContext, ms *enginepb.MVCCStats, ba *roachpb.BatchRequest, + st *kvserverpb.LeaseStatus, ui uncertainty.Interval, g *concurrency.Guard, ) (storage.Batch, *roachpb.BatchResponse, result.Result, *roachpb.Error) { batch, opLogger := r.newBatchedEngine(ba, g) - br, res, pErr := evaluateBatch(ctx, idKey, batch, rec, ms, ba, ui, false /* readOnly */) + br, res, pErr := evaluateBatch(ctx, idKey, batch, rec, ms, ba, st, ui, false /* readOnly */) if pErr == nil { if opLogger != nil { res.LogicalOpLog = &kvserverpb.LogicalOpLog{