Skip to content

Commit

Permalink
storage: initialize the proposalQuotaBaseIndex from Applied
Browse files Browse the repository at this point in the history
This commit changes the initialization of `proposalQuotaBaseIndex` from
`lastIndex` which may include entries which are not yet committed to
`status.Applied`, the highest applied index. Given the
`proposalQuotaBaseIndex` should account for all committed proposals whose quota
has been released and proposals add their quota to the release queue after they
have been committed, it's important that the that base index not be too high
lest we leave quota in the queue.

This commit also adds an assertion that the `proposalQuotaBaseIndex` plus the
length of the queue is exactly equal to the applied index. In order to maintain
this invariant, the commit ensures that we enqueue a zero-value release to the
release queue for empty commands and commands proposed on another node.

See cockroachdb#39022 (comment)
for more details.

Fixes cockroachdb#39022.

Release note (bug fix): Properly initialize proposal quota tracking to prevent
quota leak which can hang imports or other AddSSTable operations.
  • Loading branch information
ajwerner committed Jul 30, 2019
1 parent 69f874d commit 43949c6
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 30 deletions.
4 changes: 2 additions & 2 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2168,8 +2168,8 @@ func TestQuotaPool(t *testing.T) {
}

testutils.SucceedsSoon(t, func() error {
if qLen := leaderRepl.QuotaReleaseQueueLen(); qLen != 1 {
return errors.Errorf("expected 1 queued quota release, found: %d", qLen)
if qLen := leaderRepl.QuotaReleaseQueueLen(); qLen < 1 {
return errors.Errorf("expected at least 1 queued quota release, found: %d", qLen)
}
return nil
})
Expand Down
8 changes: 7 additions & 1 deletion pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft"
)

// AddReplica adds the replica to the store's replica map and to the sorted
Expand Down Expand Up @@ -265,8 +266,13 @@ func (r *Replica) LastAssignedLeaseIndex() uint64 {
func (r *Replica) InitQuotaPool(quota int64) {
r.mu.Lock()
defer r.mu.Unlock()
var appliedIndex uint64
_ = r.withRaftGroupLocked(false, func(r *raft.RawNode) (unquiesceAndWakeLeader bool, err error) {
appliedIndex = r.BasicStatus().Applied
return false, nil
})

r.mu.proposalQuotaBaseIndex = r.mu.lastIndex
r.mu.proposalQuotaBaseIndex = appliedIndex
if r.mu.proposalQuota != nil {
r.mu.proposalQuota.close()
}
Expand Down
45 changes: 21 additions & 24 deletions pkg/storage/replica_application.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,45 +172,42 @@ func (r *Replica) retrieveLocalProposals(ctx context.Context, b *cmdAppBatch) {
cmd.ctx = ctx
}
}
if !anyLocal {
if !anyLocal && r.mu.proposalQuota == nil {
// Fast-path.
return
}
for ok := it.init(&b.cmdBuf); ok; ok = it.next() {
cmd := it.cur()
if !cmd.proposedLocally() {
continue
}
if cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex {
var toRelease int64
shouldRemove := anyLocal && cmd.proposedLocally() &&
// If this entry does not have the most up-to-date view of the
// corresponding proposal's maximum lease index then the proposal
// must have been reproposed with a higher lease index. (see
// tryReproposeWithNewLeaseIndex). In that case, there's a newer
// version of the proposal in the pipeline, so don't remove the
// proposal from the map. We expect this entry to be rejected by
// checkForcedErr.
continue
}
// Delete the proposal from the proposals map. There may be reproposals
// of the proposal in the pipeline, but those will all have the same max
// lease index, meaning that they will all be rejected after this entry
// applies (successfully or otherwise). If tryReproposeWithNewLeaseIndex
// picks up the proposal on failure, it will re-add the proposal to the
// proposal map, but this won't affect anything in this cmdAppBatch.
//
// While here, add the proposal's quota size to the quota release queue.
// We check the proposal map again first to avoid double free-ing quota
// when reproposals from the same proposal end up in the same entry
// application batch.
if _, ok := r.mu.proposals[cmd.idKey]; !ok {
continue
}
delete(r.mu.proposals, cmd.idKey)
cmd.raftCmd.MaxLeaseIndex == cmd.proposal.command.MaxLeaseIndex
if shouldRemove {
// Delete the proposal from the proposals map. There may be reproposals
// of the proposal in the pipeline, but those will all have the same max
// lease index, meaning that they will all be rejected after this entry
// applies (successfully or otherwise). If tryReproposeWithNewLeaseIndex
// picks up the proposal on failure, it will re-add the proposal to the
// proposal map, but this won't affect anything in this cmdAppBatch.
//
// While here, add the proposal's quota size to the quota release queue.
// We check the proposal map again first to avoid double free-ing quota
// when reproposals from the same proposal end up in the same entry
// application batch.
delete(r.mu.proposals, cmd.idKey)
toRelease = cmd.proposal.quotaSize
}
// At this point we're not guaranteed to have proposalQuota initialized,
// the same is true for quotaReleaseQueues. Only queue the proposal's
// quota for release if the proposalQuota is initialized.
// quota for release if the proposalQuota is initialized
if r.mu.proposalQuota != nil {
r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, cmd.proposal.quotaSize)
r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, toRelease)
}
}
}
Expand Down
17 changes: 14 additions & 3 deletions pkg/storage/replica_proposal_quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,12 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(
return
}

status := r.mu.internalRaftGroup.BasicStatus()
if r.mu.leaderID != lastLeaderID {
if r.mu.replicaID == r.mu.leaderID {
// We're becoming the leader.
r.mu.proposalQuotaBaseIndex = r.mu.lastIndex

// Initialize the proposalQuotaBaseIndex as the current committed index.
r.mu.proposalQuotaBaseIndex = status.Applied
if r.mu.proposalQuota != nil {
log.Fatal(ctx, "proposalQuota was not nil before becoming the leader")
}
Expand Down Expand Up @@ -145,7 +146,6 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(

// Find the minimum index that active followers have acknowledged.
now := timeutil.Now()
status := r.mu.internalRaftGroup.BasicStatus()
commitIndex, minIndex := status.Commit, status.Commit
r.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, progress tracker.Progress) {
rep, ok := r.mu.state.Desc.GetReplicaDescriptorByID(roachpb.ReplicaID(id))
Expand Down Expand Up @@ -240,4 +240,15 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(

r.mu.proposalQuota.add(sum)
}
// Assert the sanity of the base index and the queue. Queue entries should
// correspond to applied entries. It should not be possible for the base
// index and the not yet released applied entries to not equal the applied
// index.
releasableIndex := r.mu.proposalQuotaBaseIndex + uint64(len(r.mu.quotaReleaseQueue))
if releasableIndex != status.Applied {
log.Fatalf(ctx, "proposalQuotaBaseIndex (%d) + quotaReleaseQueueLen (%d) = %d"+
" must equal the applied index (%d)",
r.mu.proposalQuotaBaseIndex, len(r.mu.quotaReleaseQueue), releasableIndex,
status.Applied)
}
}

0 comments on commit 43949c6

Please sign in to comment.