Skip to content

Commit

Permalink
kvstreamer: add some metrics
Browse files Browse the repository at this point in the history
This commit adds the following metrics:
- `kv.streamer.operators_count` tracks how many active streamer operators
are out there currently
- `kv.streamer.requests_in_progress` tracks how many BatchRequests are
in flight currently, across all streamer instances
- `kv.streamer.requests_throttled` tracks how many BatchRequests are
being throttled (queued) due to reaching the streamer concurrency limit,
across all streamer instances.

Release note: None
  • Loading branch information
yuzefovich committed Nov 28, 2024
1 parent 0c48a8e commit 13c1da0
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 3 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvstreamer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"avg_response_estimator.go",
"budget.go",
"metrics.go",
"requests_provider.go",
"results_buffer.go",
"size.go",
Expand All @@ -30,6 +31,7 @@ go_library(
"//pkg/util/buildutil",
"//pkg/util/humanizeutil",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/quotapool",
"//pkg/util/stop",
Expand Down
47 changes: 47 additions & 0 deletions pkg/kv/kvclient/kvstreamer/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package kvstreamer

import "github.com/cockroachdb/cockroach/pkg/util/metric"

var (
metaStreamerCount = metric.Metadata{
Name: "kv.streamer.operators_count",
Help: "Number of KV Streamer operators currently in use",
Measurement: "Operators",
Unit: metric.Unit_COUNT,
}
metaStreamerRequestsInProgress = metric.Metadata{
Name: "kv.streamer.requests_in_progress",
Help: "Number of BatchRequests in progress across all KV Streamer operators",
Measurement: "Batches",
Unit: metric.Unit_COUNT,
}
metaStreamerRequestsThrottled = metric.Metadata{
Name: "kv.streamer.requests_throttled",
Help: "Number of BatchRequests currently throttled due to reaching the concurrency limit, across all KV Streamer operators",
Measurement: "Batches",
Unit: metric.Unit_COUNT,
}
)

type Metrics struct {
OperatorsCount *metric.Gauge
RequestsInProgress *metric.Gauge
RequestsThrottled *metric.Gauge
}

var _ metric.Struct = Metrics{}

func (Metrics) MetricStruct() {}

func MakeMetrics() Metrics {
return Metrics{
OperatorsCount: metric.NewGauge(metaStreamerCount),
RequestsInProgress: metric.NewGauge(metaStreamerRequestsInProgress),
RequestsThrottled: metric.NewGauge(metaStreamerRequestsThrottled),
}
}
23 changes: 20 additions & 3 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (r Result) Release(ctx context.Context) {
// TODO(yuzefovich): support pipelining of Enqueue and GetResults calls.
type Streamer struct {
distSender *kvcoord.DistSender
metrics *Metrics
stopper *stop.Stopper
// sd can be nil in tests.
sd *sessiondata.SessionData
Expand Down Expand Up @@ -378,6 +379,7 @@ type sendFn func(context.Context, *kvpb.BatchRequest) (*kvpb.BatchResponse, erro
// parameters of all received responses.
func NewStreamer(
distSender *kvcoord.DistSender,
metrics *Metrics,
stopper *stop.Stopper,
txn *kv.Txn,
sendFn func(context.Context, *kvpb.BatchRequest) (*kvpb.BatchResponse, error),
Expand Down Expand Up @@ -408,13 +410,15 @@ func NewStreamer(
}
s := &Streamer{
distSender: distSender,
metrics: metrics,
stopper: stopper,
sd: sd,
headOfLineOnlyFraction: headOfLineOnlyFraction,
budget: newBudget(acc, limitBytes),
lockStrength: lockStrength,
lockDurability: lockDurability,
}
s.metrics.OperatorsCount.Inc(1)

if kvPairsRead == nil {
kvPairsRead = new(int64)
Expand Down Expand Up @@ -827,6 +831,7 @@ func (s *Streamer) Close(ctx context.Context) {
// exited.
s.results.close(ctx)
}
s.metrics.OperatorsCount.Dec(1)
*s = Streamer{}
}

Expand Down Expand Up @@ -891,7 +896,6 @@ func (w *workerCoordinator) mainLoop(ctx context.Context) {
return
}

w.s.requestsToServe.Lock()
// The coordinator goroutine is the only one that removes requests from
// w.s.requestsToServe, so we can keep the reference to next request
// without holding the lock.
Expand All @@ -900,8 +904,11 @@ func (w *workerCoordinator) mainLoop(ctx context.Context) {
// issueRequestsForAsyncProcessing() another request with higher urgency
// is added; however, this is not a problem - we wait for available
// budget here on a best-effort basis.
nextReq := w.s.requestsToServe.nextLocked()
w.s.requestsToServe.Unlock()
nextReq := func() singleRangeBatch {
w.s.requestsToServe.Lock()
defer w.s.requestsToServe.Unlock()
return w.s.requestsToServe.nextLocked()
}()
// If we already have minTargetBytes set on the first request to be
// issued, then use that.
atLeastBytes := nextReq.minTargetBytes
Expand Down Expand Up @@ -1069,6 +1076,13 @@ func (w *workerCoordinator) getMaxNumRequestsToIssue(ctx context.Context) (_ int
}
// The whole quota is currently used up, so we blockingly acquire a quota of
// 1.
numBatches := func() int64 {
w.s.requestsToServe.Lock()
defer w.s.requestsToServe.Unlock()
return int64(w.s.requestsToServe.lengthLocked())
}()
w.s.metrics.RequestsThrottled.Inc(numBatches)
defer w.s.metrics.RequestsThrottled.Dec(numBatches)
alloc, err := w.asyncSem.Acquire(ctx, 1)
if err != nil {
w.s.results.setError(err)
Expand Down Expand Up @@ -1359,6 +1373,9 @@ func (w *workerCoordinator) performRequestAsync(
},
func(ctx context.Context) {
defer w.asyncRequestCleanup(false /* budgetMuAlreadyLocked */)
w.s.metrics.RequestsInProgress.Inc(1)
defer w.s.metrics.RequestsInProgress.Dec(1)

ba := &kvpb.BatchRequest{}
ba.Header.WaitPolicy = w.lockWaitPolicy
ba.Header.TargetBytes = targetBytes
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvstreamer/streamer_accounting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ func TestStreamerMemoryAccounting(t *testing.T) {
panic(err)
}
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.DistSQLPlanningNodeID(), leafInputState)
metrics := MakeMetrics()
s := NewStreamer(
s.DistSenderI().(*kvcoord.DistSender),
&metrics,
s.AppStopper(),
leafTxn,
func(ctx context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvclient/kvstreamer/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ func getStreamer(
panic(err)
}
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.DistSQLPlanningNodeID(), leafInputState)
metrics := kvstreamer.MakeMetrics()
return kvstreamer.NewStreamer(
s.DistSenderI().(*kvcoord.DistSender),
&metrics,
s.AppStopper(),
leafTxn,
func(ctx context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
Expand Down Expand Up @@ -113,9 +115,11 @@ func TestStreamerLimitations(t *testing.T) {
})

t.Run("unexpected RootTxn", func(t *testing.T) {
metrics := kvstreamer.MakeMetrics()
require.Panics(t, func() {
kvstreamer.NewStreamer(
s.DistSenderI().(*kvcoord.DistSender),
&metrics,
s.AppStopper(),
kv.NewTxn(ctx, s.DB(), s.DistSQLPlanningNodeID()),
nil, /* sendFn */
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ go_library(
"//pkg/kv/bulk",
"//pkg/kv/kvclient",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvclient/kvstreamer",
"//pkg/kv/kvclient/kvtenant",
"//pkg/kv/kvclient/rangefeed",
"//pkg/kv/kvclient/rangestats",
Expand Down
4 changes: 4 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats"
Expand Down Expand Up @@ -745,6 +746,8 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
cfg.registry.AddMetricStruct(rowMetrics)
internalRowMetrics := sql.NewRowMetrics(true /* internal */)
cfg.registry.AddMetricStruct(internalRowMetrics)
kvStreamerMetrics := kvstreamer.MakeMetrics()
cfg.registry.AddMetricStruct(kvStreamerMetrics)

virtualSchemas, err := sql.NewVirtualSchemaHolder(ctx, cfg.Settings)
if err != nil {
Expand Down Expand Up @@ -839,6 +842,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
Metrics: &distSQLMetrics,
RowMetrics: &rowMetrics,
InternalRowMetrics: &internalRowMetrics,
KVStreamerMetrics: &kvStreamerMetrics,

SQLLivenessReader: cfg.sqlLivenessProvider.CachedReader(),
JobRegistry: jobRegistry,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colfetcher/index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@ func NewColIndexJoin(
}
kvFetcher = row.NewStreamingKVFetcher(
flowCtx.Cfg.DistSender,
flowCtx.Cfg.KVStreamerMetrics,
flowCtx.Stopper(),
txn,
flowCtx.Cfg.Settings,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/execinfra/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ go_library(
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvclient/kvstreamer",
"//pkg/kv/kvclient/rangecache",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/diskmap",
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/execinfra/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/diskmap"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
Expand Down Expand Up @@ -128,6 +129,7 @@ type ServerConfig struct {
Metrics *DistSQLMetrics
RowMetrics *rowinfra.Metrics
InternalRowMetrics *rowinfra.Metrics
KVStreamerMetrics *kvstreamer.Metrics

// SQLLivenessReader provides access to reading the liveness of sessions.
SQLLivenessReader sqlliveness.Reader
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/row/kv_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func NewKVFetcher(
// If maintainOrdering is true, then diskBuffer must be non-nil.
func NewStreamingKVFetcher(
distSender *kvcoord.DistSender,
metrics *kvstreamer.Metrics,
stopper *stop.Stopper,
txn *kv.Txn,
st *cluster.Settings,
Expand All @@ -204,6 +205,7 @@ func NewStreamingKVFetcher(
sendFn := makeSendFunc(txn, ext, &batchRequestsIssued)
streamer := kvstreamer.NewStreamer(
distSender,
metrics,
stopper,
txn,
sendFn,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@ func newJoinReader(
singleRowLookup := readerType == indexJoinReaderType || spec.LookupColumnsAreKey
streamingKVFetcher = row.NewStreamingKVFetcher(
flowCtx.Cfg.DistSender,
flowCtx.Cfg.KVStreamerMetrics,
flowCtx.Stopper(),
jr.txn,
flowCtx.Cfg.Settings,
Expand Down

0 comments on commit 13c1da0

Please sign in to comment.