Skip to content

Commit

Permalink
split: Redesign the load-based splitter to be consistent with new reb…
Browse files Browse the repository at this point in the history
…alancing signals.

Fixes: #90574

In the current load splitter, we find the split key that best balances
the QPS of the left and right sides. As a result, each request is
unweighted, since one request contributes one to the QPS. In particular,
the load splitter does not differentiate between what kinds of requests
they are, how heavy the request is, and what resources these requests
consume, which can result in scenarios where QPS is balanced but one
side has a lot more work due to a few heavy requests. Moreover, the
current load splitter treats requests that contain a split key as
“contained”. Optimizing for QPS, contained requests are bad since
splitting at a point in a contained request will not help lower the QPS
of either side. However, optimizing for other signals like CPU,
splitting at a point in a contained request is great as each side will
get part of the work of processing that request. This motivates a
redesign of the load splitter, one that enables recording weighted
requests and considers contained requests in the weight balancing for
splitting.

In this PR, we redesign the load-based splitter with the following
interface:
1. Record a point key “start” or span “[start, end)” with a weight “w”
at a specific time “ts”, where “w” is some measure of load recorded for
a span e.g. Record(ts, start, w) or Record(ts, [start, end), w)
2. Find a split key such that the load (i.e. total weight) on the
resulting split ranges would be as equal as possible according to the
recorded loads above e.g. Key()

To make the current load-based splitter (Finder) weighted, we make the
following modifications:
1. Instead of using reservoir sampling, we use weighted reservoir
sampling (a simplified version of A-Chao)
2. Remove the contained counter
3. Increment the left and right counters by the weight of the request
rather than just 1
4. Treat a weighted range request ([start, end), w) into two weighted
point requests (start, w/2) and (end, w/2)

For more details, see the design doc:
https://docs.google.com/document/d/1bdSxucz-xFzwnxL3fFXNZsRc9Vsht0oO0XuZrz5Iw84/edit#bookmark=id.xjc41tm3jx3x.

Release note (ops change): The load-based splitter has been redesigned
to be more consistent with CPU-based rebalancing rather than QPS-based
rebalancing to improve range splits.
  • Loading branch information
KaiSun314 committed Dec 17, 2022
1 parent 7c8e701 commit e57c3ce
Show file tree
Hide file tree
Showing 11 changed files with 745 additions and 65 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/asim/state/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_library(
"@com_github_google_btree//:btree",
"@io_etcd_go_etcd_raft_v3//:raft",
"@io_etcd_go_etcd_raft_v3//tracker",
"@org_golang_x_exp//rand",
],
)

Expand Down
11 changes: 3 additions & 8 deletions pkg/kv/kvserver/asim/state/split_decider.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ package state

import (
"context"
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"golang.org/x/exp/rand"
)

// LoadSplitter provides an abstraction for load based splitting. It records
Expand Down Expand Up @@ -61,14 +61,9 @@ func NewSplitDecider(
}

