From 97cb5c44789f11380859c38ae1cee16fc088762f Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Mon, 26 Aug 2024 15:46:27 -0400 Subject: [PATCH 1/2] rac2: de-interface token counter and use syncutil map Prior to this change, `TokenCounter` provided an interface implemented by `*tokenCounter`. As there is only one implementation, de-interface `TokenCounter`. Also, store the TokenCounter in a `syncutil.Map`, as opposed to a native mutex protected map. Epic: CRDB-37515 Release note: None --- .../kvflowcontrol/rac2/range_controller.go | 4 +- .../rac2/range_controller_test.go | 22 +++++------ .../kvflowcontrol/rac2/store_stream.go | 38 +++++-------------- .../kvflowcontrol/rac2/token_counter.go | 29 +------------- 4 files changed, 23 insertions(+), 70 deletions(-) diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index e62f65217c9b..1f707a1d087c 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -219,7 +219,7 @@ type voterStateForWaiters struct { isLeader bool isLeaseHolder bool isStateReplicate bool - evalTokenCounter TokenCounter + evalTokenCounter *tokenCounter } type voterSet []voterStateForWaiters @@ -456,7 +456,7 @@ type replicaState struct { // is the identity that is used to deduct tokens or wait for tokens to be // positive. stream kvflowcontrol.Stream - evalTokenCounter, sendTokenCounter TokenCounter + evalTokenCounter, sendTokenCounter *tokenCounter desc roachpb.ReplicaDescriptor sendStream *replicaSendStream diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go index 8258befa3ac8..11571fd43c17 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go @@ -435,18 +435,17 @@ func TestRangeController(t *testing.T) { tokenCountsString := func() string { var b strings.Builder - - streams := make([]kvflowcontrol.Stream, 0, len(ssTokenCounter.mu.evalCounters)) - for stream := range ssTokenCounter.mu.evalCounters { - streams = append(streams, stream) - } + var streams []kvflowcontrol.Stream + ssTokenCounter.evalCounters.Range(func(k kvflowcontrol.Stream, v *tokenCounter) bool { + streams = append(streams, k) + return true + }) sort.Slice(streams, func(i, j int) bool { return streams[i].StoreID < streams[j].StoreID }) for _, stream := range streams { fmt.Fprintf(&b, "%v: %v\n", stream, ssTokenCounter.Eval(stream)) } - return b.String() } @@ -507,15 +506,15 @@ func TestRangeController(t *testing.T) { if _, ok := setTokenCounters[stream]; !ok { setTokenCounters[stream] = struct{}{} if initialRegularTokens != -1 { - ssTokenCounter.Eval(stream).(*tokenCounter).testingSetTokens(ctx, + ssTokenCounter.Eval(stream).testingSetTokens(ctx, admissionpb.RegularWorkClass, kvflowcontrol.Tokens(initialRegularTokens)) - ssTokenCounter.Send(stream).(*tokenCounter).testingSetTokens(ctx, + ssTokenCounter.Send(stream).testingSetTokens(ctx, admissionpb.RegularWorkClass, kvflowcontrol.Tokens(initialRegularTokens)) } if initialElasticTokens != -1 { - ssTokenCounter.Eval(stream).(*tokenCounter).testingSetTokens(ctx, + ssTokenCounter.Eval(stream).testingSetTokens(ctx, admissionpb.ElasticWorkClass, kvflowcontrol.Tokens(initialElasticTokens)) - ssTokenCounter.Send(stream).(*tokenCounter).testingSetTokens(ctx, + ssTokenCounter.Send(stream).testingSetTokens(ctx, admissionpb.ElasticWorkClass, kvflowcontrol.Tokens(initialElasticTokens)) } } @@ -621,7 +620,7 @@ func TestRangeController(t *testing.T) { ssTokenCounter.Eval(kvflowcontrol.Stream{ StoreID: roachpb.StoreID(store), TenantID: roachpb.SystemTenantID, - }).(*tokenCounter).adjust(ctx, + }).adjust(ctx, admissionpb.WorkClassFromPri(pri), kvflowcontrol.Tokens(tokens)) } @@ -631,7 +630,6 @@ func TestRangeController(t *testing.T) { case "cancel_context": var rangeID int var name string - d.ScanArgs(t, "range_id", &rangeID) d.ScanArgs(t, "name", &name) testRC := ranges[roachpb.RangeID(rangeID)] diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go b/pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go index 175397579f13..ff4c634979d9 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go @@ -22,49 +22,31 @@ import ( // for a given stream. // // TODO(kvoli): Add stream deletion upon decommissioning a store. -// TODO(kvoli): Check mutex performance against syncutil.Map. type StreamTokenCounterProvider struct { - settings *cluster.Settings - - mu struct { - syncutil.Mutex - sendCounters, evalCounters map[kvflowcontrol.Stream]TokenCounter - } + settings *cluster.Settings + sendCounters, evalCounters syncutil.Map[kvflowcontrol.Stream, tokenCounter] } // NewStreamTokenCounterProvider creates a new StreamTokenCounterProvider. func NewStreamTokenCounterProvider(settings *cluster.Settings) *StreamTokenCounterProvider { - p := StreamTokenCounterProvider{settings: settings} - p.mu.evalCounters = make(map[kvflowcontrol.Stream]TokenCounter) - p.mu.sendCounters = make(map[kvflowcontrol.Stream]TokenCounter) - return &p + return &StreamTokenCounterProvider{settings: settings} } // Eval returns the evaluation token counter for the given stream. -func (p *StreamTokenCounterProvider) Eval(stream kvflowcontrol.Stream) TokenCounter { - p.mu.Lock() - defer p.mu.Unlock() - - if t, ok := p.mu.evalCounters[stream]; ok { +func (p *StreamTokenCounterProvider) Eval(stream kvflowcontrol.Stream) *tokenCounter { + if t, ok := p.evalCounters.Load(stream); ok { return t } - - t := newTokenCounter(p.settings) - p.mu.evalCounters[stream] = t + t, _ := p.evalCounters.LoadOrStore(stream, newTokenCounter(p.settings)) return t } // Send returns the send token counter for the given stream. -func (p *StreamTokenCounterProvider) Send(stream kvflowcontrol.Stream) TokenCounter { - p.mu.Lock() - defer p.mu.Unlock() - - if t, ok := p.mu.sendCounters[stream]; ok { +func (p *StreamTokenCounterProvider) Send(stream kvflowcontrol.Stream) *tokenCounter { + if t, ok := p.sendCounters.Load(stream); ok { return t } - - t := newTokenCounter(p.settings) - p.mu.sendCounters[stream] = t + t, _ := p.sendCounters.LoadOrStore(stream, newTokenCounter(p.settings)) return t } @@ -86,7 +68,7 @@ type SendTokenWatcher interface { // call CancelHandle when tokens are no longer needed, or when the caller is // done. NotifyWhenAvailable( - TokenCounter, + *tokenCounter, TokenGrantNotification, ) SendTokenWatcherHandleID // CancelHandle cancels the given handle, stopping it from being notified diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go b/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go index 3a83f6dcd20d..c73c34f60178 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go @@ -25,32 +25,6 @@ import ( "github.com/cockroachdb/redact" ) -// TokenCounter is the interface for a token counter that can be used to deduct -// and return flow control tokens. Additionally, it can be used to wait for -// tokens to become available, and to check if tokens are available without -// blocking. -// -// TODO(kvoli): Consider de-interfacing if not necessary for testing. -type TokenCounter interface { - // TokensAvailable returns true if tokens are available. If false, it returns - // a handle that may be used for waiting for tokens to become available. - TokensAvailable(admissionpb.WorkClass) (available bool, tokenWaitingHandle TokenWaitingHandle) - // TryDeduct attempts to deduct flow tokens for the given work class. If - // there are no tokens available, 0 tokens are returned. When less than the - // requested token count is available, partial tokens are returned - // corresponding to this partial amount. - TryDeduct( - context.Context, admissionpb.WorkClass, kvflowcontrol.Tokens) kvflowcontrol.Tokens - // Deduct deducts (without blocking) flow tokens for the given work class. If - // there are not enough available tokens, the token counter will go into debt - // (negative available count) and still issue the requested number of tokens. - Deduct(context.Context, admissionpb.WorkClass, kvflowcontrol.Tokens) - // Return returns flow tokens for the given work class. - Return(context.Context, admissionpb.WorkClass, kvflowcontrol.Tokens) - // String returns a string representation of the token counter. - String() string -} - // TokenWaitingHandle is the interface for waiting for positive tokens from a // token counter. type TokenWaitingHandle interface { @@ -177,8 +151,7 @@ type tokenCounter struct { } } -var _ TokenCounter = &tokenCounter{} - +// newTokenCounter creates a new TokenCounter. func newTokenCounter(settings *cluster.Settings) *tokenCounter { t := &tokenCounter{ settings: settings, From 5d555ec877965140ed9c7e47e4d6c9db3c79bb29 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Mon, 26 Aug 2024 16:39:54 -0400 Subject: [PATCH 2/2] rac2: add token counter and stream metrics This commit introduces metrics related to stream eval tokens and stream send tokens. Hooking up these metrics to the registry will be in a subsequent commit. There are two separate metric structs used: 1. `tokenCounterMetrics`, which only contains counter and is shared among all `tokenCounter`s on the same node. Each `tokenCounter` updates the shared counters after `adjust` is called. 2. `tokenStreamMetrics`, which is updated periodically by calling `UpdateMetricGauges` via the `StreamTokenCounterProvider`, which is one per node. Metrics related to `WaitForEval` (as well as blocked stream logging) are also deferred to a subsequent commit. Part of: #128031 Release note: None --- .../kvserver/kvflowcontrol/rac2/BUILD.bazel | 4 + pkg/kv/kvserver/kvflowcontrol/rac2/metrics.go | 181 ++++++++++++++++++ .../rac2/range_controller_test.go | 4 +- .../kvflowcontrol/rac2/store_stream.go | 68 ++++++- .../rac2/testdata/token_adjustment | 76 +++++++- .../kvflowcontrol/rac2/token_counter.go | 129 ++++++++++--- .../kvflowcontrol/rac2/token_counter_test.go | 45 ++++- 7 files changed, 462 insertions(+), 45 deletions(-) create mode 100644 pkg/kv/kvserver/kvflowcontrol/rac2/metrics.go diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel index df2a993e1017..9004593ee653 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "rac2", srcs = [ + "metrics.go", "priority.go", "range_controller.go", "store_stream.go", @@ -20,7 +21,9 @@ go_library( "//pkg/settings/cluster", "//pkg/util/admission/admissionpb", "//pkg/util/buildutil", + "//pkg/util/hlc", "//pkg/util/log", + "//pkg/util/metric", "//pkg/util/syncutil", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", @@ -49,6 +52,7 @@ go_test( "//pkg/settings/cluster", "//pkg/testutils/datapathutils", "//pkg/util/admission/admissionpb", + "//pkg/util/hlc", "//pkg/util/humanizeutil", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/metrics.go b/pkg/kv/kvserver/kvflowcontrol/rac2/metrics.go new file mode 100644 index 000000000000..be1a09033297 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/metrics.go @@ -0,0 +1,181 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rac2 + +import ( + "fmt" + + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/redact" +) + +// Aliases to make the code below slightly easier to read. +const regular, elastic = admissionpb.RegularWorkClass, admissionpb.ElasticWorkClass + +var ( + flowTokensAvailable = metric.Metadata{ + Name: "kvflowcontrol.tokens.%s.%s.available", + Help: "Flow %s tokens available for %s requests, across all replication streams", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + flowTokensDeducted = metric.Metadata{ + Name: "kvflowcontrol.tokens.%s.%s.deducted", + Help: "Flow %s tokens deducted by %s requests, across all replication streams", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + flowTokensReturned = metric.Metadata{ + Name: "kvflowcontrol.tokens.%s.%s.returned", + Help: "Flow %s tokens returned by %s requests, across all replication streams", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + flowTokensUnaccounted = metric.Metadata{ + Name: "kvflowcontrol.tokens.%s.%s.unaccounted", + Help: "Flow %s tokens returned by %s requests that were unaccounted for, across all replication streams", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + totalStreamCount = metric.Metadata{ + Name: "kvflowcontrol.streams.%s.%s.total_count", + Help: "Total number of %s replication streams for %s requests", + Measurement: "Count", + Unit: metric.Unit_COUNT, + } + blockedStreamCount = metric.Metadata{ + Name: "kvflowcontrol.streams.%s.%s.blocked_count", + Help: "Number of %s replication streams with no flow tokens available for %s requests", + Measurement: "Count", + Unit: metric.Unit_COUNT, + } +) + +// annotateMetricTemplateWithWorkClass uses the given metric template to build +// one suitable for the specific token type and work class. +func annotateMetricTemplateWithWorkClassAndType( + wc admissionpb.WorkClass, tmpl metric.Metadata, t flowControlMetricType, +) metric.Metadata { + rv := tmpl + rv.Name = fmt.Sprintf(tmpl.Name, t, wc) + rv.Help = fmt.Sprintf(tmpl.Help, t, wc) + return rv +} + +type flowControlMetricType int + +const ( + flowControlEvalMetricType flowControlMetricType = iota + flowControlSendMetricType + numFlowControlMetricTypes +) + +func (f flowControlMetricType) String() string { + return redact.StringWithoutMarkers(f) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (f flowControlMetricType) SafeFormat(p redact.SafePrinter, _ rune) { + switch f { + case flowControlEvalMetricType: + p.SafeString("eval") + case flowControlSendMetricType: + p.SafeString("send") + default: + panic("unknown flowControlMetricType") + } +} + +type tokenMetrics struct { + counterMetrics [numFlowControlMetricTypes]*tokenCounterMetrics + streamMetrics [numFlowControlMetricTypes]*tokenStreamMetrics +} + +func newTokenMetrics() *tokenMetrics { + m := &tokenMetrics{} + for _, typ := range []flowControlMetricType{ + flowControlEvalMetricType, + flowControlSendMetricType, + } { + m.counterMetrics[typ] = newTokenCounterMetrics(typ) + m.streamMetrics[typ] = newTokenStreamMetrics(typ) + } + return m +} + +type tokenCounterMetrics struct { + deducted [admissionpb.NumWorkClasses]*metric.Counter + returned [admissionpb.NumWorkClasses]*metric.Counter + unaccounted [admissionpb.NumWorkClasses]*metric.Counter +} + +func newTokenCounterMetrics(t flowControlMetricType) *tokenCounterMetrics { + m := &tokenCounterMetrics{} + for _, wc := range []admissionpb.WorkClass{ + admissionpb.RegularWorkClass, + admissionpb.ElasticWorkClass, + } { + m.deducted[wc] = metric.NewCounter( + annotateMetricTemplateWithWorkClassAndType(wc, flowTokensDeducted, t), + ) + m.returned[wc] = metric.NewCounter( + annotateMetricTemplateWithWorkClassAndType(wc, flowTokensReturned, t), + ) + m.unaccounted[wc] = metric.NewCounter( + annotateMetricTemplateWithWorkClassAndType(wc, flowTokensUnaccounted, t), + ) + } + return m +} + +func (m *tokenCounterMetrics) onTokenAdjustment(adjustment tokensPerWorkClass) { + if adjustment.regular < 0 { + m.deducted[regular].Inc(-int64(adjustment.regular)) + } else if adjustment.regular > 0 { + m.returned[regular].Inc(int64(adjustment.regular)) + } + if adjustment.elastic < 0 { + m.deducted[elastic].Inc(-int64(adjustment.elastic)) + } else if adjustment.elastic > 0 { + m.returned[elastic].Inc(int64(adjustment.elastic)) + } +} + +func (m *tokenCounterMetrics) onUnaccounted(unaccounted tokensPerWorkClass) { + m.unaccounted[regular].Inc(int64(unaccounted.regular)) + m.unaccounted[elastic].Inc(int64(unaccounted.elastic)) +} + +type tokenStreamMetrics struct { + count [admissionpb.NumWorkClasses]*metric.Gauge + blockedCount [admissionpb.NumWorkClasses]*metric.Gauge + tokensAvailable [admissionpb.NumWorkClasses]*metric.Gauge +} + +func newTokenStreamMetrics(t flowControlMetricType) *tokenStreamMetrics { + m := &tokenStreamMetrics{} + for _, wc := range []admissionpb.WorkClass{ + admissionpb.RegularWorkClass, + admissionpb.ElasticWorkClass, + } { + m.count[wc] = metric.NewGauge( + annotateMetricTemplateWithWorkClassAndType(wc, totalStreamCount, t), + ) + m.blockedCount[wc] = metric.NewGauge( + annotateMetricTemplateWithWorkClassAndType(wc, blockedStreamCount, t), + ) + m.tokensAvailable[wc] = metric.NewGauge( + annotateMetricTemplateWithWorkClassAndType(wc, flowTokensAvailable, t), + ) + } + return m +} diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go index 11571fd43c17..c17ab41f2de0 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -370,7 +371,8 @@ func TestRangeController(t *testing.T) { datadriven.Walk(t, datapathutils.TestDataPath(t, "range_controller"), func(t *testing.T, path string) { settings := cluster.MakeTestingClusterSettings() ranges := make(map[roachpb.RangeID]*testingRCRange) - ssTokenCounter := NewStreamTokenCounterProvider(settings) + ssTokenCounter := NewStreamTokenCounterProvider(settings, hlc.NewClockForTesting(nil)) + // setTokenCounters is used to ensure that we only set the initial token // counts once per counter. setTokenCounters := make(map[kvflowcontrol.Stream]struct{}) diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go b/pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go index ff4c634979d9..6ec2e12173ff 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go @@ -15,6 +15,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) @@ -24,12 +26,20 @@ import ( // TODO(kvoli): Add stream deletion upon decommissioning a store. type StreamTokenCounterProvider struct { settings *cluster.Settings + clock *hlc.Clock + tokenMetrics *tokenMetrics sendCounters, evalCounters syncutil.Map[kvflowcontrol.Stream, tokenCounter] } // NewStreamTokenCounterProvider creates a new StreamTokenCounterProvider. -func NewStreamTokenCounterProvider(settings *cluster.Settings) *StreamTokenCounterProvider { - return &StreamTokenCounterProvider{settings: settings} +func NewStreamTokenCounterProvider( + settings *cluster.Settings, clock *hlc.Clock, +) *StreamTokenCounterProvider { + return &StreamTokenCounterProvider{ + settings: settings, + clock: clock, + tokenMetrics: newTokenMetrics(), + } } // Eval returns the evaluation token counter for the given stream. @@ -37,7 +47,8 @@ func (p *StreamTokenCounterProvider) Eval(stream kvflowcontrol.Stream) *tokenCou if t, ok := p.evalCounters.Load(stream); ok { return t } - t, _ := p.evalCounters.LoadOrStore(stream, newTokenCounter(p.settings)) + t, _ := p.evalCounters.LoadOrStore(stream, newTokenCounter( + p.settings, p.clock, p.tokenMetrics.counterMetrics[flowControlEvalMetricType])) return t } @@ -46,10 +57,59 @@ func (p *StreamTokenCounterProvider) Send(stream kvflowcontrol.Stream) *tokenCou if t, ok := p.sendCounters.Load(stream); ok { return t } - t, _ := p.sendCounters.LoadOrStore(stream, newTokenCounter(p.settings)) + t, _ := p.sendCounters.LoadOrStore(stream, newTokenCounter( + p.settings, p.clock, p.tokenMetrics.counterMetrics[flowControlSendMetricType])) return t } +// UpdateMetricGauges updates the gauge token metrics. +func (p *StreamTokenCounterProvider) UpdateMetricGauges() { + var ( + count [numFlowControlMetricTypes][admissionpb.NumWorkClasses]int64 + blockedCount [numFlowControlMetricTypes][admissionpb.NumWorkClasses]int64 + tokensAvailable [numFlowControlMetricTypes][admissionpb.NumWorkClasses]int64 + ) + now := p.clock.PhysicalTime() + + // First aggregate the metrics across all streams, by (eval|send) types and + // (regular|elastic) work classes, then using the aggregate update the + // gauges. + gaugeUpdateFn := func(metricType flowControlMetricType) func( + kvflowcontrol.Stream, *tokenCounter) bool { + return func(stream kvflowcontrol.Stream, t *tokenCounter) bool { + count[metricType][regular]++ + count[metricType][elastic]++ + tokensAvailable[metricType][regular] += int64(t.tokens(regular)) + tokensAvailable[metricType][elastic] += int64(t.tokens(elastic)) + + regularStats, elasticStats := t.GetAndResetStats(now) + if regularStats.noTokenDuration > 0 { + blockedCount[metricType][regular]++ + } + if elasticStats.noTokenDuration > 0 { + blockedCount[metricType][elastic]++ + } + return true + } + } + + p.evalCounters.Range(gaugeUpdateFn(flowControlEvalMetricType)) + p.sendCounters.Range(gaugeUpdateFn(flowControlSendMetricType)) + for _, typ := range []flowControlMetricType{ + flowControlEvalMetricType, + flowControlSendMetricType, + } { + for _, wc := range []admissionpb.WorkClass{ + admissionpb.RegularWorkClass, + admissionpb.ElasticWorkClass, + } { + p.tokenMetrics.streamMetrics[typ].count[wc].Update(count[typ][wc]) + p.tokenMetrics.streamMetrics[typ].blockedCount[wc].Update(blockedCount[typ][wc]) + p.tokenMetrics.streamMetrics[typ].tokensAvailable[wc].Update(tokensAvailable[typ][wc]) + } + } +} + // SendTokenWatcherHandleID is a unique identifier for a handle that is // watching for available elastic send tokens on a stream. type SendTokenWatcherHandleID int64 diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/token_adjustment b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/token_adjustment index 5cb66ae0d78d..e9f78657597b 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/token_adjustment +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/token_adjustment @@ -1,6 +1,21 @@ -init +init stream=1 ---- +metrics +---- +kvflowcontrol.streams.eval.regular.total_count : 1 +kvflowcontrol.streams.eval.regular.blocked_count: 0 +kvflowcontrol.tokens.eval.regular.available : 16777216 +kvflowcontrol.tokens.eval.regular.deducted : 0 +kvflowcontrol.tokens.eval.regular.returned : 0 +kvflowcontrol.tokens.eval.regular.unaccounted : 0 +kvflowcontrol.streams.eval.elastic.total_count : 1 +kvflowcontrol.streams.eval.elastic.blocked_count: 0 +kvflowcontrol.tokens.eval.elastic.available : 8388608 +kvflowcontrol.tokens.eval.elastic.deducted : 0 +kvflowcontrol.tokens.eval.elastic.returned : 0 +kvflowcontrol.tokens.eval.elastic.unaccounted : 0 + adjust class=regular delta=-1MiB class=regular delta=-7MiB @@ -36,8 +51,43 @@ history +2.0MiB elastic +10MiB | +2.0MiB +6.0MiB regular +16MiB | +8.0MiB -init +# Despite the elastic stream being unblocked by the time metrics is called, the +# stream was blocked for a non-zero duration between metric calls so we expect +# elastic.blocked_count=1. +metrics ---- +kvflowcontrol.streams.eval.regular.total_count : 1 +kvflowcontrol.streams.eval.regular.blocked_count: 0 +kvflowcontrol.tokens.eval.regular.available : 16777216 +kvflowcontrol.tokens.eval.regular.deducted : 16777216 +kvflowcontrol.tokens.eval.regular.returned : 16777216 +kvflowcontrol.tokens.eval.regular.unaccounted : 0 +kvflowcontrol.streams.eval.elastic.total_count : 1 +kvflowcontrol.streams.eval.elastic.blocked_count: 1 +kvflowcontrol.tokens.eval.elastic.available : 8388608 +kvflowcontrol.tokens.eval.elastic.deducted : 18874368 +kvflowcontrol.tokens.eval.elastic.returned : 18874368 +kvflowcontrol.tokens.eval.elastic.unaccounted : 0 + +init stream=2 +---- + +# There should now be two streams in the metrics, double the available tokens +# for each work class. +metrics +---- +kvflowcontrol.streams.eval.regular.total_count : 2 +kvflowcontrol.streams.eval.regular.blocked_count: 0 +kvflowcontrol.tokens.eval.regular.available : 33554432 +kvflowcontrol.tokens.eval.regular.deducted : 16777216 +kvflowcontrol.tokens.eval.regular.returned : 16777216 +kvflowcontrol.tokens.eval.regular.unaccounted : 0 +kvflowcontrol.streams.eval.elastic.total_count : 2 +kvflowcontrol.streams.eval.elastic.blocked_count: 0 +kvflowcontrol.tokens.eval.elastic.available : 16777216 +kvflowcontrol.tokens.eval.elastic.deducted : 18874368 +kvflowcontrol.tokens.eval.elastic.returned : 18874368 +kvflowcontrol.tokens.eval.elastic.unaccounted : 0 adjust class=elastic delta=-7MiB @@ -47,9 +97,11 @@ class=regular delta=-1MiB class=regular delta=-6MiB class=regular delta=+6MiB class=regular delta=-9MiB -class=regular delta=+16MiB +class=regular delta=+17MiB +class=elastic delta=+1MiB ---- + history ---- regular | elastic @@ -62,4 +114,20 @@ history -6.0MiB regular +2.0MiB | -7.0MiB (elastic blocked) +6.0MiB regular +8.0MiB | -1.0MiB (elastic blocked) -9.0MiB regular -1.0MiB | -10MiB (regular and elastic blocked) - +16MiB regular +15MiB | +6.0MiB + +17MiB regular +16MiB | +7.0MiB + +1.0MiB elastic +16MiB | +8.0MiB + +metrics +---- +kvflowcontrol.streams.eval.regular.total_count : 2 +kvflowcontrol.streams.eval.regular.blocked_count: 1 +kvflowcontrol.tokens.eval.regular.available : 33554432 +kvflowcontrol.tokens.eval.regular.deducted : 40894464 +kvflowcontrol.tokens.eval.regular.returned : 40894464 +kvflowcontrol.tokens.eval.regular.unaccounted : 0 +kvflowcontrol.streams.eval.elastic.total_count : 2 +kvflowcontrol.streams.eval.elastic.blocked_count: 1 +kvflowcontrol.tokens.eval.elastic.available : 16777216 +kvflowcontrol.tokens.eval.elastic.deducted : 50331648 +kvflowcontrol.tokens.eval.elastic.returned : 50331648 +kvflowcontrol.tokens.eval.elastic.unaccounted : 0 diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go b/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go index c73c34f60178..47d0598ae8ec 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go @@ -14,11 +14,13 @@ import ( "context" "fmt" "reflect" + "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" @@ -80,50 +82,67 @@ type tokenCounterPerWorkClass struct { // needs to schedule the goroutine that got the entry for it to unblock // another. signalCh chan struct{} + stats struct { + deltaStats + noTokenStartTime time.Time + } +} + +type deltaStats struct { + noTokenDuration time.Duration + tokensDeducted, tokensReturned kvflowcontrol.Tokens } func makeTokenCounterPerWorkClass( - wc admissionpb.WorkClass, limit kvflowcontrol.Tokens, + wc admissionpb.WorkClass, limit kvflowcontrol.Tokens, now time.Time, ) tokenCounterPerWorkClass { - return tokenCounterPerWorkClass{ + twc := tokenCounterPerWorkClass{ wc: wc, tokens: limit, limit: limit, signalCh: make(chan struct{}, 1), } + twc.stats.noTokenStartTime = now + return twc } // adjustTokensLocked adjusts the tokens for the given work class by delta. func (twc *tokenCounterPerWorkClass) adjustTokensLocked( - ctx context.Context, delta kvflowcontrol.Tokens, -) { - var unaccounted kvflowcontrol.Tokens + ctx context.Context, delta kvflowcontrol.Tokens, now time.Time, +) (adjustment, unaccounted kvflowcontrol.Tokens) { before := twc.tokens twc.tokens += delta - - if delta <= 0 { - // Nothing left to do, since we know tokens didn't increase. - return - } - if twc.tokens > twc.limit { - unaccounted = twc.tokens - twc.limit - twc.tokens = twc.limit - } - if before <= 0 && twc.tokens > 0 { - twc.signal() + if delta > 0 { + twc.stats.tokensReturned += delta + if twc.tokens > twc.limit { + unaccounted = twc.tokens - twc.limit + twc.tokens = twc.limit + } + if before <= 0 && twc.tokens > 0 { + twc.signal() + twc.stats.noTokenDuration += now.Sub(twc.stats.noTokenStartTime) + } + } else { + twc.stats.tokensDeducted -= delta + if before > 0 && twc.tokens <= 0 { + twc.stats.noTokenStartTime = now + } } if buildutil.CrdbTestBuild && unaccounted != 0 { log.Fatalf(ctx, "unaccounted[%s]=%d delta=%d limit=%d", twc.wc, unaccounted, delta, twc.limit) } + + adjustment = twc.tokens - before + return adjustment, unaccounted } func (twc *tokenCounterPerWorkClass) setLimitLocked( - ctx context.Context, limit kvflowcontrol.Tokens, + ctx context.Context, limit kvflowcontrol.Tokens, now time.Time, ) { before := twc.limit twc.limit = limit - twc.adjustTokensLocked(ctx, twc.limit-before) + twc.adjustTokensLocked(ctx, twc.limit-before, now) } func (twc *tokenCounterPerWorkClass) signal() { @@ -134,6 +153,18 @@ func (twc *tokenCounterPerWorkClass) signal() { } } +func (twc *tokenCounterPerWorkClass) getAndResetStats(now time.Time) deltaStats { + stats := twc.stats.deltaStats + if twc.tokens <= 0 { + stats.noTokenDuration += now.Sub(twc.stats.noTokenStartTime) + } + twc.stats.deltaStats = deltaStats{} + // Doesn't matter if bwc.tokens is actually > 0 since in that case we won't + // use this value. + twc.stats.noTokenStartTime = now + return stats +} + type tokensPerWorkClass struct { regular, elastic kvflowcontrol.Tokens } @@ -143,6 +174,8 @@ type tokensPerWorkClass struct { // returning and waiting for flow tokens. type tokenCounter struct { settings *cluster.Settings + clock *hlc.Clock + metrics *tokenCounterMetrics mu struct { syncutil.RWMutex @@ -152,27 +185,34 @@ type tokenCounter struct { } // newTokenCounter creates a new TokenCounter. -func newTokenCounter(settings *cluster.Settings) *tokenCounter { +func newTokenCounter( + settings *cluster.Settings, clock *hlc.Clock, metrics *tokenCounterMetrics, +) *tokenCounter { t := &tokenCounter{ settings: settings, + clock: clock, + metrics: metrics, } limit := tokensPerWorkClass{ regular: kvflowcontrol.Tokens(kvflowcontrol.RegularTokensPerStream.Get(&settings.SV)), elastic: kvflowcontrol.Tokens(kvflowcontrol.ElasticTokensPerStream.Get(&settings.SV)), } + now := clock.PhysicalTime() + t.mu.counters[admissionpb.RegularWorkClass] = makeTokenCounterPerWorkClass( - admissionpb.RegularWorkClass, limit.regular) + admissionpb.RegularWorkClass, limit.regular, now) t.mu.counters[admissionpb.ElasticWorkClass] = makeTokenCounterPerWorkClass( - admissionpb.ElasticWorkClass, limit.elastic) + admissionpb.ElasticWorkClass, limit.elastic, now) onChangeFunc := func(ctx context.Context) { + now := t.clock.PhysicalTime() t.mu.Lock() defer t.mu.Unlock() t.mu.counters[admissionpb.RegularWorkClass].setLimitLocked( - ctx, kvflowcontrol.Tokens(kvflowcontrol.RegularTokensPerStream.Get(&settings.SV))) + ctx, kvflowcontrol.Tokens(kvflowcontrol.RegularTokensPerStream.Get(&settings.SV)), now) t.mu.counters[admissionpb.ElasticWorkClass].setLimitLocked( - ctx, kvflowcontrol.Tokens(kvflowcontrol.ElasticTokensPerStream.Get(&settings.SV))) + ctx, kvflowcontrol.Tokens(kvflowcontrol.ElasticTokensPerStream.Get(&settings.SV)), now) } kvflowcontrol.RegularTokensPerStream.SetOnChange(&settings.SV, onChangeFunc) @@ -224,6 +264,7 @@ func (t *tokenCounter) TokensAvailable( func (t *tokenCounter) TryDeduct( ctx context.Context, wc admissionpb.WorkClass, tokens kvflowcontrol.Tokens, ) kvflowcontrol.Tokens { + now := t.clock.PhysicalTime() t.mu.Lock() defer t.mu.Unlock() @@ -233,7 +274,7 @@ func (t *tokenCounter) TryDeduct( } adjust := min(tokensAvailable, tokens) - t.adjustLocked(ctx, wc, -adjust) + t.adjustLocked(ctx, wc, -adjust, now) return adjust } @@ -414,23 +455,38 @@ func WaitForEval( func (t *tokenCounter) adjust( ctx context.Context, class admissionpb.WorkClass, delta kvflowcontrol.Tokens, ) { + now := t.clock.PhysicalTime() t.mu.Lock() defer t.mu.Unlock() - t.adjustLocked(ctx, class, delta) + t.adjustLocked(ctx, class, delta, now) } func (t *tokenCounter) adjustLocked( - ctx context.Context, class admissionpb.WorkClass, delta kvflowcontrol.Tokens, + ctx context.Context, class admissionpb.WorkClass, delta kvflowcontrol.Tokens, now time.Time, ) { + var adjustment, unaccounted tokensPerWorkClass switch class { case admissionpb.RegularWorkClass: - t.mu.counters[admissionpb.RegularWorkClass].adjustTokensLocked(ctx, delta) - // Regular {deductions,returns} also affect elastic flow tokens. - t.mu.counters[admissionpb.ElasticWorkClass].adjustTokensLocked(ctx, delta) + adjustment.regular, unaccounted.regular = + t.mu.counters[admissionpb.RegularWorkClass].adjustTokensLocked(ctx, delta, now) + // Regular {deductions,returns} also affect elastic flow tokens. + adjustment.elastic, unaccounted.elastic = + t.mu.counters[admissionpb.ElasticWorkClass].adjustTokensLocked(ctx, delta, now) + case admissionpb.ElasticWorkClass: // Elastic {deductions,returns} only affect elastic flow tokens. - t.mu.counters[admissionpb.ElasticWorkClass].adjustTokensLocked(ctx, delta) + adjustment.elastic, unaccounted.elastic = + t.mu.counters[admissionpb.ElasticWorkClass].adjustTokensLocked(ctx, delta, now) + } + + // Adjust metrics if any tokens were actually adjusted or unaccounted for + // tokens were detected. + if adjustment.regular != 0 || adjustment.elastic != 0 { + t.metrics.onTokenAdjustment(adjustment) + } + if unaccounted.regular != 0 || unaccounted.elastic != 0 { + t.metrics.onUnaccounted(unaccounted) } } @@ -441,5 +497,16 @@ func (t *tokenCounter) testingSetTokens( ) { t.mu.Lock() defer t.mu.Unlock() - t.mu.counters[wc].adjustTokensLocked(ctx, tokens-t.mu.counters[wc].tokens) + + t.mu.counters[wc].adjustTokensLocked(ctx, + tokens-t.mu.counters[wc].tokens, t.clock.PhysicalTime()) +} + +func (t *tokenCounter) GetAndResetStats(now time.Time) (regularStats, elasticStats deltaStats) { + t.mu.Lock() + defer t.mu.Unlock() + + regularStats = t.mu.counters[admissionpb.RegularWorkClass].getAndResetStats(now) + elasticStats = t.mu.counters[admissionpb.ElasticWorkClass].getAndResetStats(now) + return regularStats, elasticStats } diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter_test.go index ee440cd6b8ae..01996b86d659 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/token_counter_test.go @@ -20,8 +20,10 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -44,6 +46,10 @@ func TestTokenAdjustment(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + provider := NewStreamTokenCounterProvider( + cluster.MakeTestingClusterSettings(), + hlc.NewClockForTesting(nil), + ) var ( ctx = context.Background() counter *tokenCounter @@ -54,7 +60,9 @@ func TestTokenAdjustment(t *testing.T) { func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { case "init": - counter = newTokenCounter(cluster.MakeTestingClusterSettings()) + var stream int + d.ScanArgs(t, "stream", &stream) + counter = provider.Eval(kvflowcontrol.Stream{StoreID: roachpb.StoreID(stream)}) adjustments = nil return "" @@ -120,6 +128,25 @@ func TestTokenAdjustment(t *testing.T) { } return buf.String() + case "metrics": + provider.UpdateMetricGauges() + var buf strings.Builder + // We are only using the eval token counter in this test. + counterMetrics := provider.tokenMetrics.counterMetrics[flowControlEvalMetricType] + streamMetrics := provider.tokenMetrics.streamMetrics[flowControlEvalMetricType] + for _, wc := range []admissionpb.WorkClass{ + admissionpb.RegularWorkClass, + admissionpb.ElasticWorkClass, + } { + fmt.Fprintf(&buf, "%-48v: %v\n", streamMetrics.count[wc].GetName(), streamMetrics.count[wc].Value()) + fmt.Fprintf(&buf, "%-48v: %v\n", streamMetrics.blockedCount[wc].GetName(), streamMetrics.blockedCount[wc].Value()) + fmt.Fprintf(&buf, "%-48v: %v\n", streamMetrics.tokensAvailable[wc].GetName(), streamMetrics.tokensAvailable[wc].Value()) + fmt.Fprintf(&buf, "%-48v: %v\n", counterMetrics.deducted[wc].GetName(), counterMetrics.deducted[wc].Count()) + fmt.Fprintf(&buf, "%-48v: %v\n", counterMetrics.returned[wc].GetName(), counterMetrics.returned[wc].Count()) + fmt.Fprintf(&buf, "%-48v: %v\n", counterMetrics.unaccounted[wc].GetName(), counterMetrics.unaccounted[wc].Count()) + } + return buf.String() + default: return fmt.Sprintf("unknown command: %s", d.Cmd) } @@ -172,7 +199,11 @@ func TestTokenCounter(t *testing.T) { settings := cluster.MakeTestingClusterSettings() kvflowcontrol.ElasticTokensPerStream.Override(ctx, &settings.SV, int64(limits.elastic)) kvflowcontrol.RegularTokensPerStream.Override(ctx, &settings.SV, int64(limits.regular)) - counter := newTokenCounter(settings) + counter := newTokenCounter( + settings, + hlc.NewClockForTesting(nil), + newTokenCounterMetrics(flowControlEvalMetricType), + ) assertStateReset := func(t *testing.T) { available, handle := counter.TokensAvailable(admissionpb.ElasticWorkClass) @@ -342,9 +373,13 @@ func (ts *evalTestState) getOrCreateTC(stream string) *namedTokenCounter { tc, exists := ts.mu.counters[stream] if !exists { tc = &namedTokenCounter{ - parent: ts, - tokenCounter: newTokenCounter(ts.settings), - stream: stream, + parent: ts, + tokenCounter: newTokenCounter( + ts.settings, + hlc.NewClockForTesting(nil), + newTokenCounterMetrics(flowControlEvalMetricType), + ), + stream: stream, } // Ensure the token counter starts with no tokens initially. tc.adjust(context.Background(),