Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/util/metric: option to use legacy hdrhistogram model, increase bucket fidelity #96029

Merged
merged 2 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 76 additions & 20 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

const (
changefeedCheckpointHistMaxLatency = 30 * time.Second
changefeedBatchHistMaxLatency = 30 * time.Second
changefeedFlushHistMaxLatency = 1 * time.Minute
admitLatencyMaxValue = 1 * time.Minute
commitLatencyMaxValue = 10 * time.Minute
)

// max length for the scope name.
const maxSLIScopeNameLen = 128

Expand Down Expand Up @@ -488,16 +496,46 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
ErrorRetries: b.Counter(metaChangefeedErrorRetries),
EmittedMessages: b.Counter(metaChangefeedEmittedMessages),
FilteredMessages: b.Counter(metaChangefeedFilteredMessages),
MessageSize: b.Histogram(metaMessageSize, histogramWindow, metric.DataSize16MBBuckets),
MessageSize: b.Histogram(metric.HistogramOptions{
Metadata: metaMessageSize,
Duration: histogramWindow,
MaxVal: 10 << 20, /* 10MB max message size */
SigFigs: 1,
Buckets: metric.DataSize16MBBuckets,
}),
EmittedBytes: b.Counter(metaChangefeedEmittedBytes),
FlushedBytes: b.Counter(metaChangefeedFlushedBytes),
Flushes: b.Counter(metaChangefeedFlushes),
SizeBasedFlushes: b.Counter(metaSizeBasedFlushes),

