Skip to content

Commit

Permalink
kvserver: use qps for hot ranges sorting
Browse files Browse the repository at this point in the history
We introduced CPU balancing by default in #97424. This had the side
effect of changing the hot ranges api to return the hottest replicas by
CPU, rather than QPS.

This patch updates the replica rankings struct to support tracking both
by CPU and QPS simultaneously. The hot ranges API collects the top k by
QPS and the store rebalancer collects depending on the setting of
`kv.allocator.load_based_rebalancing.objective`, which is by default
`cpu`.

Resolves: #99605

Release note (bug fix): The hot ranges UI page would show hot ranges by
CPU and not QPS, depending on the value of
`kv.allocator.load_based_rebalancing.objective` (default `cpu`). Now the
UI page will always collect based on QPS.
  • Loading branch information
kvoli committed Mar 30, 2023
1 parent 56ace76 commit 41e3497
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 71 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/asim/storerebalancer/replica_rankings.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ func hottestRanges(
accumulator.AddReplica(candidateReplica)
}
replRankings.Update(accumulator)
return replRankings.TopLoad()
return replRankings.TopLoad(dim)
}
41 changes: 28 additions & 13 deletions pkg/kv/kvserver/replica_rankings.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,15 @@ func NewReplicaRankings() *ReplicaRankings {
// TODO(kvoli): When adding another load dimension to be balanced upon, it will
// be necessary to clarify the semantics of this API. This is especially true
// since the UI is coupled to this function.
func NewReplicaAccumulator(dimension load.Dimension) *RRAccumulator {
res := &RRAccumulator{}
res.dim.val = func(r CandidateReplica) float64 {
return r.RangeUsageInfo().Load().Dim(dimension)
func NewReplicaAccumulator(dims ...load.Dimension) *RRAccumulator {
res := &RRAccumulator{
dims: map[load.Dimension]*rrPriorityQueue{},
}
for _, dim := range dims {
res.dims[dim] = &rrPriorityQueue{}
res.dims[dim].val = func(r CandidateReplica) float64 {
return r.RangeUsageInfo().Load().Dim(dim)
}
}
return res
}
Expand All @@ -118,13 +123,13 @@ func (rr *ReplicaRankings) Update(acc *RRAccumulator) {
}

// TopLoad returns the highest load CandidateReplicas that are tracked.
func (rr *ReplicaRankings) TopLoad() []CandidateReplica {
func (rr *ReplicaRankings) TopLoad(dimension load.Dimension) []CandidateReplica {
rr.mu.Lock()
defer rr.mu.Unlock()
// If we have a new set of data, consume it. Otherwise, just return the most
// recently consumed data.
if rr.mu.dimAccumulator != nil && rr.mu.dimAccumulator.dim.Len() > 0 {
rr.mu.byDim = consumeAccumulator(&rr.mu.dimAccumulator.dim)
if rr.mu.dimAccumulator != nil && rr.mu.dimAccumulator.dims[dimension].Len() > 0 {
rr.mu.byDim = consumeAccumulator(rr.mu.dimAccumulator.dims[dimension])
}
return rr.mu.byDim
}
Expand All @@ -138,23 +143,33 @@ func (rr *ReplicaRankings) TopLoad() []CandidateReplica {
// prevents concurrent loaders of data from messing with each other -- the last
// `update`d accumulator will win.
type RRAccumulator struct {
dim rrPriorityQueue
dims map[load.Dimension]*rrPriorityQueue
}

// AddReplica adds a replica to the replica accumulator.
func (a *RRAccumulator) AddReplica(repl CandidateReplica) {
for dim := range a.dims {
a.addReplicaForDimension(repl, dim)

}
}

func (a *RRAccumulator) addReplicaForDimension(repl CandidateReplica, dim load.Dimension) {
rr := a.dims[dim]
// If the heap isn't full, just push the new replica and return.
if a.dim.Len() < numTopReplicasToTrack {
heap.Push(&a.dim, repl)
if rr.Len() < numTopReplicasToTrack {

heap.Push(a.dims[dim], repl)
return
}

// Otherwise, conditionally push if the new replica is more deserving than
// the current tip of the heap.
if a.dim.val(repl) > a.dim.val(a.dim.entries[0]) {
heap.Pop(&a.dim)
heap.Push(&a.dim, repl)
if rr.val(repl) > rr.val(rr.entries[0]) {
heap.Pop(rr)
heap.Push(rr, repl)
}

}

func consumeAccumulator(pq *rrPriorityQueue) []CandidateReplica {
Expand Down
85 changes: 49 additions & 36 deletions pkg/kv/kvserver/replica_rankings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,52 +37,65 @@ func TestReplicaRankings(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

dimensions := []aload.Dimension{aload.Queries, aload.CPU}
rr := NewReplicaRankings()

testCases := []struct {
replicasByQPS []float64
replicasByLoad []float64
}{
{replicasByQPS: []float64{}},
{replicasByQPS: []float64{0}},
{replicasByQPS: []float64{1, 0}},
{replicasByQPS: []float64{3, 2, 1, 0}},
{replicasByQPS: []float64{3, 3, 2, 2, 1, 1, 0, 0}},
{replicasByQPS: []float64{1.1, 1.0, 0.9, -0.9, -1.0, -1.1}},
{replicasByLoad: []float64{}},
{replicasByLoad: []float64{0}},
{replicasByLoad: []float64{1, 0}},
{replicasByLoad: []float64{3, 2, 1, 0}},
{replicasByLoad: []float64{3, 3, 2, 2, 1, 1, 0, 0}},
{replicasByLoad: []float64{1.1, 1.0, 0.9, -0.9, -1.0, -1.1}},
}

for _, tc := range testCases {
acc := NewReplicaAccumulator(aload.Queries)

// Randomize the order of the inputs each time the test is run.
want := make([]float64, len(tc.replicasByQPS))
copy(want, tc.replicasByQPS)
rand.Shuffle(len(tc.replicasByQPS), func(i, j int) {
tc.replicasByQPS[i], tc.replicasByQPS[j] = tc.replicasByQPS[j], tc.replicasByQPS[i]
})

for i, replQPS := range tc.replicasByQPS {
acc.AddReplica(candidateReplica{
Replica: &Replica{RangeID: roachpb.RangeID(i)},
usage: allocator.RangeUsageInfo{QueriesPerSecond: replQPS},
for _, dimension := range dimensions {
acc := NewReplicaAccumulator(dimensions...)

// Randomize the order of the inputs each time the test is run. Also make
// a copy so that we can test on the copy for each dimension without
// mutating the underlying test case slice.
rLoad := make([]float64, len(tc.replicasByLoad))
want := make([]float64, len(tc.replicasByLoad))
copy(want, tc.replicasByLoad)
copy(rLoad, tc.replicasByLoad)

rand.Shuffle(len(rLoad), func(i, j int) {
rLoad[i], rLoad[j] = rLoad[j], rLoad[i]
})
}
rr.Update(acc)

// Make sure we can read off all expected replicas in the correct order.
repls := rr.TopLoad()
if len(repls) != len(want) {
t.Errorf("wrong number of replicas in output; got: %v; want: %v", repls, tc.replicasByQPS)
continue
}
for i := range want {
if repls[i].RangeUsageInfo().QueriesPerSecond != want[i] {
t.Errorf("got %f for %d'th element; want %f (input: %v)", repls[i].RangeUsageInfo().QueriesPerSecond, i, want, tc.replicasByQPS)
break
for i, replLoad := range rLoad {
acc.AddReplica(candidateReplica{
Replica: &Replica{RangeID: roachpb.RangeID(i)},
usage: allocator.RangeUsageInfo{
// We should get the same ordering for both QPS and CPU.
QueriesPerSecond: replLoad,
RequestCPUNanosPerSecond: replLoad,
},
})
}
rr.Update(acc)

// Make sure we can read off all expected replicas in the correct order.
repls := rr.TopLoad(dimension)
if len(repls) != len(want) {
t.Errorf("wrong number of replicas in output; got: %v; want: %v", repls, rLoad)
continue
}
for i := range want {
if repls[i].RangeUsageInfo().Load().Dim(dimension) != want[i] {
t.Errorf("got %f for %d'th element; want %f (input: %v)",
repls[i].RangeUsageInfo().Load().Dim(dimension), i, want, rLoad)
break
}
}
replsCopy := rr.TopLoad(dimension)
if !reflect.DeepEqual(repls, replsCopy) {
t.Errorf("got different replicas on second call to topQPS; first call: %v, second call: %v", repls, replsCopy)
}
}
replsCopy := rr.TopLoad()
if !reflect.DeepEqual(repls, replsCopy) {
t.Errorf("got different replicas on second call to topQPS; first call: %v, second call: %v", repls, replsCopy)
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/sidetransport"
Expand Down Expand Up @@ -2729,7 +2730,10 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa
replicaCount := s.metrics.ReplicaCount.Value()
bytesPerReplica := make([]float64, 0, replicaCount)
writesPerReplica := make([]float64, 0, replicaCount)
rankingsAccumulator := NewReplicaAccumulator(s.rebalanceObjManager.Objective().ToDimension())
// We wish to track both CPU and QPS, due to different usecases between UI
// and rebalancing. By default rebalancing uses CPU whilst the UI will use
// QPS.
rankingsAccumulator := NewReplicaAccumulator(load.CPU, load.Queries)
rankingsByTenantAccumulator := NewTenantReplicaAccumulator()

// Query the current L0 sublevels and record the updated maximum to metrics.
Expand Down Expand Up @@ -3300,7 +3304,7 @@ type HotReplicaInfo struct {
// Note that this uses cached information, so it's cheap but may be slightly
// out of date.
func (s *Store) HottestReplicas() []HotReplicaInfo {
topLoad := s.replRankings.TopLoad()
topLoad := s.replRankings.TopLoad(load.Queries)
return mapToHotReplicasInfo(topLoad)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) {
if !sr.subscribedToSpanConfigs() {
continue
}
hottestRanges := sr.replicaRankings.TopLoad()
objective := sr.RebalanceObjective()
hottestRanges := sr.replicaRankings.TopLoad(objective.ToDimension())
options := sr.scorerOptions(ctx, objective.ToDimension())
rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, mode)
sr.rebalanceStore(ctx, rctx)
Expand Down
36 changes: 18 additions & 18 deletions pkg/kv/kvserver/store_rebalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,10 @@ type testRange struct {
qps, reqCPU float64
}

func loadRanges(rr *ReplicaRankings, s *Store, ranges []testRange, loadDimension load.Dimension) {
acc := NewReplicaAccumulator(loadDimension)
func loadRanges(rr *ReplicaRankings, s *Store, ranges []testRange) {
// Track both CPU and QPS by default, the ordering the consumer uses will
// depend on the current rebalance objective.
acc := NewReplicaAccumulator(load.Queries, load.CPU)
for i, r := range ranges {
rangeID := roachpb.RangeID(i + 1)
repl := &Replica{store: s, RangeID: rangeID}
Expand Down Expand Up @@ -787,8 +789,8 @@ func TestChooseLeaseToTransfer(t *testing.T) {
for _, tc := range testCases {
t.Run("", withQPSCPU(t, objectiveProvider, func(t *testing.T) {
lbRebalanceDimension := objectiveProvider.Objective().ToDimension()
loadRanges(rr, s, []testRange{{voters: tc.storeIDs, qps: tc.qps, reqCPU: tc.reqCPU}}, lbRebalanceDimension)
hottestRanges := sr.replicaRankings.TopLoad()
loadRanges(rr, s, []testRange{{voters: tc.storeIDs, qps: tc.qps, reqCPU: tc.reqCPU}})
hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension)
options := sr.scorerOptions(ctx, lbRebalanceDimension)
options.LoadThreshold = allocatorimpl.WithAllDims(0.1)
rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode())
Expand Down Expand Up @@ -929,10 +931,10 @@ func TestChooseRangeToRebalanceRandom(t *testing.T) {
loadRanges(
rr, s, []testRange{
{voters: voterStores, nonVoters: nonVoterStores, qps: perReplicaQPS, reqCPU: perReplicaReqCPU},
}, lbRebalanceDimension,
},
)

hottestRanges := sr.replicaRankings.TopLoad()
hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension)
options := sr.scorerOptions(ctx, lbRebalanceDimension)
rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode())
rctx.options.IOOverloadOptions = allocatorimpl.IOOverloadOptions{ReplicaEnforcementLevel: allocatorimpl.IOOverloadThresholdIgnore}
Expand Down Expand Up @@ -1270,10 +1272,10 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) {
loadRanges(
rr, s, []testRange{
{voters: tc.voters, nonVoters: tc.nonVoters, qps: testingQPS, reqCPU: testingReqCPU},
}, lbRebalanceDimension,
},
)

hottestRanges := sr.replicaRankings.TopLoad()
hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension)
options := sr.scorerOptions(ctx, lbRebalanceDimension)
rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, LBRebalancingLeasesAndReplicas)
rctx.options.IOOverloadOptions = allocatorimpl.IOOverloadOptions{
Expand Down Expand Up @@ -1360,10 +1362,9 @@ func TestChooseRangeToRebalanceIgnoresRangeOnBestStores(t *testing.T) {
qps: 100,
reqCPU: 100 * float64(time.Millisecond)},
},
lbRebalanceDimension,
)

hottestRanges := sr.replicaRankings.TopLoad()
hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension)
options := sr.scorerOptions(ctx, lbRebalanceDimension)
rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode())
rctx.options.IOOverloadOptions = allocatorimpl.IOOverloadOptions{
Expand Down Expand Up @@ -1528,10 +1529,9 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) {
s.cfg.DefaultSpanConfig.NumReplicas = int32(len(tc.voters))
loadRanges(rr, s,
[]testRange{{voters: tc.voters, qps: tc.QPS, reqCPU: tc.reqCPU}},
lbRebalanceDimension,
)

hottestRanges := sr.replicaRankings.TopLoad()
hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension)
options := sr.scorerOptions(ctx, lbRebalanceDimension)
rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode())
rctx.options.IOOverloadOptions = allocatorimpl.IOOverloadOptions{
Expand Down Expand Up @@ -1621,9 +1621,9 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) {

// Load in a range with replicas on an overfull node, a slightly underfull
// node, and a very underfull node.
loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 4, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}, lbRebalanceDimension)
loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 4, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}})

hottestRanges := sr.replicaRankings.TopLoad()
hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension)
options := sr.scorerOptions(ctx, lbRebalanceDimension)
rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode())
repl := rctx.hottestRanges[0]
Expand All @@ -1638,9 +1638,9 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) {
// Then do the same, but for replica rebalancing. Make s5 an existing replica
// that's behind, and see how a new replica is preferred as the leaseholder
// over it.
loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}, lbRebalanceDimension)
loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}})

hottestRanges = sr.replicaRankings.TopLoad()
hottestRanges = sr.replicaRankings.TopLoad(lbRebalanceDimension)
options = sr.scorerOptions(ctx, lbRebalanceDimension)
rctx = sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode())
rctx.options.IOOverloadOptions = allocatorimpl.IOOverloadOptions{
Expand Down Expand Up @@ -1798,9 +1798,9 @@ func TestStoreRebalancerIOOverloadCheck(t *testing.T) {

// Load in a range with replicas on an overfull node, a slightly underfull
// node, and a very underfull node.
loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}, lbRebalanceDimension)
loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}})

hottestRanges := sr.replicaRankings.TopLoad()
hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension)
options := sr.scorerOptions(ctx, lbRebalanceDimension)
rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode())
require.Greater(t, len(rctx.hottestRanges), 0)
Expand Down

0 comments on commit 41e3497

Please sign in to comment.