From 23b1fb99171a24489692ae2742578e0880f2dd91 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 2 Jul 2019 19:51:31 -0400 Subject: [PATCH] storage: release quota on failed Raft proposals Fixes #34180. Fixes #35493. Fixes #36983. Fixes #37108. Fixes #37371. Fixes #37384. Fixes #37551. Fixes #37879. Fixes #38095. Fixes #38131. Fixes #38136. Fixes #38549. Fixes #38552. Fixes #38555. Fixes #38560. Fixes #38562. Fixes #38563. Fixes #38569. Fixes #38578. Fixes #38600. _A for of the early issues fixed by this had previous failures, but nothing very recent or actionable. I think it's worth closing them now that they should be fixed in the short term._ This fixes a bug introduced in 1ff3556 where Raft proposal quota is no longer released when Replica.propose fails. This used to happen [here](https://github.com/cockroachdb/cockroach/commit/1ff355691022f0fbe5ca9c3704d2deeb244bb2f5#diff-4315c7ebf8b8bf7bda469e1e7be82690L316), but that code was accidentally lost in the rewrite. I tracked this down by running a series of `import/tpch/nodes=4` and `scrub/all-checks/tpcc/w=100` roachtests. About half the time, the import would stall after a few hours and the roachtest health reports would start logging lines like: `n1/s1 2.00 metrics requests.slow.latch`. I tracked the stalled latch acquisition to a stalled proposal quota acquisition by a conflicting command. The range debug page showed the following: We see that the leaseholder of the Range has no pending commands but also no available proposal quota. This indicates a proposal quota leak, which led to me finding the lost release in this error case. The (now confirmed) theory for what went wrong in these roachtests is that they are performing imports, which generate a large number of AddSSTRequests. These requests are typically larger than the available proposal quota for a range, meaning that they request all of its available quota. The effect of this is that if even a single byte of quota is leaked, the entire range will seize up and stall when an AddSSTRequests is issued. Instrumentation revealed that a ChangeReplicas request with a quota size equal to the leaked amount was failing due to the error: ``` received invalid ChangeReplicasTrigger REMOVE_REPLICA((n3,s3):3): updated=[(n1,s1):1 (n4,s4):2 (n2,s2):4] next=5 to remove self (leaseholder) ``` Because of the missing error handling, this quota was not being released back into the pool, causing future requests to get stuck indefinitely waiting for leaked quota, stalling the entire import. Release note: None --- pkg/storage/replica.go | 10 ++++- pkg/storage/replica_proposal_buf.go | 6 ++- pkg/storage/replica_raft.go | 12 +++++- pkg/storage/replica_test.go | 61 ++++++++++++++++++++++++----- 4 files changed, 76 insertions(+), 13 deletions(-) diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 4c199f2d1d1c..284e2fc8b361 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -571,8 +571,16 @@ func (r *Replica) String() string { return fmt.Sprintf("[n%d,s%d,r%s]", r.store.Ident.NodeID, r.store.Ident.StoreID, &r.rangeStr) } -// cleanupFailedProposalLocked cleans up after a proposal that has failed. It +// cleanupFailedProposal cleans up after a proposal that has failed. It // clears any references to the proposal and releases associated quota. +func (r *Replica) cleanupFailedProposal(p *ProposalData) { + r.mu.Lock() + defer r.mu.Unlock() + r.cleanupFailedProposalLocked(p) +} + +// cleanupFailedProposalLocked is like cleanupFailedProposal, but requires +// the Replica mutex to be exclusively held. func (r *Replica) cleanupFailedProposalLocked(p *ProposalData) { // Clear the proposal from the proposals map. May be a no-op if the // proposal has not yet been inserted into the map. diff --git a/pkg/storage/replica_proposal_buf.go b/pkg/storage/replica_proposal_buf.go index 91c646c65a2c..e283e6f62009 100644 --- a/pkg/storage/replica_proposal_buf.go +++ b/pkg/storage/replica_proposal_buf.go @@ -136,7 +136,7 @@ type propBuf struct { testing struct { // leaseIndexFilter can be used by tests to override the max lease index // assigned to a proposal by returning a non-zero lease index. - leaseIndexFilter func(*ProposalData) (indexOverride uint64) + leaseIndexFilter func(*ProposalData) (indexOverride uint64, err error) // submitProposalFilter can be used by tests to observe and optionally // drop Raft proposals before they are handed to etcd/raft to begin the // process of replication. Dropped proposals are still eligible to be @@ -208,7 +208,9 @@ func (b *propBuf) Insert(p *ProposalData, data []byte) (uint64, error) { // Assign the command's maximum lease index. p.command.MaxLeaseIndex = b.liBase + res.leaseIndexOffset() if filter := b.testing.leaseIndexFilter; filter != nil { - if override := filter(p); override != 0 { + if override, err := filter(p); err != nil { + return 0, err + } else if override != 0 { p.command.MaxLeaseIndex = override } } diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 25c81a445d53..cbbf8e9c183f 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -235,6 +235,15 @@ func (r *Replica) evalAndPropose( // removing the proposal from the pending map (on success, in // processRaftCommand, or on failure via cleanupFailedProposalLocked). func (r *Replica) propose(ctx context.Context, p *ProposalData) (_ int64, pErr *roachpb.Error) { + // Make sure we clean up the proposal if we fail to insert it into the + // proposal buffer successfully. This ensure that we always release any + // quota that we acquire. + defer func() { + if pErr != nil { + r.cleanupFailedProposal(p) + } + }() + // Make sure the maximum lease index is unset. This field will be set in // propBuf.Insert and its encoded bytes will be appended to the encoding // buffer as a RaftCommandFooter. @@ -258,7 +267,8 @@ func (r *Replica) propose(ctx context.Context, p *ProposalData) (_ int64, pErr * // leases can stay in such a state for a very long time when using epoch- // based range leases). This shouldn't happen often, but has been seen // before (#12591). - if crt.ChangeType == roachpb.REMOVE_REPLICA && crt.Replica.ReplicaID == r.mu.replicaID { + replID := p.command.ProposerReplica.ReplicaID + if crt.ChangeType == roachpb.REMOVE_REPLICA && crt.Replica.ReplicaID == replID { msg := fmt.Sprintf("received invalid ChangeReplicasTrigger %s to remove self (leaseholder)", crt) log.Error(p.ctx, msg) return 0, roachpb.NewErrorf("%s: %s", r, msg) diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 1db9346563e0..99d9c9b9a230 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -6532,6 +6532,49 @@ func TestReplicaDestroy(t *testing.T) { } } +// TestQuotaPoolReleasedOnFailedProposal tests that the quota acquired by +// proposals is released back into the quota pool if the proposal fails before +// being submitted to Raft. +func TestQuotaPoolReleasedOnFailedProposal(t *testing.T) { + defer leaktest.AfterTest(t)() + tc := testContext{} + stopper := stop.NewStopper() + defer stopper.Stop(context.TODO()) + tc.Start(t, stopper) + + // Flush a write all the way through the Raft proposal pipeline to ensure + // that the replica becomes the Raft leader and sets up its quota pool. + iArgs := incrementArgs([]byte("a"), 1) + if _, pErr := tc.SendWrapped(&iArgs); pErr != nil { + t.Fatal(pErr) + } + + type magicKey struct{} + var minQuotaSize int64 + propErr := errors.New("proposal error") + + tc.repl.mu.Lock() + tc.repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) { + if v := p.ctx.Value(magicKey{}); v != nil { + minQuotaSize = tc.repl.mu.proposalQuota.approximateQuota() + p.quotaSize + return 0, propErr + } + return 0, nil + } + tc.repl.mu.Unlock() + + var ba roachpb.BatchRequest + pArg := putArgs(roachpb.Key("a"), make([]byte, 1<<10)) + ba.Add(&pArg) + ctx := context.WithValue(context.Background(), magicKey{}, "foo") + if _, pErr := tc.Sender().Send(ctx, ba); !testutils.IsPError(pErr, propErr.Error()) { + t.Fatalf("expected error %v, found %v", propErr, pErr) + } + if curQuota := tc.repl.QuotaAvailable(); curQuota < minQuotaSize { + t.Fatalf("proposal quota not released: found=%d, want=%d", curQuota, minQuotaSize) + } +} + // TestQuotaPoolAccessOnDestroyedReplica tests the occurrence of #17303 where // following a leader replica getting destroyed, the scheduling of // handleRaftReady twice on the replica would cause a panic when @@ -7280,13 +7323,13 @@ func TestReplicaRetryRaftProposal(t *testing.T) { var wrongLeaseIndex uint64 // populated below tc.repl.mu.Lock() - tc.repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64) { + tc.repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) { if v := p.ctx.Value(magicKey{}); v != nil { if curAttempt := atomic.AddInt32(&c, 1); curAttempt == 1 { - return wrongLeaseIndex + return wrongLeaseIndex, nil } } - return 0 + return 0, nil } tc.repl.mu.Unlock() @@ -7731,12 +7774,12 @@ func TestReplicaRefreshMultiple(t *testing.T) { t.Fatalf("test requires LeaseAppliedIndex >= 2 at this point, have %d", ai) } assigned := false - repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64) { + repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) { if p == proposal && !assigned { assigned = true - return ai - 1 + return ai - 1, nil } - return 0 + return 0, nil } repl.mu.Unlock() @@ -7877,16 +7920,16 @@ func TestFailureToProcessCommandClearsLocalResult(t *testing.T) { r := tc.repl r.mu.Lock() - r.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64) { + r.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) { // We're going to recognize the first time the commnand for the // EndTransaction is proposed and we're going to hackily decrease its // MaxLeaseIndex, so that the processing gets rejected further on. ut := p.Local.UpdatedTxns if atomic.LoadInt64(&proposalRecognized) == 0 && ut != nil && len(*ut) == 1 && (*ut)[0].ID == txn.ID { atomic.StoreInt64(&proposalRecognized, 1) - return p.command.MaxLeaseIndex - 1 + return p.command.MaxLeaseIndex - 1, nil } - return 0 + return 0, nil } r.mu.Unlock()