From 67b57a095c70c5c249b86c11b337e0d88ed534a5 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 1 Jul 2019 22:20:15 -0400 Subject: [PATCH 1/5] storage: include proposals in proposal buffer in RangeInfo.NumPending Release note: None --- pkg/storage/replica.go | 2 +- pkg/storage/replica_raft.go | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 07df7aba3df2..4c199f2d1d1c 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -894,7 +894,7 @@ func (r *Replica) State() storagepb.RangeInfo { defer r.mu.RUnlock() ri.ReplicaState = *(protoutil.Clone(&r.mu.state)).(*storagepb.ReplicaState) ri.LastIndex = r.mu.lastIndex - ri.NumPending = uint64(len(r.mu.proposals)) + ri.NumPending = uint64(r.numPendingProposalsRLocked()) ri.RaftLogSize = r.mu.raftLogSize ri.RaftLogSizeTrusted = r.mu.raftLogSizeTrusted ri.NumDropped = uint64(r.mu.droppedMessages) diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index b884337ff44d..25c81a445d53 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -328,8 +328,12 @@ func (r *Replica) propose(ctx context.Context, p *ProposalData) (_ int64, pErr * return int64(maxLeaseIndex), nil } +func (r *Replica) numPendingProposalsRLocked() int { + return len(r.mu.proposals) + r.mu.proposalBuf.Len() +} + func (r *Replica) hasPendingProposalsRLocked() bool { - return len(r.mu.proposals) > 0 || r.mu.proposalBuf.Len() > 0 + return r.numPendingProposalsRLocked() > 0 } // stepRaftGroup calls Step on the replica's RawNode with the provided request's From 03a984c4ef9a52545219bab2ca621b40dc2c9fe8 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 2 Jul 2019 16:36:55 -0400 Subject: [PATCH 2/5] storage: log amount of proposal quota acquired so far in logSlowQuota Release note: None --- pkg/storage/quota_pool.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/pkg/storage/quota_pool.go b/pkg/storage/quota_pool.go index f9229e91febe..4b55b78fda9f 100644 --- a/pkg/storage/quota_pool.go +++ b/pkg/storage/quota_pool.go @@ -25,6 +25,7 @@ package storage import ( "context" + "fmt" "time" "github.com/cockroachdb/cockroach/pkg/base" @@ -99,12 +100,16 @@ func (qp *quotaPool) addLocked(v int64) { qp.quota <- v } -func logSlowQuota(ctx context.Context, v int64, start time.Time) func() { - log.Warningf(ctx, "have been waiting %s attempting to acquire %s of proposal quota", - timeutil.Since(start), humanizeutil.IBytes(v)) +func logSlowQuota(ctx context.Context, want, have int64, start time.Time) func() { + var haveStr string + if have > 0 { + haveStr = fmt.Sprintf("; acquired %s so far", humanizeutil.IBytes(have)) + } + log.Warningf(ctx, "have been waiting %s attempting to acquire %s of proposal quota%s", + timeutil.Since(start), humanizeutil.IBytes(want), haveStr) return func() { log.Infof(ctx, "acquired %s of proposal quota after %s", - humanizeutil.IBytes(v), timeutil.Since(start)) + humanizeutil.IBytes(want), timeutil.Since(start)) } } @@ -138,7 +143,7 @@ func (qp *quotaPool) acquire(ctx context.Context, v int64) error { select { case <-slowTimer.C: slowTimer.Read = true - defer logSlowQuota(ctx, v, start)() + defer logSlowQuota(ctx, v, 0, start)() continue case <-ctx.Done(): qp.Lock() @@ -183,7 +188,7 @@ func (qp *quotaPool) acquire(ctx context.Context, v int64) error { select { case <-slowTimer.C: slowTimer.Read = true - defer logSlowQuota(ctx, v, start)() + defer logSlowQuota(ctx, v, acquired, start)() case <-ctx.Done(): qp.Lock() if acquired > 0 { From f991d762d60e9acabc3a8fb4b1a2df6d3906385c Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 2 Jul 2019 16:50:13 -0400 Subject: [PATCH 3/5] storage: pull out propBufCntRes.arrayLen method This avoids confusing arithmetic in a few spots. Release note: None --- pkg/storage/replica_proposal_buf.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/storage/replica_proposal_buf.go b/pkg/storage/replica_proposal_buf.go index f77fbf043d0a..b9b135e41ceb 100644 --- a/pkg/storage/replica_proposal_buf.go +++ b/pkg/storage/replica_proposal_buf.go @@ -68,12 +68,17 @@ func makePropBufCntReq(incLeaseIndex bool) propBufCntReq { return r } +// arrayLen returns the number of elements in the proposal buffer's array. +func (r propBufCntRes) arrayLen() int { + return int(r & (1<<32 - 1)) +} + // arrayIndex returns the index into the proposal buffer that was reserved for // the request. The returned index will be -1 if no index was reserved (e.g. by // propBufCnt.read) and if the buffer is empty. func (r propBufCntRes) arrayIndex() int { // NB: -1 because the array is 0-indexed. - return int(r&(1<<32-1)) - 1 + return r.arrayLen() - 1 } // leaseIndexOffset returns the offset from the proposal buffer's current lease @@ -162,7 +167,7 @@ func (b *propBuf) Init(p proposer) { // Len returns the number of proposals currently in the buffer. func (b *propBuf) Len() int { - return b.cnt.read().arrayIndex() + 1 + return b.cnt.read().arrayLen() } // LastAssignedLeaseIndexRLocked returns the last assigned lease index. @@ -361,7 +366,7 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error { // buffer. This ensures that we synchronize with all producers and other // consumers. res := b.cnt.clear() - used := res.arrayIndex() + 1 + used := res.arrayLen() // Before returning, consider resizing the proposal buffer's array, // depending on how much of it was used before the current flush. defer b.arr.adjustSize(used) From ef021001ffc8d44b47e13a96b558611723caddd1 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 2 Jul 2019 18:16:50 -0400 Subject: [PATCH 4/5] storage: be extra paranoid about dropped proposals when flushing propBuf This commit makes propBuf.FlushLockedWithRaftGroup more careful about accidentally dropping proposals when flushing its array. In most cases, any error will result in a log.Fatal call further up the stack anyway, but it's good to be careful about this stuff or we risk leaking proposals. We now make sure to register all proposals with the proposer after an error, even if though we stop attempting to propose entries with the Raft group. Release note: None --- pkg/storage/replica_proposal_buf.go | 31 +++++++++++++++----- pkg/storage/replica_proposal_buf_test.go | 33 ++++++++++++++++++++++ pkg/storage/replica_test.go | 36 ++++++++++++------------ 3 files changed, 75 insertions(+), 25 deletions(-) diff --git a/pkg/storage/replica_proposal_buf.go b/pkg/storage/replica_proposal_buf.go index b9b135e41ceb..4efcd73a1a12 100644 --- a/pkg/storage/replica_proposal_buf.go +++ b/pkg/storage/replica_proposal_buf.go @@ -141,7 +141,7 @@ type propBuf struct { // drop Raft proposals before they are handed to etcd/raft to begin the // process of replication. Dropped proposals are still eligible to be // reproposed due to ticks. - submitProposalFilter func(*ProposalData) (drop bool) + submitProposalFilter func(*ProposalData) (drop bool, err error) } } @@ -391,6 +391,11 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error { // and apply them. buf := b.arr.asSlice()[:used] ents := make([]raftpb.Entry, 0, used) + // Remember the first error that we see when proposing the batch. We don't + // immediately return this error because we want to finish clearing out the + // buffer and registering each of the proposals with the proposer, but we + // stop trying to propose commands to raftGroup. + var firstErr error for i, p := range buf { buf[i] = nil // clear buffer @@ -400,13 +405,19 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error { // Potentially drop the proposal before passing it to etcd/raft, but // only after performing necessary bookkeeping. if filter := b.testing.submitProposalFilter; filter != nil { - if drop := filter(p); drop { + if drop, err := filter(p); drop || err != nil { + if firstErr == nil { + firstErr = err + } continue } } - // If we don't have a raft group, we can't propose the command. - if raftGroup == nil { + // If we don't have a raft group or if the raft group has rejected one + // of the proposals, we don't try to propose any more proposals. The + // rest of the proposals will still be registered with the proposer, so + // they will eventually be reproposed. + if raftGroup == nil || firstErr != nil { continue } @@ -416,7 +427,8 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error { // preserve the correct ordering or proposals. Later proposals // will start a new batch. if err := proposeBatch(raftGroup, b.p.replicaID(), ents); err != nil { - return err + firstErr = err + continue } ents = ents[len(ents):] @@ -427,7 +439,8 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error { } encodedCtx, err := protoutil.Marshal(&confChangeCtx) if err != nil { - return err + firstErr = err + continue } if err := raftGroup.ProposeConfChange(raftpb.ConfChange{ @@ -439,7 +452,8 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error { // ignored prior to the introduction of ErrProposalDropped). // TODO(bdarnell): Handle ErrProposalDropped better. // https://github.com/cockroachdb/cockroach/issues/21849 - return err + firstErr = err + continue } } else { // Add to the batch of entries that will soon be proposed. It is @@ -456,6 +470,9 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error { }) } } + if firstErr != nil { + return firstErr + } return proposeBatch(raftGroup, b.p.replicaID(), ents) } diff --git a/pkg/storage/replica_proposal_buf_test.go b/pkg/storage/replica_proposal_buf_test.go index c70e91320d62..877d18359734 100644 --- a/pkg/storage/replica_proposal_buf_test.go +++ b/pkg/storage/replica_proposal_buf_test.go @@ -225,3 +225,36 @@ func TestProposalBufferConcurrentWithDestroy(t *testing.T) { require.Nil(t, g.Wait()) t.Logf("%d successful proposals before destroy", len(mlais)) } + +// TestProposalBufferRegistersAllOnProposalError tests that all proposals in the +// proposal buffer are registered with the proposer when the buffer is flushed, +// even if an error is seen when proposing a batch of entries. +func TestProposalBufferRegistersAllOnProposalError(t *testing.T) { + defer leaktest.AfterTest(t)() + + var p testProposer + var b propBuf + b.Init(&p) + + num := propBufArrayMinSize + for i := 0; i < num; i++ { + pd, data := newPropData(false) + _, err := b.Insert(pd, data) + require.Nil(t, err) + } + require.Equal(t, num, b.Len()) + + propNum := 0 + propErr := errors.New("failed proposal") + b.testing.submitProposalFilter = func(*ProposalData) (drop bool, err error) { + propNum++ + require.Equal(t, propNum, p.registered) + if propNum == 2 { + return false, propErr + } + return false, nil + } + err := b.flushLocked() + require.Equal(t, propErr, err) + require.Equal(t, num, p.registered) +} diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 7a0e463f8552..1db9346563e0 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -6948,12 +6948,12 @@ func TestReplicaAbandonProposal(t *testing.T) { // Cancel the request before it is proposed to Raft. dropProp := int32(1) tc.repl.mu.Lock() - tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool) { + tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool, _ error) { if v := p.ctx.Value(magicKey{}); v != nil { cancel() - return atomic.LoadInt32(&dropProp) == 1 + return atomic.LoadInt32(&dropProp) == 1, nil } - return false + return false, nil } tc.repl.mu.Unlock() @@ -7172,12 +7172,12 @@ func TestReplicaIDChangePending(t *testing.T) { // Stop the command from being proposed to the raft group and being removed. proposedOnOld := make(chan struct{}, 1) repl.mu.Lock() - repl.mu.proposalBuf.testing.submitProposalFilter = func(*ProposalData) (drop bool) { + repl.mu.proposalBuf.testing.submitProposalFilter = func(*ProposalData) (drop bool, _ error) { select { case proposedOnOld <- struct{}{}: default: } - return true + return true, nil } lease := *repl.mu.state.Lease repl.mu.Unlock() @@ -7202,14 +7202,14 @@ func TestReplicaIDChangePending(t *testing.T) { // re-proposed. proposedOnNew := make(chan struct{}, 1) repl.mu.Lock() - repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool) { + repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool, _ error) { if p.Request.Timestamp == magicTS { select { case proposedOnNew <- struct{}{}: default: } } - return false + return false, nil } repl.mu.Unlock() @@ -7379,12 +7379,12 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) { tc.repl.mu.Lock() lease := *repl.mu.state.Lease abandoned := make(map[int64]struct{}) // protected by repl.mu - tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool) { + tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool, _ error) { if _, ok := abandoned[int64(p.command.MaxLeaseIndex)]; ok { log.Infof(p.ctx, "abandoning command") - return true + return true, nil } - return false + return false, nil } tc.repl.mu.Unlock() @@ -7439,14 +7439,14 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { dropAll := int32(1) tc.repl.mu.Lock() lease := *tc.repl.mu.state.Lease - tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool) { + tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool, _ error) { if atomic.LoadInt32(&dropAll) == 1 { - return true + return true, nil } if v := p.ctx.Value(magicKey{}); v != nil { seenCmds = append(seenCmds, int(p.command.MaxLeaseIndex)) } - return false + return false, nil } tc.repl.mu.Unlock() @@ -7571,11 +7571,11 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { dropProposals.m = make(map[*ProposalData]struct{}) r.mu.Lock() - r.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool) { + r.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool, _ error) { dropProposals.Lock() defer dropProposals.Unlock() _, ok := dropProposals.m[p] - return ok + return ok, nil } r.mu.Unlock() @@ -8366,16 +8366,16 @@ func TestCancelPendingCommands(t *testing.T) { proposalDroppedCh := make(chan struct{}) proposalDropped := false tc.repl.mu.Lock() - tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool) { + tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool, _ error) { if _, ok := p.Request.GetArg(roachpb.Increment); ok { if !proposalDropped { // Notify the main thread the first time we drop a proposal. close(proposalDroppedCh) proposalDropped = true } - return true + return true, nil } - return false + return false, nil } tc.repl.mu.Unlock() From ba3813cf2db73ecff36d2cf9613e273f472c9d2d Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 2 Jul 2019 19:51:31 -0400 Subject: [PATCH 5/5] storage: release quota on failed Raft proposals Fixes #34180. Fixes #35493. Fixes #36983. Fixes #37108. Fixes #37371. Fixes #37384. Fixes #37551. Fixes #37879. Fixes #38095. Fixes #38131. Fixes #38136. Fixes #38549. Fixes #38552. Fixes #38555. Fixes #38560. Fixes #38562. Fixes #38563. Fixes #38569. Fixes #38578. Fixes #38600. _A lot of the early issues fixed by this had previous failures, but nothing very recent or actionable. I think it's worth closing them now that they should be fixed in the short term._ This fixes a bug introduced in 1ff3556 where Raft proposal quota is no longer released when Replica.propose fails. This used to happen [here](https://github.com/cockroachdb/cockroach/commit/1ff355691022f0fbe5ca9c3704d2deeb244bb2f5#diff-4315c7ebf8b8bf7bda469e1e7be82690L316), but that code was accidentally lost in the rewrite. I tracked this down by running a series of `import/tpch/nodes=4` and `scrub/all-checks/tpcc/w=100` roachtests. About half the time, the import would stall after a few hours and the roachtest health reports would start logging lines like: `n1/s1 2.00 metrics requests.slow.latch`. I tracked the stalled latch acquisition to a stalled proposal quota acquisition by a conflicting command. The range debug page showed the following: We see that the leaseholder of the Range has no pending commands but also no available proposal quota. This indicates a proposal quota leak, which led to me finding the lost release in this error case. The (now confirmed) theory for what went wrong in these roachtests is that they are performing imports, which generate a large number of AddSSTRequests. These requests are typically larger than the available proposal quota for a range, meaning that they request all of its available quota. The effect of this is that if even a single byte of quota is leaked, the entire range will seize up and stall when an AddSSTRequests is issued. Instrumentation revealed that a ChangeReplicas request with a quota size equal to the leaked amount was failing due to the error: ``` received invalid ChangeReplicasTrigger REMOVE_REPLICA((n3,s3):3): updated=[(n1,s1):1 (n4,s4):2 (n2,s2):4] next=5 to remove self (leaseholder) ``` Because of the missing error handling, this quota was not being released back into the pool, causing future requests to get stuck indefinitely waiting for leaked quota, stalling the entire import. Release note: None --- pkg/storage/replica.go | 10 ++++- pkg/storage/replica_proposal_buf.go | 6 ++- pkg/storage/replica_raft.go | 23 ++++++++--- pkg/storage/replica_test.go | 61 ++++++++++++++++++++++++----- 4 files changed, 82 insertions(+), 18 deletions(-) diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 4c199f2d1d1c..284e2fc8b361 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -571,8 +571,16 @@ func (r *Replica) String() string { return fmt.Sprintf("[n%d,s%d,r%s]", r.store.Ident.NodeID, r.store.Ident.StoreID, &r.rangeStr) } -// cleanupFailedProposalLocked cleans up after a proposal that has failed. It +// cleanupFailedProposal cleans up after a proposal that has failed. It // clears any references to the proposal and releases associated quota. +func (r *Replica) cleanupFailedProposal(p *ProposalData) { + r.mu.Lock() + defer r.mu.Unlock() + r.cleanupFailedProposalLocked(p) +} + +// cleanupFailedProposalLocked is like cleanupFailedProposal, but requires +// the Replica mutex to be exclusively held. func (r *Replica) cleanupFailedProposalLocked(p *ProposalData) { // Clear the proposal from the proposals map. May be a no-op if the // proposal has not yet been inserted into the map. diff --git a/pkg/storage/replica_proposal_buf.go b/pkg/storage/replica_proposal_buf.go index 4efcd73a1a12..93a7814af2ae 100644 --- a/pkg/storage/replica_proposal_buf.go +++ b/pkg/storage/replica_proposal_buf.go @@ -136,7 +136,7 @@ type propBuf struct { testing struct { // leaseIndexFilter can be used by tests to override the max lease index // assigned to a proposal by returning a non-zero lease index. - leaseIndexFilter func(*ProposalData) (indexOverride uint64) + leaseIndexFilter func(*ProposalData) (indexOverride uint64, err error) // submitProposalFilter can be used by tests to observe and optionally // drop Raft proposals before they are handed to etcd/raft to begin the // process of replication. Dropped proposals are still eligible to be @@ -208,7 +208,9 @@ func (b *propBuf) Insert(p *ProposalData, data []byte) (uint64, error) { // Assign the command's maximum lease index. p.command.MaxLeaseIndex = b.liBase + res.leaseIndexOffset() if filter := b.testing.leaseIndexFilter; filter != nil { - if override := filter(p); override != 0 { + if override, err := filter(p); err != nil { + return 0, err + } else if override != 0 { p.command.MaxLeaseIndex = override } } diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 25c81a445d53..e55999d4ca7a 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -190,6 +190,14 @@ func (r *Replica) evalAndPropose( if err := r.maybeAcquireProposalQuota(ctx, proposal.quotaSize); err != nil { return nil, nil, 0, roachpb.NewError(err) } + // Make sure we clean up the proposal if we fail to insert it into the + // proposal buffer successfully. This ensures that we always release any + // quota that we acquire. + defer func() { + if pErr != nil { + r.cleanupFailedProposal(proposal) + } + }() if filter := r.store.TestingKnobs().TestingProposalFilter; filter != nil { filterArgs := storagebase.ProposalFilterArgs{ @@ -230,11 +238,13 @@ func (r *Replica) evalAndPropose( return proposalCh, abandon, maxLeaseIndex, nil } -// propose starts tracking a command and proposes it to raft. If -// this method succeeds, the caller is responsible for eventually -// removing the proposal from the pending map (on success, in -// processRaftCommand, or on failure via cleanupFailedProposalLocked). -func (r *Replica) propose(ctx context.Context, p *ProposalData) (_ int64, pErr *roachpb.Error) { +// propose encodes a command, starts tracking it, and proposes it to raft. The +// method is also responsible for assigning the command its maximum lease index. +// +// The method hands ownership of the command over to the Raft machinery. After +// the method returns, all access to the command must be performed while holding +// Replica.mu and Replica.raftMu. +func (r *Replica) propose(ctx context.Context, p *ProposalData) (int64, *roachpb.Error) { // Make sure the maximum lease index is unset. This field will be set in // propBuf.Insert and its encoded bytes will be appended to the encoding // buffer as a RaftCommandFooter. @@ -258,7 +268,8 @@ func (r *Replica) propose(ctx context.Context, p *ProposalData) (_ int64, pErr * // leases can stay in such a state for a very long time when using epoch- // based range leases). This shouldn't happen often, but has been seen // before (#12591). - if crt.ChangeType == roachpb.REMOVE_REPLICA && crt.Replica.ReplicaID == r.mu.replicaID { + replID := p.command.ProposerReplica.ReplicaID + if crt.ChangeType == roachpb.REMOVE_REPLICA && crt.Replica.ReplicaID == replID { msg := fmt.Sprintf("received invalid ChangeReplicasTrigger %s to remove self (leaseholder)", crt) log.Error(p.ctx, msg) return 0, roachpb.NewErrorf("%s: %s", r, msg) diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 1db9346563e0..99d9c9b9a230 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -6532,6 +6532,49 @@ func TestReplicaDestroy(t *testing.T) { } } +// TestQuotaPoolReleasedOnFailedProposal tests that the quota acquired by +// proposals is released back into the quota pool if the proposal fails before +// being submitted to Raft. +func TestQuotaPoolReleasedOnFailedProposal(t *testing.T) { + defer leaktest.AfterTest(t)() + tc := testContext{} + stopper := stop.NewStopper() + defer stopper.Stop(context.TODO()) + tc.Start(t, stopper) + + // Flush a write all the way through the Raft proposal pipeline to ensure + // that the replica becomes the Raft leader and sets up its quota pool. + iArgs := incrementArgs([]byte("a"), 1) + if _, pErr := tc.SendWrapped(&iArgs); pErr != nil { + t.Fatal(pErr) + } + + type magicKey struct{} + var minQuotaSize int64 + propErr := errors.New("proposal error") + + tc.repl.mu.Lock() + tc.repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) { + if v := p.ctx.Value(magicKey{}); v != nil { + minQuotaSize = tc.repl.mu.proposalQuota.approximateQuota() + p.quotaSize + return 0, propErr + } + return 0, nil + } + tc.repl.mu.Unlock() + + var ba roachpb.BatchRequest + pArg := putArgs(roachpb.Key("a"), make([]byte, 1<<10)) + ba.Add(&pArg) + ctx := context.WithValue(context.Background(), magicKey{}, "foo") + if _, pErr := tc.Sender().Send(ctx, ba); !testutils.IsPError(pErr, propErr.Error()) { + t.Fatalf("expected error %v, found %v", propErr, pErr) + } + if curQuota := tc.repl.QuotaAvailable(); curQuota < minQuotaSize { + t.Fatalf("proposal quota not released: found=%d, want=%d", curQuota, minQuotaSize) + } +} + // TestQuotaPoolAccessOnDestroyedReplica tests the occurrence of #17303 where // following a leader replica getting destroyed, the scheduling of // handleRaftReady twice on the replica would cause a panic when @@ -7280,13 +7323,13 @@ func TestReplicaRetryRaftProposal(t *testing.T) { var wrongLeaseIndex uint64 // populated below tc.repl.mu.Lock() - tc.repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64) { + tc.repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) { if v := p.ctx.Value(magicKey{}); v != nil { if curAttempt := atomic.AddInt32(&c, 1); curAttempt == 1 { - return wrongLeaseIndex + return wrongLeaseIndex, nil } } - return 0 + return 0, nil } tc.repl.mu.Unlock() @@ -7731,12 +7774,12 @@ func TestReplicaRefreshMultiple(t *testing.T) { t.Fatalf("test requires LeaseAppliedIndex >= 2 at this point, have %d", ai) } assigned := false - repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64) { + repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) { if p == proposal && !assigned { assigned = true - return ai - 1 + return ai - 1, nil } - return 0 + return 0, nil } repl.mu.Unlock() @@ -7877,16 +7920,16 @@ func TestFailureToProcessCommandClearsLocalResult(t *testing.T) { r := tc.repl r.mu.Lock() - r.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64) { + r.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) { // We're going to recognize the first time the commnand for the // EndTransaction is proposed and we're going to hackily decrease its // MaxLeaseIndex, so that the processing gets rejected further on. ut := p.Local.UpdatedTxns if atomic.LoadInt64(&proposalRecognized) == 0 && ut != nil && len(*ut) == 1 && (*ut)[0].ID == txn.ID { atomic.StoreInt64(&proposalRecognized, 1) - return p.command.MaxLeaseIndex - 1 + return p.command.MaxLeaseIndex - 1, nil } - return 0 + return 0, nil } r.mu.Unlock()