Skip to content

Commit

Permalink
Merge #99669 #99716
Browse files Browse the repository at this point in the history
99669: roachtest: bump jepsen version r=smg260 a=renatolabs

This commit bumps the jepsen version now to include a fix that allows
jepsen to set custom range sizes. See:

cockroachdb/jepsen#34

Epic: none

Release note: None

99716: kvserver: use qps for hot ranges sorting r=koorosh a=kvoli

We introduced CPU balancing by default in #97424. This had the side effect of changing the hot ranges api to return the hottest replicas by CPU, rather than QPS.

This patch updates the replica rankings struct to support tracking both by CPU and QPS simultaneously. The hot ranges API collects the top k by QPS and the store rebalancer collects depending on the setting of `kv.allocator.load_based_rebalancing.objective`, which is by default `cpu`.

Resolves: #99605


Co-authored-by: Renato Costa <[email protected]>
Co-authored-by: Austen McClernon <[email protected]>
  • Loading branch information
3 people committed Mar 30, 2023
3 parents 8e11ab8 + 7601872 + 41e3497 commit 7074da6
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 74 deletions.
6 changes: 3 additions & 3 deletions pkg/cmd/roachtest/tests/jepsen.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ process for those artifacts and that would add some traceability.
If you want to make a change to jepsen (like upgrade the version to resolve
issues with env incompatibility or bump jdbc driver versions), you can create
a pull request for tc-nightly-master branch and after merging build a new
a pull request for tc-nightly-main branch and after merging build a new
artifact using:
# install build dependencies and build tools
Expand All @@ -59,7 +59,7 @@ chmod +x lein
# clone repository and checkout release branch
git clone https://github.com/cockroachdb/jepsen
cd jepsen/cockroachdb
git checkout tc-nightly-master
git checkout tc-nightly-main
# build executable jar
~/lein uberjar
Expand All @@ -81,7 +81,7 @@ const jepsenRepo = "https://github.com/cockroachdb/jepsen"
const repoBranch = "tc-nightly"

const gcpPath = "https://storage.googleapis.com/cockroach-jepsen"
const binaryVersion = "0.1.0-3d7c345d-standalone"
const binaryVersion = "0.1.0-21cbebe-standalone"

