From 0af5e4485411a3e70b6118897e55fc61f269db42 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Fri, 26 Jul 2019 20:54:59 -0400 Subject: [PATCH] storage: initialize the proposalQuotaBaseIndex from Applied This commit changes the initialization of `proposalQuotaBaseIndex` from `lastIndex` which may include entries which are not yet committed to `status.Applied`, the highest applied index. Given the `proposalQuotaBaseIndex` should account for all committed proposals whose quota has been released and proposals add their quota to the release queue after they have been committed, it's important that the that base index not be too high lest we leave quota in the queue. This commit also adds an assertion that the `proposalQuotaBaseIndex` plus the length of the queue is exactly equal to the applied index. In order to maintain this invariant, the commit ensures that we enqueue a zero-value release to the release queue for empty commands and commands proposed on another node. See https://github.com/cockroachdb/cockroach/issues/39022#issuecomment-515638973 for more details. Fixes #39022. Release note (bug fix): Properly initialize proposal quota tracking to prevent quota leak which can hang imports or other AddSSTable operations. --- pkg/storage/client_raft_test.go | 8 +++-- pkg/storage/helpers_test.go | 14 ++++++-- pkg/storage/replica_application.go | 45 +++++++++++------------- pkg/storage/replica_proposal_quota.go | 50 ++++++++++++++++----------- 4 files changed, 67 insertions(+), 50 deletions(-) diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index b6aef43fd271..c52cae2e8db6 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -2105,7 +2105,9 @@ func TestQuotaPool(t *testing.T) { testutils.SucceedsSoon(t, assertEqualLastIndex) leaderRepl := mtc.getRaftLeader(rangeID) - leaderRepl.InitQuotaPool(quota) + if err := leaderRepl.InitQuotaPool(quota); err != nil { + t.Fatalf("failed to initialize quota pool: %v", err) + } followerRepl := func() *storage.Replica { for _, store := range mtc.stores { repl, err := store.GetReplica(rangeID) @@ -2168,8 +2170,8 @@ func TestQuotaPool(t *testing.T) { } testutils.SucceedsSoon(t, func() error { - if qLen := leaderRepl.QuotaReleaseQueueLen(); qLen != 1 { - return errors.Errorf("expected 1 queued quota release, found: %d", qLen) + if qLen := leaderRepl.QuotaReleaseQueueLen(); qLen < 1 { + return errors.Errorf("expected at least 1 queued quota release, found: %d", qLen) } return nil }) diff --git a/pkg/storage/helpers_test.go b/pkg/storage/helpers_test.go index bddaac9c63c2..c91831dbf348 100644 --- a/pkg/storage/helpers_test.go +++ b/pkg/storage/helpers_test.go @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/pkg/errors" + "go.etcd.io/etcd/raft" ) // AddReplica adds the replica to the store's replica map and to the sorted @@ -262,16 +263,25 @@ func (r *Replica) LastAssignedLeaseIndex() uint64 { // a given quota. Additionally it initializes the replica's quota release queue // and its command sizes map. Only safe to call on the replica that is both // lease holder and raft leader. -func (r *Replica) InitQuotaPool(quota int64) { +func (r *Replica) InitQuotaPool(quota int64) error { r.mu.Lock() defer r.mu.Unlock() + var appliedIndex uint64 + err := r.withRaftGroupLocked(false, func(r *raft.RawNode) (unquiesceAndWakeLeader bool, err error) { + appliedIndex = r.BasicStatus().Applied + return false, nil + }) + if err != nil { + return err + } - r.mu.proposalQuotaBaseIndex = r.mu.lastIndex + r.mu.proposalQuotaBaseIndex = appliedIndex if r.mu.proposalQuota != nil { r.mu.proposalQuota.close() } r.mu.proposalQuota = newQuotaPool(quota) r.mu.quotaReleaseQueue = nil + return nil } // QuotaAvailable returns the quota available in the replica's quota pool. Only diff --git a/pkg/storage/replica_application.go b/pkg/storage/replica_application.go index 753a88ffdebd..79494a22cebe 100644 --- a/pkg/storage/replica_application.go +++ b/pkg/storage/replica_application.go @@ -172,16 +172,14 @@ func (r *Replica) retrieveLocalProposals(ctx context.Context, b *cmdAppBatch) { cmd.ctx = ctx } } - if !anyLocal { + if !anyLocal && r.mu.proposalQuota == nil { // Fast-path. return } for ok := it.init(&b.cmdBuf); ok; ok = it.next() { cmd := it.cur() - if !cmd.proposedLocally() { - continue - } - if cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex { + toRelease := int64(0) + shouldRemove := cmd.proposedLocally() && // If this entry does not have the most up-to-date view of the // corresponding proposal's maximum lease index then the proposal // must have been reproposed with a higher lease index. (see @@ -189,28 +187,27 @@ func (r *Replica) retrieveLocalProposals(ctx context.Context, b *cmdAppBatch) { // version of the proposal in the pipeline, so don't remove the // proposal from the map. We expect this entry to be rejected by // checkForcedErr. - continue - } - // Delete the proposal from the proposals map. There may be reproposals - // of the proposal in the pipeline, but those will all have the same max - // lease index, meaning that they will all be rejected after this entry - // applies (successfully or otherwise). If tryReproposeWithNewLeaseIndex - // picks up the proposal on failure, it will re-add the proposal to the - // proposal map, but this won't affect anything in this cmdAppBatch. - // - // While here, add the proposal's quota size to the quota release queue. - // We check the proposal map again first to avoid double free-ing quota - // when reproposals from the same proposal end up in the same entry - // application batch. - if _, ok := r.mu.proposals[cmd.idKey]; !ok { - continue - } - delete(r.mu.proposals, cmd.idKey) + cmd.raftCmd.MaxLeaseIndex == cmd.proposal.command.MaxLeaseIndex + if shouldRemove { + // Delete the proposal from the proposals map. There may be reproposals + // of the proposal in the pipeline, but those will all have the same max + // lease index, meaning that they will all be rejected after this entry + // applies (successfully or otherwise). If tryReproposeWithNewLeaseIndex + // picks up the proposal on failure, it will re-add the proposal to the + // proposal map, but this won't affect anything in this cmdAppBatch. + // + // While here, add the proposal's quota size to the quota release queue. + // We check the proposal map again first to avoid double free-ing quota + // when reproposals from the same proposal end up in the same entry + // application batch. + delete(r.mu.proposals, cmd.idKey) + toRelease = cmd.proposal.quotaSize + } // At this point we're not guaranteed to have proposalQuota initialized, // the same is true for quotaReleaseQueues. Only queue the proposal's - // quota for release if the proposalQuota is initialized. + // quota for release if the proposalQuota is initialized if r.mu.proposalQuota != nil { - r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, cmd.proposal.quotaSize) + r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, toRelease) } } } diff --git a/pkg/storage/replica_proposal_quota.go b/pkg/storage/replica_proposal_quota.go index 0f06b01226fd..3286bd0fb443 100644 --- a/pkg/storage/replica_proposal_quota.go +++ b/pkg/storage/replica_proposal_quota.go @@ -102,11 +102,15 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( return } + status := r.mu.internalRaftGroup.BasicStatus() if r.mu.leaderID != lastLeaderID { if r.mu.replicaID == r.mu.leaderID { // We're becoming the leader. - r.mu.proposalQuotaBaseIndex = r.mu.lastIndex - + // Initialize the proposalQuotaBaseIndex at the applied index. + // After the proposal quota is enabled all entries applied by this replica + // will be appended to the quotaReleaseQueue. The proposalQuotaBaseIndex + // and the quotaReleaseQueue together track status.Applied exactly. + r.mu.proposalQuotaBaseIndex = status.Applied if r.mu.proposalQuota != nil { log.Fatal(ctx, "proposalQuota was not nil before becoming the leader") } @@ -145,8 +149,14 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( // Find the minimum index that active followers have acknowledged. now := timeutil.Now() - status := r.mu.internalRaftGroup.BasicStatus() - commitIndex, minIndex := status.Commit, status.Commit + // commitIndex is used to determine whether a newly added replica has fully + // caught up. + commitIndex := status.Commit + // Initialize minIndex to the currently applied index. The below progress + // checks will only decrease the minIndex. Given that the quotaReleaseQueue + // cannot correspond to values beyond the applied index there's no reason + // to consider progress beyond it as meaningful. + minIndex := status.Applied r.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, progress tracker.Progress) { rep, ok := r.mu.state.Desc.GetReplicaDescriptorByID(roachpb.ReplicaID(id)) if !ok { @@ -214,30 +224,28 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( }) if r.mu.proposalQuotaBaseIndex < minIndex { - // We've persisted minIndex - r.mu.proposalQuotaBaseIndex entries to - // the raft log on all 'active' replicas since last we checked, - // we 'should' be able to release the difference back to - // the quota pool. But consider the scenario where we have a single - // replica that we're writing to, we only construct the - // quotaReleaseQueue when entries 'come out' of Raft via - // raft.Ready.CommittedEntries. The minIndex computed above uses the - // replica's commit index which is independent of whether or we've - // iterated over the entirety of raft.Ready.CommittedEntries and - // therefore may not have all minIndex - r.mu.proposalQuotaBaseIndex - // command sizes in our quotaReleaseQueue. Hence we only process - // min(minIndex - r.mu.proposalQuotaBaseIndex, len(r.mu.quotaReleaseQueue)) - // quota releases. + // We've persisted at least minIndex-r.mu.proposalQuotaBaseIndex entries + // to the raft log on all 'active' replicas and applied at least minIndex + // entries locally since last we checked, so we are able to release the + // difference back to the quota pool. numReleases := minIndex - r.mu.proposalQuotaBaseIndex - if qLen := uint64(len(r.mu.quotaReleaseQueue)); qLen < numReleases { - numReleases = qLen - } sum := int64(0) for _, rel := range r.mu.quotaReleaseQueue[:numReleases] { sum += rel } r.mu.proposalQuotaBaseIndex += numReleases r.mu.quotaReleaseQueue = r.mu.quotaReleaseQueue[numReleases:] - r.mu.proposalQuota.add(sum) } + // Assert the sanity of the base index and the queue. Queue entries should + // correspond to applied entries. It should not be possible for the base + // index and the not yet released applied entries to not equal the applied + // index. + releasableIndex := r.mu.proposalQuotaBaseIndex + uint64(len(r.mu.quotaReleaseQueue)) + if releasableIndex != status.Applied { + log.Fatalf(ctx, "proposalQuotaBaseIndex (%d) + quotaReleaseQueueLen (%d) = %d"+ + " must equal the applied index (%d)", + r.mu.proposalQuotaBaseIndex, len(r.mu.quotaReleaseQueue), releasableIndex, + status.Applied) + } }