Skip to content

Commit

Permalink
Merge #38632
Browse files Browse the repository at this point in the history
38632: storage: release quota on failed Raft proposals r=tbg a=nvanbenschoten

Fixes #34180.
Fixes #35493.
Fixes #36983.
Fixes #37108.
Fixes #37371.
Fixes #37384.
Fixes #37551.
Fixes #37879.
Fixes #38095.
Fixes #38131.
Fixes #38136.
Fixes #38549.
Fixes #38552.
Fixes #38555.
Fixes #38560.
Fixes #38562.
Fixes #38563.
Fixes #38569.
Fixes #38578.
Fixes #38600.

_A lot of the early issues fixed by this had previous failures, but nothing very recent or actionable. I think it's worth closing them now that they should be fixed in the short term._

This fixes a bug introduced in 1ff3556 where Raft proposal quota is no longer released when `Replica.propose` fails. This used to happen [here](1ff3556#diff-4315c7ebf8b8bf7bda469e1e7be82690L316), but that code was accidentally lost in the rewrite.

I tracked this down by running a series of `import/tpch/nodes=4` and `scrub/all-checks/tpcc/w=100` roachtests. About half the time, the import would stall after a few hours and the roachtest health reports would start logging lines like: `n1/s1  2.00  metrics  requests.slow.latch`. I tracked the stalled latch acquisition to a stalled proposal quota acquisition by a conflicting command. The range debug page showed the following:

![Screenshot_2019-07-01 r56 Range Debug Cockroach Console](https://user-images.githubusercontent.com/5438456/60554197-8519c780-9d04-11e9-8cf5-6c46ffbcf820.png)

We see that the Leaseholder of the Range has no pending commands but also no available proposal quota. This indicates a proposal quota leak, which led to me finding the lost release in this error case.

The (now confirmed) theory for what went wrong in these roachtests is that they are performing imports, which generate a large number of AddSSTRequests. These requests are typically larger than the available proposal quota for a range, meaning that they request all of its available quota. The effect of this is that if even a single byte of quota is leaked, the entire range will seize up and stall when an AddSSTRequests is issued. Instrumentation revealed that a ChangeReplicas request with a quota size equal to the leaked amount was failing due to the error:
```
received invalid ChangeReplicasTrigger REMOVE_REPLICA((n3,s3):3): updated=[(n1,s1):1 (n4,s4):2 (n2,s2):4] next=5 to remove self (leaseholder)
```
Because of the missing error handling, this quota was not being released back into the pool, causing future requests to get stuck indefinitely waiting for leaked quota, stalling the entire import.

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Jul 3, 2019
2 parents 04bb2d7 + ba3813c commit 40aac81
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 54 deletions.
17 changes: 11 additions & 6 deletions pkg/storage/quota_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package storage

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -99,12 +100,16 @@ func (qp *quotaPool) addLocked(v int64) {
qp.quota <- v
}

func logSlowQuota(ctx context.Context, v int64, start time.Time) func() {
log.Warningf(ctx, "have been waiting %s attempting to acquire %s of proposal quota",
timeutil.Since(start), humanizeutil.IBytes(v))
func logSlowQuota(ctx context.Context, want, have int64, start time.Time) func() {
var haveStr string
if have > 0 {
haveStr = fmt.Sprintf("; acquired %s so far", humanizeutil.IBytes(have))
}
log.Warningf(ctx, "have been waiting %s attempting to acquire %s of proposal quota%s",
timeutil.Since(start), humanizeutil.IBytes(want), haveStr)
return func() {
log.Infof(ctx, "acquired %s of proposal quota after %s",
humanizeutil.IBytes(v), timeutil.Since(start))
humanizeutil.IBytes(want), timeutil.Since(start))
}
}

Expand Down Expand Up @@ -138,7 +143,7 @@ func (qp *quotaPool) acquire(ctx context.Context, v int64) error {
select {
case <-slowTimer.C:
slowTimer.Read = true
defer logSlowQuota(ctx, v, start)()
defer logSlowQuota(ctx, v, 0, start)()
continue
case <-ctx.Done():
qp.Lock()
Expand Down Expand Up @@ -183,7 +188,7 @@ func (qp *quotaPool) acquire(ctx context.Context, v int64) error {
select {
case <-slowTimer.C:
slowTimer.Read = true
defer logSlowQuota(ctx, v, start)()
defer logSlowQuota(ctx, v, acquired, start)()
case <-ctx.Done():
qp.Lock()
if acquired > 0 {
Expand Down
12 changes: 10 additions & 2 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,8 +571,16 @@ func (r *Replica) String() string {
return fmt.Sprintf("[n%d,s%d,r%s]", r.store.Ident.NodeID, r.store.Ident.StoreID, &r.rangeStr)
}

// cleanupFailedProposalLocked cleans up after a proposal that has failed. It
// cleanupFailedProposal cleans up after a proposal that has failed. It
// clears any references to the proposal and releases associated quota.
func (r *Replica) cleanupFailedProposal(p *ProposalData) {
r.mu.Lock()
defer r.mu.Unlock()
r.cleanupFailedProposalLocked(p)
}

// cleanupFailedProposalLocked is like cleanupFailedProposal, but requires
// the Replica mutex to be exclusively held.
func (r *Replica) cleanupFailedProposalLocked(p *ProposalData) {
// Clear the proposal from the proposals map. May be a no-op if the
// proposal has not yet been inserted into the map.
Expand Down Expand Up @@ -894,7 +902,7 @@ func (r *Replica) State() storagepb.RangeInfo {
defer r.mu.RUnlock()
ri.ReplicaState = *(protoutil.Clone(&r.mu.state)).(*storagepb.ReplicaState)
ri.LastIndex = r.mu.lastIndex
ri.NumPending = uint64(len(r.mu.proposals))
ri.NumPending = uint64(r.numPendingProposalsRLocked())
ri.RaftLogSize = r.mu.raftLogSize
ri.RaftLogSizeTrusted = r.mu.raftLogSizeTrusted
ri.NumDropped = uint64(r.mu.droppedMessages)
Expand Down
48 changes: 36 additions & 12 deletions pkg/storage/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,17 @@ func makePropBufCntReq(incLeaseIndex bool) propBufCntReq {
return r
}

// arrayLen returns the number of elements in the proposal buffer's array.
func (r propBufCntRes) arrayLen() int {
return int(r & (1<<32 - 1))
}

// arrayIndex returns the index into the proposal buffer that was reserved for
// the request. The returned index will be -1 if no index was reserved (e.g. by
// propBufCnt.read) and if the buffer is empty.
func (r propBufCntRes) arrayIndex() int {
// NB: -1 because the array is 0-indexed.
return int(r&(1<<32-1)) - 1
return r.arrayLen() - 1
}

// leaseIndexOffset returns the offset from the proposal buffer's current lease
Expand Down Expand Up @@ -131,12 +136,12 @@ type propBuf struct {
testing struct {
// leaseIndexFilter can be used by tests to override the max lease index
// assigned to a proposal by returning a non-zero lease index.
leaseIndexFilter func(*ProposalData) (indexOverride uint64)
leaseIndexFilter func(*ProposalData) (indexOverride uint64, err error)
// submitProposalFilter can be used by tests to observe and optionally
// drop Raft proposals before they are handed to etcd/raft to begin the
// process of replication. Dropped proposals are still eligible to be
// reproposed due to ticks.
submitProposalFilter func(*ProposalData) (drop bool)
submitProposalFilter func(*ProposalData) (drop bool, err error)
}
}

Expand All @@ -162,7 +167,7 @@ func (b *propBuf) Init(p proposer) {

// Len returns the number of proposals currently in the buffer.
func (b *propBuf) Len() int {
return b.cnt.read().arrayIndex() + 1
return b.cnt.read().arrayLen()
}

// LastAssignedLeaseIndexRLocked returns the last assigned lease index.
Expand Down Expand Up @@ -203,7 +208,9 @@ func (b *propBuf) Insert(p *ProposalData, data []byte) (uint64, error) {
// Assign the command's maximum lease index.
p.command.MaxLeaseIndex = b.liBase + res.leaseIndexOffset()
if filter := b.testing.leaseIndexFilter; filter != nil {
if override := filter(p); override != 0 {
if override, err := filter(p); err != nil {
return 0, err
} else if override != 0 {
p.command.MaxLeaseIndex = override
}
}
Expand Down Expand Up @@ -361,7 +368,7 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error {
// buffer. This ensures that we synchronize with all producers and other
// consumers.
res := b.cnt.clear()
used := res.arrayIndex() + 1
used := res.arrayLen()
// Before returning, consider resizing the proposal buffer's array,
// depending on how much of it was used before the current flush.
defer b.arr.adjustSize(used)
Expand All @@ -386,6 +393,11 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error {
// and apply them.
buf := b.arr.asSlice()[:used]
ents := make([]raftpb.Entry, 0, used)
// Remember the first error that we see when proposing the batch. We don't
// immediately return this error because we want to finish clearing out the
// buffer and registering each of the proposals with the proposer, but we
// stop trying to propose commands to raftGroup.
var firstErr error
for i, p := range buf {
buf[i] = nil // clear buffer

Expand All @@ -395,13 +407,19 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error {
// Potentially drop the proposal before passing it to etcd/raft, but
// only after performing necessary bookkeeping.
if filter := b.testing.submitProposalFilter; filter != nil {
if drop := filter(p); drop {
if drop, err := filter(p); drop || err != nil {
if firstErr == nil {
firstErr = err
}
continue
}
}

// If we don't have a raft group, we can't propose the command.
if raftGroup == nil {
// If we don't have a raft group or if the raft group has rejected one
// of the proposals, we don't try to propose any more proposals. The
// rest of the proposals will still be registered with the proposer, so
// they will eventually be reproposed.
if raftGroup == nil || firstErr != nil {
continue
}

Expand All @@ -411,7 +429,8 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error {
// preserve the correct ordering or proposals. Later proposals
// will start a new batch.
if err := proposeBatch(raftGroup, b.p.replicaID(), ents); err != nil {
return err
firstErr = err
continue
}
ents = ents[len(ents):]

Expand All @@ -422,7 +441,8 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error {
}
encodedCtx, err := protoutil.Marshal(&confChangeCtx)
if err != nil {
return err
firstErr = err
continue
}

if err := raftGroup.ProposeConfChange(raftpb.ConfChange{
Expand All @@ -434,7 +454,8 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error {
// ignored prior to the introduction of ErrProposalDropped).
// TODO(bdarnell): Handle ErrProposalDropped better.
// https://github.com/cockroachdb/cockroach/issues/21849
return err
firstErr = err
continue
}
} else {
// Add to the batch of entries that will soon be proposed. It is
Expand All @@ -451,6 +472,9 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error {
})
}
}
if firstErr != nil {
return firstErr
}
return proposeBatch(raftGroup, b.p.replicaID(), ents)
}

Expand Down
33 changes: 33 additions & 0 deletions pkg/storage/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,36 @@ func TestProposalBufferConcurrentWithDestroy(t *testing.T) {
require.Nil(t, g.Wait())
t.Logf("%d successful proposals before destroy", len(mlais))
}

// TestProposalBufferRegistersAllOnProposalError tests that all proposals in the
// proposal buffer are registered with the proposer when the buffer is flushed,
// even if an error is seen when proposing a batch of entries.
func TestProposalBufferRegistersAllOnProposalError(t *testing.T) {
defer leaktest.AfterTest(t)()

var p testProposer
var b propBuf
b.Init(&p)

num := propBufArrayMinSize
for i := 0; i < num; i++ {
pd, data := newPropData(false)
_, err := b.Insert(pd, data)
require.Nil(t, err)
}
require.Equal(t, num, b.Len())

propNum := 0
propErr := errors.New("failed proposal")
b.testing.submitProposalFilter = func(*ProposalData) (drop bool, err error) {
propNum++
require.Equal(t, propNum, p.registered)
if propNum == 2 {
return false, propErr
}
return false, nil
}
err := b.flushLocked()
require.Equal(t, propErr, err)
require.Equal(t, num, p.registered)
}
29 changes: 22 additions & 7 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,14 @@ func (r *Replica) evalAndPropose(
if err := r.maybeAcquireProposalQuota(ctx, proposal.quotaSize); err != nil {
return nil, nil, 0, roachpb.NewError(err)
}
// Make sure we clean up the proposal if we fail to insert it into the
// proposal buffer successfully. This ensures that we always release any
// quota that we acquire.
defer func() {
if pErr != nil {
r.cleanupFailedProposal(proposal)
}
}()

if filter := r.store.TestingKnobs().TestingProposalFilter; filter != nil {
filterArgs := storagebase.ProposalFilterArgs{
Expand Down Expand Up @@ -230,11 +238,13 @@ func (r *Replica) evalAndPropose(
return proposalCh, abandon, maxLeaseIndex, nil
}

// propose starts tracking a command and proposes it to raft. If
// this method succeeds, the caller is responsible for eventually
// removing the proposal from the pending map (on success, in
// processRaftCommand, or on failure via cleanupFailedProposalLocked).
func (r *Replica) propose(ctx context.Context, p *ProposalData) (_ int64, pErr *roachpb.Error) {
// propose encodes a command, starts tracking it, and proposes it to raft. The
// method is also responsible for assigning the command its maximum lease index.
//
// The method hands ownership of the command over to the Raft machinery. After
// the method returns, all access to the command must be performed while holding
// Replica.mu and Replica.raftMu.
func (r *Replica) propose(ctx context.Context, p *ProposalData) (int64, *roachpb.Error) {
// Make sure the maximum lease index is unset. This field will be set in
// propBuf.Insert and its encoded bytes will be appended to the encoding
// buffer as a RaftCommandFooter.
Expand All @@ -258,7 +268,8 @@ func (r *Replica) propose(ctx context.Context, p *ProposalData) (_ int64, pErr *
// leases can stay in such a state for a very long time when using epoch-
// based range leases). This shouldn't happen often, but has been seen
// before (#12591).
if crt.ChangeType == roachpb.REMOVE_REPLICA && crt.Replica.ReplicaID == r.mu.replicaID {
replID := p.command.ProposerReplica.ReplicaID
if crt.ChangeType == roachpb.REMOVE_REPLICA && crt.Replica.ReplicaID == replID {
msg := fmt.Sprintf("received invalid ChangeReplicasTrigger %s to remove self (leaseholder)", crt)
log.Error(p.ctx, msg)
return 0, roachpb.NewErrorf("%s: %s", r, msg)
Expand Down Expand Up @@ -328,8 +339,12 @@ func (r *Replica) propose(ctx context.Context, p *ProposalData) (_ int64, pErr *
return int64(maxLeaseIndex), nil
}

func (r *Replica) numPendingProposalsRLocked() int {
return len(r.mu.proposals) + r.mu.proposalBuf.Len()
}

func (r *Replica) hasPendingProposalsRLocked() bool {
return len(r.mu.proposals) > 0 || r.mu.proposalBuf.Len() > 0
return r.numPendingProposalsRLocked() > 0
}

// stepRaftGroup calls Step on the replica's RawNode with the provided request's
Expand Down
Loading

0 comments on commit 40aac81

Please sign in to comment.