Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
129911: rac2: add eval wait metrics r=sumeerbhola a=kvoli

This commit introduces `EvalWaitMetrics`, which aggregate request
information:

- \# requests currently waiting
- \# requests admitted
- \# requests bypassed
- \# requests errored
- requests wait duration (histogram)

These are provided to the `RangeController` via the
`RangeControllerOptions` and will be shared between all
`RangeController`s on the same node.

Hooking up these metrics to the registry is deferred to a subsequent
commit, similar to the token and stream metrics. As such, a release note
is also deferred.

Part of: cockroachdb#128031
Release note: None

Co-authored-by: Austen McClernon <[email protected]>
  • Loading branch information
craig[bot] and kvoli committed Aug 31, 2024
2 parents 7310238 + fc35f41 commit a304b34
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 8 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/kv/kvserver/kvflowcontrol",
"//pkg/kv/kvserver/raftlog",
"//pkg/raft/raftpb",
Expand Down
105 changes: 105 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ package rac2

import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/redact"
Expand Down Expand Up @@ -58,6 +60,38 @@ var (
Measurement: "Count",
Unit: metric.Unit_COUNT,
}
// WaitForEval metrics.
requestsWaiting = metric.Metadata{
Name: "kvflowcontrol.eval_wait.%s.requests.waiting",
Help: "Number of %s requests waiting for flow tokens",
Measurement: "Requests",
Unit: metric.Unit_COUNT,
}
requestsAdmitted = metric.Metadata{
Name: "kvflowcontrol.eval_wait.%s.requests.admitted",
Help: "Number of %s requests admitted by the flow controller",
Measurement: "Requests",
Unit: metric.Unit_COUNT,
}
requestsErrored = metric.Metadata{
Name: "kvflowcontrol.eval_wait.%s.requests.errored",
Help: "Number of %s requests that errored out while waiting for flow tokens",
Measurement: "Requests",
Unit: metric.Unit_COUNT,
}
requestsBypassed = metric.Metadata{
Name: "kvflowcontrol.eval_wait.%s.requests.bypassed",
Help: "Number of waiting %s requests that bypassed the flow " +
"controller due the evaluating replica not being the leader",
Measurement: "Requests",
Unit: metric.Unit_COUNT,
}
waitDuration = metric.Metadata{
Name: "kvflowcontrol.eval_wait.%s.duration",
Help: "Latency histogram for time %s requests spent waiting for flow tokens to evaluate",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
)

// annotateMetricTemplateWithWorkClass uses the given metric template to build
Expand All @@ -71,6 +105,17 @@ func annotateMetricTemplateWithWorkClassAndType(
return rv
}

// annotateMetricTemplateWithWorkClass uses the given metric template to build
// one suitable for the specific work class.
func annotateMetricTemplateWithWorkClass(
wc admissionpb.WorkClass, tmpl metric.Metadata,
) metric.Metadata {
rv := tmpl
rv.Name = fmt.Sprintf(tmpl.Name, wc)
rv.Help = fmt.Sprintf(tmpl.Help, wc)
return rv
}

type flowControlMetricType int

const (
Expand Down Expand Up @@ -179,3 +224,63 @@ func newTokenStreamMetrics(t flowControlMetricType) *tokenStreamMetrics {
}
return m
}

type EvalWaitMetrics struct {
waiting [admissionpb.NumWorkClasses]*metric.Gauge
admitted [admissionpb.NumWorkClasses]*metric.Counter
errored [admissionpb.NumWorkClasses]*metric.Counter
bypassed [admissionpb.NumWorkClasses]*metric.Counter
duration [admissionpb.NumWorkClasses]metric.IHistogram
}

func NewEvalWaitMetrics() *EvalWaitMetrics {
m := &EvalWaitMetrics{}
for _, wc := range []admissionpb.WorkClass{
admissionpb.RegularWorkClass,
admissionpb.ElasticWorkClass,
} {
m.waiting[wc] = metric.NewGauge(
annotateMetricTemplateWithWorkClass(wc, requestsWaiting),
)
m.admitted[wc] = metric.NewCounter(
annotateMetricTemplateWithWorkClass(wc, requestsAdmitted),
)
m.errored[wc] = metric.NewCounter(
annotateMetricTemplateWithWorkClass(wc, requestsErrored),
)
m.bypassed[wc] = metric.NewCounter(
annotateMetricTemplateWithWorkClass(wc, requestsBypassed),
)
m.duration[wc] = metric.NewHistogram(
metric.HistogramOptions{
Metadata: annotateMetricTemplateWithWorkClass(wc, waitDuration),
Duration: base.DefaultHistogramWindowInterval(),
BucketConfig: metric.IOLatencyBuckets,
Mode: metric.HistogramModePrometheus,
},
)
}
return m
}

