Skip to content

Commit

Permalink
split: add observability for when load based splitting cannot find a key
Browse files Browse the repository at this point in the history
Previously, there were no metrics or logging in the load-based splitter.

This was inadequate because there is minimal observability into why the
load splitter could not find a split key.

To address this, this patch adds metrics and logging to the load
splitter, including counter metrics indicating number of times could not
find a split key and popular key (>25% occurrence) and logging
indicating causes for no split key (insufficient counters, imbalance,
too many contained).

Release note (ops change): Added observability for when load based
splitting cannot find a key to indicate the reasons why the load
splitter could not find a split key, enabling us to have more
observability and insight to debug why a range is not splitting more
easily.
  • Loading branch information
KaiSun314 authored and kvoli committed Jun 7, 2023
1 parent 1fb7942 commit d2db70a
Show file tree
Hide file tree
Showing 16 changed files with 430 additions and 85 deletions.
11 changes: 8 additions & 3 deletions pkg/kv/kvserver/asim/state/split_decider.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
package state

import (
"context"
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/metric"
)

// LoadSplitter provides an abstraction for load based splitting. It records
Expand Down Expand Up @@ -66,7 +68,10 @@ func (s *SplitDecider) newDecider() *split.Decider {
}

decider := &split.Decider{}
split.Init(decider, intN, s.qpsThreshold, s.qpsRetention)
split.Init(decider, intN, s.qpsThreshold, s.qpsRetention, &split.LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
return decider
}

Expand All @@ -81,7 +86,7 @@ func (s *SplitDecider) Record(tick time.Time, rangeID RangeID, le workload.LoadE
}

