Skip to content

Commit

Permalink
Merge #26979
Browse files Browse the repository at this point in the history
26979: storage: simplify quota pool deallocation for failed proposals r=benesch a=nvanbenschoten

This change simplifies the process of releasing quota back into the
quota pool on failed proposals. It does so by pushing all quota
management into Raft instead of having the request's goroutine
try to determine whether it needs to release quota based on the
error in the proposal result. Quota management used to only be
handled by Raft for successful proposals.

In doing so, the change removes a case where quota could be double
freed: proposals that apply and then return an error. This isn't
particularly important though, because this pattern is never
used anymore.

Besides being a simplification, this is important for #26599 because
transaction pipelining depends on requests pushing proposals into
Raft and then returning immediately. In that model (asyncConsensus)
the request's goroutine is not around to release quota resources on
failed proposals.

Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Jun 29, 2018
2 parents c12a89c + 2850bae commit 3a60101
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 58 deletions.
96 changes: 50 additions & 46 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,15 +853,32 @@ func (r *Replica) destroyRaftMuLocked(

func (r *Replica) cancelPendingCommandsLocked() {
r.mu.AssertHeld()
pr := proposalResult{
Err: roachpb.NewError(roachpb.NewAmbiguousResultError("removing replica")),
ProposalRetry: proposalRangeNoLongerExists,
}
for _, p := range r.mu.proposals {
resp := proposalResult{
Reply: &roachpb.BatchResponse{},
Err: roachpb.NewError(roachpb.NewAmbiguousResultError("removing replica")),
ProposalRetry: proposalRangeNoLongerExists,
}
p.finishApplication(resp)
r.cleanupFailedProposalLocked(p)
p.finishApplication(pr)
}
}

// cleanupFailedProposalLocked cleans up after a proposal that has failed. It
// clears any references to the proposal and releases associated quota.
func (r *Replica) cleanupFailedProposalLocked(p *ProposalData) {
// Clear the proposal from the proposals map. May be a no-op if the
// proposal has not yet been inserted into the map.
delete(r.mu.proposals, p.idKey)
// Release associated quota pool resources if we have been tracking
// this command.
//
// NB: We may be double free-ing here in cases where proposals are
// duplicated. To counter this our quota pool is capped at the initial
// quota size.
if cmdSize, ok := r.mu.commandSizes[p.idKey]; ok {
r.mu.proposalQuota.add(int64(cmdSize))
delete(r.mu.commandSizes, p.idKey)
}
r.mu.proposals = map[storagebase.CmdIDKey]*ProposalData{}
}

// setTombstoneKey writes a tombstone to disk to ensure that replica IDs never
Expand Down Expand Up @@ -2931,17 +2948,7 @@ func (r *Replica) tryExecuteWriteBatch(

log.Event(ctx, "applied timestamp cache")

ch, tryAbandon, undoQuotaAcquisition, pErr := r.propose(ctx, lease, ba, endCmds, spans)
defer func() {
// NB: We may be double free-ing here, consider the following cases:
// - The request was evaluated and the command resulted in an error, but a
// proposal is still sent.
// - Proposals get duplicated.
// To counter this our quota pool is capped at the initial quota size.
if pErr != nil {
undoQuotaAcquisition()
}
}()
ch, tryAbandon, pErr := r.propose(ctx, lease, ba, endCmds, spans)
if pErr != nil {
return nil, pErr, proposalNoRetry
}
Expand Down Expand Up @@ -3242,35 +3249,33 @@ func (r *Replica) propose(
ba roachpb.BatchRequest,
endCmds *endCmds,
spans *spanset.SpanSet,
) (chan proposalResult, func() bool, func(), *roachpb.Error) {
noop := func() {}

) (_ chan proposalResult, _ func() bool, pErr *roachpb.Error) {
r.mu.Lock()
if !r.mu.destroyStatus.IsAlive() {
err := r.mu.destroyStatus.err
r.mu.Unlock()
return nil, nil, noop, roachpb.NewError(err)
return nil, nil, roachpb.NewError(err)
}
r.mu.Unlock()

rSpan, err := keys.Range(ba)
if err != nil {
return nil, nil, noop, roachpb.NewError(err)
return nil, nil, roachpb.NewError(err)
}

// Checking the context just before proposing can help avoid ambiguous errors.
if err := ctx.Err(); err != nil {
errStr := fmt.Sprintf("%s before proposing: %s", err, ba.Summary())
log.Warning(ctx, errStr)
return nil, nil, noop, roachpb.NewError(err)
return nil, nil, roachpb.NewError(err)
}

// Only need to check that the request is in bounds at proposal time,
// not at application time, because the command queue will synchronize
// all requests (notably EndTransaction with SplitTrigger) that may
// cause this condition to change.
if err := r.requestCanProceed(rSpan, ba.Timestamp); err != nil {
return nil, nil, noop, roachpb.NewError(err)
return nil, nil, roachpb.NewError(err)
}

idKey := makeIDKey()
Expand All @@ -3295,7 +3300,7 @@ func (r *Replica) propose(
EndTxns: endTxns,
}
proposal.finishApplication(pr)
return proposal.doneCh, func() bool { return false }, noop, nil
return proposal.doneCh, func() bool { return false }, nil
}

// TODO(irfansharif): This int cast indicates that if someone configures a
Expand All @@ -3306,14 +3311,14 @@ func (r *Replica) propose(
// Once a command is written to the raft log, it must be loaded
// into memory and replayed on all replicas. If a command is
// too big, stop it here.
return nil, nil, noop, roachpb.NewError(errors.Errorf(
return nil, nil, roachpb.NewError(errors.Errorf(
"command is too large: %d bytes (max: %d)",
proposalSize, MaxCommandSize.Get(&r.store.cfg.Settings.SV),
))
}

if err := r.maybeAcquireProposalQuota(ctx, int64(proposalSize)); err != nil {
return nil, nil, noop, roachpb.NewError(err)
return nil, nil, roachpb.NewError(err)
}

// submitProposalLocked calls withRaftGroupLocked which requires that
Expand All @@ -3335,26 +3340,27 @@ func (r *Replica) propose(
if r.mu.commandSizes != nil {
r.mu.commandSizes[proposal.idKey] = proposalSize
}
undoQuotaAcquisition := func() {
r.mu.Lock()
if r.mu.commandSizes != nil && r.mu.proposalQuota != nil {
delete(r.mu.commandSizes, proposal.idKey)
r.mu.proposalQuota.add(int64(proposalSize))
// Make sure we clean up the proposal if we fail to submit it successfully.
// This is important both to ensure that that the proposals map doesn't
// grow without bound and to ensure that we always release any quota that
// we acquire.
defer func() {
if pErr != nil {
r.cleanupFailedProposalLocked(proposal)
}
r.mu.Unlock()
}
}()

// NB: We need to check Replica.mu.destroyStatus again in case the Replica has
// been destroyed between the initial check at the beginning of this method
// and the acquisition of Replica.mu. Failure to do so will leave pending
// proposals that never get cleared.
if !r.mu.destroyStatus.IsAlive() {
return nil, nil, undoQuotaAcquisition, roachpb.NewError(r.mu.destroyStatus.err)
return nil, nil, roachpb.NewError(r.mu.destroyStatus.err)
}

repDesc, err := r.getReplicaDescriptorRLocked()
if err != nil {
return nil, nil, undoQuotaAcquisition, roachpb.NewError(err)
return nil, nil, roachpb.NewError(err)
}
r.insertProposalLocked(proposal, repDesc, lease)

Expand All @@ -3366,8 +3372,7 @@ func (r *Replica) propose(
Req: ba,
}
if pErr := filter(filterArgs); pErr != nil {
delete(r.mu.proposals, idKey)
return nil, nil, undoQuotaAcquisition, pErr
return nil, nil, pErr
}
}

Expand All @@ -3377,8 +3382,7 @@ func (r *Replica) propose(
// TODO(bdarnell): Handle ErrProposalDropped better.
// https://github.com/cockroachdb/cockroach/issues/21849
} else if err != nil {
delete(r.mu.proposals, proposal.idKey)
return nil, nil, undoQuotaAcquisition, roachpb.NewError(err)
return nil, nil, roachpb.NewError(err)
}
// Must not use `proposal` in the closure below as a proposal which is not
// present in r.mu.proposals is no longer protected by the mutex. Abandoning
Expand All @@ -3399,7 +3403,7 @@ func (r *Replica) propose(
r.mu.Unlock()
return ok
}
return proposal.doneCh, tryAbandon, undoQuotaAcquisition, nil
return proposal.doneCh, tryAbandon, nil
}

// submitProposalLocked proposes or re-proposes a command in r.mu.proposals.
Expand Down Expand Up @@ -4342,12 +4346,12 @@ func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftR

numShouldRetry := 0
var reproposals pendingCmdSlice
for idKey, p := range r.mu.proposals {
for _, p := range r.mu.proposals {
if p.command.MaxLeaseIndex == 0 {
// Commands without a MaxLeaseIndex cannot be reproposed, as they might
// apply twice. We also don't want to ask the proposer to retry these
// special commands.
delete(r.mu.proposals, idKey)
r.cleanupFailedProposalLocked(p)
log.VEventf(p.ctx, 2, "refresh (reason: %s) returning AmbiguousResultError for command "+
"without MaxLeaseIndex: %v", reason, p.command)
p.finishApplication(proposalResult{Err: roachpb.NewError(
Expand Down Expand Up @@ -4382,7 +4386,7 @@ func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftR
// little ahead), because a pending command is removed only as it
// applies. Thus we'd risk reproposing a command that has been committed
// but not yet applied.
delete(r.mu.proposals, idKey)
r.cleanupFailedProposalLocked(p)
log.Eventf(p.ctx, "retry proposal %x: %s", p.idKey, reason)
if reason == reasonSnapshotApplied {
p.finishApplication(proposalResult{ProposalRetry: proposalAmbiguousShouldBeReevaluated})
Expand Down Expand Up @@ -4437,7 +4441,7 @@ func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftR
// TODO(bdarnell): Handle ErrProposalDropped better.
// https://github.com/cockroachdb/cockroach/issues/21849
} else if err != nil {
delete(r.mu.proposals, p.idKey)
r.cleanupFailedProposalLocked(p)
p.finishApplication(proposalResult{Err: roachpb.NewError(err), ProposalRetry: proposalErrorReproposing})
}
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ type ProposalData struct {
Request *roachpb.BatchRequest
}

// finishApplication is when a command application has finished. This will be
// called downstream of Raft if the command required consensus, but can be
// called upstream of Raft if the command did not and was never proposed.
// proposal.doneCh is signaled with pr so that the proposer is unblocked.
// finishApplication is called when a command application has finished. The
// method will be called downstream of Raft if the command required consensus,
// but can be called upstream of Raft if the command did not and was never
// proposed. proposal.doneCh is signaled with pr so that the proposer is
// unblocked.
//
// It first invokes the endCmds function and then sends the specified
// proposalResult on the proposal's done channel. endCmds is invoked here in
Expand Down
14 changes: 7 additions & 7 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func sendLeaseRequest(r *Replica, l *roachpb.Lease) error {
ba.Timestamp = r.store.Clock().Now()
ba.Add(&roachpb.RequestLeaseRequest{Lease: *l})
exLease, _ := r.GetLease()
ch, _, _, pErr := r.propose(context.TODO(), exLease, ba, nil, &allSpans)
ch, _, pErr := r.propose(context.TODO(), exLease, ba, nil, &allSpans)
if pErr == nil {
// Next if the command was committed, wait for the range to apply it.
// TODO(bdarnell): refactor this to a more conventional error-handling pattern.
Expand Down Expand Up @@ -1236,7 +1236,7 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) {
ba := roachpb.BatchRequest{}
ba.Timestamp = tc.repl.store.Clock().Now()
ba.Add(&roachpb.RequestLeaseRequest{Lease: *lease})
ch, _, _, pErr := tc.repl.propose(context.Background(), exLease, ba, nil, &allSpans)
ch, _, pErr := tc.repl.propose(context.Background(), exLease, ba, nil, &allSpans)
if pErr == nil {
// Next if the command was committed, wait for the range to apply it.
// TODO(bdarnell): refactor to a more conventional error-handling pattern.
Expand Down Expand Up @@ -1968,7 +1968,7 @@ func TestLeaseConcurrent(t *testing.T) {
// When we complete the command, we have to remove it from the map;
// otherwise its context (and tracing span) may be used after the
// client cleaned up.
delete(tc.repl.mu.proposals, proposal.idKey)
tc.repl.cleanupFailedProposalLocked(proposal)
proposal.finishApplication(proposalResult{Err: roachpb.NewErrorf(origMsg)})
return
}
Expand Down Expand Up @@ -4450,7 +4450,7 @@ func TestRaftRetryProtectionInTxn(t *testing.T) {
// also avoid updating the timestamp cache.
ba.Timestamp = txn.OrigTimestamp
lease, _ := tc.repl.GetLease()
ch, _, _, err := tc.repl.propose(context.Background(), lease, ba, nil, &allSpans)
ch, _, err := tc.repl.propose(context.Background(), lease, ba, nil, &allSpans)
if err != nil {
t.Fatalf("%d: unexpected error: %s", i, err)
}
Expand Down Expand Up @@ -7685,7 +7685,7 @@ func TestReplicaIDChangePending(t *testing.T) {
},
Value: roachpb.MakeValueFromBytes([]byte("val")),
})
_, _, _, err := repl.propose(context.Background(), lease, ba, nil, &allSpans)
_, _, err := repl.propose(context.Background(), lease, ba, nil, &allSpans)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -7910,7 +7910,7 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) {
// client abandoning it.
if rand.Intn(2) == 0 {
log.Infof(context.Background(), "abandoning command %d", i)
delete(repl.mu.proposals, proposal.idKey)
repl.cleanupFailedProposalLocked(proposal)
} else if err := repl.submitProposalLocked(proposal); err != nil {
t.Error(err)
} else {
Expand Down Expand Up @@ -9219,7 +9219,7 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) {
}

exLease, _ := repl.GetLease()
ch, _, _, pErr := repl.propose(
ch, _, pErr := repl.propose(
context.Background(), exLease, ba, nil /* endCmds */, &allSpans,
)
if pErr != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ func TestStoreRemoveReplicaDestroy(t *testing.T) {
t.Fatal("replica was not marked as destroyed")
}

if _, _, _, pErr := repl1.propose(
if _, _, pErr := repl1.propose(
context.Background(), lease, roachpb.BatchRequest{}, nil, &allSpans,
); !pErr.Equal(expErr) {
t.Fatalf("expected error %s, but got %v", expErr, pErr)
Expand Down

0 comments on commit 3a60101

Please sign in to comment.