Skip to content

Commit

Permalink
kv: plumb LeaseStatus into batch evaluation
Browse files Browse the repository at this point in the history
Unused plumbing in preparation for a later commit.
  • Loading branch information
nvanbenschoten committed Apr 4, 2022
1 parent 4cd2196 commit 3b1e0a0
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 32 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/batcheval/declare.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// DefaultDeclareKeys is the default implementation of Command.DeclareKeys.
Expand Down Expand Up @@ -117,6 +118,7 @@ type CommandArgs struct {
EvalCtx EvalContext
Header roachpb.Header
Args roachpb.Request
Now hlc.ClockTimestamp
// *Stats should be mutated to reflect any writes made by the command.
Stats *enginepb.MVCCStats
Uncertainty uncertainty.Interval
Expand Down
10 changes: 9 additions & 1 deletion pkg/kv/kvserver/replica_evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand Down Expand Up @@ -148,6 +149,7 @@ func evaluateBatch(
rec batcheval.EvalContext,
ms *enginepb.MVCCStats,
ba *roachpb.BatchRequest,
st *kvserverpb.LeaseStatus,
ui uncertainty.Interval,
readOnly bool,
) (_ *roachpb.BatchResponse, _ result.Result, retErr *roachpb.Error) {
Expand Down Expand Up @@ -268,7 +270,7 @@ func evaluateBatch(
// may carry a response transaction and in the case of WriteTooOldError
// (which is sometimes deferred) it is fully populated.
curResult, err := evaluateCommand(
ctx, readWriter, rec, ms, baHeader, args, reply, ui)
ctx, readWriter, rec, ms, baHeader, args, reply, st, ui)

if filter := rec.EvalKnobs().TestingPostEvalFilter; filter != nil {
filterArgs := kvserverbase.FilterArgs{
Expand Down Expand Up @@ -475,16 +477,22 @@ func evaluateCommand(
h roachpb.Header,
args roachpb.Request,
reply roachpb.Response,
st *kvserverpb.LeaseStatus,
ui uncertainty.Interval,
) (result.Result, error) {
var err error
var pd result.Result

if cmd, ok := batcheval.LookupCommand(args.Method()); ok {
var now hlc.ClockTimestamp
if st != nil {
now = st.Now
}
cArgs := batcheval.CommandArgs{
EvalCtx: rec,
Header: h,
Args: args,
Now: now,
Stats: ms,
Uncertainty: ui,
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_evaluate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,7 @@ func TestEvaluateBatch(t *testing.T) {
d.MockEvalCtx.EvalContext(),
&d.ms,
&d.ba,
nil, /* st */
uncertainty.Interval{},
d.readOnly,
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (r *Replica) MaybeGossipNodeLivenessRaftMuLocked(
defer rw.Close()

br, result, pErr :=
evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, &ba, uncertainty.Interval{}, true /* readOnly */)
evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, &ba, nil /* st */, uncertainty.Interval{}, true /* readOnly */)
if pErr != nil {
return errors.Wrapf(pErr.GoError(), "couldn't scan node liveness records in span %s", span)
}
Expand Down Expand Up @@ -222,7 +222,7 @@ func (r *Replica) loadSystemConfig(ctx context.Context) (*config.SystemConfigEnt
defer rw.Close()

br, result, pErr := evaluateBatch(
ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, &ba, uncertainty.Interval{}, true, /* readOnly */
ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, &ba, nil /* st */, uncertainty.Interval{}, true, /* readOnly */
)
if pErr != nil {
return nil, pErr.GoError()
Expand Down
9 changes: 5 additions & 4 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ func (r *Replica) evaluateProposal(
ctx context.Context,
idKey kvserverbase.CmdIDKey,
ba *roachpb.BatchRequest,
st *kvserverpb.LeaseStatus,
ui uncertainty.Interval,
g *concurrency.Guard,
) (*result.Result, bool, *roachpb.Error) {
Expand All @@ -670,7 +671,7 @@ func (r *Replica) evaluateProposal(
//
// TODO(tschottdorf): absorb all returned values in `res` below this point
// in the call stack as well.
batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, ui, g)
batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, st, ui, g)

// Note: reusing the proposer's batch when applying the command on the
// proposer was explored as an optimization but resulted in no performance
Expand Down Expand Up @@ -766,11 +767,11 @@ func (r *Replica) requestToProposal(
ctx context.Context,
idKey kvserverbase.CmdIDKey,
ba *roachpb.BatchRequest,
st kvserverpb.LeaseStatus,
st *kvserverpb.LeaseStatus,
ui uncertainty.Interval,
g *concurrency.Guard,
) (*ProposalData, *roachpb.Error) {
res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, ui, g)
res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, st, ui, g)

// Fill out the results even if pErr != nil; we'll return the error below.
proposal := &ProposalData{
Expand All @@ -779,7 +780,7 @@ func (r *Replica) requestToProposal(
doneCh: make(chan proposalResult, 1),
Local: &res.Local,
Request: ba,
leaseStatus: st,
leaseStatus: *st,
}

if needConsensus {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (r *Replica) evalAndPropose(
ctx context.Context,
ba *roachpb.BatchRequest,
g *concurrency.Guard,
st kvserverpb.LeaseStatus,
st *kvserverpb.LeaseStatus,
ui uncertainty.Interval,
tok TrackedRequestToken,
) (chan proposalResult, func(), kvserverbase.CmdIDKey, *roachpb.Error) {
Expand All @@ -124,7 +124,7 @@ func (r *Replica) evalAndPropose(

// Attach the endCmds to the proposal and assume responsibility for
// releasing the concurrency guard if the proposal makes it to Raft.
proposal.ec = endCmds{repl: r, g: g, st: st}
proposal.ec = endCmds{repl: r, g: g, st: *st}

// Pull out proposal channel to return. proposal.doneCh may be set to
// nil if it is signaled in this function.
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -80,7 +81,7 @@ func (r *Replica) executeReadOnlyBatch(
// the latches are released.

var result result.Result
br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(ctx, rw, rec, ba, ui, g)
br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(ctx, rw, rec, ba, &st, ui, g)

// If the request hit a server-side concurrency retry error, immediately
// propagate the error. Don't assume ownership of the concurrency guard.
Expand Down Expand Up @@ -233,6 +234,7 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes(
rw storage.ReadWriter,
rec batcheval.EvalContext,
ba *roachpb.BatchRequest,
st *kvserverpb.LeaseStatus,
ui uncertainty.Interval,
g *concurrency.Guard,
) (br *roachpb.BatchResponse, res result.Result, pErr *roachpb.Error) {
Expand Down Expand Up @@ -284,7 +286,7 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes(
boundAccount.Clear(ctx)
log.VEventf(ctx, 2, "server-side retry of batch")
}
br, res, pErr = evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, ba, ui, true /* readOnly */)
br, res, pErr = evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, ba, st, ui, true /* readOnly */)
// If we can retry, set a higher batch timestamp and continue.
// Allow one retry only.
if pErr == nil || retries > 0 || !canDoServersideRetry(ctx, pErr, ba, br, g, nil /* deadline */) {
Expand Down
28 changes: 15 additions & 13 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ func sendLeaseRequest(r *Replica, l *roachpb.Lease) error {
}
ba.Add(leaseReq)
_, tok := r.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp)
ch, _, _, pErr := r.evalAndPropose(ctx, &ba, allSpansGuard(), st, uncertainty.Interval{}, tok.Move(ctx))
ch, _, _, pErr := r.evalAndPropose(ctx, &ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx))
if pErr == nil {
// Next if the command was committed, wait for the range to apply it.
// TODO(bdarnell): refactor this to a more conventional error-handling pattern.
Expand Down Expand Up @@ -1389,7 +1389,7 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) {
ba.Timestamp = tc.repl.store.Clock().Now()
ba.Add(&roachpb.RequestLeaseRequest{Lease: *lease})
_, tok := tc.repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp)
ch, _, _, pErr := tc.repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, uncertainty.Interval{}, tok.Move(ctx))
ch, _, _, pErr := tc.repl.evalAndPropose(ctx, &ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx))
if pErr == nil {
// Next if the command was committed, wait for the range to apply it.
// TODO(bdarnell): refactor to a more conventional error-handling pattern.
Expand Down Expand Up @@ -7942,7 +7942,7 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) {
})
st := repl.CurrentLeaseStatus(ctx)
_, tok := repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp)
ch, _, id, err := repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, uncertainty.Interval{}, tok.Move(ctx))
ch, _, id, err := repl.evalAndPropose(ctx, &ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -8013,7 +8013,7 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) {
})
_, tok := tc.repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp)
st := tc.repl.CurrentLeaseStatus(ctx)
ch, _, _, err := tc.repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, uncertainty.Interval{}, tok.Move(ctx))
ch, _, _, err := tc.repl.evalAndPropose(ctx, &ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -8127,7 +8127,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) {
ba.Timestamp = tc.Clock().Now()
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(id)}})
st := r.CurrentLeaseStatus(ctx)
cmd, pErr := r.requestToProposal(ctx, kvserverbase.CmdIDKey(id), &ba, st, uncertainty.Interval{}, allSpansGuard())
cmd, pErr := r.requestToProposal(ctx, kvserverbase.CmdIDKey(id), &ba, &st, uncertainty.Interval{}, allSpansGuard())
if pErr != nil {
t.Fatal(pErr)
}
Expand Down Expand Up @@ -8249,7 +8249,8 @@ func TestReplicaRefreshMultiple(t *testing.T) {

incCmdID = makeIDKey()
atomic.StoreInt32(&filterActive, 1)
proposal, pErr := repl.requestToProposal(ctx, incCmdID, &ba, repl.CurrentLeaseStatus(ctx), uncertainty.Interval{}, allSpansGuard())
st := repl.CurrentLeaseStatus(ctx)
proposal, pErr := repl.requestToProposal(ctx, incCmdID, &ba, &st, uncertainty.Interval{}, allSpansGuard())
if pErr != nil {
t.Fatal(pErr)
}
Expand Down Expand Up @@ -8953,7 +8954,7 @@ func TestReplicaEvaluationNotTxnMutation(t *testing.T) {
assignSeqNumsForReqs(txn, &txnPut, &txnPut2)
origTxn := txn.Clone()

batch, _, _, _, pErr := tc.repl.evaluateWriteBatch(ctx, makeIDKey(), &ba, uncertainty.Interval{}, allSpansGuard())
batch, _, _, _, pErr := tc.repl.evaluateWriteBatch(ctx, makeIDKey(), &ba, nil, uncertainty.Interval{}, allSpansGuard())
defer batch.Close()
if pErr != nil {
t.Fatal(pErr)
Expand Down Expand Up @@ -9655,7 +9656,7 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) {
exLease, _ := repl.GetLease()
st := kvserverpb.LeaseStatus{Lease: exLease, State: kvserverpb.LeaseState_VALID}
_, tok := repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp)
ch, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, uncertainty.Interval{}, tok.Move(ctx))
ch, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx))
if pErr != nil {
t.Fatal(pErr)
}
Expand Down Expand Up @@ -9703,7 +9704,7 @@ func TestProposeWithAsyncConsensus(t *testing.T) {
atomic.StoreInt32(&filterActive, 1)
st := tc.repl.CurrentLeaseStatus(ctx)
_, tok := repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp)
ch, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, uncertainty.Interval{}, tok.Move(ctx))
ch, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx))
if pErr != nil {
t.Fatal(pErr)
}
Expand Down Expand Up @@ -9768,7 +9769,7 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) {
atomic.StoreInt32(&filterActive, 1)
st := repl.CurrentLeaseStatus(ctx)
_, tok := repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp)
_, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, uncertainty.Interval{}, tok.Move(ctx))
_, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx))
if pErr != nil {
t.Fatal(pErr)
}
Expand All @@ -9787,7 +9788,7 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) {

var pErr *roachpb.Error
_, tok := repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp)
ch, _, _, pErr = repl.evalAndPropose(ctx, &ba2, allSpansGuard(), st, uncertainty.Interval{}, tok.Move(ctx))
ch, _, _, pErr = repl.evalAndPropose(ctx, &ba2, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx))
if pErr != nil {
t.Fatal(pErr)
}
Expand Down Expand Up @@ -13230,7 +13231,7 @@ func TestProposalNotAcknowledgedOrReproposedAfterApplication(t *testing.T) {
_, tok := tc.repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp)
sp := cfg.AmbientCtx.Tracer.StartSpan("replica send", tracing.WithForceRealSpan())
tracedCtx := tracing.ContextWithSpan(ctx, sp)
ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), st, uncertainty.Interval{}, tok)
ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), &st, uncertainty.Interval{}, tok)
if pErr != nil {
t.Fatal(pErr)
}
Expand Down Expand Up @@ -13399,7 +13400,8 @@ func TestContainsEstimatesClampProposal(t *testing.T) {
ba.Timestamp = tc.Clock().Now()
req := putArgs(roachpb.Key("some-key"), []byte("some-value"))
ba.Add(&req)
proposal, err := tc.repl.requestToProposal(ctx, cmdIDKey, &ba, tc.repl.CurrentLeaseStatus(ctx), uncertainty.Interval{}, allSpansGuard())
st := tc.repl.CurrentLeaseStatus(ctx)
proposal, err := tc.repl.requestToProposal(ctx, cmdIDKey, &ba, &st, uncertainty.Interval{}, allSpansGuard())
if err != nil {
t.Error(err)
}
Expand Down
23 changes: 15 additions & 8 deletions pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (r *Replica) executeWriteBatch(
// If the command is proposed to Raft, ownership of and responsibility for
// the concurrency guard will be assumed by Raft, so provide the guard to
// evalAndPropose.
ch, abandon, _, pErr := r.evalAndPropose(ctx, ba, g, st, ui, tok.Move(ctx))
ch, abandon, _, pErr := r.evalAndPropose(ctx, ba, g, &st, ui, tok.Move(ctx))
if pErr != nil {
if cErr, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok {
// Need to unlock here because setCorruptRaftMuLock needs readOnlyCmdMu not held.
Expand Down Expand Up @@ -382,6 +382,7 @@ func (r *Replica) evaluateWriteBatch(
ctx context.Context,
idKey kvserverbase.CmdIDKey,
ba *roachpb.BatchRequest,
st *kvserverpb.LeaseStatus,
ui uncertainty.Interval,
g *concurrency.Guard,
) (storage.Batch, enginepb.MVCCStats, *roachpb.BatchResponse, result.Result, *roachpb.Error) {
Expand All @@ -395,7 +396,7 @@ func (r *Replica) evaluateWriteBatch(
// Attempt 1PC execution, if applicable. If not transactional or there are
// indications that the batch's txn will require retry, execute as normal.
if r.canAttempt1PCEvaluation(ctx, ba, g) {
res := r.evaluate1PC(ctx, idKey, ba, g)
res := r.evaluate1PC(ctx, idKey, ba, st, g)
switch res.success {
case onePCSucceeded:
return res.batch, res.stats, res.br, res.res, nil
Expand Down Expand Up @@ -426,7 +427,7 @@ func (r *Replica) evaluateWriteBatch(
ms := new(enginepb.MVCCStats)
rec := NewReplicaEvalContext(r, g.LatchSpans())
batch, br, res, pErr := r.evaluateWriteBatchWithServersideRefreshes(
ctx, idKey, rec, ms, ba, ui, g, nil /* deadline */)
ctx, idKey, rec, ms, ba, st, ui, g, nil /* deadline */)
return batch, *ms, br, res, pErr
}

Expand Down Expand Up @@ -466,7 +467,11 @@ type onePCResult struct {
// efficient - we're avoiding writing the transaction record and writing and the
// immediately deleting intents.
func (r *Replica) evaluate1PC(
ctx context.Context, idKey kvserverbase.CmdIDKey, ba *roachpb.BatchRequest, g *concurrency.Guard,
ctx context.Context,
idKey kvserverbase.CmdIDKey,
ba *roachpb.BatchRequest,
st *kvserverpb.LeaseStatus,
g *concurrency.Guard,
) (onePCRes onePCResult) {
log.VEventf(ctx, 2, "attempting 1PC execution")

Expand Down Expand Up @@ -501,10 +506,10 @@ func (r *Replica) evaluate1PC(
ms := new(enginepb.MVCCStats)
if ba.CanForwardReadTimestamp {
batch, br, res, pErr = r.evaluateWriteBatchWithServersideRefreshes(
ctx, idKey, rec, ms, &strippedBa, ui, g, etArg.Deadline)
ctx, idKey, rec, ms, &strippedBa, st, ui, g, etArg.Deadline)
} else {
batch, br, res, pErr = r.evaluateWriteBatchWrapper(
ctx, idKey, rec, ms, &strippedBa, ui, g)
ctx, idKey, rec, ms, &strippedBa, st, ui, g)
}

if pErr != nil || (!ba.CanForwardReadTimestamp && ba.Timestamp != br.Timestamp) {
Expand Down Expand Up @@ -594,6 +599,7 @@ func (r *Replica) evaluateWriteBatchWithServersideRefreshes(
rec batcheval.EvalContext,
ms *enginepb.MVCCStats,
ba *roachpb.BatchRequest,
st *kvserverpb.LeaseStatus,
ui uncertainty.Interval,
g *concurrency.Guard,
deadline *hlc.Timestamp,
Expand All @@ -609,7 +615,7 @@ func (r *Replica) evaluateWriteBatchWithServersideRefreshes(
batch.Close()
}

batch, br, res, pErr = r.evaluateWriteBatchWrapper(ctx, idKey, rec, ms, ba, ui, g)
batch, br, res, pErr = r.evaluateWriteBatchWrapper(ctx, idKey, rec, ms, ba, st, ui, g)

var success bool
if pErr == nil {
Expand Down Expand Up @@ -637,11 +643,12 @@ func (r *Replica) evaluateWriteBatchWrapper(
rec batcheval.EvalContext,
ms *enginepb.MVCCStats,
ba *roachpb.BatchRequest,
st *kvserverpb.LeaseStatus,
ui uncertainty.Interval,
g *concurrency.Guard,
) (storage.Batch, *roachpb.BatchResponse, result.Result, *roachpb.Error) {
batch, opLogger := r.newBatchedEngine(ba, g)
br, res, pErr := evaluateBatch(ctx, idKey, batch, rec, ms, ba, ui, false /* readOnly */)
br, res, pErr := evaluateBatch(ctx, idKey, batch, rec, ms, ba, st, ui, false /* readOnly */)
if pErr == nil {
if opLogger != nil {
res.LogicalOpLog = &kvserverpb.LogicalOpLog{
Expand Down

0 comments on commit 3b1e0a0

Please sign in to comment.