Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: release quota on failed Raft proposals #38632

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions pkg/storage/quota_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package storage

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -99,12 +100,16 @@ func (qp *quotaPool) addLocked(v int64) {
qp.quota <- v
}

func logSlowQuota(ctx context.Context, v int64, start time.Time) func() {
log.Warningf(ctx, "have been waiting %s attempting to acquire %s of proposal quota",
timeutil.Since(start), humanizeutil.IBytes(v))
func logSlowQuota(ctx context.Context, want, have int64, start time.Time) func() {
var haveStr string
if have > 0 {
haveStr = fmt.Sprintf("; acquired %s so far", humanizeutil.IBytes(have))
}
log.Warningf(ctx, "have been waiting %s attempting to acquire %s of proposal quota%s",
timeutil.Since(start), humanizeutil.IBytes(want), haveStr)
return func() {
log.Infof(ctx, "acquired %s of proposal quota after %s",
humanizeutil.IBytes(v), timeutil.Since(start))
humanizeutil.IBytes(want), timeutil.Since(start))
}
}

Expand Down Expand Up @@ -138,7 +143,7 @@ func (qp *quotaPool) acquire(ctx context.Context, v int64) error {
select {
case <-slowTimer.C:
slowTimer.Read = true
defer logSlowQuota(ctx, v, start)()
defer logSlowQuota(ctx, v, 0, start)()
continue
case <-ctx.Done():
qp.Lock()
Expand Down Expand Up @@ -183,7 +188,7 @@ func (qp *quotaPool) acquire(ctx context.Context, v int64) error {
select {
case <-slowTimer.C:
slowTimer.Read = true
defer logSlowQuota(ctx, v, start)()
defer logSlowQuota(ctx, v, acquired, start)()
case <-ctx.Done():
qp.Lock()
if acquired > 0 {
Expand Down
12 changes: 10 additions & 2 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,8 +571,16 @@ func (r *Replica) String() string {
return fmt.Sprintf("[n%d,s%d,r%s]", r.store.Ident.NodeID, r.store.Ident.StoreID, &r.rangeStr)
}

// cleanupFailedProposalLocked cleans up after a proposal that has failed. It
// cleanupFailedProposal cleans up after a proposal that has failed. It
// clears any references to the proposal and releases associated quota.
func (r *Replica) cleanupFailedProposal(p *ProposalData) {
r.mu.Lock()
defer r.mu.Unlock()
r.cleanupFailedProposalLocked(p)
}

// cleanupFailedProposalLocked is like cleanupFailedProposal, but requires
// the Replica mutex to be exclusively held.
func (r *Replica) cleanupFailedProposalLocked(p *ProposalData) {
// Clear the proposal from the proposals map. May be a no-op if the
// proposal has not yet been inserted into the map.
Expand Down Expand Up @@ -894,7 +902,7 @@ func (r *Replica) State() storagepb.RangeInfo {
defer r.mu.RUnlock()
ri.ReplicaState = *(protoutil.Clone(&r.mu.state)).(*storagepb.ReplicaState)
ri.LastIndex = r.mu.lastIndex
ri.NumPending = uint64(len(r.mu.proposals))
ri.NumPending = uint64(r.numPendingProposalsRLocked())
ri.RaftLogSize = r.mu.raftLogSize
ri.RaftLogSizeTrusted = r.mu.raftLogSizeTrusted
ri.NumDropped = uint64(r.mu.droppedMessages)
Expand Down
48 changes: 36 additions & 12 deletions pkg/storage/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,17 @@ func makePropBufCntReq(incLeaseIndex bool) propBufCntReq {
return r
}

// arrayLen returns the number of elements in the proposal buffer's array.
func (r propBufCntRes) arrayLen() int {
return int(r & (1<<32 - 1))
}

// arrayIndex returns the index into the proposal buffer that was reserved for
// the request. The returned index will be -1 if no index was reserved (e.g. by
// propBufCnt.read) and if the buffer is empty.
func (r propBufCntRes) arrayIndex() int {
// NB: -1 because the array is 0-indexed.
return int(r&(1<<32-1)) - 1
return r.arrayLen() - 1
}

// leaseIndexOffset returns the offset from the proposal buffer's current lease
Expand Down Expand Up @@ -131,12 +136,12 @@ type propBuf struct {
testing struct {
// leaseIndexFilter can be used by tests to override the max lease index
// assigned to a proposal by returning a non-zero lease index.
leaseIndexFilter func(*ProposalData) (indexOverride uint64)
leaseIndexFilter func(*ProposalData) (indexOverride uint64, err error)
// submitProposalFilter can be used by tests to observe and optionally
// drop Raft proposals before they are handed to etcd/raft to begin the
// process of replication. Dropped proposals are still eligible to be
// reproposed due to ticks.
submitProposalFilter func(*ProposalData) (drop bool)
submitProposalFilter func(*ProposalData) (drop bool, err error)
}
}

Expand All @@ -162,7 +167,7 @@ func (b *propBuf) Init(p proposer) {

// Len returns the number of proposals currently in the buffer.
func (b *propBuf) Len() int {
return b.cnt.read().arrayIndex() + 1
return b.cnt.read().arrayLen()
}

// LastAssignedLeaseIndexRLocked returns the last assigned lease index.
Expand Down Expand Up @@ -203,7 +208,9 @@ func (b *propBuf) Insert(p *ProposalData, data []byte) (uint64, error) {
// Assign the command's maximum lease index.
p.command.MaxLeaseIndex = b.liBase + res.leaseIndexOffset()
if filter := b.testing.leaseIndexFilter; filter != nil {
if override := filter(p); override != 0 {
if override, err := filter(p); err != nil {
return 0, err
} else if override != 0 {
p.command.MaxLeaseIndex = override
}
}
Expand Down Expand Up @@ -361,7 +368,7 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error {
// buffer. This ensures that we synchronize with all producers and other
// consumers.
res := b.cnt.clear()
used := res.arrayIndex() + 1
used := res.arrayLen()
// Before returning, consider resizing the proposal buffer's array,
// depending on how much of it was used before the current flush.
defer b.arr.adjustSize(used)
Expand All @@ -386,6 +393,11 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error {
// and apply them.
buf := b.arr.asSlice()[:used]
ents := make([]raftpb.Entry, 0, used)
// Remember the first error that we see when proposing the batch. We don't
// immediately return this error because we want to finish clearing out the
// buffer and registering each of the proposals with the proposer, but we
// stop trying to propose commands to raftGroup.
var firstErr error
for i, p := range buf {
buf[i] = nil // clear buffer

Expand All @@ -395,13 +407,19 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error {
// Potentially drop the proposal before passing it to etcd/raft, but
// only after performing necessary bookkeeping.
if filter := b.testing.submitProposalFilter; filter != nil {
if drop := filter(p); drop {
if drop, err := filter(p); drop || err != nil {
if firstErr == nil {
firstErr = err
}
continue
}
}

// If we don't have a raft group, we can't propose the command.
if raftGroup == nil {
// If we don't have a raft group or if the raft group has rejected one
// of the proposals, we don't try to propose any more proposals. The
// rest of the proposals will still be registered with the proposer, so
// they will eventually be reproposed.
if raftGroup == nil || firstErr != nil {
continue
}

Expand All @@ -411,7 +429,8 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error {
// preserve the correct ordering or proposals. Later proposals
// will start a new batch.
if err := proposeBatch(raftGroup, b.p.replicaID(), ents); err != nil {
return err
firstErr = err
continue
}
ents = ents[len(ents):]

Expand All @@ -422,7 +441,8 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error {
}
encodedCtx, err := protoutil.Marshal(&confChangeCtx)
if err != nil {
return err
firstErr = err
continue
}

if err := raftGroup.ProposeConfChange(raftpb.ConfChange{
Expand All @@ -434,7 +454,8 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error {
// ignored prior to the introduction of ErrProposalDropped).
// TODO(bdarnell): Handle ErrProposalDropped better.
// https://github.com/cockroachdb/cockroach/issues/21849
return err
firstErr = err
continue
}
} else {
// Add to the batch of entries that will soon be proposed. It is
Expand All @@ -451,6 +472,9 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error {
})
}
}
if firstErr != nil {
return firstErr
}
return proposeBatch(raftGroup, b.p.replicaID(), ents)
}

Expand Down
33 changes: 33 additions & 0 deletions pkg/storage/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,36 @@ func TestProposalBufferConcurrentWithDestroy(t *testing.T) {
require.Nil(t, g.Wait())
t.Logf("%d successful proposals before destroy", len(mlais))
}

// TestProposalBufferRegistersAllOnProposalError tests that all proposals in the
// proposal buffer are registered with the proposer when the buffer is flushed,
// even if an error is seen when proposing a batch of entries.
func TestProposalBufferRegistersAllOnProposalError(t *testing.T) {
defer leaktest.AfterTest(t)()

var p testProposer
var b propBuf
b.Init(&p)

num := propBufArrayMinSize
for i := 0; i < num; i++ {
pd, data := newPropData(false)
_, err := b.Insert(pd, data)
require.Nil(t, err)
}
require.Equal(t, num, b.Len())

propNum := 0
propErr := errors.New("failed proposal")
b.testing.submitProposalFilter = func(*ProposalData) (drop bool, err error) {
propNum++
require.Equal(t, propNum, p.registered)
if propNum == 2 {
return false, propErr
}
return false, nil
}
err := b.flushLocked()
require.Equal(t, propErr, err)
require.Equal(t, num, p.registered)
}
29 changes: 22 additions & 7 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,14 @@ func (r *Replica) evalAndPropose(
if err := r.maybeAcquireProposalQuota(ctx, proposal.quotaSize); err != nil {
return nil, nil, 0, roachpb.NewError(err)
}
// Make sure we clean up the proposal if we fail to insert it into the
// proposal buffer successfully. This ensures that we always release any
// quota that we acquire.
defer func() {
if pErr != nil {
r.cleanupFailedProposal(proposal)
}
}()

if filter := r.store.TestingKnobs().TestingProposalFilter; filter != nil {
filterArgs := storagebase.ProposalFilterArgs{
Expand Down Expand Up @@ -230,11 +238,13 @@ func (r *Replica) evalAndPropose(
return proposalCh, abandon, maxLeaseIndex, nil
}

// propose starts tracking a command and proposes it to raft. If
// this method succeeds, the caller is responsible for eventually
// removing the proposal from the pending map (on success, in
// processRaftCommand, or on failure via cleanupFailedProposalLocked).
func (r *Replica) propose(ctx context.Context, p *ProposalData) (_ int64, pErr *roachpb.Error) {
// propose encodes a command, starts tracking it, and proposes it to raft. The
// method is also responsible for assigning the command its maximum lease index.
//
// The method hands ownership of the command over to the Raft machinery. After
// the method returns, all access to the command must be performed while holding
// Replica.mu and Replica.raftMu.
func (r *Replica) propose(ctx context.Context, p *ProposalData) (int64, *roachpb.Error) {
// Make sure the maximum lease index is unset. This field will be set in
// propBuf.Insert and its encoded bytes will be appended to the encoding
// buffer as a RaftCommandFooter.
Expand All @@ -258,7 +268,8 @@ func (r *Replica) propose(ctx context.Context, p *ProposalData) (_ int64, pErr *
// leases can stay in such a state for a very long time when using epoch-
// based range leases). This shouldn't happen often, but has been seen
// before (#12591).
if crt.ChangeType == roachpb.REMOVE_REPLICA && crt.Replica.ReplicaID == r.mu.replicaID {
replID := p.command.ProposerReplica.ReplicaID
if crt.ChangeType == roachpb.REMOVE_REPLICA && crt.Replica.ReplicaID == replID {
msg := fmt.Sprintf("received invalid ChangeReplicasTrigger %s to remove self (leaseholder)", crt)
log.Error(p.ctx, msg)
return 0, roachpb.NewErrorf("%s: %s", r, msg)
Expand Down Expand Up @@ -328,8 +339,12 @@ func (r *Replica) propose(ctx context.Context, p *ProposalData) (_ int64, pErr *
return int64(maxLeaseIndex), nil
}

func (r *Replica) numPendingProposalsRLocked() int {
return len(r.mu.proposals) + r.mu.proposalBuf.Len()
}

func (r *Replica) hasPendingProposalsRLocked() bool {
return len(r.mu.proposals) > 0 || r.mu.proposalBuf.Len() > 0
return r.numPendingProposalsRLocked() > 0
}

// stepRaftGroup calls Step on the replica's RawNode with the provided request's
Expand Down
Loading