Skip to content

Commit

Permalink
Merge #39135
Browse files Browse the repository at this point in the history
39135: storage: initialize the proposalQuotaBaseIndex from Applied r=nvanbenschoten a=ajwerner

This commit changes the initialization of `proposalQuotaBaseIndex` from
`lastIndex` which may include entries which are not yet committed to
`status.Commit`, the highest committed index. Given the
`proposalQuotaBaseIndex` should account for all committed proposals whose quota
has been released and proposals add their quota to the release queue after they
have been committed, it's important that the that base index not be too high
lest we leave quota in the queue.

This commit also adds an assertion that the `proposalQuotaBaseIndex` plus the
length of the queue does not exceed the current committed index.

See #39022 (comment)
for more details.

This change did not hit this assertion in 10 runs of an import of TPC-C whereas
without it, the assertion was hit roughly 30% of the time.

Fixes #39022.

Release note (bug fix): Properly initialize proposal quota tracking to prevent
quota leak which can hang imports or other AddSSTable operations.

Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
craig[bot] and ajwerner committed Jul 30, 2019
2 parents 8a08c17 + 0af5e44 commit 940d2f2
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 50 deletions.
8 changes: 5 additions & 3 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2105,7 +2105,9 @@ func TestQuotaPool(t *testing.T) {
testutils.SucceedsSoon(t, assertEqualLastIndex)

leaderRepl := mtc.getRaftLeader(rangeID)
leaderRepl.InitQuotaPool(quota)
if err := leaderRepl.InitQuotaPool(quota); err != nil {
t.Fatalf("failed to initialize quota pool: %v", err)
}
followerRepl := func() *storage.Replica {
for _, store := range mtc.stores {
repl, err := store.GetReplica(rangeID)
Expand Down Expand Up @@ -2168,8 +2170,8 @@ func TestQuotaPool(t *testing.T) {
}

testutils.SucceedsSoon(t, func() error {
if qLen := leaderRepl.QuotaReleaseQueueLen(); qLen != 1 {
return errors.Errorf("expected 1 queued quota release, found: %d", qLen)
if qLen := leaderRepl.QuotaReleaseQueueLen(); qLen < 1 {
return errors.Errorf("expected at least 1 queued quota release, found: %d", qLen)
}
return nil
})
Expand Down
14 changes: 12 additions & 2 deletions pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft"
)

// AddReplica adds the replica to the store's replica map and to the sorted
Expand Down Expand Up @@ -262,16 +263,25 @@ func (r *Replica) LastAssignedLeaseIndex() uint64 {
// a given quota. Additionally it initializes the replica's quota release queue
// and its command sizes map. Only safe to call on the replica that is both
// lease holder and raft leader.
func (r *Replica) InitQuotaPool(quota int64) {
func (r *Replica) InitQuotaPool(quota int64) error {
r.mu.Lock()
defer r.mu.Unlock()
var appliedIndex uint64
err := r.withRaftGroupLocked(false, func(r *raft.RawNode) (unquiesceAndWakeLeader bool, err error) {
appliedIndex = r.BasicStatus().Applied
return false, nil
})
if err != nil {
return err
}

r.mu.proposalQuotaBaseIndex = r.mu.lastIndex
r.mu.proposalQuotaBaseIndex = appliedIndex
if r.mu.proposalQuota != nil {
r.mu.proposalQuota.close()
}
r.mu.proposalQuota = newQuotaPool(quota)
r.mu.quotaReleaseQueue = nil
return nil
}

// QuotaAvailable returns the quota available in the replica's quota pool. Only
Expand Down
45 changes: 21 additions & 24 deletions pkg/storage/replica_application.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,45 +172,42 @@ func (r *Replica) retrieveLocalProposals(ctx context.Context, b *cmdAppBatch) {
cmd.ctx = ctx
}
}
if !anyLocal {
if !anyLocal && r.mu.proposalQuota == nil {
// Fast-path.
return
}
for ok := it.init(&b.cmdBuf); ok; ok = it.next() {
cmd := it.cur()
if !cmd.proposedLocally() {
continue
}
if cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex {
toRelease := int64(0)
shouldRemove := cmd.proposedLocally() &&
// If this entry does not have the most up-to-date view of the
// corresponding proposal's maximum lease index then the proposal
// must have been reproposed with a higher lease index. (see
// tryReproposeWithNewLeaseIndex). In that case, there's a newer
// version of the proposal in the pipeline, so don't remove the
// proposal from the map. We expect this entry to be rejected by
// checkForcedErr.
continue
}
// Delete the proposal from the proposals map. There may be reproposals
// of the proposal in the pipeline, but those will all have the same max
// lease index, meaning that they will all be rejected after this entry
// applies (successfully or otherwise). If tryReproposeWithNewLeaseIndex
// picks up the proposal on failure, it will re-add the proposal to the
// proposal map, but this won't affect anything in this cmdAppBatch.
//
// While here, add the proposal's quota size to the quota release queue.
// We check the proposal map again first to avoid double free-ing quota
// when reproposals from the same proposal end up in the same entry
// application batch.
if _, ok := r.mu.proposals[cmd.idKey]; !ok {
continue
}
delete(r.mu.proposals, cmd.idKey)
cmd.raftCmd.MaxLeaseIndex == cmd.proposal.command.MaxLeaseIndex
if shouldRemove {
// Delete the proposal from the proposals map. There may be reproposals
// of the proposal in the pipeline, but those will all have the same max
// lease index, meaning that they will all be rejected after this entry
// applies (successfully or otherwise). If tryReproposeWithNewLeaseIndex
// picks up the proposal on failure, it will re-add the proposal to the
// proposal map, but this won't affect anything in this cmdAppBatch.
//
// While here, add the proposal's quota size to the quota release queue.
// We check the proposal map again first to avoid double free-ing quota
// when reproposals from the same proposal end up in the same entry
// application batch.
delete(r.mu.proposals, cmd.idKey)
toRelease = cmd.proposal.quotaSize
}
// At this point we're not guaranteed to have proposalQuota initialized,
// the same is true for quotaReleaseQueues. Only queue the proposal's
// quota for release if the proposalQuota is initialized.
// quota for release if the proposalQuota is initialized
if r.mu.proposalQuota != nil {
r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, cmd.proposal.quotaSize)
r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, toRelease)
}
}
}
Expand Down
50 changes: 29 additions & 21 deletions pkg/storage/replica_proposal_quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,15 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(
return
}

status := r.mu.internalRaftGroup.BasicStatus()
if r.mu.leaderID != lastLeaderID {
if r.mu.replicaID == r.mu.leaderID {
// We're becoming the leader.
r.mu.proposalQuotaBaseIndex = r.mu.lastIndex

// Initialize the proposalQuotaBaseIndex at the applied index.
// After the proposal quota is enabled all entries applied by this replica
// will be appended to the quotaReleaseQueue. The proposalQuotaBaseIndex
// and the quotaReleaseQueue together track status.Applied exactly.
r.mu.proposalQuotaBaseIndex = status.Applied
if r.mu.proposalQuota != nil {
log.Fatal(ctx, "proposalQuota was not nil before becoming the leader")
}
Expand Down Expand Up @@ -145,8 +149,14 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(

// Find the minimum index that active followers have acknowledged.
now := timeutil.Now()
status := r.mu.internalRaftGroup.BasicStatus()
commitIndex, minIndex := status.Commit, status.Commit
// commitIndex is used to determine whether a newly added replica has fully
// caught up.
commitIndex := status.Commit
// Initialize minIndex to the currently applied index. The below progress
// checks will only decrease the minIndex. Given that the quotaReleaseQueue
// cannot correspond to values beyond the applied index there's no reason
// to consider progress beyond it as meaningful.
minIndex := status.Applied
r.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, progress tracker.Progress) {
rep, ok := r.mu.state.Desc.GetReplicaDescriptorByID(roachpb.ReplicaID(id))
if !ok {
Expand Down Expand Up @@ -214,30 +224,28 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(
})

if r.mu.proposalQuotaBaseIndex < minIndex {
// We've persisted minIndex - r.mu.proposalQuotaBaseIndex entries to
// the raft log on all 'active' replicas since last we checked,
// we 'should' be able to release the difference back to
// the quota pool. But consider the scenario where we have a single
// replica that we're writing to, we only construct the
// quotaReleaseQueue when entries 'come out' of Raft via
// raft.Ready.CommittedEntries. The minIndex computed above uses the
// replica's commit index which is independent of whether or we've
// iterated over the entirety of raft.Ready.CommittedEntries and
// therefore may not have all minIndex - r.mu.proposalQuotaBaseIndex
// command sizes in our quotaReleaseQueue. Hence we only process
// min(minIndex - r.mu.proposalQuotaBaseIndex, len(r.mu.quotaReleaseQueue))
// quota releases.
// We've persisted at least minIndex-r.mu.proposalQuotaBaseIndex entries
// to the raft log on all 'active' replicas and applied at least minIndex
// entries locally since last we checked, so we are able to release the
// difference back to the quota pool.
numReleases := minIndex - r.mu.proposalQuotaBaseIndex
if qLen := uint64(len(r.mu.quotaReleaseQueue)); qLen < numReleases {
numReleases = qLen
}
sum := int64(0)
for _, rel := range r.mu.quotaReleaseQueue[:numReleases] {
sum += rel
}
r.mu.proposalQuotaBaseIndex += numReleases
r.mu.quotaReleaseQueue = r.mu.quotaReleaseQueue[numReleases:]

r.mu.proposalQuota.add(sum)
}
// Assert the sanity of the base index and the queue. Queue entries should
// correspond to applied entries. It should not be possible for the base
// index and the not yet released applied entries to not equal the applied
// index.
releasableIndex := r.mu.proposalQuotaBaseIndex + uint64(len(r.mu.quotaReleaseQueue))
if releasableIndex != status.Applied {
log.Fatalf(ctx, "proposalQuotaBaseIndex (%d) + quotaReleaseQueueLen (%d) = %d"+
" must equal the applied index (%d)",
r.mu.proposalQuotaBaseIndex, len(r.mu.quotaReleaseQueue), releasableIndex,
status.Applied)
}
}

0 comments on commit 940d2f2

Please sign in to comment.