diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel index 14832207520b..8a024676e0d8 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel @@ -13,6 +13,7 @@ go_library( deps = [ "//pkg/kv/kvserver/kvflowcontrol", "//pkg/raft/raftpb", + "//pkg/raft/tracker", "//pkg/roachpb", "//pkg/settings/cluster", "//pkg/util/admission/admissionpb", @@ -28,6 +29,7 @@ go_test( name = "rac2_test", srcs = [ "priority_test.go", + "range_controller_test.go", "token_counter_test.go", ], data = glob(["testdata/**"]), @@ -35,8 +37,11 @@ go_test( deps = [ "//pkg/kv/kvserver/kvflowcontrol", "//pkg/raft/raftpb", + "//pkg/raft/tracker", + "//pkg/roachpb", "//pkg/settings/cluster", "//pkg/util/admission/admissionpb", + "//pkg/util/humanizeutil", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/syncutil", diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index 01657e220d28..11a9894817ae 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -13,11 +13,15 @@ package rac2 import ( "cmp" "context" + "reflect" "slices" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" + "github.com/cockroachdb/cockroach/pkg/raft/tracker" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/redact" ) @@ -59,6 +63,43 @@ type RangeController interface { CloseRaftMuLocked(ctx context.Context) } +// TODO(pav-kv): This interface a placeholder for the interface containing raft +// methods. Replace this as part of #128019. +type RaftInterface interface { + // FollowerState returns the current state of a follower. The value of + // Match, Next, Admitted are populated iff in StateReplicate. All entries >= + // Next have not had MsgApps constructed during the lifetime of this + // StateReplicate (they may have been constructed previously). + // + // When a follower transitions from {StateProbe,StateSnapshot} => + // StateReplicate, we start trying to send MsgApps. We should + // notice such transitions both in HandleRaftEvent and SetReplicasLocked. + // + // RACv1 also cared about three other cases where the follower behaved as if + // it were disconnected (a) paused follower, (b) follower is behind, (c) + // follower is inactive (see + // replicaFlowControlIntegrationImpl.notActivelyReplicatingTo). (b) and (c) + // were needed since it paced at rate of slowest replica, while for regular + // work we will in v2 pace at slowest in quorum (and we don't care about + // elastic experiencing a hiccup, given it paces at rate of slowest). For + // (a), we plan to remove follower pausing. So the v2 code will be + // simplified. + // + // Requires Replica.raftMu to be held, Replica.mu is not held. + FollowerState(replicaID roachpb.ReplicaID) FollowerStateInfo +} + +type FollowerStateInfo struct { + State tracker.StateType + + // Remaining only populated in StateReplicate. + // (Match, Next) is in-flight. + Match uint64 + Next uint64 + // Invariant: Admitted[i] <= Match. + Admitted [raftpb.NumPriorities]uint64 +} + // TODO(pav-kv): This struct is a placeholder for the interface or struct // containing raft entries. Replace this as part of #128019. type RaftEvent struct { @@ -99,3 +140,342 @@ func (rs ReplicaSet) SafeFormat(w redact.SafePrinter, _ rune) { func (rs ReplicaSet) String() string { return redact.StringWithoutMarkers(rs) } + +type RangeControllerOptions struct { + RangeID roachpb.RangeID + TenantID roachpb.TenantID + // LocalReplicaID is the ReplicaID of the local replica, which is the + // leader. + LocalReplicaID roachpb.ReplicaID + // SSTokenCounter provides access to all the TokenCounters that will be + // needed (keyed by (tenantID, storeID)). + SSTokenCounter *StreamTokenCounterProvider + RaftInterface RaftInterface +} + +// RangeControllerInitState is the initial state at the time of creation. +type RangeControllerInitState struct { + // Must include RangeControllerOptions.ReplicaID. + ReplicaSet ReplicaSet + // Leaseholder may be set to NoReplicaID, in which case the leaseholder is + // unknown. + Leaseholder roachpb.ReplicaID +} + +type rangeController struct { + opts RangeControllerOptions + replicaSet ReplicaSet + // leaseholder can be NoReplicaID or not be in ReplicaSet, i.e., it is + // eventually consistent with the set of replicas. + leaseholder roachpb.ReplicaID + + mu struct { + syncutil.Mutex + + // State for waiters. When anything in voterSets changes, voterSetRefreshCh + // is closed, and replaced with a new channel. The voterSets is + // copy-on-write, so waiters make a shallow copy. + voterSets []voterSet + voterSetRefreshCh chan struct{} + } + + replicaMap map[roachpb.ReplicaID]*replicaState +} + +// voterStateForWaiters informs whether WaitForEval is required to wait for +// eval-tokens for a voter. +type voterStateForWaiters struct { + replicaID roachpb.ReplicaID + isLeader bool + isLeaseHolder bool + isStateReplicate bool + evalTokenCounter TokenCounter +} + +type voterSet []voterStateForWaiters + +var _ RangeController = &rangeController{} + +func NewRangeController( + ctx context.Context, o RangeControllerOptions, init RangeControllerInitState, +) *rangeController { + rc := &rangeController{ + opts: o, + leaseholder: init.Leaseholder, + replicaMap: make(map[roachpb.ReplicaID]*replicaState), + } + rc.mu.voterSetRefreshCh = make(chan struct{}) + rc.updateReplicaSet(ctx, init.ReplicaSet) + rc.updateVoterSets() + return rc +} + +// This blocks until there are positive tokens available for the request to +// be admitted for evaluation. Note the number of tokens required by the +// request is not considered, only the priority of the request, as the number +// of tokens is not known until eval. +// +// No mutexes should be held. +func (rc *rangeController) WaitForEval(ctx context.Context, pri admissionpb.WorkPriority) error { + wc := admissionpb.WorkClassFromPri(pri) + waitForAllReplicateHandles := false + if wc == admissionpb.ElasticWorkClass { + waitForAllReplicateHandles = true + } + var handles []tokenWaitingHandleInfo + var scratch []reflect.SelectCase + +retry: + // Snapshot the voterSets and voterSetRefreshCh. + rc.mu.Lock() + vss := rc.mu.voterSets + vssRefreshCh := rc.mu.voterSetRefreshCh + rc.mu.Unlock() + + if vssRefreshCh == nil { + // RangeControllerImpl is closed. + return nil + } + for _, vs := range vss { + quorumCount := (len(vs) + 2) / 2 + haveEvalTokensCount := 0 + handles = handles[:0] + requiredWait := false + for _, v := range vs { + available, handle := v.evalTokenCounter.TokensAvailable(wc) + if available { + haveEvalTokensCount++ + continue + } + + // Don't have eval tokens, and have a handle. + handleInfo := tokenWaitingHandleInfo{ + handle: handle, + requiredWait: v.isLeader || v.isLeaseHolder || + (waitForAllReplicateHandles && v.isStateReplicate), + } + handles = append(handles, handleInfo) + if !requiredWait && handleInfo.requiredWait { + requiredWait = true + } + } + remainingForQuorum := quorumCount - haveEvalTokensCount + if remainingForQuorum < 0 { + remainingForQuorum = 0 + } + if remainingForQuorum > 0 || requiredWait { + var state WaitEndState + state, scratch = WaitForEval(ctx, vssRefreshCh, handles, remainingForQuorum, scratch) + switch state { + case WaitSuccess: + continue + case ContextCanceled: + return ctx.Err() + case RefreshWaitSignaled: + goto retry + } + } + } + return nil +} + +// HandleRaftEventRaftMuLocked handles the provided raft event for the range. +// +// Requires replica.raftMu to be held. +func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e RaftEvent) error { + panic("unimplemented") +} + +// HandleSchedulerEventRaftMuLocked processes an event scheduled by the +// controller. +// +// Requires replica.raftMu to be held. +func (rc *rangeController) HandleSchedulerEventRaftMuLocked(ctx context.Context) error { + panic("unimplemented") +} + +// SetReplicasRaftMuLocked sets the replicas of the range. The caller will +// never mutate replicas, and neither should the callee. +// +// Requires replica.raftMu to be held. +func (rc *rangeController) SetReplicasRaftMuLocked(ctx context.Context, replicas ReplicaSet) error { + rc.updateReplicaSet(ctx, replicas) + rc.updateVoterSets() + return nil +} + +// SetLeaseholderRaftMuLocked sets the leaseholder of the range. +// +// Requires raftMu to be held. +func (rc *rangeController) SetLeaseholderRaftMuLocked( + ctx context.Context, replica roachpb.ReplicaID, +) { + if replica == rc.leaseholder { + return + } + rc.leaseholder = replica + rc.updateVoterSets() +} + +// CloseRaftMuLocked closes the range controller. +// +// Requires replica.raftMu to be held. +func (rc *rangeController) CloseRaftMuLocked(ctx context.Context) { + rc.mu.Lock() + defer rc.mu.Unlock() + + rc.mu.voterSets = nil + close(rc.mu.voterSetRefreshCh) + rc.mu.voterSetRefreshCh = nil +} + +func (rc *rangeController) updateReplicaSet(ctx context.Context, newSet ReplicaSet) { + prevSet := rc.replicaSet + for r := range prevSet { + desc, ok := newSet[r] + if !ok { + delete(rc.replicaMap, r) + } else { + rs := rc.replicaMap[r] + rs.desc = desc + } + } + for r, desc := range newSet { + _, ok := prevSet[r] + if ok { + // Already handled above. + continue + } + rc.replicaMap[r] = NewReplicaState(ctx, rc, desc) + } + rc.replicaSet = newSet +} + +func (rc *rangeController) updateVoterSets() { + rc.mu.Lock() + defer rc.mu.Unlock() + + setCount := 1 + for _, r := range rc.replicaSet { + isOld := r.IsVoterOldConfig() + isNew := r.IsVoterNewConfig() + if !isOld && !isNew { + continue + } + if !isOld && isNew { + setCount++ + break + } + } + var voterSets []voterSet + for len(voterSets) < setCount { + voterSets = append(voterSets, voterSet{}) + } + for _, r := range rc.replicaSet { + isOld := r.IsVoterOldConfig() + isNew := r.IsVoterNewConfig() + if !isOld && !isNew { + continue + } + // Is a voter. + rs := rc.replicaMap[r.ReplicaID] + vsfw := voterStateForWaiters{ + replicaID: r.ReplicaID, + isLeader: r.ReplicaID == rc.opts.LocalReplicaID, + isLeaseHolder: r.ReplicaID == rc.leaseholder, + // TODO(rac2): Once the send stream is added, check that the send stream + // is initialized here as well. + isStateReplicate: rs.connectedState.shouldWaitForElasticEvalTokens(), + evalTokenCounter: rs.evalTokenCounter, + } + if isOld { + voterSets[0] = append(voterSets[0], vsfw) + } + if isNew && setCount == 2 { + voterSets[1] = append(voterSets[1], vsfw) + } + } + rc.mu.voterSets = voterSets + close(rc.mu.voterSetRefreshCh) + rc.mu.voterSetRefreshCh = make(chan struct{}) +} + +type replicaState struct { + parent *rangeController + // stream aggregates across the streams for the same (tenant, store). This + // is the identity that is used to deduct tokens or wait for tokens to be + // positive. + stream kvflowcontrol.Stream + evalTokenCounter TokenCounter + desc roachpb.ReplicaDescriptor + connectedState connectedState +} + +func NewReplicaState( + ctx context.Context, parent *rangeController, desc roachpb.ReplicaDescriptor, +) *replicaState { + stream := kvflowcontrol.Stream{TenantID: parent.opts.TenantID, StoreID: desc.StoreID} + rs := &replicaState{ + parent: parent, + stream: stream, + evalTokenCounter: parent.opts.SSTokenCounter.Eval(stream), + desc: desc, + } + + // TODO(rac2): Construct the sendStream state here if the replica is in state + // replicate. + state := parent.opts.RaftInterface.FollowerState(desc.ReplicaID) + switch state.State { + case tracker.StateReplicate: + rs.connectedState = replicate + case tracker.StateProbe: + rs.connectedState = probeRecentlyReplicate + case tracker.StateSnapshot: + rs.connectedState = snapshot + } + return rs +} + +type connectedState uint32 + +// Local replicas are always in state replicate. +// +// Initial state for a replicaSendStream is always replicate, since it is +// created in StateReplicate. We don't care about whether the transport is +// connected or disconnected, since there is buffering capacity in the +// RaftTransport, which allows for some buffering and immediate sending when +// the RaftTransport stream reconnects (which may happen before the next +// HandleRaftEvent), which is desirable. +// +// The first false return value from SendRaftMessage will trigger a +// notification to Raft that the replica is unreachable (see +// Replica.sendRaftMessage calling Replica.addUnreachableRemoteReplica), and +// that raftpb.MsgUnreachable will cause the transition out of StateReplicate +// to StateProbe. The false return value happens either when the (generous) +// RaftTransport buffer is full, or when the circuit breaker opens. The +// circuit breaker opens 3-6s after no more TCP packets are flowing. +// +// A single transient message drop, and nack, can also cause a transition to +// StateProbe. At this layer we don't bother distinguishing on why this +// transition happened and first transition to probeRecentlyReplicate. We stay +// in this state for 1 second, and then close the replicaSendStream. +// +// The only difference in behavior between replicate and +// probeRecentlyReplicate is that we don't try to construct MsgApps in the +// latter. +// +// Initial states: replicate +// State transitions: +// +// replicate <=> {probeRecentlyReplicate, snapshot} +// snapshot => replicaSendStream closed (when observe StateProbe) +// probeRecentlyReplicate => replicaSendStream closed (after short delay) +const ( + replicate connectedState = iota + probeRecentlyReplicate + snapshot +) + +func (cs connectedState) shouldWaitForElasticEvalTokens() bool { + return cs == replicate || cs == probeRecentlyReplicate +} diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go new file mode 100644 index 000000000000..a1eec860d16e --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go @@ -0,0 +1,495 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rac2 + +import ( + "context" + "fmt" + "sort" + "strconv" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/raft/tracker" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" +) + +type testingRCEval struct { + pri admissionpb.WorkPriority + done bool + err error + cancel context.CancelFunc + refreshCh chan struct{} +} + +type testingRCRange struct { + rc *rangeController + + mu struct { + syncutil.Mutex + r testingRange + evals map[string]*testingRCEval + } +} + +func (r *testingRCRange) FollowerState(replicaID roachpb.ReplicaID) FollowerStateInfo { + r.mu.Lock() + defer r.mu.Unlock() + + replica, ok := r.mu.r.replicaSet[replicaID] + if !ok { + return FollowerStateInfo{} + } + return replica.info +} + +func (r *testingRCRange) startWaitForEval(name string, pri admissionpb.WorkPriority) { + r.mu.Lock() + defer r.mu.Unlock() + + ctx, cancel := context.WithCancel(context.Background()) + refreshCh := make(chan struct{}) + r.mu.evals[name] = &testingRCEval{ + err: nil, + cancel: cancel, + refreshCh: refreshCh, + pri: pri, + } + + go func() { + err := r.rc.WaitForEval(ctx, pri) + + r.mu.Lock() + defer r.mu.Unlock() + r.mu.evals[name].err = err + r.mu.evals[name].done = true + }() +} + +type testingRange struct { + rangeID roachpb.RangeID + tenantID roachpb.TenantID + localReplicaID roachpb.ReplicaID + replicaSet map[roachpb.ReplicaID]testingReplica +} + +func (t testingRange) replicas() ReplicaSet { + replicas := make(ReplicaSet, len(t.replicaSet)) + for i, replica := range t.replicaSet { + replicas[i] = replica.desc + } + return replicas +} + +const invalidTrackerState = tracker.StateSnapshot + 1 + +type testingReplica struct { + desc roachpb.ReplicaDescriptor + info FollowerStateInfo +} + +func scanRanges(t *testing.T, input string) []testingRange { + replicas := []testingRange{} + + for _, line := range strings.Split(input, "\n") { + parts := strings.Fields(line) + parts[0] = strings.TrimSpace(parts[0]) + if strings.HasPrefix(parts[0], "range_id=") { + // Create a new range, any replicas which follow until the next range_id + // line will be added to this replica set. + var rangeID, tenantID, localReplicaID int + var err error + + require.True(t, strings.HasPrefix(parts[0], "range_id=")) + parts[0] = strings.TrimPrefix(strings.TrimSpace(parts[0]), "range_id=") + rangeID, err = strconv.Atoi(parts[0]) + require.NoError(t, err) + + parts[1] = strings.TrimSpace(parts[1]) + require.True(t, strings.HasPrefix(parts[1], "tenant_id=")) + parts[1] = strings.TrimPrefix(strings.TrimSpace(parts[1]), "tenant_id=") + tenantID, err = strconv.Atoi(parts[1]) + require.NoError(t, err) + + parts[2] = strings.TrimSpace(parts[2]) + require.True(t, strings.HasPrefix(parts[2], "local_replica_id=")) + parts[2] = strings.TrimPrefix(strings.TrimSpace(parts[2]), "local_replica_id=") + localReplicaID, err = strconv.Atoi(parts[2]) + require.NoError(t, err) + + replicas = append(replicas, testingRange{ + rangeID: roachpb.RangeID(rangeID), + tenantID: roachpb.MustMakeTenantID(uint64(tenantID)), + localReplicaID: roachpb.ReplicaID(localReplicaID), + replicaSet: make(map[roachpb.ReplicaID]testingReplica), + }) + } else { + // Otherwise, add the replica to the last replica set created. + replica := scanReplica(t, line) + replicas[len(replicas)-1].replicaSet[replica.desc.ReplicaID] = replica + } + } + + return replicas +} + +func scanReplica(t *testing.T, line string) testingReplica { + var storeID, replicaID int + var replicaType roachpb.ReplicaType + // Default to an invalid state when no state is specified, this will be + // converted to the prior state or StateReplicate if the replica doesn't yet + // exist. + state := invalidTrackerState + var err error + + parts := strings.Fields(line) + parts[0] = strings.TrimSpace(parts[0]) + + require.True(t, strings.HasPrefix(parts[0], "store_id=")) + parts[0] = strings.TrimPrefix(strings.TrimSpace(parts[0]), "store_id=") + storeID, err = strconv.Atoi(parts[0]) + require.NoError(t, err) + + parts[1] = strings.TrimSpace(parts[1]) + require.True(t, strings.HasPrefix(parts[1], "replica_id=")) + parts[1] = strings.TrimPrefix(strings.TrimSpace(parts[1]), "replica_id=") + replicaID, err = strconv.Atoi(parts[1]) + require.NoError(t, err) + + parts[2] = strings.TrimSpace(parts[2]) + require.True(t, strings.HasPrefix(parts[2], "type=")) + parts[2] = strings.TrimPrefix(strings.TrimSpace(parts[2]), "type=") + switch parts[2] { + case "VOTER_FULL": + replicaType = roachpb.VOTER_FULL + case "VOTER_INCOMING": + replicaType = roachpb.VOTER_INCOMING + case "VOTER_DEMOTING_LEARNER": + replicaType = roachpb.VOTER_DEMOTING_LEARNER + case "LEARNER": + replicaType = roachpb.LEARNER + case "NON_VOTER": + replicaType = roachpb.NON_VOTER + case "VOTER_DEMOTING_NON_VOTER": + replicaType = roachpb.VOTER_DEMOTING_NON_VOTER + default: + panic("unknown replica type") + } + + // The fourth field is optional, if set it contains the tracker state of the + // replica on the leader replica (localReplicaID). The valid states are + // Probe, Replicate, and Snapshot. + if len(parts) > 3 { + parts[3] = strings.TrimSpace(parts[3]) + require.True(t, strings.HasPrefix(parts[3], "state=")) + parts[3] = strings.TrimPrefix(strings.TrimSpace(parts[3]), "state=") + switch parts[3] { + case "StateProbe": + state = tracker.StateProbe + case "StateReplicate": + state = tracker.StateReplicate + case "StateSnapshot": + state = tracker.StateSnapshot + default: + panic("unknown replica state") + } + } + + return testingReplica{ + desc: roachpb.ReplicaDescriptor{ + NodeID: roachpb.NodeID(storeID), + StoreID: roachpb.StoreID(storeID), + ReplicaID: roachpb.ReplicaID(replicaID), + Type: replicaType, + }, + info: FollowerStateInfo{State: state}, + } +} + +func parsePriority(t *testing.T, input string) admissionpb.WorkPriority { + switch input { + case "LowPri": + return admissionpb.LowPri + case "NormalPri": + return admissionpb.NormalPri + case "HighPri": + return admissionpb.HighPri + default: + require.Fail(t, "unknown work class") + return admissionpb.WorkPriority(-1) + } +} + +// TestRangeControllerWaitForEval tests the RangeController WaitForEval method. +// +// - init: Initializes the range controller with the given ranges. +// range_id= tenant_id= local_replica_id= +// store_id= replica_id= type= [state=] +// ... +// +// - wait_for_eval: Starts a WaitForEval call on the given range. +// range_id= name= pri= +// +// - check_state: Prints the current state of all ranges. +// +// - adjust_tokens: Adjusts the token count for the given store and priority. +// store_id= pri= tokens= +// ... +// +// - cancel_context: Cancels the context for the given range. +// range_id= name= +// +// - set_replicas: Sets the replicas for the given range. +// range_id= tenant_id= local_replica_id= +// store_id= replica_id= type= [state=] +// ... +// +// - set_leaseholder: Sets the leaseholder for the given range. +// range_id= replica_id= +func TestRangeControllerWaitForEval(t *testing.T) { + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + ranges := make(map[roachpb.RangeID]*testingRCRange) + ssTokenCounter := NewStreamTokenCounterProvider(settings) + + // Eval will only wait on a positive token amount, set the limit to 1 in + // order to simplify testing. + kvflowcontrol.RegularTokensPerStream.Override(ctx, &settings.SV, 1) + kvflowcontrol.ElasticTokensPerStream.Override(ctx, &settings.SV, 1) + // We will initialize each token counter to 0 tokens initially. The map is + // used to do so exactly once per stream. + zeroedTokenCounters := make(map[kvflowcontrol.Stream]struct{}) + + rangeStateString := func() string { + + var b strings.Builder + + // Sort the ranges by rangeID to ensure deterministic output. + sortedRanges := make([]*testingRCRange, 0, len(ranges)) + for _, testRC := range ranges { + sortedRanges = append(sortedRanges, testRC) + // We retain the lock until the end of the function call. + testRC.mu.Lock() + defer testRC.mu.Unlock() + } + sort.Slice(sortedRanges, func(i, j int) bool { + return sortedRanges[i].mu.r.rangeID < sortedRanges[j].mu.r.rangeID + }) + + for _, testRC := range sortedRanges { + replicaIDs := make([]int, 0, len(testRC.mu.r.replicaSet)) + for replicaID := range testRC.mu.r.replicaSet { + replicaIDs = append(replicaIDs, int(replicaID)) + } + sort.Ints(replicaIDs) + + fmt.Fprintf(&b, "r%d: [", testRC.mu.r.rangeID) + for i, replicaID := range replicaIDs { + replica := testRC.mu.r.replicaSet[roachpb.ReplicaID(replicaID)] + if i > 0 { + fmt.Fprintf(&b, ",") + } + fmt.Fprintf(&b, "%v", replica.desc) + if replica.desc.ReplicaID == testRC.rc.leaseholder { + fmt.Fprint(&b, "*") + } + } + fmt.Fprintf(&b, "]\n") + } + return b.String() + } + + tokenCountsString := func() string { + var b strings.Builder + streams := make([]kvflowcontrol.Stream, 0, len(ssTokenCounter.mu.evalCounters)) + for stream := range ssTokenCounter.mu.evalCounters { + streams = append(streams, stream) + } + sort.Slice(streams, func(i, j int) bool { + return streams[i].StoreID < streams[j].StoreID + }) + for _, stream := range streams { + fmt.Fprintf(&b, "%v: %v\n", stream, ssTokenCounter.Eval(stream)) + } + + return b.String() + } + + evalStateString := func() string { + time.Sleep(100 * time.Millisecond) + var b strings.Builder + + // Sort the ranges by rangeID to ensure deterministic output. + sortedRanges := make([]*testingRCRange, 0, len(ranges)) + for _, testRC := range ranges { + sortedRanges = append(sortedRanges, testRC) + // We retain the lock until the end of the function call. + testRC.mu.Lock() + defer testRC.mu.Unlock() + } + sort.Slice(sortedRanges, func(i, j int) bool { + return sortedRanges[i].mu.r.rangeID < sortedRanges[j].mu.r.rangeID + }) + + for _, testRC := range sortedRanges { + fmt.Fprintf(&b, "range_id=%d tenant_id=%d local_replica_id=%d\n", + testRC.mu.r.rangeID, testRC.mu.r.tenantID, testRC.mu.r.localReplicaID) + // Sort the evals by name to ensure deterministic output. + evals := make([]string, 0, len(testRC.mu.evals)) + for name := range testRC.mu.evals { + evals = append(evals, name) + } + sort.Strings(evals) + for _, name := range evals { + eval := testRC.mu.evals[name] + fmt.Fprintf(&b, " name=%s pri=%-8v done=%-5t err=%v\n", name, eval.pri, eval.done, eval.err) + } + } + return b.String() + } + + maybeZeroTokenCounters := func(r testingRange) { + for _, replica := range r.replicaSet { + stream := kvflowcontrol.Stream{ + StoreID: replica.desc.StoreID, + TenantID: r.tenantID, + } + if _, ok := zeroedTokenCounters[stream]; !ok { + zeroedTokenCounters[stream] = struct{}{} + ssTokenCounter.Eval(stream).(*tokenCounter).adjust(ctx, admissionpb.RegularWorkClass, -1) + } + } + } + + getOrInitRange := func(r testingRange) *testingRCRange { + testRC, ok := ranges[r.rangeID] + if !ok { + testRC = &testingRCRange{} + testRC.mu.r = r + testRC.mu.evals = make(map[string]*testingRCEval) + options := RangeControllerOptions{ + RangeID: r.rangeID, + TenantID: r.tenantID, + LocalReplicaID: r.localReplicaID, + SSTokenCounter: ssTokenCounter, + RaftInterface: testRC, + } + + init := RangeControllerInitState{ + ReplicaSet: r.replicas(), + Leaseholder: r.localReplicaID, + } + testRC.rc = NewRangeController(ctx, options, init) + ranges[r.rangeID] = testRC + } + maybeZeroTokenCounters(r) + return testRC + } + + datadriven.RunTest(t, "testdata/range_controller_wait_for_eval", func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "init": + for _, r := range scanRanges(t, d.Input) { + getOrInitRange(r) + } + return rangeStateString() + tokenCountsString() + + case "wait_for_eval": + var rangeID int + var name, priString string + d.ScanArgs(t, "range_id", &rangeID) + d.ScanArgs(t, "name", &name) + d.ScanArgs(t, "pri", &priString) + testRC := ranges[roachpb.RangeID(rangeID)] + testRC.startWaitForEval(name, parsePriority(t, priString)) + return evalStateString() + + case "check_state": + return evalStateString() + + case "adjust_tokens": + for _, line := range strings.Split(d.Input, "\n") { + parts := strings.Fields(line) + parts[0] = strings.TrimSpace(parts[0]) + require.True(t, strings.HasPrefix(parts[0], "store_id=")) + parts[0] = strings.TrimPrefix(parts[0], "store_id=") + store, err := strconv.Atoi(parts[0]) + require.NoError(t, err) + + parts[1] = strings.TrimSpace(parts[1]) + require.True(t, strings.HasPrefix(parts[1], "pri=")) + pri := parsePriority(t, strings.TrimPrefix(parts[1], "pri=")) + + parts[2] = strings.TrimSpace(parts[2]) + require.True(t, strings.HasPrefix(parts[2], "tokens=")) + tokenString := strings.TrimPrefix(parts[2], "tokens=") + tokens, err := humanizeutil.ParseBytes(tokenString) + require.NoError(t, err) + + ssTokenCounter.Eval(kvflowcontrol.Stream{ + StoreID: roachpb.StoreID(store), + TenantID: roachpb.SystemTenantID, + }).(*tokenCounter).adjust(ctx, + admissionpb.WorkClassFromPri(pri), + kvflowcontrol.Tokens(tokens)) + } + + return tokenCountsString() + + case "cancel_context": + var rangeID int + var name string + + d.ScanArgs(t, "range_id", &rangeID) + d.ScanArgs(t, "name", &name) + testRC := ranges[roachpb.RangeID(rangeID)] + func() { + testRC.mu.Lock() + defer testRC.mu.Unlock() + testRC.mu.evals[name].cancel() + }() + + return evalStateString() + + case "set_replicas": + for _, r := range scanRanges(t, d.Input) { + testRC := getOrInitRange(r) + func() { + testRC.mu.Lock() + defer testRC.mu.Unlock() + testRC.mu.r = r + }() + err := testRC.rc.SetReplicasRaftMuLocked(ctx, r.replicas()) + require.NoError(t, err) + } + return rangeStateString() + + case "set_leaseholder": + var rangeID, replicaID int + d.ScanArgs(t, "range_id", &rangeID) + d.ScanArgs(t, "replica_id", &replicaID) + testRC := ranges[roachpb.RangeID(rangeID)] + testRC.rc.SetLeaseholderRaftMuLocked(ctx, roachpb.ReplicaID(replicaID)) + return rangeStateString() + + default: + panic(fmt.Sprintf("unknown command: %s", d.Cmd)) + } + }) +} diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go b/pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go index af373c0542f9..175397579f13 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go @@ -34,9 +34,10 @@ type StreamTokenCounterProvider struct { // NewStreamTokenCounterProvider creates a new StreamTokenCounterProvider. func NewStreamTokenCounterProvider(settings *cluster.Settings) *StreamTokenCounterProvider { - return &StreamTokenCounterProvider{ - settings: settings, - } + p := StreamTokenCounterProvider{settings: settings} + p.mu.evalCounters = make(map[kvflowcontrol.Stream]TokenCounter) + p.mu.sendCounters = make(map[kvflowcontrol.Stream]TokenCounter) + return &p } // Eval returns the evaluation token counter for the given stream. diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller_wait_for_eval b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller_wait_for_eval new file mode 100644 index 000000000000..2f74fb04427b --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller_wait_for_eval @@ -0,0 +1,342 @@ +# Intialize a range with voters on s1,s2 and s3. The local replica and +# leaseholder will be s1. The leaseholder is denoted by the '*' suffix. +init +range_id=1 tenant_id=1 local_replica_id=1 + store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate + store_id=2 replica_id=2 type=VOTER_FULL state=StateReplicate + store_id=3 replica_id=3 type=VOTER_FULL state=StateReplicate +---- +r1: [(n1,s1):1*,(n2,s2):2,(n3,s3):3] +t1/s1: reg=+0 B/+1 B ela=+0 B/+1 B +t1/s2: reg=+0 B/+1 B ela=+0 B/+1 B +t1/s3: reg=+0 B/+1 B ela=+0 B/+1 B + +# Start a high priority evaluation. It should not complete due to lack of +# tokens. +wait_for_eval name=a range_id=1 pri=HighPri +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=false err= + +# Start a low priority evaluation. It should also not complete. +wait_for_eval name=b range_id=1 pri=LowPri +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=false err= + name=b pri=low-pri done=false err= + +# Add high priority tokens to the first store. This is not enough for quorum. +adjust_tokens + store_id=1 pri=HighPri tokens=1 +---- +t1/s1: reg=+1 B/+1 B ela=+1 B/+1 B +t1/s2: reg=+0 B/+1 B ela=+0 B/+1 B +t1/s3: reg=+0 B/+1 B ela=+0 B/+1 B + +# Cancel the context for the high priority evaluation 'a'. +cancel_context range_id=1 name=a +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=true err=context canceled + name=b pri=low-pri done=false err= + +# Add high priority tokens to the second store. 'b' is elastic so it should not +# complete despite having a quorum of streams with available tokens. +adjust_tokens + store_id=2 pri=HighPri tokens=1 +---- +t1/s1: reg=+1 B/+1 B ela=+1 B/+1 B +t1/s2: reg=+1 B/+1 B ela=+1 B/+1 B +t1/s3: reg=+0 B/+1 B ela=+0 B/+1 B + +check_state +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=true err=context canceled + name=b pri=low-pri done=false err= + +# Add high priority tokens to the third store. Now all stores have positive +# tokens and 'b' should complete. +adjust_tokens + store_id=3 pri=HighPri tokens=1 +---- +t1/s1: reg=+1 B/+1 B ela=+1 B/+1 B +t1/s2: reg=+1 B/+1 B ela=+1 B/+1 B +t1/s3: reg=+1 B/+1 B ela=+1 B/+1 B + +check_state +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=true err=context canceled + name=b pri=low-pri done=true err= + +# Change the replica set: replace replica 3 with a new replica 4. +set_replicas +range_id=1 tenant_id=1 local_replica_id=1 + store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate + store_id=2 replica_id=2 type=VOTER_FULL state=StateReplicate + store_id=4 replica_id=4 type=VOTER_FULL state=StateReplicate +---- +r1: [(n1,s1):1*,(n2,s2):2,(n4,s4):4] + +adjust_tokens + store_id=1 pri=HighPri tokens=-1 +---- +t1/s1: reg=+0 B/+1 B ela=+0 B/+1 B +t1/s2: reg=+1 B/+1 B ela=+1 B/+1 B +t1/s3: reg=+1 B/+1 B ela=+1 B/+1 B +t1/s4: reg=+0 B/+1 B ela=+0 B/+1 B + +# Start a new high priority evaluation 'c'. It should not complete due to lack +# of quorum. +wait_for_eval name=c range_id=1 pri=HighPri +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=true err=context canceled + name=b pri=low-pri done=true err= + name=c pri=high-pri done=false err= + +# Add high priority tokens back to the first store, restoring quorum. +adjust_tokens + store_id=1 pri=HighPri tokens=1 +---- +t1/s1: reg=+1 B/+1 B ela=+1 B/+1 B +t1/s2: reg=+1 B/+1 B ela=+1 B/+1 B +t1/s3: reg=+1 B/+1 B ela=+1 B/+1 B +t1/s4: reg=+0 B/+1 B ela=+0 B/+1 B + +# Check the state. The high priority evaluation 'c' should now complete. +check_state +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=true err=context canceled + name=b pri=low-pri done=true err= + name=c pri=high-pri done=true err= + +# Test behavior with a non-voter replica. +set_replicas +range_id=1 tenant_id=1 local_replica_id=1 + store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate + store_id=2 replica_id=2 type=VOTER_FULL state=StateReplicate + store_id=3 replica_id=3 type=VOTER_FULL state=StateReplicate + store_id=4 replica_id=4 type=NON_VOTER state=StateReplicate +---- +r1: [(n1,s1):1*,(n2,s2):2,(n3,s3):3,(n4,s4):4NON_VOTER] + +# Start a new high priority evaluation 'd'. +wait_for_eval name=d range_id=1 pri=HighPri +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=true err=context canceled + name=b pri=low-pri done=true err= + name=c pri=high-pri done=true err= + name=d pri=high-pri done=true err= + +# Remove tokens from s3, s1 and s2 have tokens which is enough for quorum. +adjust_tokens + store_id=3 pri=HighPri tokens=-1 +---- +t1/s1: reg=+1 B/+1 B ela=+1 B/+1 B +t1/s2: reg=+1 B/+1 B ela=+1 B/+1 B +t1/s3: reg=+0 B/+1 B ela=+0 B/+1 B +t1/s4: reg=+0 B/+1 B ela=+0 B/+1 B + +# Check the state. The high priority evaluation 'd' should complete despite the +# non-voter replica lacking tokens. +check_state +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=true err=context canceled + name=b pri=low-pri done=true err= + name=c pri=high-pri done=true err= + name=d pri=high-pri done=true err= + +# Test behavior when changing leaseholder. +set_leaseholder range_id=1 replica_id=2 +---- +r1: [(n1,s1):1,(n2,s2):2*,(n3,s3):3,(n4,s4):4NON_VOTER] + +# Start a new high priority evaluation 'e'. This evaluation completes +# immediately because there are already sufficient tokens for the new +# leaseholder. +wait_for_eval name=e range_id=1 pri=HighPri +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=true err=context canceled + name=b pri=low-pri done=true err= + name=c pri=high-pri done=true err= + name=d pri=high-pri done=true err= + name=e pri=high-pri done=true err= + +# Start another evaluation on a new range, which will intersect some of the +# stores of the first range. The evaluation on the first range will not +# complete until all streams have tokens, whereas the high priority evaluation +# on the second range will complete once a quorum has available tokens. +set_replicas +range_id=1 tenant_id=1 local_replica_id=1 + store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate + store_id=2 replica_id=2 type=VOTER_FULL state=StateReplicate + store_id=3 replica_id=5 type=VOTER_FULL state=StateReplicate +range_id=2 tenant_id=1 local_replica_id=1 + store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate + store_id=3 replica_id=3 type=VOTER_FULL state=StateReplicate + store_id=5 replica_id=5 type=VOTER_FULL state=StateReplicate +---- +r1: [(n1,s1):1,(n2,s2):2*,(n3,s3):5] +r2: [(n1,s1):1*,(n3,s3):3,(n5,s5):5] + +set_leaseholder range_id=1 replica_id=4 +---- +r1: [(n1,s1):1,(n2,s2):2,(n3,s3):5] +r2: [(n1,s1):1*,(n3,s3):3,(n5,s5):5] + +wait_for_eval name=f range_id=1 pri=LowPri +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=true err=context canceled + name=b pri=low-pri done=true err= + name=c pri=high-pri done=true err= + name=d pri=high-pri done=true err= + name=e pri=high-pri done=true err= + name=f pri=low-pri done=false err= +range_id=2 tenant_id={1} local_replica_id=1 + +wait_for_eval name=g range_id=2 pri=HighPri +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=true err=context canceled + name=b pri=low-pri done=true err= + name=c pri=high-pri done=true err= + name=d pri=high-pri done=true err= + name=e pri=high-pri done=true err= + name=f pri=low-pri done=false err= +range_id=2 tenant_id={1} local_replica_id=1 + name=g pri=high-pri done=false err= + +adjust_tokens + store_id=5 pri=HighPri tokens=1 +---- +t1/s1: reg=+1 B/+1 B ela=+1 B/+1 B +t1/s2: reg=+1 B/+1 B ela=+1 B/+1 B +t1/s3: reg=+0 B/+1 B ela=+0 B/+1 B +t1/s4: reg=+0 B/+1 B ela=+0 B/+1 B +t1/s5: reg=+1 B/+1 B ela=+1 B/+1 B + +check_state +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=true err=context canceled + name=b pri=low-pri done=true err= + name=c pri=high-pri done=true err= + name=d pri=high-pri done=true err= + name=e pri=high-pri done=true err= + name=f pri=low-pri done=false err= +range_id=2 tenant_id={1} local_replica_id=1 + name=g pri=high-pri done=true err= + +# Adding elastic tokens to s3 should complete the low priority evaluation 'f', +# as all stores now have elastic tokens available. +adjust_tokens + store_id=3 pri=LowPri tokens=1 +---- +t1/s1: reg=+1 B/+1 B ela=+1 B/+1 B +t1/s2: reg=+1 B/+1 B ela=+1 B/+1 B +t1/s3: reg=+0 B/+1 B ela=+1 B/+1 B +t1/s4: reg=+0 B/+1 B ela=+0 B/+1 B +t1/s5: reg=+1 B/+1 B ela=+1 B/+1 B + +check_state +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=true err=context canceled + name=b pri=low-pri done=true err= + name=c pri=high-pri done=true err= + name=d pri=high-pri done=true err= + name=e pri=high-pri done=true err= + name=f pri=low-pri done=true err= +range_id=2 tenant_id={1} local_replica_id=1 + name=g pri=high-pri done=true err= + +# Adjust the tokens so that r1 doesn't have tokens on s3 or s1, then transfer +# s3 the lease for r1. +adjust_tokens + store_id=3 pri=LowPri tokens=-1 + store_id=1 pri=HighPri tokens=-1 +---- +t1/s1: reg=+0 B/+1 B ela=+0 B/+1 B +t1/s2: reg=+1 B/+1 B ela=+1 B/+1 B +t1/s3: reg=+0 B/+1 B ela=+0 B/+1 B +t1/s4: reg=+0 B/+1 B ela=+0 B/+1 B +t1/s5: reg=+1 B/+1 B ela=+1 B/+1 B + +set_leaseholder range_id=1 replica_id=5 +---- +r1: [(n1,s1):1,(n2,s2):2,(n3,s3):5*] +r2: [(n1,s1):1*,(n3,s3):3,(n5,s5):5] + +# Start another evaluation 'h' on r1. It should not complete as the leaseholder +# (s3) doesn't have available tokens and the leader (s1) doesn't have tokens. +wait_for_eval name=h range_id=1 pri=HighPri +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=true err=context canceled + name=b pri=low-pri done=true err= + name=c pri=high-pri done=true err= + name=d pri=high-pri done=true err= + name=e pri=high-pri done=true err= + name=f pri=low-pri done=true err= + name=h pri=high-pri done=false err= +range_id=2 tenant_id={1} local_replica_id=1 + name=g pri=high-pri done=true err= + +# Add tokens to s3, this should not complete 'h' as the leader of r1 (s1) does +# not have tokens. +adjust_tokens + store_id=3 pri=HighPri tokens=1 +---- +t1/s1: reg=+0 B/+1 B ela=+0 B/+1 B +t1/s2: reg=+1 B/+1 B ela=+1 B/+1 B +t1/s3: reg=+1 B/+1 B ela=+1 B/+1 B +t1/s4: reg=+0 B/+1 B ela=+0 B/+1 B +t1/s5: reg=+1 B/+1 B ela=+1 B/+1 B + +# Start another evaluation 'i' on r1, it should also not complete until the +# leader (s1) has tokens, despite both the leaseholder (s3) and a quorum +# (s2,s3) having tokens available. Similar to above. +wait_for_eval name=i range_id=1 pri=HighPri +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=true err=context canceled + name=b pri=low-pri done=true err= + name=c pri=high-pri done=true err= + name=d pri=high-pri done=true err= + name=e pri=high-pri done=true err= + name=f pri=low-pri done=true err= + name=h pri=high-pri done=false err= + name=i pri=high-pri done=false err= +range_id=2 tenant_id={1} local_replica_id=1 + name=g pri=high-pri done=true err= + +# Finally, add tokens to s1 to complete both evaluations 'h' and 'i'. +adjust_tokens + store_id=1 pri=HighPri tokens=1 +---- +t1/s1: reg=+1 B/+1 B ela=+1 B/+1 B +t1/s2: reg=+1 B/+1 B ela=+1 B/+1 B +t1/s3: reg=+1 B/+1 B ela=+1 B/+1 B +t1/s4: reg=+0 B/+1 B ela=+0 B/+1 B +t1/s5: reg=+1 B/+1 B ela=+1 B/+1 B + +check_state +---- +range_id=1 tenant_id={1} local_replica_id=1 + name=a pri=high-pri done=true err=context canceled + name=b pri=low-pri done=true err= + name=c pri=high-pri done=true err= + name=d pri=high-pri done=true err= + name=e pri=high-pri done=true err= + name=f pri=low-pri done=true err= + name=h pri=high-pri done=true err= + name=i pri=high-pri done=true err= +range_id=2 tenant_id={1} local_replica_id=1 + name=g pri=high-pri done=true err= diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go b/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go index 687f00a24081..4e0fbb141575 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go @@ -47,6 +47,8 @@ type TokenCounter interface { Deduct(context.Context, admissionpb.WorkClass, kvflowcontrol.Tokens) // Return returns flow tokens for the given work class. Return(context.Context, admissionpb.WorkClass, kvflowcontrol.Tokens) + // String returns a string representation of the token counter. + String() string } // TokenWaitingHandle is the interface for waiting for positive tokens from a @@ -205,6 +207,22 @@ func newTokenCounter(settings *cluster.Settings) *tokenCounter { return t } +// String returns a string representation of the token counter. +func (b *tokenCounter) String() string { + return redact.StringWithoutMarkers(b) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (b *tokenCounter) SafeFormat(w redact.SafePrinter, _ rune) { + b.mu.RLock() + defer b.mu.RUnlock() + w.Printf("reg=%v/%v ela=%v/%v", + b.mu.counters[admissionpb.RegularWorkClass].tokens, + b.mu.counters[admissionpb.RegularWorkClass].limit, + b.mu.counters[admissionpb.ElasticWorkClass].tokens, + b.mu.counters[admissionpb.ElasticWorkClass].limit) +} + func (t *tokenCounter) tokens(wc admissionpb.WorkClass) kvflowcontrol.Tokens { t.mu.RLock() defer t.mu.RUnlock()