Skip to content

Commit

Permalink
storage: avoid passing BatchRequest by value through storage stack
Browse files Browse the repository at this point in the history
This commit avoids the repeated passing of BatchRequest values all the
way down the storage stack. This is fairly expensive, as most of the
layers allowed the request to escape to the heap for one reason or
another. Instead of permitting these repeat allocations, we perform
a single alloc in Replica.sendWithRangeID.

This also removes the BatchRequest from endCmds. This was the original
intention of this commit.

Release note: None
  • Loading branch information
nvanbenschoten committed Jul 15, 2019
1 parent a18b131 commit c6e7765
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 86 deletions.
20 changes: 9 additions & 11 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ func (r *Replica) sendWithRangeID(
useRaft = true
}

if err := r.checkBatchRequest(ba, isReadOnly); err != nil {
if err := r.checkBatchRequest(&ba, isReadOnly); err != nil {
return nil, roachpb.NewError(err)
}

Expand All @@ -533,13 +533,13 @@ func (r *Replica) sendWithRangeID(
var pErr *roachpb.Error
if useRaft {
log.Event(ctx, "read-write path")
br, pErr = r.executeWriteBatch(ctx, ba)
br, pErr = r.executeWriteBatch(ctx, &ba)
} else if isReadOnly {
log.Event(ctx, "read-only path")
br, pErr = r.executeReadOnlyBatch(ctx, ba)
br, pErr = r.executeReadOnlyBatch(ctx, &ba)
} else if ba.IsAdmin() {
log.Event(ctx, "admin path")
br, pErr = r.executeAdminBatch(ctx, ba)
br, pErr = r.executeAdminBatch(ctx, &ba)
} else if len(ba.Requests) == 0 {
// empty batch; shouldn't happen (we could handle it, but it hints
// at someone doing weird things, and once we drop the key range
Expand Down Expand Up @@ -1000,7 +1000,7 @@ func (r *Replica) requestCanProceed(rspan roachpb.RSpan, ts hlc.Timestamp) error
//
// TODO(tschottdorf): should check that request is contained in range
// and that EndTransaction only occurs at the very end.
func (r *Replica) checkBatchRequest(ba roachpb.BatchRequest, isReadOnly bool) error {
func (r *Replica) checkBatchRequest(ba *roachpb.BatchRequest, isReadOnly bool) error {
if ba.Timestamp == (hlc.Timestamp{}) {
// For transactional requests, Store.Send sets the timestamp. For non-
// transactional requests, the client sets the timestamp. Either way, we
Expand All @@ -1025,17 +1025,16 @@ func (r *Replica) checkBatchRequest(ba roachpb.BatchRequest, isReadOnly bool) er
type endCmds struct {
repl *Replica
lg *spanlatch.Guard
ba roachpb.BatchRequest
}

// done releases the latches acquired by the command and updates
// the timestamp cache using the final timestamp of each command.
func (ec *endCmds) done(br *roachpb.BatchResponse, pErr *roachpb.Error) {
func (ec *endCmds) done(ba *roachpb.BatchRequest, br *roachpb.BatchResponse, pErr *roachpb.Error) {
// Update the timestamp cache if the request is not being re-evaluated. Each
// request is considered in turn; only those marked as affecting the cache are
// processed. Inconsistent reads are excluded.
if ec.ba.ReadConsistency == roachpb.CONSISTENT {
ec.repl.updateTimestampCache(&ec.ba, br, pErr)
if ba.ReadConsistency == roachpb.CONSISTENT {
ec.repl.updateTimestampCache(ba, br, pErr)
}

// Release the latches acquired by the request back to the spanlatch
Expand Down Expand Up @@ -1203,7 +1202,6 @@ func (r *Replica) beginCmds(
ec := &endCmds{
repl: r,
lg: lg,
ba: *ba,
}
return ec, nil
}
Expand All @@ -1214,7 +1212,7 @@ func (r *Replica) beginCmds(
// Admin commands must run on the lease holder replica. Batch support here is
// limited to single-element batches; everything else catches an error.
func (r *Replica) executeAdminBatch(
ctx context.Context, ba roachpb.BatchRequest,
ctx context.Context, ba *roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
if len(ba.Requests) != 1 {
return nil, roachpb.NewErrorf("only single-element admin batches allowed")
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/replica_backpressure.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var backpressurableSpans = []roachpb.Span{

// canBackpressureBatch returns whether the provided BatchRequest is eligible
// for backpressure.
func canBackpressureBatch(ba roachpb.BatchRequest) bool {
func canBackpressureBatch(ba *roachpb.BatchRequest) bool {
// Don't backpressure splits themselves.
if ba.Txn != nil && ba.Txn.Name == splitTxnName {
return false
Expand Down Expand Up @@ -92,7 +92,7 @@ func (r *Replica) shouldBackpressureWrites() bool {

// maybeBackpressureWriteBatch blocks to apply backpressure if the replica
// deems that backpressure is necessary.
func (r *Replica) maybeBackpressureWriteBatch(ctx context.Context, ba roachpb.BatchRequest) error {
func (r *Replica) maybeBackpressureWriteBatch(ctx context.Context, ba *roachpb.BatchRequest) error {
if !canBackpressureBatch(ba) {
return nil
}
Expand Down
59 changes: 31 additions & 28 deletions pkg/storage/replica_evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,27 +141,30 @@ func evaluateBatch(
batch engine.ReadWriter,
rec batcheval.EvalContext,
ms *enginepb.MVCCStats,
ba roachpb.BatchRequest,
ba *roachpb.BatchRequest,
readOnly bool,
) (*roachpb.BatchResponse, result.Result, *roachpb.Error) {
// NB: Don't mutate BatchRequest directly.
baReqs := ba.Requests
baHeader := ba.Header
br := ba.CreateReply()

maxKeys := int64(math.MaxInt64)
if ba.Header.MaxSpanRequestKeys != 0 {
if baHeader.MaxSpanRequestKeys != 0 {
// We have a batch of requests with a limit. We keep track of how many
// remaining keys we can touch.
maxKeys = ba.Header.MaxSpanRequestKeys
maxKeys = baHeader.MaxSpanRequestKeys
}

// Optimize any contiguous sequences of put and conditional put ops.
if len(ba.Requests) >= optimizePutThreshold && !readOnly {
ba.Requests = optimizePuts(batch, ba.Requests, ba.Header.DistinctSpans)
if len(baReqs) >= optimizePutThreshold && !readOnly {
baReqs = optimizePuts(batch, baReqs, baHeader.DistinctSpans)
}

// Create a clone of the transaction to store the new txn state produced on
// the return/error path.
if ba.Txn != nil {
ba.Txn = ba.Txn.Clone()
if baHeader.Txn != nil {
baHeader.Txn = baHeader.Txn.Clone()

// Check whether this transaction has been aborted, if applicable.
// This applies to writes that leave intents (the use of the
Expand All @@ -171,7 +174,7 @@ func evaluateBatch(
// on reads). Note that 1PC transactions have had their
// transaction field cleared by this point so we do not execute
// this check in that case.
if ba.IsTransactionWrite() || ba.Txn.IsWriting() {
if ba.IsTransactionWrite() || baHeader.Txn.IsWriting() {
// We don't check the abort span for a couple of special requests:
// - if the request is asking to abort the transaction, then don't check the
// AbortSpan; we don't want the request to be rejected if the transaction
Expand All @@ -181,9 +184,9 @@ func evaluateBatch(
// TODO(nvanbenschoten): Let's remove heartbeats from this whitelist when
// we rationalize the TODO in txnHeartbeater.heartbeat.
singleAbort := ba.IsSingleEndTransactionRequest() &&
!ba.Requests[0].GetInner().(*roachpb.EndTransactionRequest).Commit
!baReqs[0].GetInner().(*roachpb.EndTransactionRequest).Commit
if !singleAbort && !ba.IsSingleHeartbeatTxnRequest() {
if pErr := checkIfTxnAborted(ctx, rec, batch, *ba.Txn); pErr != nil {
if pErr := checkIfTxnAborted(ctx, rec, batch, *baHeader.Txn); pErr != nil {
return nil, result.Result{}, pErr
}
}
Expand Down Expand Up @@ -218,7 +221,7 @@ func evaluateBatch(
// retry when it is available.
//
// TODO(bdarnell): Plumb the SQL CanAutoRetry field through to
// !ba.Header.DeferWriteTooOldError.
// !baHeader.DeferWriteTooOldError.
//
// A more subtle heuristic is also possible: If we get a
// WriteTooOldError while writing to a key that we have already read
Expand All @@ -236,10 +239,10 @@ func evaluateBatch(
var writeTooOldErr *roachpb.Error
mustReturnWriteTooOldErr := false

for index, union := range ba.Requests {
for index, union := range baReqs {
// Execute the command.
args := union.GetInner()
if ba.Txn != nil {
if baHeader.Txn != nil {
// Sequence numbers used to be set on each BatchRequest instead of
// on each individual Request. This meant that all Requests in a
// BatchRequest shared the same sequence number, so a BatchIndex was
Expand All @@ -250,13 +253,13 @@ func evaluateBatch(
// Set the Request's sequence number on the TxnMeta for this
// request. Each request will set their own sequence number on
// the TxnMeta, which is stored as part of an intent.
ba.Txn.Sequence = seqNum
baHeader.Txn.Sequence = seqNum
}
}
// Note that responses are populated even when an error is returned.
// TODO(tschottdorf): Change that. IIRC there is nontrivial use of it currently.
reply := br.Responses[index].GetInner()
curResult, pErr := evaluateCommand(ctx, idKey, index, batch, rec, ms, ba.Header, maxKeys, args, reply)
curResult, pErr := evaluateCommand(ctx, idKey, index, batch, rec, ms, baHeader, maxKeys, args, reply)

if err := result.MergeAndDestroy(curResult); err != nil {
// TODO(tschottdorf): see whether we really need to pass nontrivial
Expand Down Expand Up @@ -296,9 +299,9 @@ func evaluateBatch(
mustReturnWriteTooOldErr = true
}

if ba.Txn != nil {
ba.Txn.Timestamp.Forward(tErr.ActualTimestamp)
ba.Txn.WriteTooOld = true
if baHeader.Txn != nil {
baHeader.Txn.Timestamp.Forward(tErr.ActualTimestamp)
baHeader.Txn.WriteTooOld = true
}

// Clear pErr; we're done processing it by having moved the
Expand All @@ -325,44 +328,44 @@ func evaluateBatch(
// accumulate updates to it.
// TODO(spencer,tschottdorf): need copy-on-write behavior for the
// updated batch transaction / timestamp.
if ba.Txn != nil {
if baHeader.Txn != nil {
if txn := reply.Header().Txn; txn != nil {
ba.Txn.Update(txn)
baHeader.Txn.Update(txn)
}
}
}

// If there was an EndTransaction in the batch that finalized the transaction,
// the WriteTooOld status has been fully processed and we can discard the error.
if ba.Txn != nil && ba.Txn.Status.IsFinalized() {
if baHeader.Txn != nil && baHeader.Txn.Status.IsFinalized() {
writeTooOldErr = nil
} else if ba.Txn == nil {
} else if baHeader.Txn == nil {
// Non-transactional requests are unable to defer WriteTooOldErrors
// because there is no where to defer them to.
mustReturnWriteTooOldErr = true
}

// If there's a write too old error, return now that we've found
// the high water timestamp for retries.
if writeTooOldErr != nil && (mustReturnWriteTooOldErr || !ba.Header.DeferWriteTooOldError) {
if writeTooOldErr != nil && (mustReturnWriteTooOldErr || !baHeader.DeferWriteTooOldError) {
return nil, result, writeTooOldErr
}

if ba.Txn != nil {
if baHeader.Txn != nil {
// If transactional, send out the final transaction entry with the reply.
br.Txn = ba.Txn
br.Txn = baHeader.Txn
// If the transaction committed, forward the response
// timestamp to the commit timestamp in case we were able to
// optimize and commit at a higher timestamp without higher-level
// retry (i.e. there were no refresh spans and the commit timestamp
// wasn't leaked).
if ba.Txn.Status == roachpb.COMMITTED {
br.Timestamp.Forward(ba.Txn.Timestamp)
if baHeader.Txn.Status == roachpb.COMMITTED {
br.Timestamp.Forward(baHeader.Txn.Timestamp)
}
}
// Always update the batch response timestamp field to the timestamp at
// which the batch executed.
br.Timestamp.Forward(ba.Timestamp)
br.Timestamp.Forward(baHeader.Timestamp)

return br, result, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica_follower_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var FollowerReadsEnabled = settings.RegisterBoolSetting(
// acquired, whether the read only batch can be served as a follower
// read despite the error.
func (r *Replica) canServeFollowerRead(
ctx context.Context, ba roachpb.BatchRequest, pErr *roachpb.Error,
ctx context.Context, ba *roachpb.BatchRequest, pErr *roachpb.Error,
) *roachpb.Error {
canServeFollowerRead := false
if lErr, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok &&
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/replica_gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (r *Replica) MaybeGossipNodeLiveness(ctx context.Context, span roachpb.Span
// Call evaluateBatch instead of Send to avoid reacquiring latches.
rec := NewReplicaEvalContext(r, todoSpanSet)
br, result, pErr :=
evaluateBatch(ctx, storagebase.CmdIDKey(""), r.store.Engine(), rec, nil, ba, true /* readOnly */)
evaluateBatch(ctx, storagebase.CmdIDKey(""), r.store.Engine(), rec, nil, &ba, true /* readOnly */)
if pErr != nil {
return errors.Wrapf(pErr.GoError(), "couldn't scan node liveness records in span %s", span)
}
Expand Down Expand Up @@ -175,7 +175,7 @@ func (r *Replica) loadSystemConfig(ctx context.Context) (*config.SystemConfigEnt
// Call evaluateBatch instead of Send to avoid reacquiring latches.
rec := NewReplicaEvalContext(r, todoSpanSet)
br, result, pErr := evaluateBatch(
ctx, storagebase.CmdIDKey(""), r.store.Engine(), rec, nil, ba, true, /* readOnly */
ctx, storagebase.CmdIDKey(""), r.store.Engine(), rec, nil, &ba, true, /* readOnly */
)
if pErr != nil {
return nil, pErr.GoError()
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ type ProposalData struct {
// counted on to invoke endCmds itself.)
func (proposal *ProposalData) finishApplication(pr proposalResult) {
if proposal.endCmds != nil {
proposal.endCmds.done(pr.Reply, pr.Err)
proposal.endCmds.done(proposal.Request, pr.Reply, pr.Err)
proposal.endCmds = nil
}
if proposal.sp != nil {
Expand Down Expand Up @@ -636,7 +636,7 @@ type proposalResult struct {
//
// Replica.mu must not be held.
func (r *Replica) evaluateProposal(
ctx context.Context, idKey storagebase.CmdIDKey, ba roachpb.BatchRequest, spans *spanset.SpanSet,
ctx context.Context, idKey storagebase.CmdIDKey, ba *roachpb.BatchRequest, spans *spanset.SpanSet,
) (*result.Result, bool, *roachpb.Error) {
if ba.Timestamp == (hlc.Timestamp{}) {
return nil, false, roachpb.NewErrorf("can't propose Raft command with zero timestamp")
Expand Down Expand Up @@ -742,7 +742,7 @@ func (r *Replica) evaluateProposal(
func (r *Replica) requestToProposal(
ctx context.Context,
idKey storagebase.CmdIDKey,
ba roachpb.BatchRequest,
ba *roachpb.BatchRequest,
endCmds *endCmds,
spans *spanset.SpanSet,
) (*ProposalData, *roachpb.Error) {
Expand All @@ -755,7 +755,7 @@ func (r *Replica) requestToProposal(
endCmds: endCmds,
doneCh: make(chan proposalResult, 1),
Local: &res.Local,
Request: &ba,
Request: ba,
}

if needConsensus {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func makeIDKey() storagebase.CmdIDKey {
func (r *Replica) evalAndPropose(
ctx context.Context,
lease roachpb.Lease,
ba roachpb.BatchRequest,
ba *roachpb.BatchRequest,
endCmds *endCmds,
spans *spanset.SpanSet,
) (_ chan proposalResult, _ func(), _ int64, pErr *roachpb.Error) {
Expand Down Expand Up @@ -200,7 +200,7 @@ func (r *Replica) evalAndPropose(
Ctx: ctx,
Cmd: *proposal.command,
CmdID: idKey,
Req: ba,
Req: *ba,
}
if pErr := filter(filterArgs); pErr != nil {
return nil, nil, 0, pErr
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// overlapping writes currently processing through Raft ahead of us to
// clear via the latches.
func (r *Replica) executeReadOnlyBatch(
ctx context.Context, ba roachpb.BatchRequest,
ctx context.Context, ba *roachpb.BatchRequest,
) (br *roachpb.BatchResponse, pErr *roachpb.Error) {
// If the read is not inconsistent, the read requires the range lease or
// permission to serve via follower reads.
Expand All @@ -40,17 +40,17 @@ func (r *Replica) executeReadOnlyBatch(
r.store.metrics.FollowerReadsCount.Inc(1)
}
}
r.limitTxnMaxTimestamp(ctx, &ba, status)
r.limitTxnMaxTimestamp(ctx, ba, status)

spans, err := r.collectSpans(&ba)
spans, err := r.collectSpans(ba)
if err != nil {
return nil, roachpb.NewError(err)
}

// Acquire latches to prevent overlapping commands from executing
// until this command completes.
log.Event(ctx, "acquire latches")
endCmds, err := r.beginCmds(ctx, &ba, spans)
endCmds, err := r.beginCmds(ctx, ba, spans)
if err != nil {
return nil, roachpb.NewError(err)
}
Expand All @@ -64,7 +64,7 @@ func (r *Replica) executeReadOnlyBatch(
// timestamp cache update is synchronized. This is wrapped to delay
// pErr evaluation to its value when returning.
defer func() {
endCmds.done(br, pErr)
endCmds.done(ba, br, pErr)
}()

// TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed?
Expand Down
Loading

0 comments on commit c6e7765

Please sign in to comment.