Skip to content

Commit

Permalink
rac2: add token counter and stream metrics
Browse files Browse the repository at this point in the history
This commit introduces metrics related to stream eval tokens and stream
send tokens. Hooking up these metrics to the registry will be in a
subsequent commit.

There are two separate metric structs used:

1. `tokenCounterMetrics`, which only contains counter and is shared
   among all `tokenCounter`s on the same node. Each `tokenCounter`
   updates the shared counters after `adjust` is called.
2. `tokenStreamMetrics`, which is updated periodically by calling
   `UpdateMetricGauges` via the `StreamTokenCounterProvider`, which is
   one per node.

Metrics related to `WaitForEval` (as well as blocked stream logging) are
also deferred to a subsequent commit.

Part of: #128031
Release note: None
  • Loading branch information
kvoli committed Aug 29, 2024
1 parent fba5ab3 commit 447c3fc
Show file tree
Hide file tree
Showing 7 changed files with 457 additions and 44 deletions.
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "rac2",
srcs = [
"metrics.go",
"priority.go",
"range_controller.go",
"store_stream.go",
Expand All @@ -19,7 +20,9 @@ go_library(
"//pkg/settings/cluster",
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
Expand All @@ -43,6 +46,7 @@ go_test(
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/util/admission/admissionpb",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
179 changes: 179 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rac2

import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/redact"
)

// Aliases to make the code below slightly easier to read.
const regular, elastic = admissionpb.RegularWorkClass, admissionpb.ElasticWorkClass

var (
flowTokensAvailable = metric.Metadata{
Name: "kvflowcontrol.tokens.%s.%s.available",
Help: "Flow %s tokens available for %s requests, across all replication streams",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
flowTokensDeducted = metric.Metadata{
Name: "kvflowcontrol.tokens.%s.%s.deducted",
Help: "Flow %s tokens deducted by %s requests, across all replication streams",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
flowTokensReturned = metric.Metadata{
Name: "kvflowcontrol.tokens.%s.%s.returned",
Help: "Flow %s tokens returned by %s requests, across all replication streams",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
flowTokensUnaccounted = metric.Metadata{
Name: "kvflowcontrol.tokens.%s.%s.unaccounted",
Help: "Flow %s tokens returned by %s requests that were unaccounted for, across all replication streams",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
totalStreamCount = metric.Metadata{
Name: "kvflowcontrol.streams.%s.%s.total_count",
Help: "Total number of %s replication streams for %s requests",
Measurement: "Count",
Unit: metric.Unit_COUNT,
}
blockedStreamCount = metric.Metadata{
Name: "kvflowcontrol.streams.%s.%s.blocked_count",
Help: "Number of %s replication streams with no flow tokens available for %s requests",
Measurement: "Count",
Unit: metric.Unit_COUNT,
}
)

// annotateMetricTemplateWithWorkClass uses the given metric template to build
// one suitable for the specific token type and work class.
func annotateMetricTemplateWithWorkClassAndType(
wc admissionpb.WorkClass, tmpl metric.Metadata, t flowControlMetricType,
) metric.Metadata {
rv := tmpl
rv.Name = fmt.Sprintf(tmpl.Name, t, wc)
rv.Help = fmt.Sprintf(tmpl.Help, t, wc)
return rv
}

type flowControlMetricType int

const (
flowControlEvalMetricType flowControlMetricType = iota
flowControlSendMetricType
numFlowControlMetricTypes
)

func (f flowControlMetricType) String() string {
return redact.StringWithoutMarkers(f)
}

// SafeFormat implements the redact.SafeFormatter interface.
func (f flowControlMetricType) SafeFormat(p redact.SafePrinter, _ rune) {
switch f {
case flowControlEvalMetricType:
p.SafeString("eval")
case flowControlSendMetricType:
p.SafeString("send")
default:
panic("unknown flowControlMetricType")
}
}

type tokenMetrics struct {
counterMetrics [numFlowControlMetricTypes]*tokenCounterMetrics
streamMetrics [numFlowControlMetricTypes]*tokenStreamMetrics
}

func newTokenMetrics() *tokenMetrics {
m := &tokenMetrics{}
for _, typ := range []flowControlMetricType{
flowControlEvalMetricType,
flowControlSendMetricType,
} {
m.counterMetrics[typ] = newTokenCounterMetrics(typ)
m.streamMetrics[typ] = newTokenStreamMetrics(typ)
}
return m
}

type tokenCounterMetrics struct {
deducted [admissionpb.NumWorkClasses]*metric.Counter
returned [admissionpb.NumWorkClasses]*metric.Counter
unaccounted [admissionpb.NumWorkClasses]*metric.Counter
}

func newTokenCounterMetrics(t flowControlMetricType) *tokenCounterMetrics {
m := &tokenCounterMetrics{}
for _, wc := range []admissionpb.WorkClass{
admissionpb.RegularWorkClass,
admissionpb.ElasticWorkClass,
} {
m.deducted[wc] = metric.NewCounter(
annotateMetricTemplateWithWorkClassAndType(wc, flowTokensDeducted, t),
)
m.returned[wc] = metric.NewCounter(
annotateMetricTemplateWithWorkClassAndType(wc, flowTokensReturned, t),
)
m.unaccounted[wc] = metric.NewCounter(
annotateMetricTemplateWithWorkClassAndType(wc, flowTokensUnaccounted, t),
)
}
return m
}

func (m *tokenCounterMetrics) onTokenAdjustment(adjustment tokensPerWorkClass) {
if adjustment.regular < 0 {
m.deducted[regular].Inc(-int64(adjustment.regular))
} else if adjustment.regular > 0 {
m.returned[regular].Inc(int64(adjustment.regular))
}
if adjustment.elastic < 0 {
m.deducted[elastic].Inc(-int64(adjustment.elastic))
} else if adjustment.elastic > 0 {
m.returned[elastic].Inc(int64(adjustment.elastic))
}
}

func (m *tokenCounterMetrics) onUnaccounted(unaccounted tokensPerWorkClass) {
m.unaccounted[regular].Inc(int64(unaccounted.regular))
m.unaccounted[elastic].Inc(int64(unaccounted.elastic))
}

type tokenStreamMetrics struct {
count [admissionpb.NumWorkClasses]*metric.Gauge
blockedCount [admissionpb.NumWorkClasses]*metric.Gauge
tokensAvailable [admissionpb.NumWorkClasses]*metric.Gauge
}

func newTokenStreamMetrics(t flowControlMetricType) *tokenStreamMetrics {
m := &tokenStreamMetrics{}
for _, wc := range []admissionpb.WorkClass{
admissionpb.RegularWorkClass,
admissionpb.ElasticWorkClass,
} {
m.count[wc] = metric.NewGauge(
annotateMetricTemplateWithWorkClassAndType(wc, totalStreamCount, t),
)
m.blockedCount[wc] = metric.NewGauge(
annotateMetricTemplateWithWorkClassAndType(wc, blockedStreamCount, t),
)
m.tokensAvailable[wc] = metric.NewGauge(
annotateMetricTemplateWithWorkClassAndType(wc, flowTokensAvailable, t),
)
}
return m
}
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/datadriven"
Expand Down Expand Up @@ -268,7 +269,8 @@ func TestRangeControllerWaitForEval(t *testing.T) {
ctx := context.Background()
settings := cluster.MakeTestingClusterSettings()
ranges := make(map[roachpb.RangeID]*testingRCRange)
ssTokenCounter := NewStreamTokenCounterProvider(settings)
clock := hlc.NewClockForTesting(nil)
ssTokenCounter := NewStreamTokenCounterProvider(settings, clock)

// Eval will only wait on a positive token amount, set the limit to 1 in
// order to simplify testing.
Expand Down
68 changes: 64 additions & 4 deletions pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

Expand All @@ -24,20 +26,29 @@ import (
// TODO(kvoli): Add stream deletion upon decommissioning a store.
type StreamTokenCounterProvider struct {
settings *cluster.Settings
clock *hlc.Clock
tokenMetrics *tokenMetrics
sendCounters, evalCounters syncutil.Map[kvflowcontrol.Stream, tokenCounter]
}

// NewStreamTokenCounterProvider creates a new StreamTokenCounterProvider.
func NewStreamTokenCounterProvider(settings *cluster.Settings) *StreamTokenCounterProvider {
return &StreamTokenCounterProvider{settings: settings}
func NewStreamTokenCounterProvider(
settings *cluster.Settings, clock *hlc.Clock,
) *StreamTokenCounterProvider {
return &StreamTokenCounterProvider{
settings: settings,
clock: clock,
tokenMetrics: newTokenMetrics(),
}
}

// Eval returns the evaluation token counter for the given stream.
func (p *StreamTokenCounterProvider) Eval(stream kvflowcontrol.Stream) *tokenCounter {
if t, ok := p.evalCounters.Load(stream); ok {
return t
}
t, _ := p.evalCounters.LoadOrStore(stream, newTokenCounter(p.settings))
t, _ := p.evalCounters.LoadOrStore(stream, newTokenCounter(
p.settings, p.clock, p.tokenMetrics.counterMetrics[flowControlEvalMetricType]))
return t
}

Expand All @@ -46,10 +57,59 @@ func (p *StreamTokenCounterProvider) Send(stream kvflowcontrol.Stream) *tokenCou
if t, ok := p.sendCounters.Load(stream); ok {
return t
}
t, _ := p.sendCounters.LoadOrStore(stream, newTokenCounter(p.settings))
t, _ := p.sendCounters.LoadOrStore(stream, newTokenCounter(
p.settings, p.clock, p.tokenMetrics.counterMetrics[flowControlSendMetricType]))
return t
}

// UpdateMetricGauges updates the gauge token metrics.
func (p *StreamTokenCounterProvider) UpdateMetricGauges() {
var (
count [numFlowControlMetricTypes][admissionpb.NumWorkClasses]int64
blockedCount [numFlowControlMetricTypes][admissionpb.NumWorkClasses]int64
tokensAvailable [numFlowControlMetricTypes][admissionpb.NumWorkClasses]int64
)
now := p.clock.PhysicalTime()

// First aggregate the metrics across all streams, by (eval|send) types and
// (regular|elastic) work classes, then using the aggregate update the
// gauges.
gaugeUpdateFn := func(metricType flowControlMetricType) func(
kvflowcontrol.Stream, *tokenCounter) bool {
return func(stream kvflowcontrol.Stream, t *tokenCounter) bool {
count[metricType][regular]++
count[metricType][elastic]++
tokensAvailable[metricType][regular] += int64(t.tokens(regular))
tokensAvailable[metricType][elastic] += int64(t.tokens(elastic))

regularStats, elasticStats := t.GetAndResetStats(now)
if regularStats.noTokenDuration > 0 {
blockedCount[metricType][regular]++
}
if elasticStats.noTokenDuration > 0 {
blockedCount[metricType][elastic]++
}
return true
}
}

p.evalCounters.Range(gaugeUpdateFn(flowControlEvalMetricType))
p.sendCounters.Range(gaugeUpdateFn(flowControlSendMetricType))
for _, typ := range []flowControlMetricType{
flowControlEvalMetricType,
flowControlSendMetricType,
} {
for _, wc := range []admissionpb.WorkClass{
admissionpb.RegularWorkClass,
admissionpb.ElasticWorkClass,
} {
p.tokenMetrics.streamMetrics[typ].count[wc].Update(count[typ][wc])
p.tokenMetrics.streamMetrics[typ].blockedCount[wc].Update(blockedCount[typ][wc])
p.tokenMetrics.streamMetrics[typ].tokensAvailable[wc].Update(tokensAvailable[typ][wc])
}
}
}

// SendTokenWatcherHandleID is a unique identifier for a handle that is
// watching for available elastic send tokens on a stream.
type SendTokenWatcherHandleID int64
Expand Down
Loading

0 comments on commit 447c3fc

Please sign in to comment.