diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 10d3fd5486ca..0b95f8fe99ee 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -11,10 +11,10 @@ package storage import ( - "bytes" "context" "fmt" "reflect" + "strings" "sync/atomic" "time" "unsafe" @@ -116,7 +116,7 @@ type atomicDescString struct { // store atomically updates d.strPtr with the string representation of desc. func (d *atomicDescString) store(replicaID roachpb.ReplicaID, desc *roachpb.RangeDescriptor) { - var buf bytes.Buffer + var buf strings.Builder fmt.Fprintf(&buf, "%d/", desc.RangeID) if replicaID == 0 { fmt.Fprintf(&buf, "?:") @@ -1027,9 +1027,24 @@ type endCmds struct { lg *spanlatch.Guard } +// move moves the endCmds into the return value, clearing and making +// a call to done on the receiver a no-op. +func (ec *endCmds) move() endCmds { + res := *ec + *ec = endCmds{} + return res +} + // done releases the latches acquired by the command and updates // the timestamp cache using the final timestamp of each command. +// +// No-op if the receiver has been zeroed out by a call to move. func (ec *endCmds) done(ba *roachpb.BatchRequest, br *roachpb.BatchResponse, pErr *roachpb.Error) { + if ec.repl == nil { + // The endCmds were cleared. + return + } + // Update the timestamp cache if the request is not being re-evaluated. Each // request is considered in turn; only those marked as affecting the cache are // processed. Inconsistent reads are excluded. @@ -1102,14 +1117,14 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro // the supplied error. func (r *Replica) beginCmds( ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet, -) (*endCmds, error) { +) (endCmds, error) { // Only acquire latches for consistent operations. var lg *spanlatch.Guard if ba.ReadConsistency == roachpb.CONSISTENT { // Check for context cancellation before acquiring latches. if err := ctx.Err(); err != nil { log.VEventf(ctx, 2, "%s before acquiring latches: %s", err, ba.Summary()) - return nil, errors.Wrap(err, "aborted before acquiring latches") + return endCmds{}, errors.Wrap(err, "aborted before acquiring latches") } var beforeLatch time.Time @@ -1123,7 +1138,7 @@ func (r *Replica) beginCmds( var err error lg, err = r.latchMgr.Acquire(ctx, spans, ba.Timestamp) if err != nil { - return nil, err + return endCmds{}, err } if !beforeLatch.IsZero() { @@ -1134,7 +1149,7 @@ func (r *Replica) beginCmds( if filter := r.store.cfg.TestingKnobs.TestingLatchFilter; filter != nil { if pErr := filter(*ba); pErr != nil { r.latchMgr.Release(lg) - return nil, pErr.GoError() + return endCmds{}, pErr.GoError() } } @@ -1183,7 +1198,7 @@ func (r *Replica) beginCmds( // mergeCompleteCh closes. See #27442 for the full context. log.Event(ctx, "waiting on in-progress merge") r.latchMgr.Release(lg) - return nil, &roachpb.MergeInProgressError{} + return endCmds{}, &roachpb.MergeInProgressError{} } } else { log.Event(ctx, "operation accepts inconsistent results") @@ -1199,7 +1214,7 @@ func (r *Replica) beginCmds( } } - ec := &endCmds{ + ec := endCmds{ repl: r, lg: lg, } diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index de3b8aeb9e35..3ff7e608700f 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -83,9 +83,9 @@ type ProposalData struct { // tmpFooter is used to avoid an allocation. tmpFooter storagepb.RaftCommandFooter - // endCmds.finish is called after command execution to update the - // timestamp cache & release latches. - endCmds *endCmds + // ec.done is called after command application to update the timestamp + // cache and release latches. + ec endCmds // doneCh is used to signal the waiting RPC handler (the contents of // proposalResult come from LocalEvalResult). @@ -122,10 +122,7 @@ type ProposalData struct { // is canceled, it won't be listening to this done channel, and so it can't be // counted on to invoke endCmds itself.) func (proposal *ProposalData) finishApplication(pr proposalResult) { - if proposal.endCmds != nil { - proposal.endCmds.done(proposal.Request, pr.Reply, pr.Err) - proposal.endCmds = nil - } + proposal.ec.done(proposal.Request, pr.Reply, pr.Err) if proposal.sp != nil { tracing.FinishSpan(proposal.sp) } @@ -740,11 +737,7 @@ func (r *Replica) evaluateProposal( // on a non-nil *roachpb.Error and should be proposed through Raft // if ProposalData.command is non-nil. func (r *Replica) requestToProposal( - ctx context.Context, - idKey storagebase.CmdIDKey, - ba *roachpb.BatchRequest, - endCmds *endCmds, - spans *spanset.SpanSet, + ctx context.Context, idKey storagebase.CmdIDKey, ba *roachpb.BatchRequest, spans *spanset.SpanSet, ) (*ProposalData, *roachpb.Error) { res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, spans) @@ -752,7 +745,6 @@ func (r *Replica) requestToProposal( proposal := &ProposalData{ ctx: ctx, idKey: idKey, - endCmds: endCmds, doneCh: make(chan proposalResult, 1), Local: &res.Local, Request: ba, diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 74ac08f83230..c3bc51c7c424 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -45,10 +45,10 @@ func makeIDKey() storagebase.CmdIDKey { return storagebase.CmdIDKey(idKeyBuf) } -// evalAndPropose prepares the necessary pending command struct and initializes a -// client command ID if one hasn't been. A verified lease is supplied -// as a parameter if the command requires a lease; nil otherwise. It -// then evaluates the command and proposes it to Raft on success. +// evalAndPropose prepares the necessary pending command struct and initializes +// a client command ID if one hasn't been. A verified lease is supplied as a +// parameter if the command requires a lease; nil otherwise. It then evaluates +// the command and proposes it to Raft on success. // // Return values: // - a channel which receives a response or error upon application @@ -66,9 +66,17 @@ func (r *Replica) evalAndPropose( ctx context.Context, lease roachpb.Lease, ba *roachpb.BatchRequest, - endCmds *endCmds, spans *spanset.SpanSet, + ec endCmds, ) (_ chan proposalResult, _ func(), _ int64, pErr *roachpb.Error) { + // Guarantee we release the latches that we acquired if we never make + // it to passing responsibility to a proposal. This is wrapped to + // delay pErr evaluation to its value when returning. + defer func() { + // No-op if we move ec into proposal.ec. + ec.done(ba, nil /* br */, pErr) + }() + // TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed? r.mu.RLock() if !r.mu.destroyStatus.IsAlive() { @@ -105,9 +113,19 @@ func (r *Replica) evalAndPropose( } idKey := makeIDKey() - proposal, pErr := r.requestToProposal(ctx, idKey, ba, endCmds, spans) + proposal, pErr := r.requestToProposal(ctx, idKey, ba, spans) log.Event(proposal.ctx, "evaluated request") + // Attach the endCmds to the proposal. This moves responsibility of + // releasing latches to "below Raft" machinery. However, we make sure + // we clean up this resource if the proposal doesn't make it to Raft. + proposal.ec = ec.move() + defer func() { + if pErr != nil { + proposal.ec.done(ba, nil /* br */, pErr) + } + }() + // Pull out proposal channel to return. proposal.doneCh may be set to // nil if it is signaled in this function. proposalCh := proposal.doneCh diff --git a/pkg/storage/replica_read.go b/pkg/storage/replica_read.go index aecd5f1c1991..fd6c008847a6 100644 --- a/pkg/storage/replica_read.go +++ b/pkg/storage/replica_read.go @@ -50,7 +50,7 @@ func (r *Replica) executeReadOnlyBatch( // Acquire latches to prevent overlapping commands from executing // until this command completes. log.Event(ctx, "acquire latches") - endCmds, err := r.beginCmds(ctx, ba, spans) + ec, err := r.beginCmds(ctx, ba, spans) if err != nil { return nil, roachpb.NewError(err) } @@ -64,7 +64,7 @@ func (r *Replica) executeReadOnlyBatch( // timestamp cache update is synchronized. This is wrapped to delay // pErr evaluation to its value when returning. defer func() { - endCmds.done(ba, br, pErr) + ec.done(ba, br, pErr) }() // TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed? diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 688ae7795d61..4901063b0138 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -603,7 +603,7 @@ func sendLeaseRequest(r *Replica, l *roachpb.Lease) error { ba.Timestamp = r.store.Clock().Now() ba.Add(&roachpb.RequestLeaseRequest{Lease: *l}) exLease, _ := r.GetLease() - ch, _, _, pErr := r.evalAndPropose(context.TODO(), exLease, &ba, nil, &allSpans) + ch, _, _, pErr := r.evalAndPropose(context.TODO(), exLease, &ba, &allSpans, endCmds{}) if pErr == nil { // Next if the command was committed, wait for the range to apply it. // TODO(bdarnell): refactor this to a more conventional error-handling pattern. @@ -1384,7 +1384,7 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) { ba := roachpb.BatchRequest{} ba.Timestamp = tc.repl.store.Clock().Now() ba.Add(&roachpb.RequestLeaseRequest{Lease: *lease}) - ch, _, _, pErr := tc.repl.evalAndPropose(context.Background(), exLease, &ba, nil, &allSpans) + ch, _, _, pErr := tc.repl.evalAndPropose(context.Background(), exLease, &ba, &allSpans, endCmds{}) if pErr == nil { // Next if the command was committed, wait for the range to apply it. // TODO(bdarnell): refactor to a more conventional error-handling pattern. @@ -7242,7 +7242,7 @@ func TestReplicaIDChangePending(t *testing.T) { }, Value: roachpb.MakeValueFromBytes([]byte("val")), }) - _, _, _, err := repl.evalAndPropose(context.Background(), lease, &ba, nil, &allSpans) + _, _, _, err := repl.evalAndPropose(context.Background(), lease, &ba, &allSpans, endCmds{}) if err != nil { t.Fatal(err) } @@ -7448,7 +7448,7 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) { Key: roachpb.Key(fmt.Sprintf("k%d", i)), }, }) - ch, _, idx, err := repl.evalAndPropose(ctx, lease, &ba, nil, &allSpans) + ch, _, idx, err := repl.evalAndPropose(ctx, lease, &ba, &allSpans, endCmds{}) if err != nil { t.Fatal(err) } @@ -7514,7 +7514,7 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { Key: roachpb.Key(fmt.Sprintf("k%d", i)), }, }) - ch, _, _, err := tc.repl.evalAndPropose(ctx, lease, &ba, nil, &allSpans) + ch, _, _, err := tc.repl.evalAndPropose(ctx, lease, &ba, &allSpans, endCmds{}) if err != nil { t.Fatal(err) } @@ -7639,7 +7639,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(id)}}) lease, _ := r.GetLease() ctx := context.Background() - cmd, pErr := r.requestToProposal(ctx, storagebase.CmdIDKey(id), &ba, nil, &allSpans) + cmd, pErr := r.requestToProposal(ctx, storagebase.CmdIDKey(id), &ba, &allSpans) if pErr != nil { t.Fatal(pErr) } @@ -7765,7 +7765,7 @@ func TestReplicaRefreshMultiple(t *testing.T) { incCmdID = makeIDKey() atomic.StoreInt32(&filterActive, 1) - proposal, pErr := repl.requestToProposal(ctx, incCmdID, &ba, nil, &allSpans) + proposal, pErr := repl.requestToProposal(ctx, incCmdID, &ba, &allSpans) if pErr != nil { t.Fatal(pErr) } @@ -8761,7 +8761,7 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) { exLease, _ := repl.GetLease() ch, _, _, pErr := repl.evalAndPropose( - context.Background(), exLease, &ba, nil /* endCmds */, &allSpans, + context.Background(), exLease, &ba, &allSpans, endCmds{}, ) if pErr != nil { t.Fatal(pErr) @@ -8808,7 +8808,7 @@ func TestProposeWithAsyncConsensus(t *testing.T) { atomic.StoreInt32(&filterActive, 1) exLease, _ := repl.GetLease() ch, _, _, pErr := repl.evalAndPropose( - context.Background(), exLease, &ba, nil /* endCmds */, &allSpans, + context.Background(), exLease, &ba, &allSpans, endCmds{}, ) if pErr != nil { t.Fatal(pErr) @@ -8872,7 +8872,7 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) { atomic.StoreInt32(&filterActive, 1) exLease, _ := repl.GetLease() - _, _, _, pErr := repl.evalAndPropose(ctx, exLease, &ba, nil /* endCmds */, &allSpans) + _, _, _, pErr := repl.evalAndPropose(ctx, exLease, &ba, &allSpans, endCmds{}) if pErr != nil { t.Fatal(pErr) } @@ -8890,7 +8890,7 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) { ba2.Timestamp = tc.Clock().Now() var pErr *roachpb.Error - ch, _, _, pErr = repl.evalAndPropose(ctx, exLease, &ba, nil /* endCmds */, &allSpans) + ch, _, _, pErr = repl.evalAndPropose(ctx, exLease, &ba, &allSpans, endCmds{}) if pErr != nil { t.Fatal(pErr) } diff --git a/pkg/storage/replica_write.go b/pkg/storage/replica_write.go index aadb6927088f..80690c6441b0 100644 --- a/pkg/storage/replica_write.go +++ b/pkg/storage/replica_write.go @@ -79,7 +79,7 @@ func (r *Replica) executeWriteBatch( return nil, roachpb.NewError(err) } - var endCmds *endCmds + var ec endCmds if !ba.IsLeaseRequest() { // Acquire latches to prevent overlapping commands from executing until // this command completes. Note that this must be done before getting @@ -87,18 +87,18 @@ func (r *Replica) executeWriteBatch( // after preceding commands have been run to successful completion. log.Event(ctx, "acquire latches") var err error - endCmds, err = r.beginCmds(ctx, ba, spans) + ec, err = r.beginCmds(ctx, ba, spans) if err != nil { return nil, roachpb.NewError(err) } } - // Guarantee we release the latches that we just acquired. This is - // wrapped to delay pErr evaluation to its value when returning. + // Guarantee we release the latches that we just acquired if we never make + // it to passing responsibility to evalAndPropose. This is wrapped to delay + // pErr evaluation to its value when returning. defer func() { - if endCmds != nil { - endCmds.done(ba, br, pErr) - } + // No-op if we move ec into evalAndPropose. + ec.done(ba, br, pErr) }() var lease roachpb.Lease @@ -146,7 +146,9 @@ func (r *Replica) executeWriteBatch( log.Event(ctx, "applied timestamp cache") - ch, abandon, maxLeaseIndex, pErr := r.evalAndPropose(ctx, lease, ba, endCmds, spans) + // After the command is proposed to Raft, invoking endCmds.done is the + // responsibility of Raft, so move the endCmds into evalAndPropose. + ch, abandon, maxLeaseIndex, pErr := r.evalAndPropose(ctx, lease, ba, spans, ec.move()) if pErr != nil { if maxLeaseIndex != 0 { log.Fatalf( @@ -164,10 +166,6 @@ func (r *Replica) executeWriteBatch( untrack(ctx, ctpb.Epoch(lease.Epoch), r.RangeID, ctpb.LAI(maxLeaseIndex)) } - // After the command is proposed to Raft, invoking endCmds.done is now the - // responsibility of processRaftCommand. - endCmds = nil - // If the command was accepted by raft, wait for the range to apply it. ctxDone := ctx.Done() shouldQuiesce := r.store.stopper.ShouldQuiesce() @@ -258,10 +256,7 @@ and the following Raft status: %+v`, // re-executed in full. This allows it to lay down intents and return // an appropriate retryable error. func (r *Replica) evaluateWriteBatch( - ctx context.Context, - idKey storagebase.CmdIDKey, - ba *roachpb.BatchRequest, - spans *spanset.SpanSet, + ctx context.Context, idKey storagebase.CmdIDKey, ba *roachpb.BatchRequest, spans *spanset.SpanSet, ) (engine.Batch, enginepb.MVCCStats, *roachpb.BatchResponse, result.Result, *roachpb.Error) { ms := enginepb.MVCCStats{} // If not transactional or there are indications that the batch's txn will diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 60fca73d95fd..51e248c7d5b3 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -727,7 +727,7 @@ func TestStoreRemoveReplicaDestroy(t *testing.T) { } if _, _, _, pErr := repl1.evalAndPropose( - context.Background(), lease, &roachpb.BatchRequest{}, nil, &allSpans, + context.Background(), lease, &roachpb.BatchRequest{}, &allSpans, endCmds{}, ); !pErr.Equal(expErr) { t.Fatalf("expected error %s, but got %v", expErr, pErr) }