Skip to content

Commit

Permalink
kvstreamer: add some metrics
Browse files Browse the repository at this point in the history
This commit adds the following metrics:
- `kv.streamer.operators.active` tracks how many active streamer operators
are out there currently
- `kv.streamer.batches.count` tracks total number of BatchRequests sent
by all streamer instances
- `kv.streamer.batches.in_progress` tracks how many BatchRequests are
in flight currently, across all streamer instances
- `kv.streamer.batches.throttled` tracks how many BatchRequests are
being currently throttled (queued) due to reaching the streamer concurrency
limit, across all streamer instances.

It also includes a couple of recently added dist sender metrics into
roachprod opentelemetry registry.

Release note: None
  • Loading branch information
yuzefovich committed Dec 3, 2024
1 parent fd2fec8 commit 568266c
Show file tree
Hide file tree
Showing 14 changed files with 106 additions and 3 deletions.
4 changes: 4 additions & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -1530,6 +1530,10 @@
<tr><td>APPLICATION</td><td>kv.protectedts.reconciliation.num_runs</td><td>number of successful reconciliation runs on this node</td><td>Count</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>kv.protectedts.reconciliation.records_processed</td><td>number of records processed without error during reconciliation on this node</td><td>Count</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>kv.protectedts.reconciliation.records_removed</td><td>number of records removed during reconciliation runs on this node</td><td>Count</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>kv.streamer.batches.in_progress</td><td>Number of BatchRequests in progress across all KV Streamer operators</td><td>Batches</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>kv.streamer.batches.sent</td><td>Number of BatchRequests sent across all KV Streamer operators</td><td>Batches</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>kv.streamer.batches.throttled</td><td>Number of BatchRequests currently being throttled due to reaching the concurrency limit, across all KV Streamer operators</td><td>Batches</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>kv.streamer.operators.active</td><td>Number of KV Streamer operators currently in use</td><td>Operators</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.batch_hist_nanos</td><td>Time spent flushing a batch</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.catchup_ranges</td><td>Source side ranges undergoing catch up scans (innacurate with multiple LDR jobs)</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.catchup_ranges_by_label</td><td>Source side ranges undergoing catch up scans</td><td>Ranges</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvstreamer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"avg_response_estimator.go",
"budget.go",
"metrics.go",
"requests_provider.go",
"results_buffer.go",
"size.go",
Expand All @@ -30,6 +31,7 @@ go_library(
"//pkg/util/buildutil",
"//pkg/util/humanizeutil",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/quotapool",
"//pkg/util/stop",
Expand Down
55 changes: 55 additions & 0 deletions pkg/kv/kvclient/kvstreamer/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package kvstreamer

import "github.com/cockroachdb/cockroach/pkg/util/metric"

var (
metaStreamerCount = metric.Metadata{
Name: "kv.streamer.operators.active",
Help: "Number of KV Streamer operators currently in use",
Measurement: "Operators",
Unit: metric.Unit_COUNT,
}
metaBatchesSent = metric.Metadata{
Name: "kv.streamer.batches.sent",
Help: "Number of BatchRequests sent across all KV Streamer operators",
Measurement: "Batches",
Unit: metric.Unit_COUNT,
}
metaBatchesInProgress = metric.Metadata{
Name: "kv.streamer.batches.in_progress",
Help: "Number of BatchRequests in progress across all KV Streamer operators",
Measurement: "Batches",
Unit: metric.Unit_COUNT,
}
metaBatchesThrottled = metric.Metadata{
Name: "kv.streamer.batches.throttled",
Help: "Number of BatchRequests currently being throttled due to reaching the concurrency limit, across all KV Streamer operators",
Measurement: "Batches",
Unit: metric.Unit_COUNT,
}
)

type Metrics struct {
OperatorsCount *metric.Gauge
BatchesSent *metric.Counter
BatchesInProgress *metric.Gauge
BatchesThrottled *metric.Gauge
}

var _ metric.Struct = Metrics{}

func (Metrics) MetricStruct() {}

func MakeMetrics() Metrics {
return Metrics{
OperatorsCount: metric.NewGauge(metaStreamerCount),
BatchesSent: metric.NewCounter(metaBatchesSent),
BatchesInProgress: metric.NewGauge(metaBatchesInProgress),
BatchesThrottled: metric.NewGauge(metaBatchesThrottled),
}
}
24 changes: 21 additions & 3 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (r Result) Release(ctx context.Context) {
// TODO(yuzefovich): support pipelining of Enqueue and GetResults calls.
type Streamer struct {
distSender *kvcoord.DistSender
metrics *Metrics
stopper *stop.Stopper
// sd can be nil in tests.
sd *sessiondata.SessionData
Expand Down Expand Up @@ -378,6 +379,7 @@ type sendFn func(context.Context, *kvpb.BatchRequest) (*kvpb.BatchResponse, erro
// parameters of all received responses.
func NewStreamer(
distSender *kvcoord.DistSender,
metrics *Metrics,
stopper *stop.Stopper,
txn *kv.Txn,
sendFn func(context.Context, *kvpb.BatchRequest) (*kvpb.BatchResponse, error),
Expand Down Expand Up @@ -408,13 +410,15 @@ func NewStreamer(
}
s := &Streamer{
distSender: distSender,
metrics: metrics,
stopper: stopper,
sd: sd,
headOfLineOnlyFraction: headOfLineOnlyFraction,
budget: newBudget(acc, limitBytes),
lockStrength: lockStrength,
lockDurability: lockDurability,
}
s.metrics.OperatorsCount.Inc(1)

if kvPairsRead == nil {
kvPairsRead = new(int64)
Expand Down Expand Up @@ -827,6 +831,7 @@ func (s *Streamer) Close(ctx context.Context) {
// exited.
s.results.close(ctx)
}
s.metrics.OperatorsCount.Dec(1)
*s = Streamer{}
}

Expand Down Expand Up @@ -891,7 +896,6 @@ func (w *workerCoordinator) mainLoop(ctx context.Context) {
return
}

w.s.requestsToServe.Lock()
// The coordinator goroutine is the only one that removes requests from
// w.s.requestsToServe, so we can keep the reference to next request
// without holding the lock.
Expand All @@ -900,8 +904,11 @@ func (w *workerCoordinator) mainLoop(ctx context.Context) {
// issueRequestsForAsyncProcessing() another request with higher urgency
// is added; however, this is not a problem - we wait for available
// budget here on a best-effort basis.
nextReq := w.s.requestsToServe.nextLocked()
w.s.requestsToServe.Unlock()
nextReq := func() singleRangeBatch {
w.s.requestsToServe.Lock()
defer w.s.requestsToServe.Unlock()
return w.s.requestsToServe.nextLocked()
}()
// If we already have minTargetBytes set on the first request to be
// issued, then use that.
atLeastBytes := nextReq.minTargetBytes
Expand Down Expand Up @@ -1069,6 +1076,13 @@ func (w *workerCoordinator) getMaxNumRequestsToIssue(ctx context.Context) (_ int
}
// The whole quota is currently used up, so we blockingly acquire a quota of
// 1.
numBatches := func() int64 {
w.s.requestsToServe.Lock()
defer w.s.requestsToServe.Unlock()
return int64(w.s.requestsToServe.lengthLocked())
}()
w.s.metrics.BatchesThrottled.Inc(numBatches)
defer w.s.metrics.BatchesThrottled.Dec(numBatches)
alloc, err := w.asyncSem.Acquire(ctx, 1)
if err != nil {
w.s.results.setError(err)
Expand Down Expand Up @@ -1359,6 +1373,10 @@ func (w *workerCoordinator) performRequestAsync(
},
func(ctx context.Context) {
defer w.asyncRequestCleanup(false /* budgetMuAlreadyLocked */)
w.s.metrics.BatchesSent.Inc(1)
w.s.metrics.BatchesInProgress.Inc(1)
defer w.s.metrics.BatchesInProgress.Dec(1)

ba := &kvpb.BatchRequest{}
ba.Header.WaitPolicy = w.lockWaitPolicy
ba.Header.TargetBytes = targetBytes
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvstreamer/streamer_accounting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ func TestStreamerMemoryAccounting(t *testing.T) {
panic(err)
}
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.DistSQLPlanningNodeID(), leafInputState)
metrics := MakeMetrics()
s := NewStreamer(
s.DistSenderI().(*kvcoord.DistSender),
&metrics,
s.AppStopper(),
leafTxn,
func(ctx context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvclient/kvstreamer/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ func getStreamer(
panic(err)
}
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.DistSQLPlanningNodeID(), leafInputState)
metrics := kvstreamer.MakeMetrics()
return kvstreamer.NewStreamer(
s.DistSenderI().(*kvcoord.DistSender),
&metrics,
s.AppStopper(),
leafTxn,
func(ctx context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
Expand Down Expand Up @@ -113,9 +115,11 @@ func TestStreamerLimitations(t *testing.T) {
})

t.Run("unexpected RootTxn", func(t *testing.T) {
metrics := kvstreamer.MakeMetrics()
require.Panics(t, func() {
kvstreamer.NewStreamer(
s.DistSenderI().(*kvcoord.DistSender),
&metrics,
s.AppStopper(),
kv.NewTxn(ctx, s.DB(), s.DistSQLPlanningNodeID()),
nil, /* sendFn */
Expand Down
6 changes: 6 additions & 0 deletions pkg/roachprod/opentelemetry/cockroachdb_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,10 @@ var cockroachdbMetrics = map[string]string{
"distsender_batch_responses_cross_zone_bytes": "distsender.batch_responses.cross_zone.bytes",
"distsender_batch_responses_replica_addressed_bytes": "distsender.batch_responses.replica_addressed.bytes",
"distsender_batches": "distsender.batches.total",
"distsender_batches_async_in_progress": "distsender.batches.async.in_progress",
"distsender_batches_async_sent": "distsender.batches.async.sent",
"distsender_batches_async_throttled": "distsender.batches.async.throttled",
"distsender_batches_async_throttled_cumulative_duration_nanos": "distsender.batches.async.throttled_cumulative_duration_nanos",
"distsender_batches_partial": "distsender.batches.partial",
"distsender_circuit_breaker_replicas_count": "distsender.circuit_breaker.replicas.count",
"distsender_circuit_breaker_replicas_probes_failure": "distsender.circuit_breaker.replicas.probes.failure",
Expand Down Expand Up @@ -1013,6 +1015,10 @@ var cockroachdbMetrics = map[string]string{
"kv_replica_write_batch_evaluate_latency_sum": "kv.replica_write_batch_evaluate.latency.sum",
"kv_split_estimated_stats": "kv.split.estimated_stats",
"kv_split_total_bytes_estimates": "kv.split.total_bytes_estimates",
"kv_streamer_batches_in_progress": "kv.streamer.batches.in_progress",
"kv_streamer_batches_sent": "kv.streamer.batches.sent",
"kv_streamer_batches_throttled": "kv.streamer.batches.throttled",
"kv_streamer_operators_active": "kv.streamer.operators.active",
"kv_tenant_rate_limit_current_blocked": "kv.tenant_rate_limit.current_blocked",
"kv_tenant_rate_limit_num_tenants": "kv.tenant_rate_limit.num_tenants",
"kv_tenant_rate_limit_read_batches_admitted": "kv.tenant_rate_limit.read_batches_admitted",
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ go_library(
"//pkg/kv/bulk",
"//pkg/kv/kvclient",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvclient/kvstreamer",
"//pkg/kv/kvclient/kvtenant",
"//pkg/kv/kvclient/rangefeed",
"//pkg/kv/kvclient/rangestats",
Expand Down
4 changes: 4 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats"
Expand Down Expand Up @@ -745,6 +746,8 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
cfg.registry.AddMetricStruct(rowMetrics)
internalRowMetrics := sql.NewRowMetrics(true /* internal */)
cfg.registry.AddMetricStruct(internalRowMetrics)
kvStreamerMetrics := kvstreamer.MakeMetrics()
cfg.registry.AddMetricStruct(kvStreamerMetrics)

virtualSchemas, err := sql.NewVirtualSchemaHolder(ctx, cfg.Settings)
if err != nil {
Expand Down Expand Up @@ -839,6 +842,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
Metrics: &distSQLMetrics,
RowMetrics: &rowMetrics,
InternalRowMetrics: &internalRowMetrics,
KVStreamerMetrics: &kvStreamerMetrics,

SQLLivenessReader: cfg.sqlLivenessProvider.CachedReader(),
JobRegistry: jobRegistry,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colfetcher/index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@ func NewColIndexJoin(
}
kvFetcher = row.NewStreamingKVFetcher(
flowCtx.Cfg.DistSender,
flowCtx.Cfg.KVStreamerMetrics,
flowCtx.Stopper(),
txn,
flowCtx.Cfg.Settings,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/execinfra/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ go_library(
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvclient/kvstreamer",
"//pkg/kv/kvclient/rangecache",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/diskmap",
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/execinfra/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/diskmap"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
Expand Down Expand Up @@ -128,6 +129,7 @@ type ServerConfig struct {
Metrics *DistSQLMetrics
RowMetrics *rowinfra.Metrics
InternalRowMetrics *rowinfra.Metrics
KVStreamerMetrics *kvstreamer.Metrics

// SQLLivenessReader provides access to reading the liveness of sessions.
SQLLivenessReader sqlliveness.Reader
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/row/kv_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func NewKVFetcher(
// If maintainOrdering is true, then diskBuffer must be non-nil.
func NewStreamingKVFetcher(
distSender *kvcoord.DistSender,
metrics *kvstreamer.Metrics,
stopper *stop.Stopper,
txn *kv.Txn,
st *cluster.Settings,
Expand All @@ -204,6 +205,7 @@ func NewStreamingKVFetcher(
sendFn := makeSendFunc(txn, ext, &batchRequestsIssued)
streamer := kvstreamer.NewStreamer(
distSender,
metrics,
stopper,
txn,
sendFn,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@ func newJoinReader(
singleRowLookup := readerType == indexJoinReaderType || spec.LookupColumnsAreKey
streamingKVFetcher = row.NewStreamingKVFetcher(
flowCtx.Cfg.DistSender,
flowCtx.Cfg.KVStreamerMetrics,
flowCtx.Stopper(),
jr.txn,
flowCtx.Cfg.Settings,
Expand Down

0 comments on commit 568266c

Please sign in to comment.