BatchHistNanos: b.Histogram(metaChangefeedBatchHistNanos, histogramWindow, metric.BatchProcessLatencyBuckets),
FlushHistNanos: b.Histogram(metaChangefeedFlushHistNanos, histogramWindow, metric.BatchProcessLatencyBuckets),
CommitLatency: b.Histogram(metaCommitLatency, histogramWindow, metric.BatchProcessLatencyBuckets),
AdmitLatency: b.Histogram(metaAdmitLatency, histogramWindow, metric.BatchProcessLatencyBuckets),
BatchHistNanos: b.Histogram(metric.HistogramOptions{
Metadata: metaChangefeedBatchHistNanos,
Duration: histogramWindow,
MaxVal: changefeedBatchHistMaxLatency.Nanoseconds(),
SigFigs: 1,
Buckets: metric.BatchProcessLatencyBuckets,
}),
FlushHistNanos: b.Histogram(metric.HistogramOptions{
Metadata: metaChangefeedFlushHistNanos,
Duration: histogramWindow,
MaxVal: changefeedFlushHistMaxLatency.Nanoseconds(),
SigFigs: 2,
Buckets: metric.BatchProcessLatencyBuckets,
}),
CommitLatency: b.Histogram(metric.HistogramOptions{
Metadata: metaCommitLatency,
Duration: histogramWindow,
MaxVal: commitLatencyMaxValue.Nanoseconds(),
SigFigs: 1,
Buckets: metric.BatchProcessLatencyBuckets,
}),
AdmitLatency: b.Histogram(metric.HistogramOptions{
Metadata: metaAdmitLatency,
Duration: histogramWindow,
MaxVal: admitLatencyMaxValue.Nanoseconds(),
SigFigs: 1,
Buckets: metric.BatchProcessLatencyBuckets,
}),
BackfillCount: b.Gauge(metaChangefeedBackfillCount),
BackfillPendingRanges: b.Gauge(metaChangefeedBackfillPendingRanges),
RunningCount: b.Gauge(metaChangefeedRunning),
Expand Down Expand Up @@ -572,12 +610,12 @@ type Metrics struct {
Failures *metric.Counter
ResolvedMessages *metric.Counter
QueueTimeNanos *metric.Counter
CheckpointHistNanos *metric.Histogram
CheckpointHistNanos metric.IHistogram
FrontierUpdates *metric.Counter
ThrottleMetrics cdcutils.Metrics
ReplanCount *metric.Counter
ParallelConsumerFlushNanos *metric.Histogram
ParallelConsumerConsumeNanos *metric.Histogram
ParallelConsumerFlushNanos metric.IHistogram
ParallelConsumerConsumeNanos metric.IHistogram
ParallelConsumerInFlightEvents *metric.Gauge

mu struct {
Expand All @@ -599,18 +637,36 @@ func (m *Metrics) getSLIMetrics(scope string) (*sliMetrics, error) {
// MakeMetrics makes the metrics for changefeed monitoring.
func MakeMetrics(histogramWindow time.Duration) metric.Struct {
m := &Metrics{
AggMetrics: newAggregateMetrics(histogramWindow),
KVFeedMetrics: kvevent.MakeMetrics(histogramWindow),
SchemaFeedMetrics: schemafeed.MakeMetrics(histogramWindow),
ResolvedMessages: metric.NewCounter(metaChangefeedForwardedResolvedMessages),
Failures: metric.NewCounter(metaChangefeedFailures),
QueueTimeNanos: metric.NewCounter(metaEventQueueTime),
CheckpointHistNanos: metric.NewHistogram(metaChangefeedCheckpointHistNanos, histogramWindow, metric.IOLatencyBuckets),
FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates),
ThrottleMetrics: cdcutils.MakeMetrics(histogramWindow),
ReplanCount: metric.NewCounter(metaChangefeedReplanCount),
ParallelConsumerFlushNanos: metric.NewHistogram(metaChangefeedEventConsumerFlushNanos, histogramWindow, metric.IOLatencyBuckets),
ParallelConsumerConsumeNanos: metric.NewHistogram(metaChangefeedEventConsumerConsumeNanos, histogramWindow, metric.IOLatencyBuckets),
AggMetrics: newAggregateMetrics(histogramWindow),
KVFeedMetrics: kvevent.MakeMetrics(histogramWindow),
SchemaFeedMetrics: schemafeed.MakeMetrics(histogramWindow),
ResolvedMessages: metric.NewCounter(metaChangefeedForwardedResolvedMessages),
Failures: metric.NewCounter(metaChangefeedFailures),
QueueTimeNanos: metric.NewCounter(metaEventQueueTime),
CheckpointHistNanos: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaChangefeedCheckpointHistNanos,
Duration: histogramWindow,
MaxVal: changefeedCheckpointHistMaxLatency.Nanoseconds(),
SigFigs: 2,
Buckets: metric.IOLatencyBuckets,
}),
FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates),
ThrottleMetrics: cdcutils.MakeMetrics(histogramWindow),
ReplanCount: metric.NewCounter(metaChangefeedReplanCount),
// Below two metrics were never implemented using the hdr histogram. Set ForceUsePrometheus
// to true.
ParallelConsumerFlushNanos: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaChangefeedEventConsumerFlushNanos,
Duration: histogramWindow,
Buckets: metric.IOLatencyBuckets,
Mode: metric.HistogramModePrometheus,
}),
ParallelConsumerConsumeNanos: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaChangefeedEventConsumerConsumeNanos,
Duration: histogramWindow,
Buckets: metric.IOLatencyBuckets,
Mode: metric.HistogramModePrometheus,
}),
ParallelConsumerInFlightEvents: metric.NewGauge(metaChangefeedEventConsumerInFlightEvents),
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/sqlproxyccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type connector struct {

// DialTenantLatency tracks how long it takes to retrieve the address for
// a tenant and set up a tcp connection to the address.
DialTenantLatency *metric.Histogram
DialTenantLatency metric.IHistogram

// DialTenantRetries counts how often dialing a tenant is retried.
DialTenantRetries *metric.Counter
Expand Down
27 changes: 18 additions & 9 deletions pkg/ccl/sqlproxyccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,12 @@ func TestConnector_dialTenantCluster(t *testing.T) {

c := &connector{
TenantID: roachpb.MustMakeTenantID(42),
DialTenantLatency: metric.NewHistogram(
metaDialTenantLatency, time.Millisecond, metric.NetworkLatencyBuckets,
),
DialTenantLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePrometheus,
Metadata: metaDialTenantLatency,
Duration: time.Millisecond,
Buckets: metric.NetworkLatencyBuckets,
}),
DialTenantRetries: metric.NewCounter(metaDialTenantRetries),
}
dc := &testTenantDirectoryCache{}
Expand Down Expand Up @@ -460,9 +463,12 @@ func TestConnector_dialTenantCluster(t *testing.T) {
defer cancel()

c := &connector{
DialTenantLatency: metric.NewHistogram(
metaDialTenantLatency, time.Millisecond, metric.NetworkLatencyBuckets,
),
DialTenantLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaDialTenantLatency,
Duration: time.Millisecond,
Buckets: metric.NetworkLatencyBuckets,
}),
DialTenantRetries: metric.NewCounter(metaDialTenantRetries),
}
c.testingKnobs.lookupAddr = func(ctx context.Context) (string, error) {
Expand Down Expand Up @@ -491,9 +497,12 @@ func TestConnector_dialTenantCluster(t *testing.T) {
var reportFailureFnCount int
c := &connector{
TenantID: roachpb.MustMakeTenantID(42),
DialTenantLatency: metric.NewHistogram(
metaDialTenantLatency, time.Millisecond, metric.NetworkLatencyBuckets,
),
DialTenantLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaDialTenantLatency,
Duration: time.Millisecond,
Buckets: metric.NetworkLatencyBuckets,
}),
DialTenantRetries: metric.NewCounter(metaDialTenantRetries),
}
c.DirectoryCache = &testTenantDirectoryCache{
Expand Down
61 changes: 38 additions & 23 deletions pkg/ccl/sqlproxyccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@ type metrics struct {
RoutingErrCount *metric.Counter
RefusedConnCount *metric.Counter
SuccessfulConnCount *metric.Counter
ConnectionLatency *metric.Histogram
ConnectionLatency metric.IHistogram
AuthFailedCount *metric.Counter
ExpiredClientConnCount *metric.Counter

DialTenantLatency *metric.Histogram
DialTenantLatency metric.IHistogram
DialTenantRetries *metric.Counter

ConnMigrationSuccessCount *metric.Counter
ConnMigrationErrorFatalCount *metric.Counter
ConnMigrationErrorRecoverableCount *metric.Counter
ConnMigrationAttemptedCount *metric.Counter
ConnMigrationAttemptedLatency *metric.Histogram
ConnMigrationTransferResponseMessageSize *metric.Histogram
ConnMigrationAttemptedLatency metric.IHistogram
ConnMigrationTransferResponseMessageSize metric.IHistogram

QueryCancelReceivedPGWire *metric.Counter
QueryCancelReceivedHTTP *metric.Counter
Expand All @@ -49,6 +49,16 @@ func (metrics) MetricStruct() {}

var _ metric.Struct = metrics{}

const (
// maxExpectedTransferResponseMessageSize corresponds to maximum expected
// response message size for the SHOW TRANSFER STATE query. We choose 16MB
// here to match the defaultMaxReadBufferSize used for ingesting SQL
// statements in the SQL server (see pkg/sql/pgwire/pgwirebase/encoding.go).
//
// This will be used to tune sql.session_transfer.max_session_size.
maxExpectedTransferResponseMessageSize = 1 << 24 // 16MB
)

var (
metaCurConnCount = metric.Metadata{
Name: "proxy.sql.conns",
Expand Down Expand Up @@ -213,35 +223,40 @@ func makeProxyMetrics() metrics {
RoutingErrCount: metric.NewCounter(metaRoutingErrCount),
RefusedConnCount: metric.NewCounter(metaRefusedConnCount),
SuccessfulConnCount: metric.NewCounter(metaSuccessfulConnCount),
ConnectionLatency: metric.NewHistogram(
metaConnMigrationAttemptedCount,
base.DefaultHistogramWindowInterval(),
metric.NetworkLatencyBuckets,
),
ConnectionLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaConnMigrationAttemptedCount,
Duration: base.DefaultHistogramWindowInterval(),
Buckets: metric.NetworkLatencyBuckets,
}),
AuthFailedCount: metric.NewCounter(metaAuthFailedCount),
ExpiredClientConnCount: metric.NewCounter(metaExpiredClientConnCount),
// Connector metrics.
DialTenantLatency: metric.NewHistogram(
metaDialTenantLatency,
base.DefaultHistogramWindowInterval(),
metric.NetworkLatencyBuckets,
DialTenantLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaDialTenantLatency,
Duration: base.DefaultHistogramWindowInterval(),
Buckets: metric.NetworkLatencyBuckets},
),
DialTenantRetries: metric.NewCounter(metaDialTenantRetries),
// Connection migration metrics.
ConnMigrationSuccessCount: metric.NewCounter(metaConnMigrationSuccessCount),
ConnMigrationErrorFatalCount: metric.NewCounter(metaConnMigrationErrorFatalCount),
ConnMigrationErrorRecoverableCount: metric.NewCounter(metaConnMigrationErrorRecoverableCount),
ConnMigrationAttemptedCount: metric.NewCounter(metaConnMigrationAttemptedCount),
ConnMigrationAttemptedLatency: metric.NewHistogram(
metaConnMigrationAttemptedLatency,
base.DefaultHistogramWindowInterval(),
metric.NetworkLatencyBuckets,
),
ConnMigrationTransferResponseMessageSize: metric.NewHistogram(
metaConnMigrationTransferResponseMessageSize,
base.DefaultHistogramWindowInterval(),
metric.DataSize16MBBuckets,
),
ConnMigrationAttemptedLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaConnMigrationAttemptedLatency,
Duration: base.DefaultHistogramWindowInterval(),
Buckets: metric.NetworkLatencyBuckets,
}),
ConnMigrationTransferResponseMessageSize: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaConnMigrationTransferResponseMessageSize,
Duration: base.DefaultHistogramWindowInterval(),
Buckets: metric.DataSize16MBBuckets,
MaxVal: maxExpectedTransferResponseMessageSize,
SigFigs: 1,
}),
QueryCancelReceivedPGWire: metric.NewCounter(metaQueryCancelReceivedPGWire),
QueryCancelReceivedHTTP: metric.NewCounter(metaQueryCancelReceivedHTTP),
QueryCancelIgnored: metric.NewCounter(metaQueryCancelIgnored),
Expand Down
42 changes: 33 additions & 9 deletions pkg/ccl/streamingccl/streamingest/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
)

