From 41e3497400eac758aeb6a7333238e4e3a1aeac35 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Mon, 27 Mar 2023 19:01:09 +0000 Subject: [PATCH] kvserver: use qps for hot ranges sorting We introduced CPU balancing by default in #97424. This had the side effect of changing the hot ranges api to return the hottest replicas by CPU, rather than QPS. This patch updates the replica rankings struct to support tracking both by CPU and QPS simultaneously. The hot ranges API collects the top k by QPS and the store rebalancer collects depending on the setting of `kv.allocator.load_based_rebalancing.objective`, which is by default `cpu`. Resolves: #99605 Release note (bug fix): The hot ranges UI page would show hot ranges by CPU and not QPS, depending on the value of `kv.allocator.load_based_rebalancing.objective` (default `cpu`). Now the UI page will always collect based on QPS. --- .../asim/storerebalancer/replica_rankings.go | 2 +- pkg/kv/kvserver/replica_rankings.go | 41 ++++++--- pkg/kv/kvserver/replica_rankings_test.go | 85 +++++++++++-------- pkg/kv/kvserver/store.go | 8 +- pkg/kv/kvserver/store_rebalancer.go | 2 +- pkg/kv/kvserver/store_rebalancer_test.go | 36 ++++---- 6 files changed, 103 insertions(+), 71 deletions(-) diff --git a/pkg/kv/kvserver/asim/storerebalancer/replica_rankings.go b/pkg/kv/kvserver/asim/storerebalancer/replica_rankings.go index 0c40833f0798..6606c6247cde 100644 --- a/pkg/kv/kvserver/asim/storerebalancer/replica_rankings.go +++ b/pkg/kv/kvserver/asim/storerebalancer/replica_rankings.go @@ -29,5 +29,5 @@ func hottestRanges( accumulator.AddReplica(candidateReplica) } replRankings.Update(accumulator) - return replRankings.TopLoad() + return replRankings.TopLoad(dim) } diff --git a/pkg/kv/kvserver/replica_rankings.go b/pkg/kv/kvserver/replica_rankings.go index f0b68e4016a6..147fd5062ac0 100644 --- a/pkg/kv/kvserver/replica_rankings.go +++ b/pkg/kv/kvserver/replica_rankings.go @@ -102,10 +102,15 @@ func NewReplicaRankings() *ReplicaRankings { // TODO(kvoli): When adding another load dimension to be balanced upon, it will // be necessary to clarify the semantics of this API. This is especially true // since the UI is coupled to this function. -func NewReplicaAccumulator(dimension load.Dimension) *RRAccumulator { - res := &RRAccumulator{} - res.dim.val = func(r CandidateReplica) float64 { - return r.RangeUsageInfo().Load().Dim(dimension) +func NewReplicaAccumulator(dims ...load.Dimension) *RRAccumulator { + res := &RRAccumulator{ + dims: map[load.Dimension]*rrPriorityQueue{}, + } + for _, dim := range dims { + res.dims[dim] = &rrPriorityQueue{} + res.dims[dim].val = func(r CandidateReplica) float64 { + return r.RangeUsageInfo().Load().Dim(dim) + } } return res } @@ -118,13 +123,13 @@ func (rr *ReplicaRankings) Update(acc *RRAccumulator) { } // TopLoad returns the highest load CandidateReplicas that are tracked. -func (rr *ReplicaRankings) TopLoad() []CandidateReplica { +func (rr *ReplicaRankings) TopLoad(dimension load.Dimension) []CandidateReplica { rr.mu.Lock() defer rr.mu.Unlock() // If we have a new set of data, consume it. Otherwise, just return the most // recently consumed data. - if rr.mu.dimAccumulator != nil && rr.mu.dimAccumulator.dim.Len() > 0 { - rr.mu.byDim = consumeAccumulator(&rr.mu.dimAccumulator.dim) + if rr.mu.dimAccumulator != nil && rr.mu.dimAccumulator.dims[dimension].Len() > 0 { + rr.mu.byDim = consumeAccumulator(rr.mu.dimAccumulator.dims[dimension]) } return rr.mu.byDim } @@ -138,23 +143,33 @@ func (rr *ReplicaRankings) TopLoad() []CandidateReplica { // prevents concurrent loaders of data from messing with each other -- the last // `update`d accumulator will win. type RRAccumulator struct { - dim rrPriorityQueue + dims map[load.Dimension]*rrPriorityQueue } // AddReplica adds a replica to the replica accumulator. func (a *RRAccumulator) AddReplica(repl CandidateReplica) { + for dim := range a.dims { + a.addReplicaForDimension(repl, dim) + + } +} + +func (a *RRAccumulator) addReplicaForDimension(repl CandidateReplica, dim load.Dimension) { + rr := a.dims[dim] // If the heap isn't full, just push the new replica and return. - if a.dim.Len() < numTopReplicasToTrack { - heap.Push(&a.dim, repl) + if rr.Len() < numTopReplicasToTrack { + + heap.Push(a.dims[dim], repl) return } // Otherwise, conditionally push if the new replica is more deserving than // the current tip of the heap. - if a.dim.val(repl) > a.dim.val(a.dim.entries[0]) { - heap.Pop(&a.dim) - heap.Push(&a.dim, repl) + if rr.val(repl) > rr.val(rr.entries[0]) { + heap.Pop(rr) + heap.Push(rr, repl) } + } func consumeAccumulator(pq *rrPriorityQueue) []CandidateReplica { diff --git a/pkg/kv/kvserver/replica_rankings_test.go b/pkg/kv/kvserver/replica_rankings_test.go index 0f8770d8459e..0e02c07861d7 100644 --- a/pkg/kv/kvserver/replica_rankings_test.go +++ b/pkg/kv/kvserver/replica_rankings_test.go @@ -37,52 +37,65 @@ func TestReplicaRankings(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + dimensions := []aload.Dimension{aload.Queries, aload.CPU} rr := NewReplicaRankings() testCases := []struct { - replicasByQPS []float64 + replicasByLoad []float64 }{ - {replicasByQPS: []float64{}}, - {replicasByQPS: []float64{0}}, - {replicasByQPS: []float64{1, 0}}, - {replicasByQPS: []float64{3, 2, 1, 0}}, - {replicasByQPS: []float64{3, 3, 2, 2, 1, 1, 0, 0}}, - {replicasByQPS: []float64{1.1, 1.0, 0.9, -0.9, -1.0, -1.1}}, + {replicasByLoad: []float64{}}, + {replicasByLoad: []float64{0}}, + {replicasByLoad: []float64{1, 0}}, + {replicasByLoad: []float64{3, 2, 1, 0}}, + {replicasByLoad: []float64{3, 3, 2, 2, 1, 1, 0, 0}}, + {replicasByLoad: []float64{1.1, 1.0, 0.9, -0.9, -1.0, -1.1}}, } for _, tc := range testCases { - acc := NewReplicaAccumulator(aload.Queries) - - // Randomize the order of the inputs each time the test is run. - want := make([]float64, len(tc.replicasByQPS)) - copy(want, tc.replicasByQPS) - rand.Shuffle(len(tc.replicasByQPS), func(i, j int) { - tc.replicasByQPS[i], tc.replicasByQPS[j] = tc.replicasByQPS[j], tc.replicasByQPS[i] - }) - - for i, replQPS := range tc.replicasByQPS { - acc.AddReplica(candidateReplica{ - Replica: &Replica{RangeID: roachpb.RangeID(i)}, - usage: allocator.RangeUsageInfo{QueriesPerSecond: replQPS}, + for _, dimension := range dimensions { + acc := NewReplicaAccumulator(dimensions...) + + // Randomize the order of the inputs each time the test is run. Also make + // a copy so that we can test on the copy for each dimension without + // mutating the underlying test case slice. + rLoad := make([]float64, len(tc.replicasByLoad)) + want := make([]float64, len(tc.replicasByLoad)) + copy(want, tc.replicasByLoad) + copy(rLoad, tc.replicasByLoad) + + rand.Shuffle(len(rLoad), func(i, j int) { + rLoad[i], rLoad[j] = rLoad[j], rLoad[i] }) - } - rr.Update(acc) - // Make sure we can read off all expected replicas in the correct order. - repls := rr.TopLoad() - if len(repls) != len(want) { - t.Errorf("wrong number of replicas in output; got: %v; want: %v", repls, tc.replicasByQPS) - continue - } - for i := range want { - if repls[i].RangeUsageInfo().QueriesPerSecond != want[i] { - t.Errorf("got %f for %d'th element; want %f (input: %v)", repls[i].RangeUsageInfo().QueriesPerSecond, i, want, tc.replicasByQPS) - break + for i, replLoad := range rLoad { + acc.AddReplica(candidateReplica{ + Replica: &Replica{RangeID: roachpb.RangeID(i)}, + usage: allocator.RangeUsageInfo{ + // We should get the same ordering for both QPS and CPU. + QueriesPerSecond: replLoad, + RequestCPUNanosPerSecond: replLoad, + }, + }) + } + rr.Update(acc) + + // Make sure we can read off all expected replicas in the correct order. + repls := rr.TopLoad(dimension) + if len(repls) != len(want) { + t.Errorf("wrong number of replicas in output; got: %v; want: %v", repls, rLoad) + continue + } + for i := range want { + if repls[i].RangeUsageInfo().Load().Dim(dimension) != want[i] { + t.Errorf("got %f for %d'th element; want %f (input: %v)", + repls[i].RangeUsageInfo().Load().Dim(dimension), i, want, rLoad) + break + } + } + replsCopy := rr.TopLoad(dimension) + if !reflect.DeepEqual(repls, replsCopy) { + t.Errorf("got different replicas on second call to topQPS; first call: %v, second call: %v", repls, replsCopy) } - } - replsCopy := rr.TopLoad() - if !reflect.DeepEqual(repls, replsCopy) { - t.Errorf("got different replicas on second call to topQPS; first call: %v, second call: %v", repls, replsCopy) } } } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index ba2c4ae54851..231a3d6cdec1 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/sidetransport" @@ -2729,7 +2730,10 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa replicaCount := s.metrics.ReplicaCount.Value() bytesPerReplica := make([]float64, 0, replicaCount) writesPerReplica := make([]float64, 0, replicaCount) - rankingsAccumulator := NewReplicaAccumulator(s.rebalanceObjManager.Objective().ToDimension()) + // We wish to track both CPU and QPS, due to different usecases between UI + // and rebalancing. By default rebalancing uses CPU whilst the UI will use + // QPS. + rankingsAccumulator := NewReplicaAccumulator(load.CPU, load.Queries) rankingsByTenantAccumulator := NewTenantReplicaAccumulator() // Query the current L0 sublevels and record the updated maximum to metrics. @@ -3300,7 +3304,7 @@ type HotReplicaInfo struct { // Note that this uses cached information, so it's cheap but may be slightly // out of date. func (s *Store) HottestReplicas() []HotReplicaInfo { - topLoad := s.replRankings.TopLoad() + topLoad := s.replRankings.TopLoad(load.Queries) return mapToHotReplicasInfo(topLoad) } diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index 621963c944cd..2f67ad2c0ddf 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -275,8 +275,8 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { if !sr.subscribedToSpanConfigs() { continue } - hottestRanges := sr.replicaRankings.TopLoad() objective := sr.RebalanceObjective() + hottestRanges := sr.replicaRankings.TopLoad(objective.ToDimension()) options := sr.scorerOptions(ctx, objective.ToDimension()) rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, mode) sr.rebalanceStore(ctx, rctx) diff --git a/pkg/kv/kvserver/store_rebalancer_test.go b/pkg/kv/kvserver/store_rebalancer_test.go index d3e68d0c14d4..21b9225e0a34 100644 --- a/pkg/kv/kvserver/store_rebalancer_test.go +++ b/pkg/kv/kvserver/store_rebalancer_test.go @@ -500,8 +500,10 @@ type testRange struct { qps, reqCPU float64 } -func loadRanges(rr *ReplicaRankings, s *Store, ranges []testRange, loadDimension load.Dimension) { - acc := NewReplicaAccumulator(loadDimension) +func loadRanges(rr *ReplicaRankings, s *Store, ranges []testRange) { + // Track both CPU and QPS by default, the ordering the consumer uses will + // depend on the current rebalance objective. + acc := NewReplicaAccumulator(load.Queries, load.CPU) for i, r := range ranges { rangeID := roachpb.RangeID(i + 1) repl := &Replica{store: s, RangeID: rangeID} @@ -787,8 +789,8 @@ func TestChooseLeaseToTransfer(t *testing.T) { for _, tc := range testCases { t.Run("", withQPSCPU(t, objectiveProvider, func(t *testing.T) { lbRebalanceDimension := objectiveProvider.Objective().ToDimension() - loadRanges(rr, s, []testRange{{voters: tc.storeIDs, qps: tc.qps, reqCPU: tc.reqCPU}}, lbRebalanceDimension) - hottestRanges := sr.replicaRankings.TopLoad() + loadRanges(rr, s, []testRange{{voters: tc.storeIDs, qps: tc.qps, reqCPU: tc.reqCPU}}) + hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension) options := sr.scorerOptions(ctx, lbRebalanceDimension) options.LoadThreshold = allocatorimpl.WithAllDims(0.1) rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) @@ -929,10 +931,10 @@ func TestChooseRangeToRebalanceRandom(t *testing.T) { loadRanges( rr, s, []testRange{ {voters: voterStores, nonVoters: nonVoterStores, qps: perReplicaQPS, reqCPU: perReplicaReqCPU}, - }, lbRebalanceDimension, + }, ) - hottestRanges := sr.replicaRankings.TopLoad() + hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension) options := sr.scorerOptions(ctx, lbRebalanceDimension) rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) rctx.options.IOOverloadOptions = allocatorimpl.IOOverloadOptions{ReplicaEnforcementLevel: allocatorimpl.IOOverloadThresholdIgnore} @@ -1270,10 +1272,10 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) { loadRanges( rr, s, []testRange{ {voters: tc.voters, nonVoters: tc.nonVoters, qps: testingQPS, reqCPU: testingReqCPU}, - }, lbRebalanceDimension, + }, ) - hottestRanges := sr.replicaRankings.TopLoad() + hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension) options := sr.scorerOptions(ctx, lbRebalanceDimension) rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, LBRebalancingLeasesAndReplicas) rctx.options.IOOverloadOptions = allocatorimpl.IOOverloadOptions{ @@ -1360,10 +1362,9 @@ func TestChooseRangeToRebalanceIgnoresRangeOnBestStores(t *testing.T) { qps: 100, reqCPU: 100 * float64(time.Millisecond)}, }, - lbRebalanceDimension, ) - hottestRanges := sr.replicaRankings.TopLoad() + hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension) options := sr.scorerOptions(ctx, lbRebalanceDimension) rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) rctx.options.IOOverloadOptions = allocatorimpl.IOOverloadOptions{ @@ -1528,10 +1529,9 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { s.cfg.DefaultSpanConfig.NumReplicas = int32(len(tc.voters)) loadRanges(rr, s, []testRange{{voters: tc.voters, qps: tc.QPS, reqCPU: tc.reqCPU}}, - lbRebalanceDimension, ) - hottestRanges := sr.replicaRankings.TopLoad() + hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension) options := sr.scorerOptions(ctx, lbRebalanceDimension) rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) rctx.options.IOOverloadOptions = allocatorimpl.IOOverloadOptions{ @@ -1621,9 +1621,9 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { // Load in a range with replicas on an overfull node, a slightly underfull // node, and a very underfull node. - loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 4, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}, lbRebalanceDimension) + loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 4, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}) - hottestRanges := sr.replicaRankings.TopLoad() + hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension) options := sr.scorerOptions(ctx, lbRebalanceDimension) rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) repl := rctx.hottestRanges[0] @@ -1638,9 +1638,9 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { // Then do the same, but for replica rebalancing. Make s5 an existing replica // that's behind, and see how a new replica is preferred as the leaseholder // over it. - loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}, lbRebalanceDimension) + loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}) - hottestRanges = sr.replicaRankings.TopLoad() + hottestRanges = sr.replicaRankings.TopLoad(lbRebalanceDimension) options = sr.scorerOptions(ctx, lbRebalanceDimension) rctx = sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) rctx.options.IOOverloadOptions = allocatorimpl.IOOverloadOptions{ @@ -1798,9 +1798,9 @@ func TestStoreRebalancerIOOverloadCheck(t *testing.T) { // Load in a range with replicas on an overfull node, a slightly underfull // node, and a very underfull node. - loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}, lbRebalanceDimension) + loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}) - hottestRanges := sr.replicaRankings.TopLoad() + hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension) options := sr.scorerOptions(ctx, lbRebalanceDimension) rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) require.Greater(t, len(rctx.hottestRanges), 0)