func (s *SplitDecider) newDecider() *split.Decider {
rand := rand.New(rand.NewSource(s.seed))

intN := func(n int) int {
return rand.Intn(n)
}

rand := rand.New(rand.NewSource(uint64(s.seed)))
decider := &split.Decider{}
split.Init(decider, intN, s.qpsThreshold, s.qpsRetention, &split.LoadSplitterMetrics{
split.Init(decider, rand, s.qpsThreshold, s.qpsRetention, &split.LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package kvserver
import (
"bytes"
"context"
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -95,7 +94,7 @@ func newUnloadedReplica(
r.mu.stateLoader = stateloader.Make(desc.RangeID)
r.mu.quiescent = true
r.mu.conf = store.cfg.DefaultSpanConfig
split.Init(&r.loadBasedSplitter, rand.Intn, func() float64 {
split.Init(&r.loadBasedSplitter, nil, func() float64 {
return float64(SplitByLoadQPSThreshold.Get(&store.cfg.Settings.SV))
}, func() time.Duration {
return kvserverbase.SplitByLoadMergeDelay.Get(&store.cfg.Settings.SV)
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/split/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
srcs = [
"decider.go",
"finder.go",
"weighted_finder.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split",
visibility = ["//visibility:public"],
Expand All @@ -26,6 +27,7 @@ go_test(
"decider_test.go",
"finder_test.go",
"load_based_splitter_test.go",
"weighted_finder_test.go",
],
args = ["-test.timeout=55s"],
embed = [":split"],
Expand Down
28 changes: 16 additions & 12 deletions pkg/kv/kvserver/split/decider.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ package split

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"golang.org/x/exp/rand"
)

const minSplitSuggestionInterval = time.Minute
Expand Down Expand Up @@ -63,7 +65,7 @@ type LoadSplitterMetrics struct {
// incoming requests to find potential split keys and checks if sampled
// candidate split keys satisfy certain requirements.
type Decider struct {
intn func(n int) int // supplied to Init
randSource *rand.Rand
qpsThreshold func() float64 // supplied to Init
qpsRetention func() time.Duration // supplied to Init
loadSplitterMetrics *LoadSplitterMetrics // supplied to Init
Expand All @@ -80,8 +82,8 @@ type Decider struct {
maxQPS maxQPSTracker

// Fields tracking split key suggestions.
splitFinder *Finder // populated when engaged or decided
lastSplitSuggestion time.Time // last stipulation to client to carry out split
splitFinder *WeightedFinder // populated when engaged or decided
lastSplitSuggestion time.Time // last stipulation to client to carry out split

// Fields tracking logging / metrics around load-based splitter split key.
lastNoSplitKeyLoggingMetrics time.Time
Expand All @@ -94,12 +96,15 @@ type Decider struct {
// may exist in the system at any given point in time.
func Init(
lbs *Decider,
intn func(n int) int,
randSource *rand.Rand,
qpsThreshold func() float64,
qpsRetention func() time.Duration,
loadSplitterMetrics *LoadSplitterMetrics,
) {
lbs.intn = intn
if randSource == nil {
randSource = rand.New(rand.NewSource(2022))
}
lbs.randSource = randSource
lbs.qpsThreshold = qpsThreshold
lbs.qpsRetention = qpsRetention
lbs.loadSplitterMetrics = loadSplitterMetrics
Expand Down Expand Up @@ -147,7 +152,7 @@ func (d *Decider) recordLocked(
// to be used.
if d.mu.lastQPS >= d.qpsThreshold() {
if d.mu.splitFinder == nil {
d.mu.splitFinder = NewFinder(now)
d.mu.splitFinder = NewWeightedFinder(now, d.randSource)
}
} else {
d.mu.splitFinder = nil
Expand All @@ -157,7 +162,7 @@ func (d *Decider) recordLocked(
if d.mu.splitFinder != nil && n != 0 {
s := span()
if s.Key != nil {
d.mu.splitFinder.Record(span(), d.intn)
d.mu.splitFinder.Record(span(), 1)
}
if d.mu.splitFinder.Ready(now) {
if d.mu.splitFinder.Key() != nil {
Expand All @@ -168,16 +173,15 @@ func (d *Decider) recordLocked(
} else {
if now.Sub(d.mu.lastNoSplitKeyLoggingMetrics) > minNoSplitKeyLoggingMetricsInterval {
d.mu.lastNoSplitKeyLoggingMetrics = now
insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained := d.mu.splitFinder.NoSplitKeyCause()
if insufficientCounters < splitKeySampleSize {
noSplitKeyCauseLogMsg := d.mu.splitFinder.NoSplitKeyCauseLogMsg()
if noSplitKeyCauseLogMsg != "" {
popularKeyFrequency := d.mu.splitFinder.PopularKeyFrequency()
noSplitKeyCauseLogMsg += fmt.Sprintf(", most popular key occurs in %d%% of samples", int(popularKeyFrequency*100))
log.KvDistribution.Infof(ctx, "%s", noSplitKeyCauseLogMsg)
if popularKeyFrequency >= splitKeyThreshold {
d.loadSplitterMetrics.PopularKeyCount.Inc(1)
}
d.loadSplitterMetrics.NoSplitKeyCount.Inc(1)
log.KvDistribution.Infof(ctx,
"No split key found: insufficient counters = %d, imbalance = %d, too many contained = %d, imbalance and too many contained = %d, most popular key occurs in %d%% of samples",
insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained, int(popularKeyFrequency*100))
}
}
}
Expand Down
28 changes: 14 additions & 14 deletions pkg/kv/kvserver/split/decider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package split
import (
"context"
"math"
"math/rand"
"testing"
"time"

Expand All @@ -24,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
)

func ms(i int) time.Time {
Expand All @@ -37,10 +37,10 @@ func ms(i int) time.Time {
func TestDecider(t *testing.T) {
defer leaktest.AfterTest(t)()

intn := rand.New(rand.NewSource(12)).Intn
rand := rand.New(rand.NewSource(12))

var d Decider
Init(&d, intn, func() float64 { return 10.0 }, func() time.Duration { return 2 * time.Second }, &LoadSplitterMetrics{
Init(&d, rand, func() float64 { return 10.0 }, func() time.Duration { return 2 * time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestDecider(t *testing.T) {
assert.Equal(t, ms(1200), d.mu.lastQPSRollover)
assertMaxQPS(1099, 0, false)

var nilFinder *Finder
var nilFinder *WeightedFinder

assert.Equal(t, nilFinder, d.mu.splitFinder)

Expand Down Expand Up @@ -196,10 +196,10 @@ func TestDecider(t *testing.T) {

func TestDecider_MaxQPS(t *testing.T) {
defer leaktest.AfterTest(t)()
intn := rand.New(rand.NewSource(11)).Intn
rand := rand.New(rand.NewSource(11))

var d Decider
Init(&d, intn, func() float64 { return 100.0 }, func() time.Duration { return 10 * time.Second }, &LoadSplitterMetrics{
Init(&d, rand, func() float64 { return 100.0 }, func() time.Duration { return 10 * time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down Expand Up @@ -242,10 +242,10 @@ func TestDecider_MaxQPS(t *testing.T) {

func TestDeciderCallsEnsureSafeSplitKey(t *testing.T) {
defer leaktest.AfterTest(t)()
intn := rand.New(rand.NewSource(11)).Intn
rand := rand.New(rand.NewSource(11))

var d Decider
Init(&d, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
Init(&d, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down Expand Up @@ -278,10 +278,10 @@ func TestDeciderCallsEnsureSafeSplitKey(t *testing.T) {

func TestDeciderIgnoresEnsureSafeSplitKeyOnError(t *testing.T) {
defer leaktest.AfterTest(t)()
intn := rand.New(rand.NewSource(11)).Intn
rand := rand.New(rand.NewSource(11))

var d Decider
Init(&d, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
Init(&d, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down Expand Up @@ -409,11 +409,11 @@ func TestMaxQPSTracker(t *testing.T) {

func TestDeciderMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
intn := rand.New(rand.NewSource(11)).Intn
rand := rand.New(rand.NewSource(11))
timeStart := 1000

var dPopular Decider
Init(&dPopular, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
Init(&dPopular, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand All @@ -435,7 +435,7 @@ func TestDeciderMetrics(t *testing.T) {

// No split key, not popular key
var dNotPopular Decider
Init(&dNotPopular, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
Init(&dNotPopular, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand All @@ -455,7 +455,7 @@ func TestDeciderMetrics(t *testing.T) {

// No split key, all insufficient counters
var dAllInsufficientCounters Decider
Init(&dAllInsufficientCounters, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
Init(&dAllInsufficientCounters, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down
19 changes: 17 additions & 2 deletions pkg/kv/kvserver/split/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package split

import (
"bytes"
"fmt"
"math"
"sort"
"time"
Expand Down Expand Up @@ -156,7 +157,7 @@ func (f *Finder) Key() roachpb.Key {
// determines the number of samples that don't pass each split key requirement
// (e.g. insufficient counters, imbalance in left and right counters, too many
// contained counters, or a combination of the last two).
func (f *Finder) NoSplitKeyCause() (
func (f *Finder) noSplitKeyCause() (
insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained int,
) {
for _, s := range f.samples {
Expand All @@ -179,6 +180,20 @@ func (f *Finder) NoSplitKeyCause() (
return
}

// NoSplitKeyCauseLogMsg returns a log message containing all of this
// information if not all samples are invalid due to insufficient counters,
// otherwise returns an empty string.
func (f *Finder) NoSplitKeyCauseLogMsg() string {
insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained := f.noSplitKeyCause()
if insufficientCounters == splitKeySampleSize {
return ""
}
noSplitKeyCauseLogMsg := fmt.Sprintf(
"No split key found: insufficient counters = %d, imbalance = %d, too many contained = %d, imbalance and too many contained = %d",
insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained)
return noSplitKeyCauseLogMsg
}

// PopularKeyFrequency returns the percentage that the most popular key appears
// in f.samples.
func (f *Finder) PopularKeyFrequency() float64 {
Expand Down Expand Up @@ -217,7 +232,7 @@ func NewTestFinder(randSource *rand.Rand) *TestFinder {
}

// Record records the span, ignoring weight as this Finder is unweighted.
func (tf *TestFinder) Record(span roachpb.Span, weight float32) {
func (tf *TestFinder) Record(span roachpb.Span, weight float64) {
tf.f.Record(span, tf.randSource.Intn)
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/split/finder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,15 +278,15 @@ func TestFinderNoSplitKeyCause(t *testing.T) {
samples := [splitKeySampleSize]sample{}
for i, idx := range rand.Perm(splitKeySampleSize) {
if i < 5 {
// insufficient counters
// Insufficient counters.
samples[idx] = sample{
key: keys.SystemSQLCodec.TablePrefix(uint32(i)),
left: 0,
right: 0,
contained: splitKeyMinCounter - 1,
}
} else if i < 7 {
// imbalance
// Imbalance and too many contained counters.
deviationLeft := rand.Intn(5)
deviationRight := rand.Intn(5)
samples[idx] = sample{
Expand All @@ -296,7 +296,7 @@ func TestFinderNoSplitKeyCause(t *testing.T) {
contained: int(max(float64(splitKeyMinCounter-40-deviationLeft+deviationRight), float64(40+deviationLeft-deviationRight))),
}
} else if i < 13 {
// imbalance
// Imbalance counters.
deviationLeft := rand.Intn(5)
deviationRight := rand.Intn(5)
samples[idx] = sample{
Expand All @@ -306,7 +306,7 @@ func TestFinderNoSplitKeyCause(t *testing.T) {
contained: int(max(float64(splitKeyMinCounter-80-deviationLeft+deviationRight), 0)),
}
} else {
// too many contained
// Too many contained counters.
contained := int(splitKeyMinCounter*splitKeyContainedThreshold + 1)
left := (splitKeyMinCounter - contained) / 2
samples[idx] = sample{
Expand All @@ -320,7 +320,7 @@ func TestFinderNoSplitKeyCause(t *testing.T) {

finder := NewFinder(timeutil.Now())
finder.samples = samples
insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained := finder.NoSplitKeyCause()
insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained := finder.noSplitKeyCause()
assert.Equal(t, 5, insufficientCounters, "unexpected insufficient counters")
assert.Equal(t, 6, imbalance, "unexpected imbalance counters")
assert.Equal(t, 7, tooManyContained, "unexpected too many contained counters")
Expand Down
Loading

0 comments on commit e57c3ce

Please sign in to comment.