qps := LoadEventQPS(le)
shouldSplit := decider.Record(tick, int(qps), func() roachpb.Span {
shouldSplit := decider.Record(context.Background(), tick, int(qps), func() roachpb.Span {
return roachpb.Span{
Key: Key(le.Key).ToRKey().AsRawKey(),
}
Expand All @@ -102,7 +107,7 @@ func (s *SplitDecider) SplitKey(tick time.Time, rangeID RangeID) (Key, bool) {
return InvalidKey, false
}

key := decider.MaybeSplitKey(tick)
key := decider.MaybeSplitKey(context.Background(), tick)
if key == nil {
return InvalidKey, false
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_range_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func RangeStats(
) (result.Result, error) {
reply := resp.(*roachpb.RangeStatsResponse)
reply.MVCCStats = cArgs.EvalCtx.GetMVCCStats()
reply.DeprecatedLastQueriesPerSecond = cArgs.EvalCtx.GetLastSplitQPS()
if qps, ok := cArgs.EvalCtx.GetMaxSplitQPS(); ok {
reply.DeprecatedLastQueriesPerSecond = cArgs.EvalCtx.GetLastSplitQPS(ctx)
if qps, ok := cArgs.EvalCtx.GetMaxSplitQPS(ctx); ok {
reply.MaxQueriesPerSecond = qps
} else {
// See comment on MaxQueriesPerSecond. -1 means !ok.
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,15 @@ type EvalContext interface {
//
// NOTE: This should not be used when the load based splitting cluster setting
// is disabled.
GetMaxSplitQPS() (float64, bool)
GetMaxSplitQPS(context.Context) (float64, bool)

// GetLastSplitQPS returns the Replica's most recent queries/s request rate.
//
// NOTE: This should not be used when the load based splitting cluster setting
// is disabled.
//
// TODO(nvanbenschoten): remove this method in v22.1.
GetLastSplitQPS() float64
GetLastSplitQPS(context.Context) float64

GetGCThreshold() hlc.Timestamp
ExcludeDataFromBackup() bool
Expand Down Expand Up @@ -240,10 +240,10 @@ func (m *mockEvalCtxImpl) ContainsKey(key roachpb.Key) bool {
func (m *mockEvalCtxImpl) GetMVCCStats() enginepb.MVCCStats {
return m.Stats
}
func (m *mockEvalCtxImpl) GetMaxSplitQPS() (float64, bool) {
func (m *mockEvalCtxImpl) GetMaxSplitQPS(context.Context) (float64, bool) {
return m.QPS, true
}
func (m *mockEvalCtxImpl) GetLastSplitQPS() float64 {
func (m *mockEvalCtxImpl) GetLastSplitQPS(context.Context) float64 {
return m.QPS
}
func (m *mockEvalCtxImpl) CanCreateTxnRecord(
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (mq *mergeQueue) process(

lhsDesc := lhsRepl.Desc()
lhsStats := lhsRepl.GetMVCCStats()
lhsQPS, lhsQPSOK := lhsRepl.GetMaxSplitQPS()
lhsQPS, lhsQPSOK := lhsRepl.GetMaxSplitQPS(ctx)
minBytes := lhsRepl.GetMinBytes()
if lhsStats.Total() >= minBytes {
log.VEventf(ctx, 2, "skipping merge: LHS meets minimum size threshold %d with %d bytes",
Expand Down
23 changes: 23 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand Down Expand Up @@ -1644,6 +1645,20 @@ Note that the measurement does not include the duration for replicating the eval
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

metaPopularKeyCount = metric.Metadata{
Name: "kv.loadsplitter.popularkey",
Help: "Load-based splitter could not find a split key and the most popular sampled split key occurs in >= 25% of the samples.",
Measurement: "Occurrences",
Unit: metric.Unit_COUNT,
}

metaNoSplitKeyCount = metric.Metadata{
Name: "kv.loadsplitter.nosplitkey",
Help: "Load-based splitter could not find a split key.",
Measurement: "Occurrences",
Unit: metric.Unit_COUNT,
}
)

// StoreMetrics is the set of metrics for a given store.
Expand All @@ -1654,6 +1669,9 @@ type StoreMetrics struct {
// tenant basis.
*TenantsStorageMetrics

// LoadSplitterMetrics stores metrics for load-based splitter split key.
*split.LoadSplitterMetrics

// Replica metrics.
ReplicaCount *metric.Gauge // Does not include uninitialized or reserved replicas.
ReservedReplicaCount *metric.Gauge
Expand Down Expand Up @@ -2185,6 +2203,10 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
sm := &StoreMetrics{
registry: storeRegistry,
TenantsStorageMetrics: newTenantsStorageMetrics(),
LoadSplitterMetrics: &split.LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metaPopularKeyCount),
NoSplitKeyCount: metric.NewCounter(metaNoSplitKeyCount),
},

// Replica metrics.
ReplicaCount: metric.NewGauge(metaReplicaCount),
Expand Down Expand Up @@ -2516,6 +2538,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
}

storeRegistry.AddMetricStruct(sm)
storeRegistry.AddMetricStruct(sm.LoadSplitterMetrics)
return sm
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1150,8 +1150,8 @@ func (r *Replica) SetMVCCStatsForTesting(stats *enginepb.MVCCStats) {
// works when the load based splitting cluster setting is enabled.
//
// Use QueriesPerSecond() for current QPS stats for all other purposes.
func (r *Replica) GetMaxSplitQPS() (float64, bool) {
return r.loadBasedSplitter.MaxQPS(r.Clock().PhysicalTime())
func (r *Replica) GetMaxSplitQPS(ctx context.Context) (float64, bool) {
return r.loadBasedSplitter.MaxQPS(ctx, r.Clock().PhysicalTime())
}

// GetLastSplitQPS returns the Replica's most recent queries/s request rate.
Expand All @@ -1160,8 +1160,8 @@ func (r *Replica) GetMaxSplitQPS() (float64, bool) {
// works when the load based splitting cluster setting is enabled.
//
// Use QueriesPerSecond() for current QPS stats for all other purposes.
func (r *Replica) GetLastSplitQPS() float64 {
return r.loadBasedSplitter.LastQPS(r.Clock().PhysicalTime())
func (r *Replica) GetLastSplitQPS(ctx context.Context) float64 {
return r.loadBasedSplitter.LastQPS(ctx, r.Clock().PhysicalTime())
}

// ContainsKey returns whether this range contains the specified key.
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica_eval_context_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,14 @@ func (rec SpanSetReplicaEvalContext) GetMVCCStats() enginepb.MVCCStats {

// GetMaxSplitQPS returns the Replica's maximum queries/s rate for splitting and
// merging purposes.
func (rec SpanSetReplicaEvalContext) GetMaxSplitQPS() (float64, bool) {
return rec.i.GetMaxSplitQPS()
func (rec SpanSetReplicaEvalContext) GetMaxSplitQPS(ctx context.Context) (float64, bool) {
return rec.i.GetMaxSplitQPS(ctx)
}

// GetLastSplitQPS returns the Replica's most recent queries/s rate for
// splitting and merging purposes.
func (rec SpanSetReplicaEvalContext) GetLastSplitQPS() float64 {
return rec.i.GetLastSplitQPS()
func (rec SpanSetReplicaEvalContext) GetLastSplitQPS(ctx context.Context) float64 {
return rec.i.GetLastSplitQPS(ctx)
}

// CanCreateTxnRecord determines whether a transaction record can be created
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func newUnloadedReplica(
return float64(SplitByLoadQPSThreshold.Get(&store.cfg.Settings.SV))
}, func() time.Duration {
return kvserverbase.SplitByLoadMergeDelay.Get(&store.cfg.Settings.SV)
})
}, store.metrics.LoadSplitterMetrics)
r.mu.proposals = map[kvserverbase.CmdIDKey]*ProposalData{}
r.mu.checksums = map[uuid.UUID]*replicaChecksum{}
r.mu.proposalBuf.Init((*replicaProposer)(r), tracker.NewLockfreeTracker(), r.Clock(), r.ClusterSettings())
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_split_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (r *Replica) recordBatchForLoadBasedSplitting(
if !r.SplitByLoadEnabled() {
return
}
shouldInitSplit := r.loadBasedSplitter.Record(timeutil.Now(), len(ba.Requests), func() roachpb.Span {
shouldInitSplit := r.loadBasedSplitter.Record(ctx, timeutil.Now(), len(ba.Requests), func() roachpb.Span {
return spans.BoundarySpan(spanset.SpanGlobal)
})
if shouldInitSplit {
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/split/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ go_library(
deps = [
"//pkg/keys",
"//pkg/roachpb",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/syncutil",
],
)
Expand All @@ -30,6 +32,7 @@ go_test(
"//pkg/roachpb",
"//pkg/util/encoding",
"//pkg/util/leaktest",
"//pkg/util/metric",
"//pkg/util/stop",
"//pkg/util/timeutil",
"@com_github_stretchr_testify//assert",
Expand Down
72 changes: 57 additions & 15 deletions pkg/kv/kvserver/split/decider.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@
package split

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

const minSplitSuggestionInterval = time.Minute
const minNoSplitKeyLoggingMetricsInterval = time.Minute
const minQueriesPerSecondSampleDuration = time.Second

// A Decider collects measurements about the activity (measured in qps) on a
Expand Down Expand Up @@ -48,10 +52,21 @@ const minQueriesPerSecondSampleDuration = time.Second
// prevent load-based splits from being merged away until the resulting ranges
// have consistently remained below a certain QPS threshold for a sufficiently
// long period of time.

// LoadSplitterMetrics consists of metrics for load-based splitter split key.
type LoadSplitterMetrics struct {
PopularKeyCount *metric.Counter
NoSplitKeyCount *metric.Counter
}

// Decider tracks the latest QPS and if certain conditions are met, records
// incoming requests to find potential split keys and checks if sampled
// candidate split keys satisfy certain requirements.
type Decider struct {
intn func(n int) int // supplied to Init
qpsThreshold func() float64 // supplied to Init
qpsRetention func() time.Duration // supplied to Init
intn func(n int) int // supplied to Init
qpsThreshold func() float64 // supplied to Init
qpsRetention func() time.Duration // supplied to Init
loadSplitterMetrics *LoadSplitterMetrics // supplied to Init

mu struct {
syncutil.Mutex
Expand All @@ -67,6 +82,9 @@ type Decider struct {
// Fields tracking split key suggestions.
splitFinder *Finder // populated when engaged or decided
lastSplitSuggestion time.Time // last stipulation to client to carry out split

// Fields tracking logging / metrics around load-based splitter split key.
lastNoSplitKeyLoggingMetrics time.Time
}
}

Expand All @@ -79,10 +97,12 @@ func Init(
intn func(n int) int,
qpsThreshold func() float64,
qpsRetention func() time.Duration,
loadSplitterMetrics *LoadSplitterMetrics,
) {
lbs.intn = intn
lbs.qpsThreshold = qpsThreshold
lbs.qpsRetention = qpsRetention
lbs.loadSplitterMetrics = loadSplitterMetrics
}

// Record notifies the Decider that 'n' operations are being carried out which
Expand All @@ -93,14 +113,16 @@ func Init(
// If the returned boolean is true, a split key is available (though it may
// disappear as more keys are sampled) and should be initiated by the caller,
// which can call MaybeSplitKey to retrieve the suggested key.
func (d *Decider) Record(now time.Time, n int, span func() roachpb.Span) bool {
func (d *Decider) Record(ctx context.Context, now time.Time, n int, span func() roachpb.Span) bool {
d.mu.Lock()
defer d.mu.Unlock()

return d.recordLocked(now, n, span)
return d.recordLocked(ctx, now, n, span)
}

func (d *Decider) recordLocked(now time.Time, n int, span func() roachpb.Span) bool {
func (d *Decider) recordLocked(
ctx context.Context, now time.Time, n int, span func() roachpb.Span,
) bool {
d.mu.count += int64(n)

// First compute requests per second since the last check.
Expand Down Expand Up @@ -137,9 +159,28 @@ func (d *Decider) recordLocked(now time.Time, n int, span func() roachpb.Span) b
if s.Key != nil {
d.mu.splitFinder.Record(span(), d.intn)
}
if now.Sub(d.mu.lastSplitSuggestion) > minSplitSuggestionInterval && d.mu.splitFinder.Ready(now) && d.mu.splitFinder.Key() != nil {
d.mu.lastSplitSuggestion = now
return true
if d.mu.splitFinder.Ready(now) {
if d.mu.splitFinder.Key() != nil {
if now.Sub(d.mu.lastSplitSuggestion) > minSplitSuggestionInterval {
d.mu.lastSplitSuggestion = now
return true
}
} else {
if now.Sub(d.mu.lastNoSplitKeyLoggingMetrics) > minNoSplitKeyLoggingMetricsInterval {
d.mu.lastNoSplitKeyLoggingMetrics = now
insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained := d.mu.splitFinder.NoSplitKeyCause()
if insufficientCounters < splitKeySampleSize {
popularKeyFrequency := d.mu.splitFinder.PopularKeyFrequency()
if popularKeyFrequency >= splitKeyThreshold {
d.loadSplitterMetrics.PopularKeyCount.Inc(1)
}
d.loadSplitterMetrics.NoSplitKeyCount.Inc(1)
log.KvDistribution.Infof(ctx,
"No split key found: insufficient counters = %d, imbalance = %d, too many contained = %d, imbalance and too many contained = %d, most popular key occurs in %d%% of samples",
insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained, int(popularKeyFrequency*100))
}
}
}
}
}
return false
Expand All @@ -156,22 +197,22 @@ func (d *Decider) RecordMax(now time.Time, qps float64) {
}

// LastQPS returns the most recent QPS measurement.
func (d *Decider) LastQPS(now time.Time) float64 {
func (d *Decider) LastQPS(ctx context.Context, now time.Time) float64 {
d.mu.Lock()
defer d.mu.Unlock()

d.recordLocked(now, 0, nil) // force QPS computation
d.recordLocked(ctx, now, 0, nil) // force QPS computation
return d.mu.lastQPS
}

// MaxQPS returns the maximum QPS measurement recorded over the retention
// period. If the Decider has not been recording for a full retention period,
// the method returns false.
func (d *Decider) MaxQPS(now time.Time) (float64, bool) {
func (d *Decider) MaxQPS(ctx context.Context, now time.Time) (float64, bool) {
d.mu.Lock()
defer d.mu.Unlock()

d.recordLocked(now, 0, nil) // force QPS computation
d.recordLocked(ctx, now, 0, nil) // force QPS computation
return d.mu.maxQPS.maxQPS(now, d.qpsRetention())
}

Expand All @@ -180,13 +221,13 @@ func (d *Decider) MaxQPS(now time.Time) (float64, bool) {
// or if it wasn't able to determine a suitable split key.
//
// It is legal to call MaybeSplitKey at any time.
func (d *Decider) MaybeSplitKey(now time.Time) roachpb.Key {
func (d *Decider) MaybeSplitKey(ctx context.Context, now time.Time) roachpb.Key {
var key roachpb.Key

d.mu.Lock()
defer d.mu.Unlock()

d.recordLocked(now, 0, nil)
d.recordLocked(ctx, now, 0, nil)
if d.mu.splitFinder != nil && d.mu.splitFinder.Ready(now) {
// We've found a key to split at. This key might be in the middle of a
// SQL row. If we fail to rectify that, we'll cause SQL crashes:
Expand Down Expand Up @@ -240,6 +281,7 @@ func (d *Decider) Reset(now time.Time) {
d.mu.maxQPS.reset(now, d.qpsRetention())
d.mu.splitFinder = nil
d.mu.lastSplitSuggestion = time.Time{}
d.mu.lastNoSplitKeyLoggingMetrics = time.Time{}
}

// maxQPSTracker collects a series of queries-per-second measurement samples and
Expand Down
Loading

0 comments on commit d2db70a

Please sign in to comment.