Skip to content

Commit

Permalink
Merge #129350
Browse files Browse the repository at this point in the history
129350: rac2: add token counter and stream metrics  r=sumeerbhola a=kvoli

This commit introduces metrics related to stream eval tokens and stream
send tokens. Hooking up these metrics to the registry will be in a
subsequent commit.

There are two separate metric structs used:

1. `tokenCounterMetrics`, which only contains counter and is shared
   among all `tokenCounter`s on the same node. Each `tokenCounter`
   updates the shared counters after `adjust` is called.
2. `tokenStreamMetrics`, which is updated periodically by calling
   `UpdateMetricGauges` via the `StreamTokenCounterProvider`, which is
   one per node.

Metrics related to `WaitForEval` (as well as blocked stream logging) are
also deferred to a subsequent commit.

Part of: #128031
Release note: None

Co-authored-by: Austen McClernon <[email protected]>
  • Loading branch information
craig[bot] and kvoli committed Aug 30, 2024
2 parents 1aa4781 + 5d555ec commit 3373e5d
Show file tree
Hide file tree
Showing 8 changed files with 482 additions and 112 deletions.
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "rac2",
srcs = [
"metrics.go",
"priority.go",
"range_controller.go",
"store_stream.go",
Expand All @@ -20,7 +21,9 @@ go_library(
"//pkg/settings/cluster",
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
Expand Down Expand Up @@ -49,6 +52,7 @@ go_test(
"//pkg/settings/cluster",
"//pkg/testutils/datapathutils",
"//pkg/util/admission/admissionpb",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
181 changes: 181 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rac2

import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/redact"
)

// Aliases to make the code below slightly easier to read.
const regular, elastic = admissionpb.RegularWorkClass, admissionpb.ElasticWorkClass

var (
flowTokensAvailable = metric.Metadata{
Name: "kvflowcontrol.tokens.%s.%s.available",
Help: "Flow %s tokens available for %s requests, across all replication streams",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
flowTokensDeducted = metric.Metadata{
Name: "kvflowcontrol.tokens.%s.%s.deducted",
Help: "Flow %s tokens deducted by %s requests, across all replication streams",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
flowTokensReturned = metric.Metadata{
Name: "kvflowcontrol.tokens.%s.%s.returned",
Help: "Flow %s tokens returned by %s requests, across all replication streams",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
flowTokensUnaccounted = metric.Metadata{
Name: "kvflowcontrol.tokens.%s.%s.unaccounted",
Help: "Flow %s tokens returned by %s requests that were unaccounted for, across all replication streams",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
totalStreamCount = metric.Metadata{
Name: "kvflowcontrol.streams.%s.%s.total_count",
Help: "Total number of %s replication streams for %s requests",
Measurement: "Count",
Unit: metric.Unit_COUNT,
}
blockedStreamCount = metric.Metadata{
Name: "kvflowcontrol.streams.%s.%s.blocked_count",
Help: "Number of %s replication streams with no flow tokens available for %s requests",
Measurement: "Count",
Unit: metric.Unit_COUNT,
}
)

// annotateMetricTemplateWithWorkClass uses the given metric template to build
// one suitable for the specific token type and work class.
func annotateMetricTemplateWithWorkClassAndType(
wc admissionpb.WorkClass, tmpl metric.Metadata, t flowControlMetricType,
) metric.Metadata {
rv := tmpl
rv.Name = fmt.Sprintf(tmpl.Name, t, wc)
rv.Help = fmt.Sprintf(tmpl.Help, t, wc)
return rv
}

type flowControlMetricType int

const (
flowControlEvalMetricType flowControlMetricType = iota
flowControlSendMetricType
numFlowControlMetricTypes
)

func (f flowControlMetricType) String() string {
return redact.StringWithoutMarkers(f)
}

// SafeFormat implements the redact.SafeFormatter interface.
func (f flowControlMetricType) SafeFormat(p redact.SafePrinter, _ rune) {
switch f {
case flowControlEvalMetricType:
p.SafeString("eval")
case flowControlSendMetricType:
p.SafeString("send")
default:
panic("unknown flowControlMetricType")
}
}

type tokenMetrics struct {
counterMetrics [numFlowControlMetricTypes]*tokenCounterMetrics
streamMetrics [numFlowControlMetricTypes]*tokenStreamMetrics
}

func newTokenMetrics() *tokenMetrics {
m := &tokenMetrics{}
for _, typ := range []flowControlMetricType{
flowControlEvalMetricType,
flowControlSendMetricType,
} {
m.counterMetrics[typ] = newTokenCounterMetrics(typ)
m.streamMetrics[typ] = newTokenStreamMetrics(typ)
}
return m
}

type tokenCounterMetrics struct {
deducted [admissionpb.NumWorkClasses]*metric.Counter
returned [admissionpb.NumWorkClasses]*metric.Counter
unaccounted [admissionpb.NumWorkClasses]*metric.Counter
}

func newTokenCounterMetrics(t flowControlMetricType) *tokenCounterMetrics {
m := &tokenCounterMetrics{}
for _, wc := range []admissionpb.WorkClass{
admissionpb.RegularWorkClass,
admissionpb.ElasticWorkClass,
} {
m.deducted[wc] = metric.NewCounter(
annotateMetricTemplateWithWorkClassAndType(wc, flowTokensDeducted, t),
)
m.returned[wc] = metric.NewCounter(
annotateMetricTemplateWithWorkClassAndType(wc, flowTokensReturned, t),
)
m.unaccounted[wc] = metric.NewCounter(
annotateMetricTemplateWithWorkClassAndType(wc, flowTokensUnaccounted, t),
)
}
return m
}

func (m *tokenCounterMetrics) onTokenAdjustment(adjustment tokensPerWorkClass) {
if adjustment.regular < 0 {
m.deducted[regular].Inc(-int64(adjustment.regular))
} else if adjustment.regular > 0 {
m.returned[regular].Inc(int64(adjustment.regular))
}
if adjustment.elastic < 0 {
m.deducted[elastic].Inc(-int64(adjustment.elastic))
} else if adjustment.elastic > 0 {
m.returned[elastic].Inc(int64(adjustment.elastic))
}
}

func (m *tokenCounterMetrics) onUnaccounted(unaccounted tokensPerWorkClass) {
m.unaccounted[regular].Inc(int64(unaccounted.regular))
m.unaccounted[elastic].Inc(int64(unaccounted.elastic))
}

type tokenStreamMetrics struct {
count [admissionpb.NumWorkClasses]*metric.Gauge
blockedCount [admissionpb.NumWorkClasses]*metric.Gauge
tokensAvailable [admissionpb.NumWorkClasses]*metric.Gauge
}

func newTokenStreamMetrics(t flowControlMetricType) *tokenStreamMetrics {
m := &tokenStreamMetrics{}
for _, wc := range []admissionpb.WorkClass{
admissionpb.RegularWorkClass,
admissionpb.ElasticWorkClass,
} {
m.count[wc] = metric.NewGauge(
annotateMetricTemplateWithWorkClassAndType(wc, totalStreamCount, t),
)
m.blockedCount[wc] = metric.NewGauge(
annotateMetricTemplateWithWorkClassAndType(wc, blockedStreamCount, t),
)
m.tokensAvailable[wc] = metric.NewGauge(
annotateMetricTemplateWithWorkClassAndType(wc, flowTokensAvailable, t),
)
}
return m
}
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ type voterStateForWaiters struct {
isLeader bool
isLeaseHolder bool
isStateReplicate bool
evalTokenCounter TokenCounter
evalTokenCounter *tokenCounter
}

type voterSet []voterStateForWaiters
Expand Down Expand Up @@ -456,7 +456,7 @@ type replicaState struct {
// is the identity that is used to deduct tokens or wait for tokens to be
// positive.
stream kvflowcontrol.Stream
evalTokenCounter, sendTokenCounter TokenCounter
evalTokenCounter, sendTokenCounter *tokenCounter
desc roachpb.ReplicaDescriptor

sendStream *replicaSendStream
Expand Down
26 changes: 13 additions & 13 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -370,7 +371,8 @@ func TestRangeController(t *testing.T) {
datadriven.Walk(t, datapathutils.TestDataPath(t, "range_controller"), func(t *testing.T, path string) {
settings := cluster.MakeTestingClusterSettings()
ranges := make(map[roachpb.RangeID]*testingRCRange)
ssTokenCounter := NewStreamTokenCounterProvider(settings)
ssTokenCounter := NewStreamTokenCounterProvider(settings, hlc.NewClockForTesting(nil))

// setTokenCounters is used to ensure that we only set the initial token
// counts once per counter.
setTokenCounters := make(map[kvflowcontrol.Stream]struct{})
Expand Down Expand Up @@ -435,18 +437,17 @@ func TestRangeController(t *testing.T) {

tokenCountsString := func() string {
var b strings.Builder

streams := make([]kvflowcontrol.Stream, 0, len(ssTokenCounter.mu.evalCounters))
for stream := range ssTokenCounter.mu.evalCounters {
streams = append(streams, stream)
}
var streams []kvflowcontrol.Stream
ssTokenCounter.evalCounters.Range(func(k kvflowcontrol.Stream, v *tokenCounter) bool {
streams = append(streams, k)
return true
})
sort.Slice(streams, func(i, j int) bool {
return streams[i].StoreID < streams[j].StoreID
})
for _, stream := range streams {
fmt.Fprintf(&b, "%v: %v\n", stream, ssTokenCounter.Eval(stream))
}

return b.String()
}

Expand Down Expand Up @@ -507,15 +508,15 @@ func TestRangeController(t *testing.T) {
if _, ok := setTokenCounters[stream]; !ok {
setTokenCounters[stream] = struct{}{}
if initialRegularTokens != -1 {
ssTokenCounter.Eval(stream).(*tokenCounter).testingSetTokens(ctx,
ssTokenCounter.Eval(stream).testingSetTokens(ctx,
admissionpb.RegularWorkClass, kvflowcontrol.Tokens(initialRegularTokens))
ssTokenCounter.Send(stream).(*tokenCounter).testingSetTokens(ctx,
ssTokenCounter.Send(stream).testingSetTokens(ctx,
admissionpb.RegularWorkClass, kvflowcontrol.Tokens(initialRegularTokens))
}
if initialElasticTokens != -1 {
ssTokenCounter.Eval(stream).(*tokenCounter).testingSetTokens(ctx,
ssTokenCounter.Eval(stream).testingSetTokens(ctx,
admissionpb.ElasticWorkClass, kvflowcontrol.Tokens(initialElasticTokens))
ssTokenCounter.Send(stream).(*tokenCounter).testingSetTokens(ctx,
ssTokenCounter.Send(stream).testingSetTokens(ctx,
admissionpb.ElasticWorkClass, kvflowcontrol.Tokens(initialElasticTokens))
}
}
Expand Down Expand Up @@ -621,7 +622,7 @@ func TestRangeController(t *testing.T) {
ssTokenCounter.Eval(kvflowcontrol.Stream{
StoreID: roachpb.StoreID(store),
TenantID: roachpb.SystemTenantID,
}).(*tokenCounter).adjust(ctx,
}).adjust(ctx,
admissionpb.WorkClassFromPri(pri),
kvflowcontrol.Tokens(tokens))
}
Expand All @@ -631,7 +632,6 @@ func TestRangeController(t *testing.T) {
case "cancel_context":
var rangeID int
var name string

d.ScanArgs(t, "range_id", &rangeID)
d.ScanArgs(t, "name", &name)
testRC := ranges[roachpb.RangeID(rangeID)]
Expand Down
Loading

0 comments on commit 3373e5d

Please sign in to comment.