const (
streamingFlushHistMaxLatency = 1 * time.Minute
streamingAdmitLatencyMaxValue = 3 * time.Minute
streamingCommitLatencyMaxValue = 10 * time.Minute
)

var (
metaReplicationEventsIngested = metric.Metadata{
Name: "replication.events_ingested",
Expand Down Expand Up @@ -120,9 +126,9 @@ type Metrics struct {
Flushes *metric.Counter
JobProgressUpdates *metric.Counter
ResolvedEvents *metric.Counter
FlushHistNanos *metric.Histogram
CommitLatency *metric.Histogram
AdmitLatency *metric.Histogram
FlushHistNanos metric.IHistogram
CommitLatency metric.IHistogram
AdmitLatency metric.IHistogram
RunningCount *metric.Gauge
EarliestDataCheckpointSpan *metric.Gauge
LatestDataCheckpointSpan *metric.Gauge
Expand All @@ -143,12 +149,30 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
Flushes: metric.NewCounter(metaReplicationFlushes),
ResolvedEvents: metric.NewCounter(metaReplicationResolvedEventsIngested),
JobProgressUpdates: metric.NewCounter(metaJobProgressUpdates),
FlushHistNanos: metric.NewHistogram(metaReplicationFlushHistNanos,
histogramWindow, metric.BatchProcessLatencyBuckets),
CommitLatency: metric.NewHistogram(metaReplicationCommitLatency,
histogramWindow, metric.BatchProcessLatencyBuckets),
AdmitLatency: metric.NewHistogram(metaReplicationAdmitLatency,
histogramWindow, metric.BatchProcessLatencyBuckets),
FlushHistNanos: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaReplicationFlushHistNanos,
Duration: histogramWindow,
Buckets: metric.BatchProcessLatencyBuckets,
MaxVal: streamingFlushHistMaxLatency.Nanoseconds(),
SigFigs: 1,
Mode: metric.HistogramModePreferHdrLatency,
}),
CommitLatency: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaReplicationCommitLatency,
Duration: histogramWindow,
Buckets: metric.BatchProcessLatencyBuckets,
MaxVal: streamingCommitLatencyMaxValue.Nanoseconds(),
SigFigs: 1,
Mode: metric.HistogramModePreferHdrLatency,
}),
AdmitLatency: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaReplicationAdmitLatency,
Duration: histogramWindow,
Buckets: metric.BatchProcessLatencyBuckets,
MaxVal: streamingAdmitLatencyMaxValue.Nanoseconds(),
SigFigs: 1,
Mode: metric.HistogramModePreferHdrLatency,
}),
RunningCount: metric.NewGauge(metaStreamsRunning),
EarliestDataCheckpointSpan: metric.NewGauge(metaEarliestDataCheckpointSpan),
LatestDataCheckpointSpan: metric.NewGauge(metaLatestDataCheckpointSpan),
Expand Down
14 changes: 12 additions & 2 deletions pkg/kv/bulk/bulk_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
// Metrics contains pointers to the metrics for
// monitoring bulk operations.
type Metrics struct {
MaxBytesHist *metric.Histogram
MaxBytesHist metric.IHistogram
CurBytesCount *metric.Gauge
}

Expand All @@ -44,10 +44,20 @@ var (
}
)

// See pkg/sql/mem_metrics.go
// log10int64times1000 = log10(math.MaxInt64) * 1000, rounded up somewhat
const log10int64times1000 = 19 * 1000

// MakeBulkMetrics instantiates the metrics holder for bulk operation monitoring.
func MakeBulkMetrics(histogramWindow time.Duration) Metrics {
return Metrics{
MaxBytesHist: metric.NewHistogram(metaMemMaxBytes, histogramWindow, metric.MemoryUsage64MBBuckets),
MaxBytesHist: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaMemMaxBytes,
Duration: histogramWindow,
MaxVal: log10int64times1000,
SigFigs: 3,
Buckets: metric.MemoryUsage64MBBuckets,
}),
CurBytesCount: metric.NewGauge(metaMemCurBytes),
}
}
Expand Down
Loading