diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 4c199f2d1d1c..284e2fc8b361 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -571,8 +571,16 @@ func (r *Replica) String() string { return fmt.Sprintf("[n%d,s%d,r%s]", r.store.Ident.NodeID, r.store.Ident.StoreID, &r.rangeStr) } -// cleanupFailedProposalLocked cleans up after a proposal that has failed. It +// cleanupFailedProposal cleans up after a proposal that has failed. It // clears any references to the proposal and releases associated quota. +func (r *Replica) cleanupFailedProposal(p *ProposalData) { + r.mu.Lock() + defer r.mu.Unlock() + r.cleanupFailedProposalLocked(p) +} + +// cleanupFailedProposalLocked is like cleanupFailedProposal, but requires +// the Replica mutex to be exclusively held. func (r *Replica) cleanupFailedProposalLocked(p *ProposalData) { // Clear the proposal from the proposals map. May be a no-op if the // proposal has not yet been inserted into the map. diff --git a/pkg/storage/replica_proposal_buf.go b/pkg/storage/replica_proposal_buf.go index 91c646c65a2c..e283e6f62009 100644 --- a/pkg/storage/replica_proposal_buf.go +++ b/pkg/storage/replica_proposal_buf.go @@ -136,7 +136,7 @@ type propBuf struct { testing struct { // leaseIndexFilter can be used by tests to override the max lease index // assigned to a proposal by returning a non-zero lease index. - leaseIndexFilter func(*ProposalData) (indexOverride uint64) + leaseIndexFilter func(*ProposalData) (indexOverride uint64, err error) // submitProposalFilter can be used by tests to observe and optionally // drop Raft proposals before they are handed to etcd/raft to begin the // process of replication. Dropped proposals are still eligible to be @@ -208,7 +208,9 @@ func (b *propBuf) Insert(p *ProposalData, data []byte) (uint64, error) { // Assign the command's maximum lease index. p.command.MaxLeaseIndex = b.liBase + res.leaseIndexOffset() if filter := b.testing.leaseIndexFilter; filter != nil { - if override := filter(p); override != 0 { + if override, err := filter(p); err != nil { + return 0, err + } else if override != 0 { p.command.MaxLeaseIndex = override } } diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 25c81a445d53..cbbf8e9c183f 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -235,6 +235,15 @@ func (r *Replica) evalAndPropose( // removing the proposal from the pending map (on success, in // processRaftCommand, or on failure via cleanupFailedProposalLocked). func (r *Replica) propose(ctx context.Context, p *ProposalData) (_ int64, pErr *roachpb.Error) { + // Make sure we clean up the proposal if we fail to insert it into the + // proposal buffer successfully. This ensure that we always release any + // quota that we acquire. + defer func() { + if pErr != nil { + r.cleanupFailedProposal(p) + } + }() + // Make sure the maximum lease index is unset. This field will be set in // propBuf.Insert and its encoded bytes will be appended to the encoding // buffer as a RaftCommandFooter. @@ -258,7 +267,8 @@ func (r *Replica) propose(ctx context.Context, p *ProposalData) (_ int64, pErr * // leases can stay in such a state for a very long time when using epoch- // based range leases). This shouldn't happen often, but has been seen // before (#12591). - if crt.ChangeType == roachpb.REMOVE_REPLICA && crt.Replica.ReplicaID == r.mu.replicaID { + replID := p.command.ProposerReplica.ReplicaID + if crt.ChangeType == roachpb.REMOVE_REPLICA && crt.Replica.ReplicaID == replID { msg := fmt.Sprintf("received invalid ChangeReplicasTrigger %s to remove self (leaseholder)", crt) log.Error(p.ctx, msg) return 0, roachpb.NewErrorf("%s: %s", r, msg) diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 1db9346563e0..99d9c9b9a230 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -6532,6 +6532,49 @@ func TestReplicaDestroy(t *testing.T) { } } +// TestQuotaPoolReleasedOnFailedProposal tests that the quota acquired by +// proposals is released back into the quota pool if the proposal fails before +// being submitted to Raft. +func TestQuotaPoolReleasedOnFailedProposal(t *testing.T) { + defer leaktest.AfterTest(t)() + tc := testContext{} + stopper := stop.NewStopper() + defer stopper.Stop(context.TODO()) + tc.Start(t, stopper) + + // Flush a write all the way through the Raft proposal pipeline to ensure + // that the replica becomes the Raft leader and sets up its quota pool. + iArgs := incrementArgs([]byte("a"), 1) + if _, pErr := tc.SendWrapped(&iArgs); pErr != nil { + t.Fatal(pErr) + } + + type magicKey struct{} + var minQuotaSize int64 + propErr := errors.New("proposal error") + + tc.repl.mu.Lock() + tc.repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) { + if v := p.ctx.Value(magicKey{}); v != nil { + minQuotaSize = tc.repl.mu.proposalQuota.approximateQuota() + p.quotaSize + return 0, propErr + } + return 0, nil + } + tc.repl.mu.Unlock() + + var ba roachpb.BatchRequest + pArg := putArgs(roachpb.Key("a"), make([]byte, 1<<10)) + ba.Add(&pArg) + ctx := context.WithValue(context.Background(), magicKey{}, "foo") + if _, pErr := tc.Sender().Send(ctx, ba); !testutils.IsPError(pErr, propErr.Error()) { + t.Fatalf("expected error %v, found %v", propErr, pErr) + } + if curQuota := tc.repl.QuotaAvailable(); curQuota < minQuotaSize { + t.Fatalf("proposal quota not released: found=%d, want=%d", curQuota, minQuotaSize) + } +} + // TestQuotaPoolAccessOnDestroyedReplica tests the occurrence of #17303 where // following a leader replica getting destroyed, the scheduling of // handleRaftReady twice on the replica would cause a panic when @@ -7280,13 +7323,13 @@ func TestReplicaRetryRaftProposal(t *testing.T) { var wrongLeaseIndex uint64 // populated below tc.repl.mu.Lock() - tc.repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64) { + tc.repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) { if v := p.ctx.Value(magicKey{}); v != nil { if curAttempt := atomic.AddInt32(&c, 1); curAttempt == 1 { - return wrongLeaseIndex + return wrongLeaseIndex, nil } } - return 0 + return 0, nil } tc.repl.mu.Unlock() @@ -7731,12 +7774,12 @@ func TestReplicaRefreshMultiple(t *testing.T) { t.Fatalf("test requires LeaseAppliedIndex >= 2 at this point, have %d", ai) } assigned := false - repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64) { + repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) { if p == proposal && !assigned { assigned = true - return ai - 1 + return ai - 1, nil } - return 0 + return 0, nil } repl.mu.Unlock() @@ -7877,16 +7920,16 @@ func TestFailureToProcessCommandClearsLocalResult(t *testing.T) { r := tc.repl r.mu.Lock() - r.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64) { + r.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) { // We're going to recognize the first time the commnand for the // EndTransaction is proposed and we're going to hackily decrease its // MaxLeaseIndex, so that the processing gets rejected further on. ut := p.Local.UpdatedTxns if atomic.LoadInt64(&proposalRecognized) == 0 && ut != nil && len(*ut) == 1 && (*ut)[0].ID == txn.ID { atomic.StoreInt64(&proposalRecognized, 1) - return p.command.MaxLeaseIndex - 1 + return p.command.MaxLeaseIndex - 1, nil } - return 0 + return 0, nil } r.mu.Unlock()