diff --git a/pkg/storage/quota_pool.go b/pkg/storage/quota_pool.go index f9229e91febe..4b55b78fda9f 100644 --- a/pkg/storage/quota_pool.go +++ b/pkg/storage/quota_pool.go @@ -25,6 +25,7 @@ package storage import ( "context" + "fmt" "time" "github.com/cockroachdb/cockroach/pkg/base" @@ -99,12 +100,16 @@ func (qp *quotaPool) addLocked(v int64) { qp.quota <- v } -func logSlowQuota(ctx context.Context, v int64, start time.Time) func() { - log.Warningf(ctx, "have been waiting %s attempting to acquire %s of proposal quota", - timeutil.Since(start), humanizeutil.IBytes(v)) +func logSlowQuota(ctx context.Context, want, have int64, start time.Time) func() { + var haveStr string + if have > 0 { + haveStr = fmt.Sprintf("; acquired %s so far", humanizeutil.IBytes(have)) + } + log.Warningf(ctx, "have been waiting %s attempting to acquire %s of proposal quota%s", + timeutil.Since(start), humanizeutil.IBytes(want), haveStr) return func() { log.Infof(ctx, "acquired %s of proposal quota after %s", - humanizeutil.IBytes(v), timeutil.Since(start)) + humanizeutil.IBytes(want), timeutil.Since(start)) } } @@ -138,7 +143,7 @@ func (qp *quotaPool) acquire(ctx context.Context, v int64) error { select { case <-slowTimer.C: slowTimer.Read = true - defer logSlowQuota(ctx, v, start)() + defer logSlowQuota(ctx, v, 0, start)() continue case <-ctx.Done(): qp.Lock() @@ -183,7 +188,7 @@ func (qp *quotaPool) acquire(ctx context.Context, v int64) error { select { case <-slowTimer.C: slowTimer.Read = true - defer logSlowQuota(ctx, v, start)() + defer logSlowQuota(ctx, v, acquired, start)() case <-ctx.Done(): qp.Lock() if acquired > 0 { diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 07df7aba3df2..284e2fc8b361 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -571,8 +571,16 @@ func (r *Replica) String() string { return fmt.Sprintf("[n%d,s%d,r%s]", r.store.Ident.NodeID, r.store.Ident.StoreID, &r.rangeStr) } -// cleanupFailedProposalLocked cleans up after a proposal that has failed. It +// cleanupFailedProposal cleans up after a proposal that has failed. It // clears any references to the proposal and releases associated quota. +func (r *Replica) cleanupFailedProposal(p *ProposalData) { + r.mu.Lock() + defer r.mu.Unlock() + r.cleanupFailedProposalLocked(p) +} + +// cleanupFailedProposalLocked is like cleanupFailedProposal, but requires +// the Replica mutex to be exclusively held. func (r *Replica) cleanupFailedProposalLocked(p *ProposalData) { // Clear the proposal from the proposals map. May be a no-op if the // proposal has not yet been inserted into the map. @@ -894,7 +902,7 @@ func (r *Replica) State() storagepb.RangeInfo { defer r.mu.RUnlock() ri.ReplicaState = *(protoutil.Clone(&r.mu.state)).(*storagepb.ReplicaState) ri.LastIndex = r.mu.lastIndex - ri.NumPending = uint64(len(r.mu.proposals)) + ri.NumPending = uint64(r.numPendingProposalsRLocked()) ri.RaftLogSize = r.mu.raftLogSize ri.RaftLogSizeTrusted = r.mu.raftLogSizeTrusted ri.NumDropped = uint64(r.mu.droppedMessages) diff --git a/pkg/storage/replica_proposal_buf.go b/pkg/storage/replica_proposal_buf.go index f77fbf043d0a..93a7814af2ae 100644 --- a/pkg/storage/replica_proposal_buf.go +++ b/pkg/storage/replica_proposal_buf.go @@ -68,12 +68,17 @@ func makePropBufCntReq(incLeaseIndex bool) propBufCntReq { return r } +// arrayLen returns the number of elements in the proposal buffer's array. +func (r propBufCntRes) arrayLen() int { + return int(r & (1<<32 - 1)) +} + // arrayIndex returns the index into the proposal buffer that was reserved for // the request. The returned index will be -1 if no index was reserved (e.g. by // propBufCnt.read) and if the buffer is empty. func (r propBufCntRes) arrayIndex() int { // NB: -1 because the array is 0-indexed. - return int(r&(1<<32-1)) - 1 + return r.arrayLen() - 1 } // leaseIndexOffset returns the offset from the proposal buffer's current lease @@ -131,12 +136,12 @@ type propBuf struct { testing struct { // leaseIndexFilter can be used by tests to override the max lease index // assigned to a proposal by returning a non-zero lease index. - leaseIndexFilter func(*ProposalData) (indexOverride uint64) + leaseIndexFilter func(*ProposalData) (indexOverride uint64, err error) // submitProposalFilter can be used by tests to observe and optionally // drop Raft proposals before they are handed to etcd/raft to begin the // process of replication. Dropped proposals are still eligible to be // reproposed due to ticks. - submitProposalFilter func(*ProposalData) (drop bool) + submitProposalFilter func(*ProposalData) (drop bool, err error) } } @@ -162,7 +167,7 @@ func (b *propBuf) Init(p proposer) { // Len returns the number of proposals currently in the buffer. func (b *propBuf) Len() int { - return b.cnt.read().arrayIndex() + 1 + return b.cnt.read().arrayLen() } // LastAssignedLeaseIndexRLocked returns the last assigned lease index. @@ -203,7 +208,9 @@ func (b *propBuf) Insert(p *ProposalData, data []byte) (uint64, error) { // Assign the command's maximum lease index. p.command.MaxLeaseIndex = b.liBase + res.leaseIndexOffset() if filter := b.testing.leaseIndexFilter; filter != nil { - if override := filter(p); override != 0 { + if override, err := filter(p); err != nil { + return 0, err + } else if override != 0 { p.command.MaxLeaseIndex = override } } @@ -361,7 +368,7 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error { // buffer. This ensures that we synchronize with all producers and other // consumers. res := b.cnt.clear() - used := res.arrayIndex() + 1 + used := res.arrayLen() // Before returning, consider resizing the proposal buffer's array, // depending on how much of it was used before the current flush. defer b.arr.adjustSize(used) @@ -386,6 +393,11 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error { // and apply them. buf := b.arr.asSlice()[:used] ents := make([]raftpb.Entry, 0, used) + // Remember the first error that we see when proposing the batch. We don't + // immediately return this error because we want to finish clearing out the + // buffer and registering each of the proposals with the proposer, but we + // stop trying to propose commands to raftGroup. + var firstErr error for i, p := range buf { buf[i] = nil // clear buffer @@ -395,13 +407,19 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error { // Potentially drop the proposal before passing it to etcd/raft, but // only after performing necessary bookkeeping. if filter := b.testing.submitProposalFilter; filter != nil { - if drop := filter(p); drop { + if drop, err := filter(p); drop || err != nil { + if firstErr == nil { + firstErr = err + } continue } } - // If we don't have a raft group, we can't propose the command. - if raftGroup == nil { + // If we don't have a raft group or if the raft group has rejected one + // of the proposals, we don't try to propose any more proposals. The + // rest of the proposals will still be registered with the proposer, so + // they will eventually be reproposed. + if raftGroup == nil || firstErr != nil { continue } @@ -411,7 +429,8 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error { // preserve the correct ordering or proposals. Later proposals // will start a new batch. if err := proposeBatch(raftGroup, b.p.replicaID(), ents); err != nil { - return err + firstErr = err + continue } ents = ents[len(ents):] @@ -422,7 +441,8 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error { } encodedCtx, err := protoutil.Marshal(&confChangeCtx) if err != nil { - return err + firstErr = err + continue } if err := raftGroup.ProposeConfChange(raftpb.ConfChange{ @@ -434,7 +454,8 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error { // ignored prior to the introduction of ErrProposalDropped). // TODO(bdarnell): Handle ErrProposalDropped better. // https://github.com/cockroachdb/cockroach/issues/21849 - return err + firstErr = err + continue } } else { // Add to the batch of entries that will soon be proposed. It is @@ -451,6 +472,9 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error { }) } } + if firstErr != nil { + return firstErr + } return proposeBatch(raftGroup, b.p.replicaID(), ents) } diff --git a/pkg/storage/replica_proposal_buf_test.go b/pkg/storage/replica_proposal_buf_test.go index c70e91320d62..877d18359734 100644 --- a/pkg/storage/replica_proposal_buf_test.go +++ b/pkg/storage/replica_proposal_buf_test.go @@ -225,3 +225,36 @@ func TestProposalBufferConcurrentWithDestroy(t *testing.T) { require.Nil(t, g.Wait()) t.Logf("%d successful proposals before destroy", len(mlais)) } + +// TestProposalBufferRegistersAllOnProposalError tests that all proposals in the +// proposal buffer are registered with the proposer when the buffer is flushed, +// even if an error is seen when proposing a batch of entries. +func TestProposalBufferRegistersAllOnProposalError(t *testing.T) { + defer leaktest.AfterTest(t)() + + var p testProposer + var b propBuf + b.Init(&p) + + num := propBufArrayMinSize + for i := 0; i < num; i++ { + pd, data := newPropData(false) + _, err := b.Insert(pd, data) + require.Nil(t, err) + } + require.Equal(t, num, b.Len()) + + propNum := 0 + propErr := errors.New("failed proposal") + b.testing.submitProposalFilter = func(*ProposalData) (drop bool, err error) { + propNum++ + require.Equal(t, propNum, p.registered) + if propNum == 2 { + return false, propErr + } + return false, nil + } + err := b.flushLocked() + require.Equal(t, propErr, err) + require.Equal(t, num, p.registered) +} diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index b884337ff44d..e55999d4ca7a 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -190,6 +190,14 @@ func (r *Replica) evalAndPropose( if err := r.maybeAcquireProposalQuota(ctx, proposal.quotaSize); err != nil { return nil, nil, 0, roachpb.NewError(err) } + // Make sure we clean up the proposal if we fail to insert it into the + // proposal buffer successfully. This ensures that we always release any + // quota that we acquire. + defer func() { + if pErr != nil { + r.cleanupFailedProposal(proposal) + } + }() if filter := r.store.TestingKnobs().TestingProposalFilter; filter != nil { filterArgs := storagebase.ProposalFilterArgs{ @@ -230,11 +238,13 @@ func (r *Replica) evalAndPropose( return proposalCh, abandon, maxLeaseIndex, nil } -// propose starts tracking a command and proposes it to raft. If -// this method succeeds, the caller is responsible for eventually -// removing the proposal from the pending map (on success, in -// processRaftCommand, or on failure via cleanupFailedProposalLocked). -func (r *Replica) propose(ctx context.Context, p *ProposalData) (_ int64, pErr *roachpb.Error) { +// propose encodes a command, starts tracking it, and proposes it to raft. The +// method is also responsible for assigning the command its maximum lease index. +// +// The method hands ownership of the command over to the Raft machinery. After +// the method returns, all access to the command must be performed while holding +// Replica.mu and Replica.raftMu. +func (r *Replica) propose(ctx context.Context, p *ProposalData) (int64, *roachpb.Error) { // Make sure the maximum lease index is unset. This field will be set in // propBuf.Insert and its encoded bytes will be appended to the encoding // buffer as a RaftCommandFooter. @@ -258,7 +268,8 @@ func (r *Replica) propose(ctx context.Context, p *ProposalData) (_ int64, pErr * // leases can stay in such a state for a very long time when using epoch- // based range leases). This shouldn't happen often, but has been seen // before (#12591). - if crt.ChangeType == roachpb.REMOVE_REPLICA && crt.Replica.ReplicaID == r.mu.replicaID { + replID := p.command.ProposerReplica.ReplicaID + if crt.ChangeType == roachpb.REMOVE_REPLICA && crt.Replica.ReplicaID == replID { msg := fmt.Sprintf("received invalid ChangeReplicasTrigger %s to remove self (leaseholder)", crt) log.Error(p.ctx, msg) return 0, roachpb.NewErrorf("%s: %s", r, msg) @@ -328,8 +339,12 @@ func (r *Replica) propose(ctx context.Context, p *ProposalData) (_ int64, pErr * return int64(maxLeaseIndex), nil } +func (r *Replica) numPendingProposalsRLocked() int { + return len(r.mu.proposals) + r.mu.proposalBuf.Len() +} + func (r *Replica) hasPendingProposalsRLocked() bool { - return len(r.mu.proposals) > 0 || r.mu.proposalBuf.Len() > 0 + return r.numPendingProposalsRLocked() > 0 } // stepRaftGroup calls Step on the replica's RawNode with the provided request's diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 7a0e463f8552..99d9c9b9a230 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -6532,6 +6532,49 @@ func TestReplicaDestroy(t *testing.T) { } } +// TestQuotaPoolReleasedOnFailedProposal tests that the quota acquired by +// proposals is released back into the quota pool if the proposal fails before +// being submitted to Raft. +func TestQuotaPoolReleasedOnFailedProposal(t *testing.T) { + defer leaktest.AfterTest(t)() + tc := testContext{} + stopper := stop.NewStopper() + defer stopper.Stop(context.TODO()) + tc.Start(t, stopper) + + // Flush a write all the way through the Raft proposal pipeline to ensure + // that the replica becomes the Raft leader and sets up its quota pool. + iArgs := incrementArgs([]byte("a"), 1) + if _, pErr := tc.SendWrapped(&iArgs); pErr != nil { + t.Fatal(pErr) + } + + type magicKey struct{} + var minQuotaSize int64 + propErr := errors.New("proposal error") + + tc.repl.mu.Lock() + tc.repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) { + if v := p.ctx.Value(magicKey{}); v != nil { + minQuotaSize = tc.repl.mu.proposalQuota.approximateQuota() + p.quotaSize + return 0, propErr + } + return 0, nil + } + tc.repl.mu.Unlock() + + var ba roachpb.BatchRequest + pArg := putArgs(roachpb.Key("a"), make([]byte, 1<<10)) + ba.Add(&pArg) + ctx := context.WithValue(context.Background(), magicKey{}, "foo") + if _, pErr := tc.Sender().Send(ctx, ba); !testutils.IsPError(pErr, propErr.Error()) { + t.Fatalf("expected error %v, found %v", propErr, pErr) + } + if curQuota := tc.repl.QuotaAvailable(); curQuota < minQuotaSize { + t.Fatalf("proposal quota not released: found=%d, want=%d", curQuota, minQuotaSize) + } +} + // TestQuotaPoolAccessOnDestroyedReplica tests the occurrence of #17303 where // following a leader replica getting destroyed, the scheduling of // handleRaftReady twice on the replica would cause a panic when @@ -6948,12 +6991,12 @@ func TestReplicaAbandonProposal(t *testing.T) { // Cancel the request before it is proposed to Raft. dropProp := int32(1) tc.repl.mu.Lock() - tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool) { + tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool, _ error) { if v := p.ctx.Value(magicKey{}); v != nil { cancel() - return atomic.LoadInt32(&dropProp) == 1 + return atomic.LoadInt32(&dropProp) == 1, nil } - return false + return false, nil } tc.repl.mu.Unlock() @@ -7172,12 +7215,12 @@ func TestReplicaIDChangePending(t *testing.T) { // Stop the command from being proposed to the raft group and being removed. proposedOnOld := make(chan struct{}, 1) repl.mu.Lock() - repl.mu.proposalBuf.testing.submitProposalFilter = func(*ProposalData) (drop bool) { + repl.mu.proposalBuf.testing.submitProposalFilter = func(*ProposalData) (drop bool, _ error) { select { case proposedOnOld <- struct{}{}: default: } - return true + return true, nil } lease := *repl.mu.state.Lease repl.mu.Unlock() @@ -7202,14 +7245,14 @@ func TestReplicaIDChangePending(t *testing.T) { // re-proposed. proposedOnNew := make(chan struct{}, 1) repl.mu.Lock() - repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool) { + repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool, _ error) { if p.Request.Timestamp == magicTS { select { case proposedOnNew <- struct{}{}: default: } } - return false + return false, nil } repl.mu.Unlock() @@ -7280,13 +7323,13 @@ func TestReplicaRetryRaftProposal(t *testing.T) { var wrongLeaseIndex uint64 // populated below tc.repl.mu.Lock() - tc.repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64) { + tc.repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) { if v := p.ctx.Value(magicKey{}); v != nil { if curAttempt := atomic.AddInt32(&c, 1); curAttempt == 1 { - return wrongLeaseIndex + return wrongLeaseIndex, nil } } - return 0 + return 0, nil } tc.repl.mu.Unlock() @@ -7379,12 +7422,12 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) { tc.repl.mu.Lock() lease := *repl.mu.state.Lease abandoned := make(map[int64]struct{}) // protected by repl.mu - tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool) { + tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool, _ error) { if _, ok := abandoned[int64(p.command.MaxLeaseIndex)]; ok { log.Infof(p.ctx, "abandoning command") - return true + return true, nil } - return false + return false, nil } tc.repl.mu.Unlock() @@ -7439,14 +7482,14 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { dropAll := int32(1) tc.repl.mu.Lock() lease := *tc.repl.mu.state.Lease - tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool) { + tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool, _ error) { if atomic.LoadInt32(&dropAll) == 1 { - return true + return true, nil } if v := p.ctx.Value(magicKey{}); v != nil { seenCmds = append(seenCmds, int(p.command.MaxLeaseIndex)) } - return false + return false, nil } tc.repl.mu.Unlock() @@ -7571,11 +7614,11 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { dropProposals.m = make(map[*ProposalData]struct{}) r.mu.Lock() - r.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool) { + r.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool, _ error) { dropProposals.Lock() defer dropProposals.Unlock() _, ok := dropProposals.m[p] - return ok + return ok, nil } r.mu.Unlock() @@ -7731,12 +7774,12 @@ func TestReplicaRefreshMultiple(t *testing.T) { t.Fatalf("test requires LeaseAppliedIndex >= 2 at this point, have %d", ai) } assigned := false - repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64) { + repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) { if p == proposal && !assigned { assigned = true - return ai - 1 + return ai - 1, nil } - return 0 + return 0, nil } repl.mu.Unlock() @@ -7877,16 +7920,16 @@ func TestFailureToProcessCommandClearsLocalResult(t *testing.T) { r := tc.repl r.mu.Lock() - r.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64) { + r.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) { // We're going to recognize the first time the commnand for the // EndTransaction is proposed and we're going to hackily decrease its // MaxLeaseIndex, so that the processing gets rejected further on. ut := p.Local.UpdatedTxns if atomic.LoadInt64(&proposalRecognized) == 0 && ut != nil && len(*ut) == 1 && (*ut)[0].ID == txn.ID { atomic.StoreInt64(&proposalRecognized, 1) - return p.command.MaxLeaseIndex - 1 + return p.command.MaxLeaseIndex - 1, nil } - return 0 + return 0, nil } r.mu.Unlock() @@ -8366,16 +8409,16 @@ func TestCancelPendingCommands(t *testing.T) { proposalDroppedCh := make(chan struct{}) proposalDropped := false tc.repl.mu.Lock() - tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool) { + tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool, _ error) { if _, ok := p.Request.GetArg(roachpb.Increment); ok { if !proposalDropped { // Notify the main thread the first time we drop a proposal. close(proposalDroppedCh) proposalDropped = true } - return true + return true, nil } - return false + return false, nil } tc.repl.mu.Unlock()