From c6e77653507eed9315270a90cb55b76bbab38c83 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 15 Jul 2019 11:36:48 -0400 Subject: [PATCH] storage: avoid passing BatchRequest by value through storage stack This commit avoids the repeated passing of BatchRequest values all the way down the storage stack. This is fairly expensive, as most of the layers allowed the request to escape to the heap for one reason or another. Instead of permitting these repeat allocations, we perform a single alloc in Replica.sendWithRangeID. This also removes the BatchRequest from endCmds. This was the original intention of this commit. Release note: None --- pkg/storage/replica.go | 20 +++++----- pkg/storage/replica_backpressure.go | 4 +- pkg/storage/replica_evaluate.go | 59 +++++++++++++++------------- pkg/storage/replica_follower_read.go | 2 +- pkg/storage/replica_gossip.go | 4 +- pkg/storage/replica_proposal.go | 8 ++-- pkg/storage/replica_raft.go | 4 +- pkg/storage/replica_read.go | 10 ++--- pkg/storage/replica_test.go | 36 ++++++++--------- pkg/storage/replica_write.go | 27 +++++++------ pkg/storage/store_test.go | 2 +- 11 files changed, 90 insertions(+), 86 deletions(-) diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 0f4480c6a8aa..10d3fd5486ca 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -519,7 +519,7 @@ func (r *Replica) sendWithRangeID( useRaft = true } - if err := r.checkBatchRequest(ba, isReadOnly); err != nil { + if err := r.checkBatchRequest(&ba, isReadOnly); err != nil { return nil, roachpb.NewError(err) } @@ -533,13 +533,13 @@ func (r *Replica) sendWithRangeID( var pErr *roachpb.Error if useRaft { log.Event(ctx, "read-write path") - br, pErr = r.executeWriteBatch(ctx, ba) + br, pErr = r.executeWriteBatch(ctx, &ba) } else if isReadOnly { log.Event(ctx, "read-only path") - br, pErr = r.executeReadOnlyBatch(ctx, ba) + br, pErr = r.executeReadOnlyBatch(ctx, &ba) } else if ba.IsAdmin() { log.Event(ctx, "admin path") - br, pErr = r.executeAdminBatch(ctx, ba) + br, pErr = r.executeAdminBatch(ctx, &ba) } else if len(ba.Requests) == 0 { // empty batch; shouldn't happen (we could handle it, but it hints // at someone doing weird things, and once we drop the key range @@ -1000,7 +1000,7 @@ func (r *Replica) requestCanProceed(rspan roachpb.RSpan, ts hlc.Timestamp) error // // TODO(tschottdorf): should check that request is contained in range // and that EndTransaction only occurs at the very end. -func (r *Replica) checkBatchRequest(ba roachpb.BatchRequest, isReadOnly bool) error { +func (r *Replica) checkBatchRequest(ba *roachpb.BatchRequest, isReadOnly bool) error { if ba.Timestamp == (hlc.Timestamp{}) { // For transactional requests, Store.Send sets the timestamp. For non- // transactional requests, the client sets the timestamp. Either way, we @@ -1025,17 +1025,16 @@ func (r *Replica) checkBatchRequest(ba roachpb.BatchRequest, isReadOnly bool) er type endCmds struct { repl *Replica lg *spanlatch.Guard - ba roachpb.BatchRequest } // done releases the latches acquired by the command and updates // the timestamp cache using the final timestamp of each command. -func (ec *endCmds) done(br *roachpb.BatchResponse, pErr *roachpb.Error) { +func (ec *endCmds) done(ba *roachpb.BatchRequest, br *roachpb.BatchResponse, pErr *roachpb.Error) { // Update the timestamp cache if the request is not being re-evaluated. Each // request is considered in turn; only those marked as affecting the cache are // processed. Inconsistent reads are excluded. - if ec.ba.ReadConsistency == roachpb.CONSISTENT { - ec.repl.updateTimestampCache(&ec.ba, br, pErr) + if ba.ReadConsistency == roachpb.CONSISTENT { + ec.repl.updateTimestampCache(ba, br, pErr) } // Release the latches acquired by the request back to the spanlatch @@ -1203,7 +1202,6 @@ func (r *Replica) beginCmds( ec := &endCmds{ repl: r, lg: lg, - ba: *ba, } return ec, nil } @@ -1214,7 +1212,7 @@ func (r *Replica) beginCmds( // Admin commands must run on the lease holder replica. Batch support here is // limited to single-element batches; everything else catches an error. func (r *Replica) executeAdminBatch( - ctx context.Context, ba roachpb.BatchRequest, + ctx context.Context, ba *roachpb.BatchRequest, ) (*roachpb.BatchResponse, *roachpb.Error) { if len(ba.Requests) != 1 { return nil, roachpb.NewErrorf("only single-element admin batches allowed") diff --git a/pkg/storage/replica_backpressure.go b/pkg/storage/replica_backpressure.go index ddb66fcabb78..c9ef293f1430 100644 --- a/pkg/storage/replica_backpressure.go +++ b/pkg/storage/replica_backpressure.go @@ -51,7 +51,7 @@ var backpressurableSpans = []roachpb.Span{ // canBackpressureBatch returns whether the provided BatchRequest is eligible // for backpressure. -func canBackpressureBatch(ba roachpb.BatchRequest) bool { +func canBackpressureBatch(ba *roachpb.BatchRequest) bool { // Don't backpressure splits themselves. if ba.Txn != nil && ba.Txn.Name == splitTxnName { return false @@ -92,7 +92,7 @@ func (r *Replica) shouldBackpressureWrites() bool { // maybeBackpressureWriteBatch blocks to apply backpressure if the replica // deems that backpressure is necessary. -func (r *Replica) maybeBackpressureWriteBatch(ctx context.Context, ba roachpb.BatchRequest) error { +func (r *Replica) maybeBackpressureWriteBatch(ctx context.Context, ba *roachpb.BatchRequest) error { if !canBackpressureBatch(ba) { return nil } diff --git a/pkg/storage/replica_evaluate.go b/pkg/storage/replica_evaluate.go index 722bed39f3a2..99498ecd3346 100644 --- a/pkg/storage/replica_evaluate.go +++ b/pkg/storage/replica_evaluate.go @@ -141,27 +141,30 @@ func evaluateBatch( batch engine.ReadWriter, rec batcheval.EvalContext, ms *enginepb.MVCCStats, - ba roachpb.BatchRequest, + ba *roachpb.BatchRequest, readOnly bool, ) (*roachpb.BatchResponse, result.Result, *roachpb.Error) { + // NB: Don't mutate BatchRequest directly. + baReqs := ba.Requests + baHeader := ba.Header br := ba.CreateReply() maxKeys := int64(math.MaxInt64) - if ba.Header.MaxSpanRequestKeys != 0 { + if baHeader.MaxSpanRequestKeys != 0 { // We have a batch of requests with a limit. We keep track of how many // remaining keys we can touch. - maxKeys = ba.Header.MaxSpanRequestKeys + maxKeys = baHeader.MaxSpanRequestKeys } // Optimize any contiguous sequences of put and conditional put ops. - if len(ba.Requests) >= optimizePutThreshold && !readOnly { - ba.Requests = optimizePuts(batch, ba.Requests, ba.Header.DistinctSpans) + if len(baReqs) >= optimizePutThreshold && !readOnly { + baReqs = optimizePuts(batch, baReqs, baHeader.DistinctSpans) } // Create a clone of the transaction to store the new txn state produced on // the return/error path. - if ba.Txn != nil { - ba.Txn = ba.Txn.Clone() + if baHeader.Txn != nil { + baHeader.Txn = baHeader.Txn.Clone() // Check whether this transaction has been aborted, if applicable. // This applies to writes that leave intents (the use of the @@ -171,7 +174,7 @@ func evaluateBatch( // on reads). Note that 1PC transactions have had their // transaction field cleared by this point so we do not execute // this check in that case. - if ba.IsTransactionWrite() || ba.Txn.IsWriting() { + if ba.IsTransactionWrite() || baHeader.Txn.IsWriting() { // We don't check the abort span for a couple of special requests: // - if the request is asking to abort the transaction, then don't check the // AbortSpan; we don't want the request to be rejected if the transaction @@ -181,9 +184,9 @@ func evaluateBatch( // TODO(nvanbenschoten): Let's remove heartbeats from this whitelist when // we rationalize the TODO in txnHeartbeater.heartbeat. singleAbort := ba.IsSingleEndTransactionRequest() && - !ba.Requests[0].GetInner().(*roachpb.EndTransactionRequest).Commit + !baReqs[0].GetInner().(*roachpb.EndTransactionRequest).Commit if !singleAbort && !ba.IsSingleHeartbeatTxnRequest() { - if pErr := checkIfTxnAborted(ctx, rec, batch, *ba.Txn); pErr != nil { + if pErr := checkIfTxnAborted(ctx, rec, batch, *baHeader.Txn); pErr != nil { return nil, result.Result{}, pErr } } @@ -218,7 +221,7 @@ func evaluateBatch( // retry when it is available. // // TODO(bdarnell): Plumb the SQL CanAutoRetry field through to - // !ba.Header.DeferWriteTooOldError. + // !baHeader.DeferWriteTooOldError. // // A more subtle heuristic is also possible: If we get a // WriteTooOldError while writing to a key that we have already read @@ -236,10 +239,10 @@ func evaluateBatch( var writeTooOldErr *roachpb.Error mustReturnWriteTooOldErr := false - for index, union := range ba.Requests { + for index, union := range baReqs { // Execute the command. args := union.GetInner() - if ba.Txn != nil { + if baHeader.Txn != nil { // Sequence numbers used to be set on each BatchRequest instead of // on each individual Request. This meant that all Requests in a // BatchRequest shared the same sequence number, so a BatchIndex was @@ -250,13 +253,13 @@ func evaluateBatch( // Set the Request's sequence number on the TxnMeta for this // request. Each request will set their own sequence number on // the TxnMeta, which is stored as part of an intent. - ba.Txn.Sequence = seqNum + baHeader.Txn.Sequence = seqNum } } // Note that responses are populated even when an error is returned. // TODO(tschottdorf): Change that. IIRC there is nontrivial use of it currently. reply := br.Responses[index].GetInner() - curResult, pErr := evaluateCommand(ctx, idKey, index, batch, rec, ms, ba.Header, maxKeys, args, reply) + curResult, pErr := evaluateCommand(ctx, idKey, index, batch, rec, ms, baHeader, maxKeys, args, reply) if err := result.MergeAndDestroy(curResult); err != nil { // TODO(tschottdorf): see whether we really need to pass nontrivial @@ -296,9 +299,9 @@ func evaluateBatch( mustReturnWriteTooOldErr = true } - if ba.Txn != nil { - ba.Txn.Timestamp.Forward(tErr.ActualTimestamp) - ba.Txn.WriteTooOld = true + if baHeader.Txn != nil { + baHeader.Txn.Timestamp.Forward(tErr.ActualTimestamp) + baHeader.Txn.WriteTooOld = true } // Clear pErr; we're done processing it by having moved the @@ -325,18 +328,18 @@ func evaluateBatch( // accumulate updates to it. // TODO(spencer,tschottdorf): need copy-on-write behavior for the // updated batch transaction / timestamp. - if ba.Txn != nil { + if baHeader.Txn != nil { if txn := reply.Header().Txn; txn != nil { - ba.Txn.Update(txn) + baHeader.Txn.Update(txn) } } } // If there was an EndTransaction in the batch that finalized the transaction, // the WriteTooOld status has been fully processed and we can discard the error. - if ba.Txn != nil && ba.Txn.Status.IsFinalized() { + if baHeader.Txn != nil && baHeader.Txn.Status.IsFinalized() { writeTooOldErr = nil - } else if ba.Txn == nil { + } else if baHeader.Txn == nil { // Non-transactional requests are unable to defer WriteTooOldErrors // because there is no where to defer them to. mustReturnWriteTooOldErr = true @@ -344,25 +347,25 @@ func evaluateBatch( // If there's a write too old error, return now that we've found // the high water timestamp for retries. - if writeTooOldErr != nil && (mustReturnWriteTooOldErr || !ba.Header.DeferWriteTooOldError) { + if writeTooOldErr != nil && (mustReturnWriteTooOldErr || !baHeader.DeferWriteTooOldError) { return nil, result, writeTooOldErr } - if ba.Txn != nil { + if baHeader.Txn != nil { // If transactional, send out the final transaction entry with the reply. - br.Txn = ba.Txn + br.Txn = baHeader.Txn // If the transaction committed, forward the response // timestamp to the commit timestamp in case we were able to // optimize and commit at a higher timestamp without higher-level // retry (i.e. there were no refresh spans and the commit timestamp // wasn't leaked). - if ba.Txn.Status == roachpb.COMMITTED { - br.Timestamp.Forward(ba.Txn.Timestamp) + if baHeader.Txn.Status == roachpb.COMMITTED { + br.Timestamp.Forward(baHeader.Txn.Timestamp) } } // Always update the batch response timestamp field to the timestamp at // which the batch executed. - br.Timestamp.Forward(ba.Timestamp) + br.Timestamp.Forward(baHeader.Timestamp) return br, result, nil } diff --git a/pkg/storage/replica_follower_read.go b/pkg/storage/replica_follower_read.go index e84cd806dc24..ccf8b3c13ba5 100644 --- a/pkg/storage/replica_follower_read.go +++ b/pkg/storage/replica_follower_read.go @@ -35,7 +35,7 @@ var FollowerReadsEnabled = settings.RegisterBoolSetting( // acquired, whether the read only batch can be served as a follower // read despite the error. func (r *Replica) canServeFollowerRead( - ctx context.Context, ba roachpb.BatchRequest, pErr *roachpb.Error, + ctx context.Context, ba *roachpb.BatchRequest, pErr *roachpb.Error, ) *roachpb.Error { canServeFollowerRead := false if lErr, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok && diff --git a/pkg/storage/replica_gossip.go b/pkg/storage/replica_gossip.go index f8278c53c234..cca990b45058 100644 --- a/pkg/storage/replica_gossip.go +++ b/pkg/storage/replica_gossip.go @@ -135,7 +135,7 @@ func (r *Replica) MaybeGossipNodeLiveness(ctx context.Context, span roachpb.Span // Call evaluateBatch instead of Send to avoid reacquiring latches. rec := NewReplicaEvalContext(r, todoSpanSet) br, result, pErr := - evaluateBatch(ctx, storagebase.CmdIDKey(""), r.store.Engine(), rec, nil, ba, true /* readOnly */) + evaluateBatch(ctx, storagebase.CmdIDKey(""), r.store.Engine(), rec, nil, &ba, true /* readOnly */) if pErr != nil { return errors.Wrapf(pErr.GoError(), "couldn't scan node liveness records in span %s", span) } @@ -175,7 +175,7 @@ func (r *Replica) loadSystemConfig(ctx context.Context) (*config.SystemConfigEnt // Call evaluateBatch instead of Send to avoid reacquiring latches. rec := NewReplicaEvalContext(r, todoSpanSet) br, result, pErr := evaluateBatch( - ctx, storagebase.CmdIDKey(""), r.store.Engine(), rec, nil, ba, true, /* readOnly */ + ctx, storagebase.CmdIDKey(""), r.store.Engine(), rec, nil, &ba, true, /* readOnly */ ) if pErr != nil { return nil, pErr.GoError() diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index f1f236e41be9..de3b8aeb9e35 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -123,7 +123,7 @@ type ProposalData struct { // counted on to invoke endCmds itself.) func (proposal *ProposalData) finishApplication(pr proposalResult) { if proposal.endCmds != nil { - proposal.endCmds.done(pr.Reply, pr.Err) + proposal.endCmds.done(proposal.Request, pr.Reply, pr.Err) proposal.endCmds = nil } if proposal.sp != nil { @@ -636,7 +636,7 @@ type proposalResult struct { // // Replica.mu must not be held. func (r *Replica) evaluateProposal( - ctx context.Context, idKey storagebase.CmdIDKey, ba roachpb.BatchRequest, spans *spanset.SpanSet, + ctx context.Context, idKey storagebase.CmdIDKey, ba *roachpb.BatchRequest, spans *spanset.SpanSet, ) (*result.Result, bool, *roachpb.Error) { if ba.Timestamp == (hlc.Timestamp{}) { return nil, false, roachpb.NewErrorf("can't propose Raft command with zero timestamp") @@ -742,7 +742,7 @@ func (r *Replica) evaluateProposal( func (r *Replica) requestToProposal( ctx context.Context, idKey storagebase.CmdIDKey, - ba roachpb.BatchRequest, + ba *roachpb.BatchRequest, endCmds *endCmds, spans *spanset.SpanSet, ) (*ProposalData, *roachpb.Error) { @@ -755,7 +755,7 @@ func (r *Replica) requestToProposal( endCmds: endCmds, doneCh: make(chan proposalResult, 1), Local: &res.Local, - Request: &ba, + Request: ba, } if needConsensus { diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 90e2a219e6f0..74ac08f83230 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -65,7 +65,7 @@ func makeIDKey() storagebase.CmdIDKey { func (r *Replica) evalAndPropose( ctx context.Context, lease roachpb.Lease, - ba roachpb.BatchRequest, + ba *roachpb.BatchRequest, endCmds *endCmds, spans *spanset.SpanSet, ) (_ chan proposalResult, _ func(), _ int64, pErr *roachpb.Error) { @@ -200,7 +200,7 @@ func (r *Replica) evalAndPropose( Ctx: ctx, Cmd: *proposal.command, CmdID: idKey, - Req: ba, + Req: *ba, } if pErr := filter(filterArgs); pErr != nil { return nil, nil, 0, pErr diff --git a/pkg/storage/replica_read.go b/pkg/storage/replica_read.go index 47e82005a115..aecd5f1c1991 100644 --- a/pkg/storage/replica_read.go +++ b/pkg/storage/replica_read.go @@ -27,7 +27,7 @@ import ( // overlapping writes currently processing through Raft ahead of us to // clear via the latches. func (r *Replica) executeReadOnlyBatch( - ctx context.Context, ba roachpb.BatchRequest, + ctx context.Context, ba *roachpb.BatchRequest, ) (br *roachpb.BatchResponse, pErr *roachpb.Error) { // If the read is not inconsistent, the read requires the range lease or // permission to serve via follower reads. @@ -40,9 +40,9 @@ func (r *Replica) executeReadOnlyBatch( r.store.metrics.FollowerReadsCount.Inc(1) } } - r.limitTxnMaxTimestamp(ctx, &ba, status) + r.limitTxnMaxTimestamp(ctx, ba, status) - spans, err := r.collectSpans(&ba) + spans, err := r.collectSpans(ba) if err != nil { return nil, roachpb.NewError(err) } @@ -50,7 +50,7 @@ func (r *Replica) executeReadOnlyBatch( // Acquire latches to prevent overlapping commands from executing // until this command completes. log.Event(ctx, "acquire latches") - endCmds, err := r.beginCmds(ctx, &ba, spans) + endCmds, err := r.beginCmds(ctx, ba, spans) if err != nil { return nil, roachpb.NewError(err) } @@ -64,7 +64,7 @@ func (r *Replica) executeReadOnlyBatch( // timestamp cache update is synchronized. This is wrapped to delay // pErr evaluation to its value when returning. defer func() { - endCmds.done(br, pErr) + endCmds.done(ba, br, pErr) }() // TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed? diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 9249363b3a78..688ae7795d61 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -490,7 +490,7 @@ func TestMaybeStripInFlightWrites(t *testing.T) { var ba roachpb.BatchRequest ba.Add(c.reqs...) t.Run(fmt.Sprint(ba), func(t *testing.T) { - resBa, err := maybeStripInFlightWrites(ba) + resBa, err := maybeStripInFlightWrites(&ba) if c.expErr == "" { if err != nil { t.Errorf("expected no error, got %v", err) @@ -559,7 +559,7 @@ func TestIsOnePhaseCommit(t *testing.T) { ba.Txn.Timestamp = ba.Txn.OrigTimestamp.Add(1, 0) } } - if is1PC := isOnePhaseCommit(ba, &StoreTestingKnobs{}); is1PC != c.exp1PC { + if is1PC := isOnePhaseCommit(&ba, &StoreTestingKnobs{}); is1PC != c.exp1PC { t.Errorf("%d: expected 1pc=%t; got %t", i, c.exp1PC, is1PC) } } @@ -603,7 +603,7 @@ func sendLeaseRequest(r *Replica, l *roachpb.Lease) error { ba.Timestamp = r.store.Clock().Now() ba.Add(&roachpb.RequestLeaseRequest{Lease: *l}) exLease, _ := r.GetLease() - ch, _, _, pErr := r.evalAndPropose(context.TODO(), exLease, ba, nil, &allSpans) + ch, _, _, pErr := r.evalAndPropose(context.TODO(), exLease, &ba, nil, &allSpans) if pErr == nil { // Next if the command was committed, wait for the range to apply it. // TODO(bdarnell): refactor this to a more conventional error-handling pattern. @@ -1384,7 +1384,7 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) { ba := roachpb.BatchRequest{} ba.Timestamp = tc.repl.store.Clock().Now() ba.Add(&roachpb.RequestLeaseRequest{Lease: *lease}) - ch, _, _, pErr := tc.repl.evalAndPropose(context.Background(), exLease, ba, nil, &allSpans) + ch, _, _, pErr := tc.repl.evalAndPropose(context.Background(), exLease, &ba, nil, &allSpans) if pErr == nil { // Next if the command was committed, wait for the range to apply it. // TODO(bdarnell): refactor to a more conventional error-handling pattern. @@ -6960,7 +6960,7 @@ func TestReplicaCancelRaft(t *testing.T) { if err := ba.SetActiveTimestamp(tc.Clock().Now); err != nil { t.Fatal(err) } - _, pErr := tc.repl.executeWriteBatch(ctx, ba) + _, pErr := tc.repl.executeWriteBatch(ctx, &ba) if cancelEarly { if !testutils.IsPError(pErr, context.Canceled.Error()) { t.Fatalf("expected canceled error; got %v", pErr) @@ -7013,7 +7013,7 @@ func TestReplicaAbandonProposal(t *testing.T) { ba.Add(&roachpb.PutRequest{ RequestHeader: roachpb.RequestHeader{Key: []byte("acdfg")}, }) - _, pErr := tc.repl.executeWriteBatch(ctx, ba) + _, pErr := tc.repl.executeWriteBatch(ctx, &ba) if pErr == nil { t.Fatal("expected failure, but found success") } @@ -7242,7 +7242,7 @@ func TestReplicaIDChangePending(t *testing.T) { }, Value: roachpb.MakeValueFromBytes([]byte("val")), }) - _, _, _, err := repl.evalAndPropose(context.Background(), lease, ba, nil, &allSpans) + _, _, _, err := repl.evalAndPropose(context.Background(), lease, &ba, nil, &allSpans) if err != nil { t.Fatal(err) } @@ -7370,7 +7370,7 @@ func TestReplicaRetryRaftProposal(t *testing.T) { { _, pErr := tc.repl.executeWriteBatch( context.WithValue(ctx, magicKey{}, "foo"), - ba, + &ba, ) if pErr != nil { t.Fatalf("write batch returned error: %s", pErr) @@ -7401,7 +7401,7 @@ func TestReplicaRetryRaftProposal(t *testing.T) { }) _, pErr := tc.repl.executeWriteBatch( context.WithValue(ctx, magicKey{}, "foo"), - ba, + &ba, ) if pErr != nil { t.Fatal(pErr) @@ -7448,7 +7448,7 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) { Key: roachpb.Key(fmt.Sprintf("k%d", i)), }, }) - ch, _, idx, err := repl.evalAndPropose(ctx, lease, ba, nil, &allSpans) + ch, _, idx, err := repl.evalAndPropose(ctx, lease, &ba, nil, &allSpans) if err != nil { t.Fatal(err) } @@ -7514,7 +7514,7 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { Key: roachpb.Key(fmt.Sprintf("k%d", i)), }, }) - ch, _, _, err := tc.repl.evalAndPropose(ctx, lease, ba, nil, &allSpans) + ch, _, _, err := tc.repl.evalAndPropose(ctx, lease, &ba, nil, &allSpans) if err != nil { t.Fatal(err) } @@ -7639,7 +7639,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(id)}}) lease, _ := r.GetLease() ctx := context.Background() - cmd, pErr := r.requestToProposal(ctx, storagebase.CmdIDKey(id), ba, nil, &allSpans) + cmd, pErr := r.requestToProposal(ctx, storagebase.CmdIDKey(id), &ba, nil, &allSpans) if pErr != nil { t.Fatal(pErr) } @@ -7765,7 +7765,7 @@ func TestReplicaRefreshMultiple(t *testing.T) { incCmdID = makeIDKey() atomic.StoreInt32(&filterActive, 1) - proposal, pErr := repl.requestToProposal(ctx, incCmdID, ba, nil, &allSpans) + proposal, pErr := repl.requestToProposal(ctx, incCmdID, &ba, nil, &allSpans) if pErr != nil { t.Fatal(pErr) } @@ -8128,7 +8128,7 @@ func TestReplicaEvaluationNotTxnMutation(t *testing.T) { assignSeqNumsForReqs(txn, &txnPut, &txnPut2) origTxn := txn.Clone() - batch, _, _, _, pErr := tc.repl.evaluateWriteBatch(ctx, makeIDKey(), ba, &allSpans) + batch, _, _, _, pErr := tc.repl.evaluateWriteBatch(ctx, makeIDKey(), &ba, &allSpans) defer batch.Close() if pErr != nil { t.Fatal(pErr) @@ -8761,7 +8761,7 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) { exLease, _ := repl.GetLease() ch, _, _, pErr := repl.evalAndPropose( - context.Background(), exLease, ba, nil /* endCmds */, &allSpans, + context.Background(), exLease, &ba, nil /* endCmds */, &allSpans, ) if pErr != nil { t.Fatal(pErr) @@ -8808,7 +8808,7 @@ func TestProposeWithAsyncConsensus(t *testing.T) { atomic.StoreInt32(&filterActive, 1) exLease, _ := repl.GetLease() ch, _, _, pErr := repl.evalAndPropose( - context.Background(), exLease, ba, nil /* endCmds */, &allSpans, + context.Background(), exLease, &ba, nil /* endCmds */, &allSpans, ) if pErr != nil { t.Fatal(pErr) @@ -8872,7 +8872,7 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) { atomic.StoreInt32(&filterActive, 1) exLease, _ := repl.GetLease() - _, _, _, pErr := repl.evalAndPropose(ctx, exLease, ba, nil /* endCmds */, &allSpans) + _, _, _, pErr := repl.evalAndPropose(ctx, exLease, &ba, nil /* endCmds */, &allSpans) if pErr != nil { t.Fatal(pErr) } @@ -8890,7 +8890,7 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) { ba2.Timestamp = tc.Clock().Now() var pErr *roachpb.Error - ch, _, _, pErr = repl.evalAndPropose(ctx, exLease, ba, nil /* endCmds */, &allSpans) + ch, _, _, pErr = repl.evalAndPropose(ctx, exLease, &ba, nil /* endCmds */, &allSpans) if pErr != nil { t.Fatal(pErr) } diff --git a/pkg/storage/replica_write.go b/pkg/storage/replica_write.go index ac6a1a0482a3..aadb6927088f 100644 --- a/pkg/storage/replica_write.go +++ b/pkg/storage/replica_write.go @@ -60,7 +60,7 @@ import ( // as this method makes the assumption that it operates on a shallow copy (see // call to applyTimestampCache). func (r *Replica) executeWriteBatch( - ctx context.Context, ba roachpb.BatchRequest, + ctx context.Context, ba *roachpb.BatchRequest, ) (br *roachpb.BatchResponse, pErr *roachpb.Error) { startTime := timeutil.Now() @@ -74,7 +74,7 @@ func (r *Replica) executeWriteBatch( return nil, roachpb.NewError(err) } - spans, err := r.collectSpans(&ba) + spans, err := r.collectSpans(ba) if err != nil { return nil, roachpb.NewError(err) } @@ -87,7 +87,7 @@ func (r *Replica) executeWriteBatch( // after preceding commands have been run to successful completion. log.Event(ctx, "acquire latches") var err error - endCmds, err = r.beginCmds(ctx, &ba, spans) + endCmds, err = r.beginCmds(ctx, ba, spans) if err != nil { return nil, roachpb.NewError(err) } @@ -97,7 +97,7 @@ func (r *Replica) executeWriteBatch( // wrapped to delay pErr evaluation to its value when returning. defer func() { if endCmds != nil { - endCmds.done(br, pErr) + endCmds.done(ba, br, pErr) } }() @@ -114,7 +114,7 @@ func (r *Replica) executeWriteBatch( } lease = status.Lease } - r.limitTxnMaxTimestamp(ctx, &ba, status) + r.limitTxnMaxTimestamp(ctx, ba, status) minTS, untrack := r.store.cfg.ClosedTimestamp.Tracker.Track(ctx) defer untrack(ctx, 0, 0, 0) // covers all error returns below @@ -123,7 +123,7 @@ func (r *Replica) executeWriteBatch( // commands which require this command to move its timestamp // forward. Or, in the case of a transactional write, the txn // timestamp and possible write-too-old bool. - if bumped, pErr := r.applyTimestampCache(ctx, &ba, minTS); pErr != nil { + if bumped, pErr := r.applyTimestampCache(ctx, ba, minTS); pErr != nil { return nil, pErr } else if bumped { // If we bump the transaction's timestamp, we must absolutely @@ -258,7 +258,10 @@ and the following Raft status: %+v`, // re-executed in full. This allows it to lay down intents and return // an appropriate retryable error. func (r *Replica) evaluateWriteBatch( - ctx context.Context, idKey storagebase.CmdIDKey, ba roachpb.BatchRequest, spans *spanset.SpanSet, + ctx context.Context, + idKey storagebase.CmdIDKey, + ba *roachpb.BatchRequest, + spans *spanset.SpanSet, ) (engine.Batch, enginepb.MVCCStats, *roachpb.BatchResponse, result.Result, *roachpb.Error) { ms := enginepb.MVCCStats{} // If not transactional or there are indications that the batch's txn will @@ -270,7 +273,7 @@ func (r *Replica) evaluateWriteBatch( // Try executing with transaction stripped. We use the transaction timestamp // to write any values as it may have been advanced by the timestamp cache. - strippedBa := ba + strippedBa := *ba strippedBa.Timestamp = strippedBa.Txn.Timestamp strippedBa.Txn = nil if hasBegin { @@ -287,7 +290,7 @@ func (r *Replica) evaluateWriteBatch( // If all writes occurred at the intended timestamp, we've succeeded on the fast path. rec := NewReplicaEvalContext(r, spans) batch, br, res, pErr := r.evaluateWriteBatchWithLocalRetries( - ctx, idKey, rec, &ms, strippedBa, spans, retryLocally, + ctx, idKey, rec, &ms, &strippedBa, spans, retryLocally, ) if pErr == nil && (ba.Timestamp == br.Timestamp || (retryLocally && !batcheval.IsEndTransactionExceedingDeadline(br.Timestamp, etArg))) { @@ -362,7 +365,7 @@ func (r *Replica) evaluateWriteBatchWithLocalRetries( idKey storagebase.CmdIDKey, rec batcheval.EvalContext, ms *enginepb.MVCCStats, - ba roachpb.BatchRequest, + ba *roachpb.BatchRequest, spans *spanset.SpanSet, canRetry bool, ) (batch engine.Batch, br *roachpb.BatchResponse, res result.Result, pErr *roachpb.Error) { @@ -440,7 +443,7 @@ func (r *Replica) evaluateWriteBatchWithLocalRetries( // serializable and the commit timestamp has been forwarded, or (3) the // transaction exceeded its deadline, or (4) the testing knobs disallow optional // one phase commits and the BatchRequest does not require one phase commit. -func isOnePhaseCommit(ba roachpb.BatchRequest, knobs *StoreTestingKnobs) bool { +func isOnePhaseCommit(ba *roachpb.BatchRequest, knobs *StoreTestingKnobs) bool { if ba.Txn == nil { return false } @@ -468,7 +471,7 @@ func isOnePhaseCommit(ba roachpb.BatchRequest, knobs *StoreTestingKnobs) bool { // entirely. This is possible if the function removes all of the in-flight // writes from an EndTransaction request that was committing in parallel with // writes which all happened to be on the same range as the transaction record. -func maybeStripInFlightWrites(ba roachpb.BatchRequest) (roachpb.BatchRequest, error) { +func maybeStripInFlightWrites(ba *roachpb.BatchRequest) (*roachpb.BatchRequest, error) { args, hasET := ba.GetArg(roachpb.EndTransaction) if !hasET { return ba, nil diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 9782ef687fe6..60fca73d95fd 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -727,7 +727,7 @@ func TestStoreRemoveReplicaDestroy(t *testing.T) { } if _, _, _, pErr := repl1.evalAndPropose( - context.Background(), lease, roachpb.BatchRequest{}, nil, &allSpans, + context.Background(), lease, &roachpb.BatchRequest{}, nil, &allSpans, ); !pErr.Equal(expErr) { t.Fatalf("expected error %s, but got %v", expErr, pErr) }