func (e *EvalWaitMetrics) onWaiting(wc admissionpb.WorkClass) {
e.waiting[wc].Inc(1)
}

func (e *EvalWaitMetrics) onAdmitted(wc admissionpb.WorkClass, dur time.Duration) {
e.admitted[wc].Inc(1)
e.waiting[wc].Dec(1)
e.duration[wc].RecordValue(dur.Nanoseconds())
}

func (e *EvalWaitMetrics) onBypassed(wc admissionpb.WorkClass, dur time.Duration) {
e.bypassed[wc].Inc(1)
e.waiting[wc].Dec(1)
e.duration[wc].RecordValue(dur.Nanoseconds())
}

func (e *EvalWaitMetrics) onErrored(wc admissionpb.WorkClass, dur time.Duration) {
e.errored[wc].Inc(1)
e.waiting[wc].Dec(1)
e.duration[wc].RecordValue(dur.Nanoseconds())
}
15 changes: 13 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/raft/tracker"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -179,8 +180,10 @@ type RangeControllerOptions struct {
LocalReplicaID roachpb.ReplicaID
// SSTokenCounter provides access to all the TokenCounters that will be
// needed (keyed by (tenantID, storeID)).
SSTokenCounter *StreamTokenCounterProvider
RaftInterface RaftInterface
SSTokenCounter *StreamTokenCounterProvider
RaftInterface RaftInterface
Clock *hlc.Clock
EvalWaitMetrics *EvalWaitMetrics
}

// RangeControllerInitState is the initial state at the time of creation.
Expand Down Expand Up @@ -257,6 +260,8 @@ func (rc *rangeController) WaitForEval(
var handles []tokenWaitingHandleInfo
var scratch []reflect.SelectCase

rc.opts.EvalWaitMetrics.onWaiting(wc)
start := rc.opts.Clock.PhysicalTime()
retry:
// Snapshot the voterSets and voterSetRefreshCh.
rc.mu.Lock()
Expand All @@ -266,6 +271,10 @@ retry:

if vssRefreshCh == nil {
// RangeControllerImpl is closed.
// TODO(kvoli): We also need to do this in the replica_rac2.Processor,
// which will allow requests to bypass when a replica is not the leader and
// therefore the controller is closed.
rc.opts.EvalWaitMetrics.onBypassed(wc, rc.opts.Clock.PhysicalTime().Sub(start))
return false, nil
}
for _, vs := range vss {
Expand Down Expand Up @@ -302,12 +311,14 @@ retry:
case WaitSuccess:
continue
case ContextCanceled:
rc.opts.EvalWaitMetrics.onErrored(wc, rc.opts.Clock.PhysicalTime().Sub(start))
return false, ctx.Err()
case RefreshWaitSignaled:
goto retry
}
}
}
rc.opts.EvalWaitMetrics.onAdmitted(wc, rc.opts.Clock.PhysicalTime().Sub(start))
return true, nil
}