var jepsenNemeses = []struct {
name, config string
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/asim/storerebalancer/replica_rankings.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ func hottestRanges(
accumulator.AddReplica(candidateReplica)
}
replRankings.Update(accumulator)
return replRankings.TopLoad()
return replRankings.TopLoad(dim)
}
41 changes: 28 additions & 13 deletions pkg/kv/kvserver/replica_rankings.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,15 @@ func NewReplicaRankings() *ReplicaRankings {
// TODO(kvoli): When adding another load dimension to be balanced upon, it will
// be necessary to clarify the semantics of this API. This is especially true
// since the UI is coupled to this function.
func NewReplicaAccumulator(dimension load.Dimension) *RRAccumulator {
res := &RRAccumulator{}
res.dim.val = func(r CandidateReplica) float64 {
return r.RangeUsageInfo().Load().Dim(dimension)
func NewReplicaAccumulator(dims ...load.Dimension) *RRAccumulator {
res := &RRAccumulator{
dims: map[load.Dimension]*rrPriorityQueue{},
}
for _, dim := range dims {
res.dims[dim] = &rrPriorityQueue{}
res.dims[dim].val = func(r CandidateReplica) float64 {
return r.RangeUsageInfo().Load().Dim(dim)
}
}
return res
}
Expand All @@ -118,13 +123,13 @@ func (rr *ReplicaRankings) Update(acc *RRAccumulator) {
}

// TopLoad returns the highest load CandidateReplicas that are tracked.
func (rr *ReplicaRankings) TopLoad() []CandidateReplica {
func (rr *ReplicaRankings) TopLoad(dimension load.Dimension) []CandidateReplica {
rr.mu.Lock()
defer rr.mu.Unlock()
// If we have a new set of data, consume it. Otherwise, just return the most
// recently consumed data.
if rr.mu.dimAccumulator != nil && rr.mu.dimAccumulator.dim.Len() > 0 {
rr.mu.byDim = consumeAccumulator(&rr.mu.dimAccumulator.dim)
if rr.mu.dimAccumulator != nil && rr.mu.dimAccumulator.dims[dimension].Len() > 0 {
rr.mu.byDim = consumeAccumulator(rr.mu.dimAccumulator.dims[dimension])
}
return rr.mu.byDim
}
Expand All @@ -138,23 +143,33 @@ func (rr *ReplicaRankings) TopLoad() []CandidateReplica {
// prevents concurrent loaders of data from messing with each other -- the last
// `update`d accumulator will win.
type RRAccumulator struct {
dim rrPriorityQueue
dims map[load.Dimension]*rrPriorityQueue
}

// AddReplica adds a replica to the replica accumulator.
func (a *RRAccumulator) AddReplica(repl CandidateReplica) {
for dim := range a.dims {
a.addReplicaForDimension(repl, dim)

}
}

func (a *RRAccumulator) addReplicaForDimension(repl CandidateReplica, dim load.Dimension) {
rr := a.dims[dim]
// If the heap isn't full, just push the new replica and return.
if a.dim.Len() < numTopReplicasToTrack {
heap.Push(&a.dim, repl)
if rr.Len() < numTopReplicasToTrack {

heap.Push(a.dims[dim], repl)
return
}

// Otherwise, conditionally push if the new replica is more deserving than
// the current tip of the heap.
if a.dim.val(repl) > a.dim.val(a.dim.entries[0]) {
heap.Pop(&a.dim)
heap.Push(&a.dim, repl)
if rr.val(repl) > rr.val(rr.entries[0]) {
heap.Pop(rr)
heap.Push(rr, repl)
}

}

func consumeAccumulator(pq *rrPriorityQueue) []CandidateReplica {
Expand Down
85 changes: 49 additions & 36 deletions pkg/kv/kvserver/replica_rankings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,52 +37,65 @@ func TestReplicaRankings(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

dimensions := []aload.Dimension{aload.Queries, aload.CPU}
rr := NewReplicaRankings()

testCases := []struct {
replicasByQPS []float64
replicasByLoad []float64
}{
{replicasByQPS: []float64{}},
{replicasByQPS: []float64{0}},
{replicasByQPS: []float64{1, 0}},
{replicasByQPS: []float64{3, 2, 1, 0}},
{replicasByQPS: []float64{3, 3, 2, 2, 1, 1, 0, 0}},
{replicasByQPS: []float64{1.1, 1.0, 0.9, -0.9, -1.0, -1.1}},
{replicasByLoad: []float64{}},
{replicasByLoad: []float64{0}},
{replicasByLoad: []float64{1, 0}},
{replicasByLoad: []float64{3, 2, 1, 0}},
{replicasByLoad: []float64{3, 3, 2, 2, 1, 1, 0, 0}},
{replicasByLoad: []float64{1.1, 1.0, 0.9, -0.9, -1.0, -1.1}},
}

for _, tc := range testCases {
acc := NewReplicaAccumulator(aload.Queries)

// Randomize the order of the inputs each time the test is run.
want := make([]float64, len(tc.replicasByQPS))
copy(want, tc.replicasByQPS)
rand.Shuffle(len(tc.replicasByQPS), func(i, j int) {
tc.replicasByQPS[i], tc.replicasByQPS[j] = tc.replicasByQPS[j], tc.replicasByQPS[i]
})

for i, replQPS := range tc.replicasByQPS {
acc.AddReplica(candidateReplica{
Replica: &Replica{RangeID: roachpb.RangeID(i)},
usage: allocator.RangeUsageInfo{QueriesPerSecond: replQPS},
for _, dimension := range dimensions {
acc := NewReplicaAccumulator(dimensions...)

// Randomize the order of the inputs each time the test is run. Also make
// a copy so that we can test on the copy for each dimension without
// mutating the underlying test case slice.
rLoad := make([]float64, len(tc.replicasByLoad))
want := make([]float64, len(tc.replicasByLoad))
copy(want, tc.replicasByLoad)
copy(rLoad, tc.replicasByLoad)

rand.Shuffle(len(rLoad), func(i, j int) {
rLoad[i], rLoad[j] = rLoad[j], rLoad[i]
})
}
rr.Update(acc)

// Make sure we can read off all expected replicas in the correct order.
repls := rr.TopLoad()
if len(repls) != len(want) {
t.Errorf("wrong number of replicas in output; got: %v; want: %v", repls, tc.replicasByQPS)
continue
}
for i := range want {
if repls[i].RangeUsageInfo().QueriesPerSecond != want[i] {
t.Errorf("got %f for %d'th element; want %f (input: %v)", repls[i].RangeUsageInfo().QueriesPerSecond, i, want, tc.replicasByQPS)
break
for i, replLoad := range rLoad {
acc.AddReplica(candidateReplica{
Replica: &Replica{RangeID: roachpb.RangeID(i)},
usage: allocator.RangeUsageInfo{
// We should get the same ordering for both QPS and CPU.
QueriesPerSecond: replLoad,
RequestCPUNanosPerSecond: replLoad,
},
})
}
rr.Update(acc)

// Make sure we can read off all expected replicas in the correct order.
repls := rr.TopLoad(dimension)
if len(repls) != len(want) {
t.Errorf("wrong number of replicas in output; got: %v; want: %v", repls, rLoad)
continue
}
for i := range want {
if repls[i].RangeUsageInfo().Load().Dim(dimension) != want[i] {
t.Errorf("got %f for %d'th element; want %f (input: %v)",
repls[i].RangeUsageInfo().Load().Dim(dimension), i, want, rLoad)
break
}
}
replsCopy := rr.TopLoad(dimension)
if !reflect.DeepEqual(repls, replsCopy) {
t.Errorf("got different replicas on second call to topQPS; first call: %v, second call: %v", repls, replsCopy)
}
}
replsCopy := rr.TopLoad()
if !reflect.DeepEqual(repls, replsCopy) {
t.Errorf("got different replicas on second call to topQPS; first call: %v, second call: %v", repls, replsCopy)
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/sidetransport"
Expand Down Expand Up @@ -2729,7 +2730,10 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa
replicaCount := s.metrics.ReplicaCount.Value()
bytesPerReplica := make([]float64, 0, replicaCount)
writesPerReplica := make([]float64, 0, replicaCount)
rankingsAccumulator := NewReplicaAccumulator(s.rebalanceObjManager.Objective().ToDimension())
// We wish to track both CPU and QPS, due to different usecases between UI
// and rebalancing. By default rebalancing uses CPU whilst the UI will use
// QPS.
rankingsAccumulator := NewReplicaAccumulator(load.CPU, load.Queries)
rankingsByTenantAccumulator := NewTenantReplicaAccumulator()

// Query the current L0 sublevels and record the updated maximum to metrics.
Expand Down Expand Up @@ -3300,7 +3304,7 @@ type HotReplicaInfo struct {
// Note that this uses cached information, so it's cheap but may be slightly
// out of date.
func (s *Store) HottestReplicas() []HotReplicaInfo {
topLoad := s.replRankings.TopLoad()
topLoad := s.replRankings.TopLoad(load.Queries)
return mapToHotReplicasInfo(topLoad)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) {
if !sr.subscribedToSpanConfigs() {
continue
}
hottestRanges := sr.replicaRankings.TopLoad()
objective := sr.RebalanceObjective()
hottestRanges := sr.replicaRankings.TopLoad(objective.ToDimension())
options := sr.scorerOptions(ctx, objective.ToDimension())
rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, mode)
sr.rebalanceStore(ctx, rctx)
Expand Down
36 changes: 18 additions & 18 deletions pkg/kv/kvserver/store_rebalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,10 @@ type testRange struct {
qps, reqCPU float64
}

func loadRanges(rr *ReplicaRankings, s *Store, ranges []testRange, loadDimension load.Dimension) {
acc := NewReplicaAccumulator(loadDimension)
func loadRanges(rr *ReplicaRankings, s *Store, ranges []testRange) {
// Track both CPU and QPS by default, the ordering the consumer uses will
// depend on the current rebalance objective.
acc := NewReplicaAccumulator(load.Queries, load.CPU)
for i, r := range ranges {
rangeID := roachpb.RangeID(i + 1)
repl := &Replica{store: s, RangeID: rangeID}
Expand Down Expand Up @@ -787,8 +789,8 @@ func TestChooseLeaseToTransfer(t *testing.T) {
for _, tc := range testCases {
t.Run("", withQPSCPU(t, objectiveProvider, func(t *testing.T) {
lbRebalanceDimension := objectiveProvider.Objective().ToDimension()
loadRanges(rr, s, []testRange{{voters: tc.storeIDs, qps: tc.qps, reqCPU: tc.reqCPU}}, lbRebalanceDimension)
hottestRanges := sr.replicaRankings.TopLoad()
loadRanges(rr, s, []testRange{{voters: tc.storeIDs, qps: tc.qps, reqCPU: tc.reqCPU}})
hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension)
options := sr.scorerOptions(ctx, lbRebalanceDimension)
options.LoadThreshold = allocatorimpl.WithAllDims(0.1)
rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode())
Expand Down Expand Up @@ -929,10 +931,10 @@ func TestChooseRangeToRebalanceRandom(t *testing.T) {
loadRanges(
rr, s, []testRange{
{voters: voterStores, nonVoters: nonVoterStores, qps: perReplicaQPS, reqCPU: perReplicaReqCPU},
}, lbRebalanceDimension,
},
)

hottestRanges := sr.replicaRankings.TopLoad()
hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension)
options := sr.scorerOptions(ctx, lbRebalanceDimension)
rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode())
rctx.options.IOOverloadOptions = allocatorimpl.IOOverloadOptions{ReplicaEnforcementLevel: allocatorimpl.IOOverloadThresholdIgnore}
Expand Down Expand Up @@ -1270,10 +1272,10 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) {
loadRanges(
rr, s, []testRange{
{voters: tc.voters, nonVoters: tc.nonVoters, qps: testingQPS, reqCPU: testingReqCPU},
}, lbRebalanceDimension,
},
)

hottestRanges := sr.replicaRankings.TopLoad()
hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension)
options := sr.scorerOptions(ctx, lbRebalanceDimension)
rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, LBRebalancingLeasesAndReplicas)
rctx.options.IOOverloadOptions = allocatorimpl.IOOverloadOptions{
Expand Down Expand Up @@ -1360,10 +1362,9 @@ func TestChooseRangeToRebalanceIgnoresRangeOnBestStores(t *testing.T) {
qps: 100,
reqCPU: 100 * float64(time.Millisecond)},
},
lbRebalanceDimension,
)

hottestRanges := sr.replicaRankings.TopLoad()
hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension)
options := sr.scorerOptions(ctx, lbRebalanceDimension)
rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode())
rctx.options.IOOverloadOptions = allocatorimpl.IOOverloadOptions{
Expand Down Expand Up @@ -1528,10 +1529,9 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) {
s.cfg.DefaultSpanConfig.NumReplicas = int32(len(tc.voters))
loadRanges(rr, s,
[]testRange{{voters: tc.voters, qps: tc.QPS, reqCPU: tc.reqCPU}},
lbRebalanceDimension,
)

hottestRanges := sr.replicaRankings.TopLoad()
hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension)
options := sr.scorerOptions(ctx, lbRebalanceDimension)
rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode())
rctx.options.IOOverloadOptions = allocatorimpl.IOOverloadOptions{
Expand Down Expand Up @@ -1621,9 +1621,9 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) {

// Load in a range with replicas on an overfull node, a slightly underfull
// node, and a very underfull node.
loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 4, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}, lbRebalanceDimension)
loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 4, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}})

hottestRanges := sr.replicaRankings.TopLoad()
hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension)
options := sr.scorerOptions(ctx, lbRebalanceDimension)
rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode())
repl := rctx.hottestRanges[0]
Expand All @@ -1638,9 +1638,9 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) {
// Then do the same, but for replica rebalancing. Make s5 an existing replica
// that's behind, and see how a new replica is preferred as the leaseholder
// over it.
loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}, lbRebalanceDimension)
loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}})

hottestRanges = sr.replicaRankings.TopLoad()
hottestRanges = sr.replicaRankings.TopLoad(lbRebalanceDimension)
options = sr.scorerOptions(ctx, lbRebalanceDimension)
rctx = sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode())
rctx.options.IOOverloadOptions = allocatorimpl.IOOverloadOptions{
Expand Down Expand Up @@ -1798,9 +1798,9 @@ func TestStoreRebalancerIOOverloadCheck(t *testing.T) {

// Load in a range with replicas on an overfull node, a slightly underfull
// node, and a very underfull node.
loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}, lbRebalanceDimension)
loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}})

hottestRanges := sr.replicaRankings.TopLoad()
hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension)
options := sr.scorerOptions(ctx, lbRebalanceDimension)
rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode())
require.Greater(t, len(rctx.hottestRanges), 0)
Expand Down

0 comments on commit 7074da6

Please sign in to comment.