diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index b6aef43fd271..c52cae2e8db6 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -2105,7 +2105,9 @@ func TestQuotaPool(t *testing.T) { testutils.SucceedsSoon(t, assertEqualLastIndex) leaderRepl := mtc.getRaftLeader(rangeID) - leaderRepl.InitQuotaPool(quota) + if err := leaderRepl.InitQuotaPool(quota); err != nil { + t.Fatalf("failed to initialize quota pool: %v", err) + } followerRepl := func() *storage.Replica { for _, store := range mtc.stores { repl, err := store.GetReplica(rangeID) @@ -2168,8 +2170,8 @@ func TestQuotaPool(t *testing.T) { } testutils.SucceedsSoon(t, func() error { - if qLen := leaderRepl.QuotaReleaseQueueLen(); qLen != 1 { - return errors.Errorf("expected 1 queued quota release, found: %d", qLen) + if qLen := leaderRepl.QuotaReleaseQueueLen(); qLen < 1 { + return errors.Errorf("expected at least 1 queued quota release, found: %d", qLen) } return nil }) diff --git a/pkg/storage/helpers_test.go b/pkg/storage/helpers_test.go index bddaac9c63c2..c91831dbf348 100644 --- a/pkg/storage/helpers_test.go +++ b/pkg/storage/helpers_test.go @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/pkg/errors" + "go.etcd.io/etcd/raft" ) // AddReplica adds the replica to the store's replica map and to the sorted @@ -262,16 +263,25 @@ func (r *Replica) LastAssignedLeaseIndex() uint64 { // a given quota. Additionally it initializes the replica's quota release queue // and its command sizes map. Only safe to call on the replica that is both // lease holder and raft leader. -func (r *Replica) InitQuotaPool(quota int64) { +func (r *Replica) InitQuotaPool(quota int64) error { r.mu.Lock() defer r.mu.Unlock() + var appliedIndex uint64 + err := r.withRaftGroupLocked(false, func(r *raft.RawNode) (unquiesceAndWakeLeader bool, err error) { + appliedIndex = r.BasicStatus().Applied + return false, nil + }) + if err != nil { + return err + } - r.mu.proposalQuotaBaseIndex = r.mu.lastIndex + r.mu.proposalQuotaBaseIndex = appliedIndex if r.mu.proposalQuota != nil { r.mu.proposalQuota.close() } r.mu.proposalQuota = newQuotaPool(quota) r.mu.quotaReleaseQueue = nil + return nil } // QuotaAvailable returns the quota available in the replica's quota pool. Only diff --git a/pkg/storage/replica_application.go b/pkg/storage/replica_application.go index 753a88ffdebd..79494a22cebe 100644 --- a/pkg/storage/replica_application.go +++ b/pkg/storage/replica_application.go @@ -172,16 +172,14 @@ func (r *Replica) retrieveLocalProposals(ctx context.Context, b *cmdAppBatch) { cmd.ctx = ctx } } - if !anyLocal { + if !anyLocal && r.mu.proposalQuota == nil { // Fast-path. return } for ok := it.init(&b.cmdBuf); ok; ok = it.next() { cmd := it.cur() - if !cmd.proposedLocally() { - continue - } - if cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex { + toRelease := int64(0) + shouldRemove := cmd.proposedLocally() && // If this entry does not have the most up-to-date view of the // corresponding proposal's maximum lease index then the proposal // must have been reproposed with a higher lease index. (see @@ -189,28 +187,27 @@ func (r *Replica) retrieveLocalProposals(ctx context.Context, b *cmdAppBatch) { // version of the proposal in the pipeline, so don't remove the // proposal from the map. We expect this entry to be rejected by // checkForcedErr. - continue - } - // Delete the proposal from the proposals map. There may be reproposals - // of the proposal in the pipeline, but those will all have the same max - // lease index, meaning that they will all be rejected after this entry - // applies (successfully or otherwise). If tryReproposeWithNewLeaseIndex - // picks up the proposal on failure, it will re-add the proposal to the - // proposal map, but this won't affect anything in this cmdAppBatch. - // - // While here, add the proposal's quota size to the quota release queue. - // We check the proposal map again first to avoid double free-ing quota - // when reproposals from the same proposal end up in the same entry - // application batch. - if _, ok := r.mu.proposals[cmd.idKey]; !ok { - continue - } - delete(r.mu.proposals, cmd.idKey) + cmd.raftCmd.MaxLeaseIndex == cmd.proposal.command.MaxLeaseIndex + if shouldRemove { + // Delete the proposal from the proposals map. There may be reproposals + // of the proposal in the pipeline, but those will all have the same max + // lease index, meaning that they will all be rejected after this entry + // applies (successfully or otherwise). If tryReproposeWithNewLeaseIndex + // picks up the proposal on failure, it will re-add the proposal to the + // proposal map, but this won't affect anything in this cmdAppBatch. + // + // While here, add the proposal's quota size to the quota release queue. + // We check the proposal map again first to avoid double free-ing quota + // when reproposals from the same proposal end up in the same entry + // application batch. + delete(r.mu.proposals, cmd.idKey) + toRelease = cmd.proposal.quotaSize + } // At this point we're not guaranteed to have proposalQuota initialized, // the same is true for quotaReleaseQueues. Only queue the proposal's - // quota for release if the proposalQuota is initialized. + // quota for release if the proposalQuota is initialized if r.mu.proposalQuota != nil { - r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, cmd.proposal.quotaSize) + r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, toRelease) } } } diff --git a/pkg/storage/replica_proposal_quota.go b/pkg/storage/replica_proposal_quota.go index 0f06b01226fd..3286bd0fb443 100644 --- a/pkg/storage/replica_proposal_quota.go +++ b/pkg/storage/replica_proposal_quota.go @@ -102,11 +102,15 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( return } + status := r.mu.internalRaftGroup.BasicStatus() if r.mu.leaderID != lastLeaderID { if r.mu.replicaID == r.mu.leaderID { // We're becoming the leader. - r.mu.proposalQuotaBaseIndex = r.mu.lastIndex - + // Initialize the proposalQuotaBaseIndex at the applied index. + // After the proposal quota is enabled all entries applied by this replica + // will be appended to the quotaReleaseQueue. The proposalQuotaBaseIndex + // and the quotaReleaseQueue together track status.Applied exactly. + r.mu.proposalQuotaBaseIndex = status.Applied if r.mu.proposalQuota != nil { log.Fatal(ctx, "proposalQuota was not nil before becoming the leader") } @@ -145,8 +149,14 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( // Find the minimum index that active followers have acknowledged. now := timeutil.Now() - status := r.mu.internalRaftGroup.BasicStatus() - commitIndex, minIndex := status.Commit, status.Commit + // commitIndex is used to determine whether a newly added replica has fully + // caught up. + commitIndex := status.Commit + // Initialize minIndex to the currently applied index. The below progress + // checks will only decrease the minIndex. Given that the quotaReleaseQueue + // cannot correspond to values beyond the applied index there's no reason + // to consider progress beyond it as meaningful. + minIndex := status.Applied r.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, progress tracker.Progress) { rep, ok := r.mu.state.Desc.GetReplicaDescriptorByID(roachpb.ReplicaID(id)) if !ok { @@ -214,30 +224,28 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( }) if r.mu.proposalQuotaBaseIndex < minIndex { - // We've persisted minIndex - r.mu.proposalQuotaBaseIndex entries to - // the raft log on all 'active' replicas since last we checked, - // we 'should' be able to release the difference back to - // the quota pool. But consider the scenario where we have a single - // replica that we're writing to, we only construct the - // quotaReleaseQueue when entries 'come out' of Raft via - // raft.Ready.CommittedEntries. The minIndex computed above uses the - // replica's commit index which is independent of whether or we've - // iterated over the entirety of raft.Ready.CommittedEntries and - // therefore may not have all minIndex - r.mu.proposalQuotaBaseIndex - // command sizes in our quotaReleaseQueue. Hence we only process - // min(minIndex - r.mu.proposalQuotaBaseIndex, len(r.mu.quotaReleaseQueue)) - // quota releases. + // We've persisted at least minIndex-r.mu.proposalQuotaBaseIndex entries + // to the raft log on all 'active' replicas and applied at least minIndex + // entries locally since last we checked, so we are able to release the + // difference back to the quota pool. numReleases := minIndex - r.mu.proposalQuotaBaseIndex - if qLen := uint64(len(r.mu.quotaReleaseQueue)); qLen < numReleases { - numReleases = qLen - } sum := int64(0) for _, rel := range r.mu.quotaReleaseQueue[:numReleases] { sum += rel } r.mu.proposalQuotaBaseIndex += numReleases r.mu.quotaReleaseQueue = r.mu.quotaReleaseQueue[numReleases:] - r.mu.proposalQuota.add(sum) } + // Assert the sanity of the base index and the queue. Queue entries should + // correspond to applied entries. It should not be possible for the base + // index and the not yet released applied entries to not equal the applied + // index. + releasableIndex := r.mu.proposalQuotaBaseIndex + uint64(len(r.mu.quotaReleaseQueue)) + if releasableIndex != status.Applied { + log.Fatalf(ctx, "proposalQuotaBaseIndex (%d) + quotaReleaseQueueLen (%d) = %d"+ + " must equal the applied index (%d)", + r.mu.proposalQuotaBaseIndex, len(r.mu.quotaReleaseQueue), releasableIndex, + status.Applied) + } }