diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 0f4480c6a8aa..10d3fd5486ca 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -519,7 +519,7 @@ func (r *Replica) sendWithRangeID( useRaft = true } - if err := r.checkBatchRequest(ba, isReadOnly); err != nil { + if err := r.checkBatchRequest(&ba, isReadOnly); err != nil { return nil, roachpb.NewError(err) } @@ -533,13 +533,13 @@ func (r *Replica) sendWithRangeID( var pErr *roachpb.Error if useRaft { log.Event(ctx, "read-write path") - br, pErr = r.executeWriteBatch(ctx, ba) + br, pErr = r.executeWriteBatch(ctx, &ba) } else if isReadOnly { log.Event(ctx, "read-only path") - br, pErr = r.executeReadOnlyBatch(ctx, ba) + br, pErr = r.executeReadOnlyBatch(ctx, &ba) } else if ba.IsAdmin() { log.Event(ctx, "admin path") - br, pErr = r.executeAdminBatch(ctx, ba) + br, pErr = r.executeAdminBatch(ctx, &ba) } else if len(ba.Requests) == 0 { // empty batch; shouldn't happen (we could handle it, but it hints // at someone doing weird things, and once we drop the key range @@ -1000,7 +1000,7 @@ func (r *Replica) requestCanProceed(rspan roachpb.RSpan, ts hlc.Timestamp) error // // TODO(tschottdorf): should check that request is contained in range // and that EndTransaction only occurs at the very end. -func (r *Replica) checkBatchRequest(ba roachpb.BatchRequest, isReadOnly bool) error { +func (r *Replica) checkBatchRequest(ba *roachpb.BatchRequest, isReadOnly bool) error { if ba.Timestamp == (hlc.Timestamp{}) { // For transactional requests, Store.Send sets the timestamp. For non- // transactional requests, the client sets the timestamp. Either way, we @@ -1025,17 +1025,16 @@ func (r *Replica) checkBatchRequest(ba roachpb.BatchRequest, isReadOnly bool) er type endCmds struct { repl *Replica lg *spanlatch.Guard - ba roachpb.BatchRequest } // done releases the latches acquired by the command and updates // the timestamp cache using the final timestamp of each command. -func (ec *endCmds) done(br *roachpb.BatchResponse, pErr *roachpb.Error) { +func (ec *endCmds) done(ba *roachpb.BatchRequest, br *roachpb.BatchResponse, pErr *roachpb.Error) { // Update the timestamp cache if the request is not being re-evaluated. Each // request is considered in turn; only those marked as affecting the cache are // processed. Inconsistent reads are excluded. - if ec.ba.ReadConsistency == roachpb.CONSISTENT { - ec.repl.updateTimestampCache(&ec.ba, br, pErr) + if ba.ReadConsistency == roachpb.CONSISTENT { + ec.repl.updateTimestampCache(ba, br, pErr) } // Release the latches acquired by the request back to the spanlatch @@ -1203,7 +1202,6 @@ func (r *Replica) beginCmds( ec := &endCmds{ repl: r, lg: lg, - ba: *ba, } return ec, nil } @@ -1214,7 +1212,7 @@ func (r *Replica) beginCmds( // Admin commands must run on the lease holder replica. Batch support here is // limited to single-element batches; everything else catches an error. func (r *Replica) executeAdminBatch( - ctx context.Context, ba roachpb.BatchRequest, + ctx context.Context, ba *roachpb.BatchRequest, ) (*roachpb.BatchResponse, *roachpb.Error) { if len(ba.Requests) != 1 { return nil, roachpb.NewErrorf("only single-element admin batches allowed") diff --git a/pkg/storage/replica_backpressure.go b/pkg/storage/replica_backpressure.go index ddb66fcabb78..c9ef293f1430 100644 --- a/pkg/storage/replica_backpressure.go +++ b/pkg/storage/replica_backpressure.go @@ -51,7 +51,7 @@ var backpressurableSpans = []roachpb.Span{ // canBackpressureBatch returns whether the provided BatchRequest is eligible // for backpressure. -func canBackpressureBatch(ba roachpb.BatchRequest) bool { +func canBackpressureBatch(ba *roachpb.BatchRequest) bool { // Don't backpressure splits themselves. if ba.Txn != nil && ba.Txn.Name == splitTxnName { return false @@ -92,7 +92,7 @@ func (r *Replica) shouldBackpressureWrites() bool { // maybeBackpressureWriteBatch blocks to apply backpressure if the replica // deems that backpressure is necessary. -func (r *Replica) maybeBackpressureWriteBatch(ctx context.Context, ba roachpb.BatchRequest) error { +func (r *Replica) maybeBackpressureWriteBatch(ctx context.Context, ba *roachpb.BatchRequest) error { if !canBackpressureBatch(ba) { return nil } diff --git a/pkg/storage/replica_evaluate.go b/pkg/storage/replica_evaluate.go index 722bed39f3a2..99498ecd3346 100644 --- a/pkg/storage/replica_evaluate.go +++ b/pkg/storage/replica_evaluate.go @@ -141,27 +141,30 @@ func evaluateBatch( batch engine.ReadWriter, rec batcheval.EvalContext, ms *enginepb.MVCCStats, - ba roachpb.BatchRequest, + ba *roachpb.BatchRequest, readOnly bool, ) (*roachpb.BatchResponse, result.Result, *roachpb.Error) { + // NB: Don't mutate BatchRequest directly. + baReqs := ba.Requests + baHeader := ba.Header br := ba.CreateReply() maxKeys := int64(math.MaxInt64) - if ba.Header.MaxSpanRequestKeys != 0 { + if baHeader.MaxSpanRequestKeys != 0 { // We have a batch of requests with a limit. We keep track of how many // remaining keys we can touch. - maxKeys = ba.Header.MaxSpanRequestKeys + maxKeys = baHeader.MaxSpanRequestKeys } // Optimize any contiguous sequences of put and conditional put ops. - if len(ba.Requests) >= optimizePutThreshold && !readOnly { - ba.Requests = optimizePuts(batch, ba.Requests, ba.Header.DistinctSpans) + if len(baReqs) >= optimizePutThreshold && !readOnly { + baReqs = optimizePuts(batch, baReqs, baHeader.DistinctSpans) } // Create a clone of the transaction to store the new txn state produced on // the return/error path. - if ba.Txn != nil { - ba.Txn = ba.Txn.Clone() + if baHeader.Txn != nil { + baHeader.Txn = baHeader.Txn.Clone() // Check whether this transaction has been aborted, if applicable. // This applies to writes that leave intents (the use of the @@ -171,7 +174,7 @@ func evaluateBatch( // on reads). Note that 1PC transactions have had their // transaction field cleared by this point so we do not execute // this check in that case. - if ba.IsTransactionWrite() || ba.Txn.IsWriting() { + if ba.IsTransactionWrite() || baHeader.Txn.IsWriting() { // We don't check the abort span for a couple of special requests: // - if the request is asking to abort the transaction, then don't check the // AbortSpan; we don't want the request to be rejected if the transaction @@ -181,9 +184,9 @@ func evaluateBatch( // TODO(nvanbenschoten): Let's remove heartbeats from this whitelist when // we rationalize the TODO in txnHeartbeater.heartbeat. singleAbort := ba.IsSingleEndTransactionRequest() && - !ba.Requests[0].GetInner().(*roachpb.EndTransactionRequest).Commit + !baReqs[0].GetInner().(*roachpb.EndTransactionRequest).Commit if !singleAbort && !ba.IsSingleHeartbeatTxnRequest() { - if pErr := checkIfTxnAborted(ctx, rec, batch, *ba.Txn); pErr != nil { + if pErr := checkIfTxnAborted(ctx, rec, batch, *baHeader.Txn); pErr != nil { return nil, result.Result{}, pErr } } @@ -218,7 +221,7 @@ func evaluateBatch( // retry when it is available. // // TODO(bdarnell): Plumb the SQL CanAutoRetry field through to - // !ba.Header.DeferWriteTooOldError. + // !baHeader.DeferWriteTooOldError. // // A more subtle heuristic is also possible: If we get a // WriteTooOldError while writing to a key that we have already read @@ -236,10 +239,10 @@ func evaluateBatch( var writeTooOldErr *roachpb.Error mustReturnWriteTooOldErr := false - for index, union := range ba.Requests { + for index, union := range baReqs { // Execute the command. args := union.GetInner() - if ba.Txn != nil { + if baHeader.Txn != nil { // Sequence numbers used to be set on each BatchRequest instead of // on each individual Request. This meant that all Requests in a // BatchRequest shared the same sequence number, so a BatchIndex was @@ -250,13 +253,13 @@ func evaluateBatch( // Set the Request's sequence number on the TxnMeta for this // request. Each request will set their own sequence number on // the TxnMeta, which is stored as part of an intent. - ba.Txn.Sequence = seqNum + baHeader.Txn.Sequence = seqNum } } // Note that responses are populated even when an error is returned. // TODO(tschottdorf): Change that. IIRC there is nontrivial use of it currently. reply := br.Responses[index].GetInner() - curResult, pErr := evaluateCommand(ctx, idKey, index, batch, rec, ms, ba.Header, maxKeys, args, reply) + curResult, pErr := evaluateCommand(ctx, idKey, index, batch, rec, ms, baHeader, maxKeys, args, reply) if err := result.MergeAndDestroy(curResult); err != nil { // TODO(tschottdorf): see whether we really need to pass nontrivial @@ -296,9 +299,9 @@ func evaluateBatch( mustReturnWriteTooOldErr = true } - if ba.Txn != nil { - ba.Txn.Timestamp.Forward(tErr.ActualTimestamp) - ba.Txn.WriteTooOld = true + if baHeader.Txn != nil { + baHeader.Txn.Timestamp.Forward(tErr.ActualTimestamp) + baHeader.Txn.WriteTooOld = true } // Clear pErr; we're done processing it by having moved the @@ -325,18 +328,18 @@ func evaluateBatch( // accumulate updates to it. // TODO(spencer,tschottdorf): need copy-on-write behavior for the // updated batch transaction / timestamp. - if ba.Txn != nil { + if baHeader.Txn != nil { if txn := reply.Header().Txn; txn != nil { - ba.Txn.Update(txn) + baHeader.Txn.Update(txn) } } } // If there was an EndTransaction in the batch that finalized the transaction, // the WriteTooOld status has been fully processed and we can discard the error. - if ba.Txn != nil && ba.Txn.Status.IsFinalized() { + if baHeader.Txn != nil && baHeader.Txn.Status.IsFinalized() { writeTooOldErr = nil - } else if ba.Txn == nil { + } else if baHeader.Txn == nil { // Non-transactional requests are unable to defer WriteTooOldErrors // because there is no where to defer them to. mustReturnWriteTooOldErr = true @@ -344,25 +347,25 @@ func evaluateBatch( // If there's a write too old error, return now that we've found // the high water timestamp for retries. - if writeTooOldErr != nil && (mustReturnWriteTooOldErr || !ba.Header.DeferWriteTooOldError) { + if writeTooOldErr != nil && (mustReturnWriteTooOldErr || !baHeader.DeferWriteTooOldError) { return nil, result, writeTooOldErr } - if ba.Txn != nil { + if baHeader.Txn != nil { // If transactional, send out the final transaction entry with the reply. - br.Txn = ba.Txn + br.Txn = baHeader.Txn // If the transaction committed, forward the response // timestamp to the commit timestamp in case we were able to // optimize and commit at a higher timestamp without higher-level // retry (i.e. there were no refresh spans and the commit timestamp // wasn't leaked). - if ba.Txn.Status == roachpb.COMMITTED { - br.Timestamp.Forward(ba.Txn.Timestamp) + if baHeader.Txn.Status == roachpb.COMMITTED { + br.Timestamp.Forward(baHeader.Txn.Timestamp) } } // Always update the batch response timestamp field to the timestamp at // which the batch executed. - br.Timestamp.Forward(ba.Timestamp) + br.Timestamp.Forward(baHeader.Timestamp) return br, result, nil } diff --git a/pkg/storage/replica_follower_read.go b/pkg/storage/replica_follower_read.go index e84cd806dc24..ccf8b3c13ba5 100644 --- a/pkg/storage/replica_follower_read.go +++ b/pkg/storage/replica_follower_read.go @@ -35,7 +35,7 @@ var FollowerReadsEnabled = settings.RegisterBoolSetting( // acquired, whether the read only batch can be served as a follower // read despite the error. func (r *Replica) canServeFollowerRead( - ctx context.Context, ba roachpb.BatchRequest, pErr *roachpb.Error, + ctx context.Context, ba *roachpb.BatchRequest, pErr *roachpb.Error, ) *roachpb.Error { canServeFollowerRead := false if lErr, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok && diff --git a/pkg/storage/replica_gossip.go b/pkg/storage/replica_gossip.go index f8278c53c234..cca990b45058 100644 --- a/pkg/storage/replica_gossip.go +++ b/pkg/storage/replica_gossip.go @@ -135,7 +135,7 @@ func (r *Replica) MaybeGossipNodeLiveness(ctx context.Context, span roachpb.Span // Call evaluateBatch instead of Send to avoid reacquiring latches. rec := NewReplicaEvalContext(r, todoSpanSet) br, result, pErr := - evaluateBatch(ctx, storagebase.CmdIDKey(""), r.store.Engine(), rec, nil, ba, true /* readOnly */) + evaluateBatch(ctx, storagebase.CmdIDKey(""), r.store.Engine(), rec, nil, &ba, true /* readOnly */) if pErr != nil { return errors.Wrapf(pErr.GoError(), "couldn't scan node liveness records in span %s", span) } @@ -175,7 +175,7 @@ func (r *Replica) loadSystemConfig(ctx context.Context) (*config.SystemConfigEnt // Call evaluateBatch instead of Send to avoid reacquiring latches. rec := NewReplicaEvalContext(r, todoSpanSet) br, result, pErr := evaluateBatch( - ctx, storagebase.CmdIDKey(""), r.store.Engine(), rec, nil, ba, true, /* readOnly */ + ctx, storagebase.CmdIDKey(""), r.store.Engine(), rec, nil, &ba, true, /* readOnly */ ) if pErr != nil { return nil, pErr.GoError() diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index f1f236e41be9..de3b8aeb9e35 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -123,7 +123,7 @@ type ProposalData struct { // counted on to invoke endCmds itself.) func (proposal *ProposalData) finishApplication(pr proposalResult) { if proposal.endCmds != nil { - proposal.endCmds.done(pr.Reply, pr.Err) + proposal.endCmds.done(proposal.Request, pr.Reply, pr.Err) proposal.endCmds = nil } if proposal.sp != nil { @@ -636,7 +636,7 @@ type proposalResult struct { // // Replica.mu must not be held. func (r *Replica) evaluateProposal( - ctx context.Context, idKey storagebase.CmdIDKey, ba roachpb.BatchRequest, spans *spanset.SpanSet, + ctx context.Context, idKey storagebase.CmdIDKey, ba *roachpb.BatchRequest, spans *spanset.SpanSet, ) (*result.Result, bool, *roachpb.Error) { if ba.Timestamp == (hlc.Timestamp{}) { return nil, false, roachpb.NewErrorf("can't propose Raft command with zero timestamp") @@ -742,7 +742,7 @@ func (r *Replica) evaluateProposal( func (r *Replica) requestToProposal( ctx context.Context, idKey storagebase.CmdIDKey, - ba roachpb.BatchRequest, + ba *roachpb.BatchRequest, endCmds *endCmds, spans *spanset.SpanSet, ) (*ProposalData, *roachpb.Error) { @@ -755,7 +755,7 @@ func (r *Replica) requestToProposal( endCmds: endCmds, doneCh: make(chan proposalResult, 1), Local: &res.Local, - Request: &ba, + Request: ba, } if needConsensus { diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 90e2a219e6f0..74ac08f83230 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -65,7 +65,7 @@ func makeIDKey() storagebase.CmdIDKey { func (r *Replica) evalAndPropose( ctx context.Context, lease roachpb.Lease, - ba roachpb.BatchRequest, + ba *roachpb.BatchRequest, endCmds *endCmds, spans *spanset.SpanSet, ) (_ chan proposalResult, _ func(), _ int64, pErr *roachpb.Error) { @@ -200,7 +200,7 @@ func (r *Replica) evalAndPropose( Ctx: ctx, Cmd: *proposal.command, CmdID: idKey, - Req: ba, + Req: *ba, } if pErr := filter(filterArgs); pErr != nil { return nil, nil, 0, pErr diff --git a/pkg/storage/replica_read.go b/pkg/storage/replica_read.go index 47e82005a115..aecd5f1c1991 100644 --- a/pkg/storage/replica_read.go +++ b/pkg/storage/replica_read.go @@ -27,7 +27,7 @@ import ( // overlapping writes currently processing through Raft ahead of us to // clear via the latches. func (r *Replica) executeReadOnlyBatch( - ctx context.Context, ba roachpb.BatchRequest, + ctx context.Context, ba *roachpb.BatchRequest, ) (br *roachpb.BatchResponse, pErr *roachpb.Error) { // If the read is not inconsistent, the read requires the range lease or // permission to serve via follower reads. @@ -40,9 +40,9 @@ func (r *Replica) executeReadOnlyBatch( r.store.metrics.FollowerReadsCount.Inc(1) } } - r.limitTxnMaxTimestamp(ctx, &ba, status) + r.limitTxnMaxTimestamp(ctx, ba, status) - spans, err := r.collectSpans(&ba) + spans, err := r.collectSpans(ba) if err != nil { return nil, roachpb.NewError(err) } @@ -50,7 +50,7 @@ func (r *Replica) executeReadOnlyBatch( // Acquire latches to prevent overlapping commands from executing // until this command completes. log.Event(ctx, "acquire latches") - endCmds, err := r.beginCmds(ctx, &ba, spans) + endCmds, err := r.beginCmds(ctx, ba, spans) if err != nil { return nil, roachpb.NewError(err) } @@ -64,7 +64,7 @@ func (r *Replica) executeReadOnlyBatch( // timestamp cache update is synchronized. This is wrapped to delay // pErr evaluation to its value when returning. defer func() { - endCmds.done(br, pErr) + endCmds.done(ba, br, pErr) }() // TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed? diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 9249363b3a78..688ae7795d61 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -490,7 +490,7 @@ func TestMaybeStripInFlightWrites(t *testing.T) { var ba roachpb.BatchRequest ba.Add(c.reqs...) t.Run(fmt.Sprint(ba), func(t *testing.T) { - resBa, err := maybeStripInFlightWrites(ba) + resBa, err := maybeStripInFlightWrites(&ba) if c.expErr == "" { if err != nil { t.Errorf("expected no error, got %v", err) @@ -559,7 +559,7 @@ func TestIsOnePhaseCommit(t *testing.T) { ba.Txn.Timestamp = ba.Txn.OrigTimestamp.Add(1, 0) } } - if is1PC := isOnePhaseCommit(ba, &StoreTestingKnobs{}); is1PC != c.exp1PC { + if is1PC := isOnePhaseCommit(&ba, &StoreTestingKnobs{}); is1PC != c.exp1PC { t.Errorf("%d: expected 1pc=%t; got %t", i, c.exp1PC, is1PC) } } @@ -603,7 +603,7 @@ func sendLeaseRequest(r *Replica, l *roachpb.Lease) error { ba.Timestamp = r.store.Clock().Now() ba.Add(&roachpb.RequestLeaseRequest{Lease: *l}) exLease, _ := r.GetLease() - ch, _, _, pErr := r.evalAndPropose(context.TODO(), exLease, ba, nil, &allSpans) + ch, _, _, pErr := r.evalAndPropose(context.TODO(), exLease, &ba, nil, &allSpans) if pErr == nil { // Next if the command was committed, wait for the range to apply it. // TODO(bdarnell): refactor this to a more conventional error-handling pattern. @@ -1384,7 +1384,7 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) { ba := roachpb.BatchRequest{} ba.Timestamp = tc.repl.store.Clock().Now() ba.Add(&roachpb.RequestLeaseRequest{Lease: *lease}) - ch, _, _, pErr := tc.repl.evalAndPropose(context.Background(), exLease, ba, nil, &allSpans) + ch, _, _, pErr := tc.repl.evalAndPropose(context.Background(), exLease, &ba, nil, &allSpans) if pErr == nil { // Next if the command was committed, wait for the range to apply it. // TODO(bdarnell): refactor to a more conventional error-handling pattern. @@ -6960,7 +6960,7 @@ func TestReplicaCancelRaft(t *testing.T) { if err := ba.SetActiveTimestamp(tc.Clock().Now); err != nil { t.Fatal(err) } - _, pErr := tc.repl.executeWriteBatch(ctx, ba) + _, pErr := tc.repl.executeWriteBatch(ctx, &ba) if cancelEarly { if !testutils.IsPError(pErr, context.Canceled.Error()) { t.Fatalf("expected canceled error; got %v", pErr) @@ -7013,7 +7013,7 @@ func TestReplicaAbandonProposal(t *testing.T) { ba.Add(&roachpb.PutRequest{ RequestHeader: roachpb.RequestHeader{Key: []byte("acdfg")}, }) - _, pErr := tc.repl.executeWriteBatch(ctx, ba) + _, pErr := tc.repl.executeWriteBatch(ctx, &ba) if pErr == nil { t.Fatal("expected failure, but found success") } @@ -7242,7 +7242,7 @@ func TestReplicaIDChangePending(t *testing.T) { }, Value: roachpb.MakeValueFromBytes([]byte("val")), }) - _, _, _, err := repl.evalAndPropose(context.Background(), lease, ba, nil, &allSpans) + _, _, _, err := repl.evalAndPropose(context.Background(), lease, &ba, nil, &allSpans) if err != nil { t.Fatal(err) } @@ -7370,7 +7370,7 @@ func TestReplicaRetryRaftProposal(t *testing.T) { { _, pErr := tc.repl.executeWriteBatch( context.WithValue(ctx, magicKey{}, "foo"), - ba, + &ba, ) if pErr != nil { t.Fatalf("write batch returned error: %s", pErr) @@ -7401,7 +7401,7 @@ func TestReplicaRetryRaftProposal(t *testing.T) { }) _, pErr := tc.repl.executeWriteBatch( context.WithValue(ctx, magicKey{}, "foo"), - ba, + &ba, ) if pErr != nil { t.Fatal(pErr) @@ -7448,7 +7448,7 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) { Key: roachpb.Key(fmt.Sprintf("k%d", i)), }, }) - ch, _, idx, err := repl.evalAndPropose(ctx, lease, ba, nil, &allSpans) + ch, _, idx, err := repl.evalAndPropose(ctx, lease, &ba, nil, &allSpans) if err != nil { t.Fatal(err) } @@ -7514,7 +7514,7 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { Key: roachpb.Key(fmt.Sprintf("k%d", i)), }, }) - ch, _, _, err := tc.repl.evalAndPropose(ctx, lease, ba, nil, &allSpans) + ch, _, _, err := tc.repl.evalAndPropose(ctx, lease, &ba, nil, &allSpans) if err != nil { t.Fatal(err) } @@ -7639,7 +7639,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(id)}}) lease, _ := r.GetLease() ctx := context.Background() - cmd, pErr := r.requestToProposal(ctx, storagebase.CmdIDKey(id), ba, nil, &allSpans) + cmd, pErr := r.requestToProposal(ctx, storagebase.CmdIDKey(id), &ba, nil, &allSpans) if pErr != nil { t.Fatal(pErr) } @@ -7765,7 +7765,7 @@ func TestReplicaRefreshMultiple(t *testing.T) { incCmdID = makeIDKey() atomic.StoreInt32(&filterActive, 1) - proposal, pErr := repl.requestToProposal(ctx, incCmdID, ba, nil, &allSpans) + proposal, pErr := repl.requestToProposal(ctx, incCmdID, &ba, nil, &allSpans) if pErr != nil { t.Fatal(pErr) } @@ -8128,7 +8128,7 @@ func TestReplicaEvaluationNotTxnMutation(t *testing.T) { assignSeqNumsForReqs(txn, &txnPut, &txnPut2) origTxn := txn.Clone() - batch, _, _, _, pErr := tc.repl.evaluateWriteBatch(ctx, makeIDKey(), ba, &allSpans) + batch, _, _, _, pErr := tc.repl.evaluateWriteBatch(ctx, makeIDKey(), &ba, &allSpans) defer batch.Close() if pErr != nil { t.Fatal(pErr) @@ -8761,7 +8761,7 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) { exLease, _ := repl.GetLease() ch, _, _, pErr := repl.evalAndPropose( - context.Background(), exLease, ba, nil /* endCmds */, &allSpans, + context.Background(), exLease, &ba, nil /* endCmds */, &allSpans, ) if pErr != nil { t.Fatal(pErr) @@ -8808,7 +8808,7 @@ func TestProposeWithAsyncConsensus(t *testing.T) { atomic.StoreInt32(&filterActive, 1) exLease, _ := repl.GetLease() ch, _, _, pErr := repl.evalAndPropose( - context.Background(), exLease, ba, nil /* endCmds */, &allSpans, + context.Background(), exLease, &ba, nil /* endCmds */, &allSpans, ) if pErr != nil { t.Fatal(pErr) @@ -8872,7 +8872,7 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) { atomic.StoreInt32(&filterActive, 1) exLease, _ := repl.GetLease() - _, _, _, pErr := repl.evalAndPropose(ctx, exLease, ba, nil /* endCmds */, &allSpans) + _, _, _, pErr := repl.evalAndPropose(ctx, exLease, &ba, nil /* endCmds */, &allSpans) if pErr != nil { t.Fatal(pErr) } @@ -8890,7 +8890,7 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) { ba2.Timestamp = tc.Clock().Now() var pErr *roachpb.Error - ch, _, _, pErr = repl.evalAndPropose(ctx, exLease, ba, nil /* endCmds */, &allSpans) + ch, _, _, pErr = repl.evalAndPropose(ctx, exLease, &ba, nil /* endCmds */, &allSpans) if pErr != nil { t.Fatal(pErr) } diff --git a/pkg/storage/replica_write.go b/pkg/storage/replica_write.go index ac6a1a0482a3..aadb6927088f 100644 --- a/pkg/storage/replica_write.go +++ b/pkg/storage/replica_write.go @@ -60,7 +60,7 @@ import ( // as this method makes the assumption that it operates on a shallow copy (see // call to applyTimestampCache). func (r *Replica) executeWriteBatch( - ctx context.Context, ba roachpb.BatchRequest, + ctx context.Context, ba *roachpb.BatchRequest, ) (br *roachpb.BatchResponse, pErr *roachpb.Error) { startTime := timeutil.Now() @@ -74,7 +74,7 @@ func (r *Replica) executeWriteBatch( return nil, roachpb.NewError(err) } - spans, err := r.collectSpans(&ba) + spans, err := r.collectSpans(ba) if err != nil { return nil, roachpb.NewError(err) } @@ -87,7 +87,7 @@ func (r *Replica) executeWriteBatch( // after preceding commands have been run to successful completion. log.Event(ctx, "acquire latches") var err error - endCmds, err = r.beginCmds(ctx, &ba, spans) + endCmds, err = r.beginCmds(ctx, ba, spans) if err != nil { return nil, roachpb.NewError(err) } @@ -97,7 +97,7 @@ func (r *Replica) executeWriteBatch( // wrapped to delay pErr evaluation to its value when returning. defer func() { if endCmds != nil { - endCmds.done(br, pErr) + endCmds.done(ba, br, pErr) } }() @@ -114,7 +114,7 @@ func (r *Replica) executeWriteBatch( } lease = status.Lease } - r.limitTxnMaxTimestamp(ctx, &ba, status) + r.limitTxnMaxTimestamp(ctx, ba, status) minTS, untrack := r.store.cfg.ClosedTimestamp.Tracker.Track(ctx) defer untrack(ctx, 0, 0, 0) // covers all error returns below @@ -123,7 +123,7 @@ func (r *Replica) executeWriteBatch( // commands which require this command to move its timestamp // forward. Or, in the case of a transactional write, the txn // timestamp and possible write-too-old bool. - if bumped, pErr := r.applyTimestampCache(ctx, &ba, minTS); pErr != nil { + if bumped, pErr := r.applyTimestampCache(ctx, ba, minTS); pErr != nil { return nil, pErr } else if bumped { // If we bump the transaction's timestamp, we must absolutely @@ -258,7 +258,10 @@ and the following Raft status: %+v`, // re-executed in full. This allows it to lay down intents and return // an appropriate retryable error. func (r *Replica) evaluateWriteBatch( - ctx context.Context, idKey storagebase.CmdIDKey, ba roachpb.BatchRequest, spans *spanset.SpanSet, + ctx context.Context, + idKey storagebase.CmdIDKey, + ba *roachpb.BatchRequest, + spans *spanset.SpanSet, ) (engine.Batch, enginepb.MVCCStats, *roachpb.BatchResponse, result.Result, *roachpb.Error) { ms := enginepb.MVCCStats{} // If not transactional or there are indications that the batch's txn will @@ -270,7 +273,7 @@ func (r *Replica) evaluateWriteBatch( // Try executing with transaction stripped. We use the transaction timestamp // to write any values as it may have been advanced by the timestamp cache. - strippedBa := ba + strippedBa := *ba strippedBa.Timestamp = strippedBa.Txn.Timestamp strippedBa.Txn = nil if hasBegin { @@ -287,7 +290,7 @@ func (r *Replica) evaluateWriteBatch( // If all writes occurred at the intended timestamp, we've succeeded on the fast path. rec := NewReplicaEvalContext(r, spans) batch, br, res, pErr := r.evaluateWriteBatchWithLocalRetries( - ctx, idKey, rec, &ms, strippedBa, spans, retryLocally, + ctx, idKey, rec, &ms, &strippedBa, spans, retryLocally, ) if pErr == nil && (ba.Timestamp == br.Timestamp || (retryLocally && !batcheval.IsEndTransactionExceedingDeadline(br.Timestamp, etArg))) { @@ -362,7 +365,7 @@ func (r *Replica) evaluateWriteBatchWithLocalRetries( idKey storagebase.CmdIDKey, rec batcheval.EvalContext, ms *enginepb.MVCCStats, - ba roachpb.BatchRequest, + ba *roachpb.BatchRequest, spans *spanset.SpanSet, canRetry bool, ) (batch engine.Batch, br *roachpb.BatchResponse, res result.Result, pErr *roachpb.Error) { @@ -440,7 +443,7 @@ func (r *Replica) evaluateWriteBatchWithLocalRetries( // serializable and the commit timestamp has been forwarded, or (3) the // transaction exceeded its deadline, or (4) the testing knobs disallow optional // one phase commits and the BatchRequest does not require one phase commit. -func isOnePhaseCommit(ba roachpb.BatchRequest, knobs *StoreTestingKnobs) bool { +func isOnePhaseCommit(ba *roachpb.BatchRequest, knobs *StoreTestingKnobs) bool { if ba.Txn == nil { return false } @@ -468,7 +471,7 @@ func isOnePhaseCommit(ba roachpb.BatchRequest, knobs *StoreTestingKnobs) bool { // entirely. This is possible if the function removes all of the in-flight // writes from an EndTransaction request that was committing in parallel with // writes which all happened to be on the same range as the transaction record. -func maybeStripInFlightWrites(ba roachpb.BatchRequest) (roachpb.BatchRequest, error) { +func maybeStripInFlightWrites(ba *roachpb.BatchRequest) (*roachpb.BatchRequest, error) { args, hasET := ba.GetArg(roachpb.EndTransaction) if !hasET { return ba, nil diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 9782ef687fe6..60fca73d95fd 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -727,7 +727,7 @@ func TestStoreRemoveReplicaDestroy(t *testing.T) { } if _, _, _, pErr := repl1.evalAndPropose( - context.Background(), lease, roachpb.BatchRequest{}, nil, &allSpans, + context.Background(), lease, &roachpb.BatchRequest{}, nil, &allSpans, ); !pErr.Equal(expErr) { t.Fatalf("expected error %s, but got %v", expErr, pErr) }