From 43949c605c2a4d0086efef61fb475a61085e4b20 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Fri, 26 Jul 2019 20:54:59 -0400 Subject: [PATCH] storage: initialize the proposalQuotaBaseIndex from Applied This commit changes the initialization of `proposalQuotaBaseIndex` from `lastIndex` which may include entries which are not yet committed to `status.Applied`, the highest applied index. Given the `proposalQuotaBaseIndex` should account for all committed proposals whose quota has been released and proposals add their quota to the release queue after they have been committed, it's important that the that base index not be too high lest we leave quota in the queue. This commit also adds an assertion that the `proposalQuotaBaseIndex` plus the length of the queue is exactly equal to the applied index. In order to maintain this invariant, the commit ensures that we enqueue a zero-value release to the release queue for empty commands and commands proposed on another node. See https://github.com/cockroachdb/cockroach/issues/39022#issuecomment-515638973 for more details. Fixes #39022. Release note (bug fix): Properly initialize proposal quota tracking to prevent quota leak which can hang imports or other AddSSTable operations. --- pkg/storage/client_raft_test.go | 4 +-- pkg/storage/helpers_test.go | 8 ++++- pkg/storage/replica_application.go | 45 +++++++++++++-------------- pkg/storage/replica_proposal_quota.go | 17 ++++++++-- 4 files changed, 44 insertions(+), 30 deletions(-) diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index b6aef43fd271..9fc3948e607b 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -2168,8 +2168,8 @@ func TestQuotaPool(t *testing.T) { } testutils.SucceedsSoon(t, func() error { - if qLen := leaderRepl.QuotaReleaseQueueLen(); qLen != 1 { - return errors.Errorf("expected 1 queued quota release, found: %d", qLen) + if qLen := leaderRepl.QuotaReleaseQueueLen(); qLen < 1 { + return errors.Errorf("expected at least 1 queued quota release, found: %d", qLen) } return nil }) diff --git a/pkg/storage/helpers_test.go b/pkg/storage/helpers_test.go index bddaac9c63c2..94e44f847cc3 100644 --- a/pkg/storage/helpers_test.go +++ b/pkg/storage/helpers_test.go @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/pkg/errors" + "go.etcd.io/etcd/raft" ) // AddReplica adds the replica to the store's replica map and to the sorted @@ -265,8 +266,13 @@ func (r *Replica) LastAssignedLeaseIndex() uint64 { func (r *Replica) InitQuotaPool(quota int64) { r.mu.Lock() defer r.mu.Unlock() + var appliedIndex uint64 + _ = r.withRaftGroupLocked(false, func(r *raft.RawNode) (unquiesceAndWakeLeader bool, err error) { + appliedIndex = r.BasicStatus().Applied + return false, nil + }) - r.mu.proposalQuotaBaseIndex = r.mu.lastIndex + r.mu.proposalQuotaBaseIndex = appliedIndex if r.mu.proposalQuota != nil { r.mu.proposalQuota.close() } diff --git a/pkg/storage/replica_application.go b/pkg/storage/replica_application.go index fc86a6bffc0e..0f1e3ff72f83 100644 --- a/pkg/storage/replica_application.go +++ b/pkg/storage/replica_application.go @@ -172,16 +172,14 @@ func (r *Replica) retrieveLocalProposals(ctx context.Context, b *cmdAppBatch) { cmd.ctx = ctx } } - if !anyLocal { + if !anyLocal && r.mu.proposalQuota == nil { // Fast-path. return } for ok := it.init(&b.cmdBuf); ok; ok = it.next() { cmd := it.cur() - if !cmd.proposedLocally() { - continue - } - if cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex { + var toRelease int64 + shouldRemove := anyLocal && cmd.proposedLocally() && // If this entry does not have the most up-to-date view of the // corresponding proposal's maximum lease index then the proposal // must have been reproposed with a higher lease index. (see @@ -189,28 +187,27 @@ func (r *Replica) retrieveLocalProposals(ctx context.Context, b *cmdAppBatch) { // version of the proposal in the pipeline, so don't remove the // proposal from the map. We expect this entry to be rejected by // checkForcedErr. - continue - } - // Delete the proposal from the proposals map. There may be reproposals - // of the proposal in the pipeline, but those will all have the same max - // lease index, meaning that they will all be rejected after this entry - // applies (successfully or otherwise). If tryReproposeWithNewLeaseIndex - // picks up the proposal on failure, it will re-add the proposal to the - // proposal map, but this won't affect anything in this cmdAppBatch. - // - // While here, add the proposal's quota size to the quota release queue. - // We check the proposal map again first to avoid double free-ing quota - // when reproposals from the same proposal end up in the same entry - // application batch. - if _, ok := r.mu.proposals[cmd.idKey]; !ok { - continue - } - delete(r.mu.proposals, cmd.idKey) + cmd.raftCmd.MaxLeaseIndex == cmd.proposal.command.MaxLeaseIndex + if shouldRemove { + // Delete the proposal from the proposals map. There may be reproposals + // of the proposal in the pipeline, but those will all have the same max + // lease index, meaning that they will all be rejected after this entry + // applies (successfully or otherwise). If tryReproposeWithNewLeaseIndex + // picks up the proposal on failure, it will re-add the proposal to the + // proposal map, but this won't affect anything in this cmdAppBatch. + // + // While here, add the proposal's quota size to the quota release queue. + // We check the proposal map again first to avoid double free-ing quota + // when reproposals from the same proposal end up in the same entry + // application batch. + delete(r.mu.proposals, cmd.idKey) + toRelease = cmd.proposal.quotaSize + } // At this point we're not guaranteed to have proposalQuota initialized, // the same is true for quotaReleaseQueues. Only queue the proposal's - // quota for release if the proposalQuota is initialized. + // quota for release if the proposalQuota is initialized if r.mu.proposalQuota != nil { - r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, cmd.proposal.quotaSize) + r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, toRelease) } } } diff --git a/pkg/storage/replica_proposal_quota.go b/pkg/storage/replica_proposal_quota.go index 0f06b01226fd..2f0ea6e538c0 100644 --- a/pkg/storage/replica_proposal_quota.go +++ b/pkg/storage/replica_proposal_quota.go @@ -102,11 +102,12 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( return } + status := r.mu.internalRaftGroup.BasicStatus() if r.mu.leaderID != lastLeaderID { if r.mu.replicaID == r.mu.leaderID { // We're becoming the leader. - r.mu.proposalQuotaBaseIndex = r.mu.lastIndex - + // Initialize the proposalQuotaBaseIndex as the current committed index. + r.mu.proposalQuotaBaseIndex = status.Applied if r.mu.proposalQuota != nil { log.Fatal(ctx, "proposalQuota was not nil before becoming the leader") } @@ -145,7 +146,6 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( // Find the minimum index that active followers have acknowledged. now := timeutil.Now() - status := r.mu.internalRaftGroup.BasicStatus() commitIndex, minIndex := status.Commit, status.Commit r.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, progress tracker.Progress) { rep, ok := r.mu.state.Desc.GetReplicaDescriptorByID(roachpb.ReplicaID(id)) @@ -240,4 +240,15 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( r.mu.proposalQuota.add(sum) } + // Assert the sanity of the base index and the queue. Queue entries should + // correspond to applied entries. It should not be possible for the base + // index and the not yet released applied entries to not equal the applied + // index. + releasableIndex := r.mu.proposalQuotaBaseIndex + uint64(len(r.mu.quotaReleaseQueue)) + if releasableIndex != status.Applied { + log.Fatalf(ctx, "proposalQuotaBaseIndex (%d) + quotaReleaseQueueLen (%d) = %d"+ + " must equal the applied index (%d)", + r.mu.proposalQuotaBaseIndex, len(r.mu.quotaReleaseQueue), releasableIndex, + status.Applied) + } }