Skip to content

Commit

Permalink
storage: pass endCmds by value instead of reference
Browse files Browse the repository at this point in the history
This was a wasted allocation. The commit avoids it by passing the type
by value and making its methods zero value-aware. It also cleans up
a bit of state management by introducing move semantics.

Release note: None
  • Loading branch information
nvanbenschoten committed Jul 15, 2019
1 parent c6e7765 commit ec6ed44
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 57 deletions.
31 changes: 23 additions & 8 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
package storage

import (
"bytes"
"context"
"fmt"
"reflect"
"strings"
"sync/atomic"
"time"
"unsafe"
Expand Down Expand Up @@ -116,7 +116,7 @@ type atomicDescString struct {

// store atomically updates d.strPtr with the string representation of desc.
func (d *atomicDescString) store(replicaID roachpb.ReplicaID, desc *roachpb.RangeDescriptor) {
var buf bytes.Buffer
var buf strings.Builder
fmt.Fprintf(&buf, "%d/", desc.RangeID)
if replicaID == 0 {
fmt.Fprintf(&buf, "?:")
Expand Down Expand Up @@ -1027,9 +1027,24 @@ type endCmds struct {
lg *spanlatch.Guard
}

// move moves the endCmds into the return value, clearing and making
// a call to done on the receiver a no-op.
func (ec *endCmds) move() endCmds {
res := *ec
*ec = endCmds{}
return res
}

// done releases the latches acquired by the command and updates
// the timestamp cache using the final timestamp of each command.
//
// No-op if the receiver has been zeroed out by a call to move.
func (ec *endCmds) done(ba *roachpb.BatchRequest, br *roachpb.BatchResponse, pErr *roachpb.Error) {
if ec.repl == nil {
// The endCmds were cleared.
return
}

// Update the timestamp cache if the request is not being re-evaluated. Each
// request is considered in turn; only those marked as affecting the cache are
// processed. Inconsistent reads are excluded.
Expand Down Expand Up @@ -1102,14 +1117,14 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro
// the supplied error.
func (r *Replica) beginCmds(
ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet,
) (*endCmds, error) {
) (endCmds, error) {
// Only acquire latches for consistent operations.
var lg *spanlatch.Guard
if ba.ReadConsistency == roachpb.CONSISTENT {
// Check for context cancellation before acquiring latches.
if err := ctx.Err(); err != nil {
log.VEventf(ctx, 2, "%s before acquiring latches: %s", err, ba.Summary())
return nil, errors.Wrap(err, "aborted before acquiring latches")
return endCmds{}, errors.Wrap(err, "aborted before acquiring latches")
}

var beforeLatch time.Time
Expand All @@ -1123,7 +1138,7 @@ func (r *Replica) beginCmds(
var err error
lg, err = r.latchMgr.Acquire(ctx, spans, ba.Timestamp)
if err != nil {
return nil, err
return endCmds{}, err
}

if !beforeLatch.IsZero() {
Expand All @@ -1134,7 +1149,7 @@ func (r *Replica) beginCmds(
if filter := r.store.cfg.TestingKnobs.TestingLatchFilter; filter != nil {
if pErr := filter(*ba); pErr != nil {
r.latchMgr.Release(lg)
return nil, pErr.GoError()
return endCmds{}, pErr.GoError()
}
}

Expand Down Expand Up @@ -1183,7 +1198,7 @@ func (r *Replica) beginCmds(
// mergeCompleteCh closes. See #27442 for the full context.
log.Event(ctx, "waiting on in-progress merge")
r.latchMgr.Release(lg)
return nil, &roachpb.MergeInProgressError{}
return endCmds{}, &roachpb.MergeInProgressError{}
}
} else {
log.Event(ctx, "operation accepts inconsistent results")
Expand All @@ -1199,7 +1214,7 @@ func (r *Replica) beginCmds(
}
}

ec := &endCmds{
ec := endCmds{
repl: r,
lg: lg,
}
Expand Down
18 changes: 5 additions & 13 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ type ProposalData struct {
// tmpFooter is used to avoid an allocation.
tmpFooter storagepb.RaftCommandFooter

// endCmds.finish is called after command execution to update the
// timestamp cache & release latches.
endCmds *endCmds
// ec.done is called after command application to update the timestamp
// cache and release latches.
ec endCmds

// doneCh is used to signal the waiting RPC handler (the contents of
// proposalResult come from LocalEvalResult).
Expand Down Expand Up @@ -122,10 +122,7 @@ type ProposalData struct {
// is canceled, it won't be listening to this done channel, and so it can't be
// counted on to invoke endCmds itself.)
func (proposal *ProposalData) finishApplication(pr proposalResult) {
if proposal.endCmds != nil {
proposal.endCmds.done(proposal.Request, pr.Reply, pr.Err)
proposal.endCmds = nil
}
proposal.ec.done(proposal.Request, pr.Reply, pr.Err)
if proposal.sp != nil {
tracing.FinishSpan(proposal.sp)
}
Expand Down Expand Up @@ -740,19 +737,14 @@ func (r *Replica) evaluateProposal(
// on a non-nil *roachpb.Error and should be proposed through Raft
// if ProposalData.command is non-nil.
func (r *Replica) requestToProposal(
ctx context.Context,
idKey storagebase.CmdIDKey,
ba *roachpb.BatchRequest,
endCmds *endCmds,
spans *spanset.SpanSet,
ctx context.Context, idKey storagebase.CmdIDKey, ba *roachpb.BatchRequest, spans *spanset.SpanSet,
) (*ProposalData, *roachpb.Error) {
res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, spans)

// Fill out the results even if pErr != nil; we'll return the error below.
proposal := &ProposalData{
ctx: ctx,
idKey: idKey,
endCmds: endCmds,
doneCh: make(chan proposalResult, 1),
Local: &res.Local,
Request: ba,
Expand Down
30 changes: 24 additions & 6 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ func makeIDKey() storagebase.CmdIDKey {
return storagebase.CmdIDKey(idKeyBuf)
}

// evalAndPropose prepares the necessary pending command struct and initializes a
// client command ID if one hasn't been. A verified lease is supplied
// as a parameter if the command requires a lease; nil otherwise. It
// then evaluates the command and proposes it to Raft on success.
// evalAndPropose prepares the necessary pending command struct and initializes
// a client command ID if one hasn't been. A verified lease is supplied as a
// parameter if the command requires a lease; nil otherwise. It then evaluates
// the command and proposes it to Raft on success.
//
// Return values:
// - a channel which receives a response or error upon application
Expand All @@ -66,9 +66,17 @@ func (r *Replica) evalAndPropose(
ctx context.Context,
lease roachpb.Lease,
ba *roachpb.BatchRequest,
endCmds *endCmds,
spans *spanset.SpanSet,
ec endCmds,
) (_ chan proposalResult, _ func(), _ int64, pErr *roachpb.Error) {
// Guarantee we release the latches that we acquired if we never make
// it to passing responsibility to a proposal. This is wrapped to
// delay pErr evaluation to its value when returning.
defer func() {
// No-op if we move ec into proposal.ec.
ec.done(ba, nil /* br */, pErr)
}()

// TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed?
r.mu.RLock()
if !r.mu.destroyStatus.IsAlive() {
Expand Down Expand Up @@ -105,9 +113,19 @@ func (r *Replica) evalAndPropose(
}

idKey := makeIDKey()
proposal, pErr := r.requestToProposal(ctx, idKey, ba, endCmds, spans)
proposal, pErr := r.requestToProposal(ctx, idKey, ba, spans)
log.Event(proposal.ctx, "evaluated request")

// Attach the endCmds to the proposal. This moves responsibility of
// releasing latches to "below Raft" machinery. However, we make sure
// we clean up this resource if the proposal doesn't make it to Raft.
proposal.ec = ec.move()
defer func() {
if pErr != nil {
proposal.ec.done(ba, nil /* br */, pErr)
}
}()

// Pull out proposal channel to return. proposal.doneCh may be set to
// nil if it is signaled in this function.
proposalCh := proposal.doneCh
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (r *Replica) executeReadOnlyBatch(
// Acquire latches to prevent overlapping commands from executing
// until this command completes.
log.Event(ctx, "acquire latches")
endCmds, err := r.beginCmds(ctx, ba, spans)
ec, err := r.beginCmds(ctx, ba, spans)
if err != nil {
return nil, roachpb.NewError(err)
}
Expand All @@ -64,7 +64,7 @@ func (r *Replica) executeReadOnlyBatch(
// timestamp cache update is synchronized. This is wrapped to delay
// pErr evaluation to its value when returning.
defer func() {
endCmds.done(ba, br, pErr)
ec.done(ba, br, pErr)
}()

// TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed?
Expand Down
22 changes: 11 additions & 11 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ func sendLeaseRequest(r *Replica, l *roachpb.Lease) error {
ba.Timestamp = r.store.Clock().Now()
ba.Add(&roachpb.RequestLeaseRequest{Lease: *l})
exLease, _ := r.GetLease()
ch, _, _, pErr := r.evalAndPropose(context.TODO(), exLease, &ba, nil, &allSpans)
ch, _, _, pErr := r.evalAndPropose(context.TODO(), exLease, &ba, &allSpans, endCmds{})
if pErr == nil {
// Next if the command was committed, wait for the range to apply it.
// TODO(bdarnell): refactor this to a more conventional error-handling pattern.
Expand Down Expand Up @@ -1384,7 +1384,7 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) {
ba := roachpb.BatchRequest{}
ba.Timestamp = tc.repl.store.Clock().Now()
ba.Add(&roachpb.RequestLeaseRequest{Lease: *lease})
ch, _, _, pErr := tc.repl.evalAndPropose(context.Background(), exLease, &ba, nil, &allSpans)
ch, _, _, pErr := tc.repl.evalAndPropose(context.Background(), exLease, &ba, &allSpans, endCmds{})
if pErr == nil {
// Next if the command was committed, wait for the range to apply it.
// TODO(bdarnell): refactor to a more conventional error-handling pattern.
Expand Down Expand Up @@ -7242,7 +7242,7 @@ func TestReplicaIDChangePending(t *testing.T) {
},
Value: roachpb.MakeValueFromBytes([]byte("val")),
})
_, _, _, err := repl.evalAndPropose(context.Background(), lease, &ba, nil, &allSpans)
_, _, _, err := repl.evalAndPropose(context.Background(), lease, &ba, &allSpans, endCmds{})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -7448,7 +7448,7 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) {
Key: roachpb.Key(fmt.Sprintf("k%d", i)),
},
})
ch, _, idx, err := repl.evalAndPropose(ctx, lease, &ba, nil, &allSpans)
ch, _, idx, err := repl.evalAndPropose(ctx, lease, &ba, &allSpans, endCmds{})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -7514,7 +7514,7 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) {
Key: roachpb.Key(fmt.Sprintf("k%d", i)),
},
})
ch, _, _, err := tc.repl.evalAndPropose(ctx, lease, &ba, nil, &allSpans)
ch, _, _, err := tc.repl.evalAndPropose(ctx, lease, &ba, &allSpans, endCmds{})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -7639,7 +7639,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) {
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(id)}})
lease, _ := r.GetLease()
ctx := context.Background()
cmd, pErr := r.requestToProposal(ctx, storagebase.CmdIDKey(id), &ba, nil, &allSpans)
cmd, pErr := r.requestToProposal(ctx, storagebase.CmdIDKey(id), &ba, &allSpans)
if pErr != nil {
t.Fatal(pErr)
}
Expand Down Expand Up @@ -7765,7 +7765,7 @@ func TestReplicaRefreshMultiple(t *testing.T) {

incCmdID = makeIDKey()
atomic.StoreInt32(&filterActive, 1)
proposal, pErr := repl.requestToProposal(ctx, incCmdID, &ba, nil, &allSpans)
proposal, pErr := repl.requestToProposal(ctx, incCmdID, &ba, &allSpans)
if pErr != nil {
t.Fatal(pErr)
}
Expand Down Expand Up @@ -8761,7 +8761,7 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) {

exLease, _ := repl.GetLease()
ch, _, _, pErr := repl.evalAndPropose(
context.Background(), exLease, &ba, nil /* endCmds */, &allSpans,
context.Background(), exLease, &ba, &allSpans, endCmds{},
)
if pErr != nil {
t.Fatal(pErr)
Expand Down Expand Up @@ -8808,7 +8808,7 @@ func TestProposeWithAsyncConsensus(t *testing.T) {
atomic.StoreInt32(&filterActive, 1)
exLease, _ := repl.GetLease()
ch, _, _, pErr := repl.evalAndPropose(
context.Background(), exLease, &ba, nil /* endCmds */, &allSpans,
context.Background(), exLease, &ba, &allSpans, endCmds{},
)
if pErr != nil {
t.Fatal(pErr)
Expand Down Expand Up @@ -8872,7 +8872,7 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) {

atomic.StoreInt32(&filterActive, 1)
exLease, _ := repl.GetLease()
_, _, _, pErr := repl.evalAndPropose(ctx, exLease, &ba, nil /* endCmds */, &allSpans)
_, _, _, pErr := repl.evalAndPropose(ctx, exLease, &ba, &allSpans, endCmds{})
if pErr != nil {
t.Fatal(pErr)
}
Expand All @@ -8890,7 +8890,7 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) {
ba2.Timestamp = tc.Clock().Now()

var pErr *roachpb.Error
ch, _, _, pErr = repl.evalAndPropose(ctx, exLease, &ba, nil /* endCmds */, &allSpans)
ch, _, _, pErr = repl.evalAndPropose(ctx, exLease, &ba, &allSpans, endCmds{})
if pErr != nil {
t.Fatal(pErr)
}
Expand Down
27 changes: 11 additions & 16 deletions pkg/storage/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,26 +79,26 @@ func (r *Replica) executeWriteBatch(
return nil, roachpb.NewError(err)
}

var endCmds *endCmds
var ec endCmds
if !ba.IsLeaseRequest() {
// Acquire latches to prevent overlapping commands from executing until
// this command completes. Note that this must be done before getting
// the max timestamp for the key(s), as timestamp cache is only updated
// after preceding commands have been run to successful completion.
log.Event(ctx, "acquire latches")
var err error
endCmds, err = r.beginCmds(ctx, ba, spans)
ec, err = r.beginCmds(ctx, ba, spans)
if err != nil {
return nil, roachpb.NewError(err)
}
}

// Guarantee we release the latches that we just acquired. This is
// wrapped to delay pErr evaluation to its value when returning.
// Guarantee we release the latches that we just acquired if we never make
// it to passing responsibility to evalAndPropose. This is wrapped to delay
// pErr evaluation to its value when returning.
defer func() {
if endCmds != nil {
endCmds.done(ba, br, pErr)
}
// No-op if we move ec into evalAndPropose.
ec.done(ba, br, pErr)
}()

var lease roachpb.Lease
Expand Down Expand Up @@ -146,7 +146,9 @@ func (r *Replica) executeWriteBatch(

log.Event(ctx, "applied timestamp cache")

ch, abandon, maxLeaseIndex, pErr := r.evalAndPropose(ctx, lease, ba, endCmds, spans)
// After the command is proposed to Raft, invoking endCmds.done is the
// responsibility of Raft, so move the endCmds into evalAndPropose.
ch, abandon, maxLeaseIndex, pErr := r.evalAndPropose(ctx, lease, ba, spans, ec.move())
if pErr != nil {
if maxLeaseIndex != 0 {
log.Fatalf(
Expand All @@ -164,10 +166,6 @@ func (r *Replica) executeWriteBatch(
untrack(ctx, ctpb.Epoch(lease.Epoch), r.RangeID, ctpb.LAI(maxLeaseIndex))
}

// After the command is proposed to Raft, invoking endCmds.done is now the
// responsibility of processRaftCommand.
endCmds = nil

// If the command was accepted by raft, wait for the range to apply it.
ctxDone := ctx.Done()
shouldQuiesce := r.store.stopper.ShouldQuiesce()
Expand Down Expand Up @@ -258,10 +256,7 @@ and the following Raft status: %+v`,
// re-executed in full. This allows it to lay down intents and return
// an appropriate retryable error.
func (r *Replica) evaluateWriteBatch(
ctx context.Context,
idKey storagebase.CmdIDKey,
ba *roachpb.BatchRequest,
spans *spanset.SpanSet,
ctx context.Context, idKey storagebase.CmdIDKey, ba *roachpb.BatchRequest, spans *spanset.SpanSet,
) (engine.Batch, enginepb.MVCCStats, *roachpb.BatchResponse, result.Result, *roachpb.Error) {
ms := enginepb.MVCCStats{}
// If not transactional or there are indications that the batch's txn will
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ func TestStoreRemoveReplicaDestroy(t *testing.T) {
}

if _, _, _, pErr := repl1.evalAndPropose(
context.Background(), lease, &roachpb.BatchRequest{}, nil, &allSpans,
context.Background(), lease, &roachpb.BatchRequest{}, &allSpans, endCmds{},
); !pErr.Equal(expErr) {
t.Fatalf("expected error %s, but got %v", expErr, pErr)
}
Expand Down

0 comments on commit ec6ed44

Please sign in to comment.