Expand Down
45 changes: 39 additions & 6 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,15 +363,19 @@ func testingCreateEntry(t *testing.T, info entryInfo) raftpb.Entry {
// - stream_state: Prints the state of the stream(s) for the given range's
// replicas.
// range_id=<range_id>
//
// - metrics: Prints the current state of the eval metrics.
func TestRangeController(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

datadriven.Walk(t, datapathutils.TestDataPath(t, "range_controller"), func(t *testing.T, path string) {
clock := hlc.NewClockForTesting(nil)
settings := cluster.MakeTestingClusterSettings()
ranges := make(map[roachpb.RangeID]*testingRCRange)
ssTokenCounter := NewStreamTokenCounterProvider(settings, hlc.NewClockForTesting(nil))
ssTokenCounter := NewStreamTokenCounterProvider(settings, clock)
evalMetrics := NewEvalWaitMetrics()

// setTokenCounters is used to ensure that we only set the initial token
// counts once per counter.
Expand Down Expand Up @@ -530,11 +534,13 @@ func TestRangeController(t *testing.T) {
testRC.mu.r = r
testRC.mu.evals = make(map[string]*testingRCEval)
options := RangeControllerOptions{
RangeID: r.rangeID,
TenantID: r.tenantID,
LocalReplicaID: r.localReplicaID,
SSTokenCounter: ssTokenCounter,
RaftInterface: testRC,
RangeID: r.rangeID,
TenantID: r.tenantID,
LocalReplicaID: r.localReplicaID,
SSTokenCounter: ssTokenCounter,
RaftInterface: testRC,
Clock: clock,
EvalWaitMetrics: evalMetrics,
}

init := RangeControllerInitState{
Expand Down Expand Up @@ -804,6 +810,26 @@ func TestRangeController(t *testing.T) {
d.ScanArgs(t, "range_id", &rangeID)
return sendStreamString(roachpb.RangeID(rangeID))

case "metrics":
var buf strings.Builder

for _, wc := range []admissionpb.WorkClass{
admissionpb.RegularWorkClass,
admissionpb.ElasticWorkClass,
} {
fmt.Fprintf(&buf, "%-50v: %v\n", evalMetrics.waiting[wc].GetName(), evalMetrics.waiting[wc].Value())
fmt.Fprintf(&buf, "%-50v: %v\n", evalMetrics.admitted[wc].GetName(), evalMetrics.admitted[wc].Count())
fmt.Fprintf(&buf, "%-50v: %v\n", evalMetrics.errored[wc].GetName(), evalMetrics.errored[wc].Count())
fmt.Fprintf(&buf, "%-50v: %v\n", evalMetrics.bypassed[wc].GetName(), evalMetrics.bypassed[wc].Count())
// We only print the number of recorded durations, instead of any
// percentiles or cumulative wait times as these are
// non-deterministic in the test.
fmt.Fprintf(&buf, "%-50v: %v\n",
fmt.Sprintf("%v.count", evalMetrics.duration[wc].GetName()),
testingFirst(evalMetrics.duration[wc].CumulativeSnapshot().Total()))
}
return buf.String()

default:
panic(fmt.Sprintf("unknown command: %s", d.Cmd))
}
Expand Down Expand Up @@ -916,3 +942,10 @@ func TestGetEntryFCState(t *testing.T) {
})
}
}

func testingFirst(args ...interface{}) interface{} {
if len(args) > 0 {
return args[0]
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,19 @@ t1/s1: reg=+0 B/+1 B ela=+0 B/+1 B
t1/s2: reg=+0 B/+1 B ela=+0 B/+1 B
t1/s3: reg=+0 B/+1 B ela=+0 B/+1 B

metrics
----
kvflowcontrol.eval_wait.regular.requests.waiting : 0
kvflowcontrol.eval_wait.regular.requests.admitted : 0
kvflowcontrol.eval_wait.regular.requests.errored : 0
kvflowcontrol.eval_wait.regular.requests.bypassed : 0
kvflowcontrol.eval_wait.regular.duration.count : 0
kvflowcontrol.eval_wait.elastic.requests.waiting : 0
kvflowcontrol.eval_wait.elastic.requests.admitted : 0
kvflowcontrol.eval_wait.elastic.requests.errored : 0
kvflowcontrol.eval_wait.elastic.requests.bypassed : 0
kvflowcontrol.eval_wait.elastic.duration.count : 0

# Start a high priority evaluation. It should not complete due to lack of
# tokens.
wait_for_eval name=a range_id=1 pri=HighPri
Expand Down Expand Up @@ -259,6 +272,19 @@ range_id=1 tenant_id={1} local_replica_id=1
range_id=2 tenant_id={1} local_replica_id=1
name=g pri=high-pri done=true waited=true err=<nil>

metrics
----
kvflowcontrol.eval_wait.regular.requests.waiting : 0
kvflowcontrol.eval_wait.regular.requests.admitted : 4
kvflowcontrol.eval_wait.regular.requests.errored : 1
kvflowcontrol.eval_wait.regular.requests.bypassed : 0
kvflowcontrol.eval_wait.regular.duration.count : 5
kvflowcontrol.eval_wait.elastic.requests.waiting : 0
kvflowcontrol.eval_wait.elastic.requests.admitted : 2
kvflowcontrol.eval_wait.elastic.requests.errored : 0
kvflowcontrol.eval_wait.elastic.requests.bypassed : 0
kvflowcontrol.eval_wait.elastic.duration.count : 2

# Adjust the tokens so that r1 doesn't have tokens on s3 or s1, then transfer
# s3 the lease for r1.
adjust_tokens
Expand Down Expand Up @@ -386,3 +412,16 @@ range_id=1 tenant_id={1} local_replica_id=1
name=j pri=high-pri done=true waited=false err=<nil>
range_id=2 tenant_id={1} local_replica_id=1
name=g pri=high-pri done=true waited=true err=<nil>

metrics
----
kvflowcontrol.eval_wait.regular.requests.waiting : 0
kvflowcontrol.eval_wait.regular.requests.admitted : 6
kvflowcontrol.eval_wait.regular.requests.errored : 1
kvflowcontrol.eval_wait.regular.requests.bypassed : 1
kvflowcontrol.eval_wait.regular.duration.count : 8
kvflowcontrol.eval_wait.elastic.requests.waiting : 0
kvflowcontrol.eval_wait.elastic.requests.admitted : 2
kvflowcontrol.eval_wait.elastic.requests.errored : 0
kvflowcontrol.eval_wait.elastic.requests.bypassed : 0
kvflowcontrol.eval_wait.elastic.duration.count : 2

0 comments on commit a304b34

Please sign in to comment.