Skip to content

Commit

Permalink
changefeedccl: add scoping support for max_behind_nanos
Browse files Browse the repository at this point in the history
Currently, changefeed.max_behind_nanos is a measurement of the
lag of the furthest behind changefeed. We'd like to be able to
support scoping/grouping changefeeds. This applies scoping to
that metric similar to the scoping on changefeed.aggregator_progress.

Fixes: #132281

Release note (ops change): the changefeed.max_behind_nanos metric
now supports scoping with metric labels.
  • Loading branch information
aerfrei committed Jan 16, 2025
1 parent ab3695a commit f0224c3
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 35 deletions.
2 changes: 1 addition & 1 deletion docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@
<tr><td>APPLICATION</td><td>changefeed.internal_retry_message_count</td><td>Number of messages for which an attempt to retry them within an aggregator node was made</td><td>Messages</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.kafka_throttling_hist_nanos</td><td>Time spent in throttling due to exceeding kafka quota</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.lagging_ranges</td><td>The number of ranges considered to be lagging behind</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.max_behind_nanos</td><td>(Deprecated in favor of checkpoint_progress) The most any changefeed&#39;s persisted checkpoint is behind the present</td><td>Nanoseconds</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.max_behind_nanos</td><td>The most any changefeed&#39;s persisted checkpoint is behind the present</td><td>Nanoseconds</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.message_size_hist</td><td>Message size histogram</td><td>Bytes</td><td>HISTOGRAM</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.messages.messages_pushback_nanos</td><td>Total time spent throttled for messages quota</td><td>Nanoseconds</td><td>COUNTER</td><td>NANOSECONDS</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.network.bytes_in</td><td>The number of bytes received from the network by changefeeds</td><td>Bytes</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
17 changes: 16 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,20 @@ func TestChangefeedProgressMetrics(t *testing.T) {
})
}

// Verify that max_behind_nanos has recurring updates
var lastValue int64 = 0
for i := 0; i < 3; i++ {
testutils.SucceedsSoon(t, func() error {
value := sliA.MaxBehindNanos.Value()
if value != lastValue {
lastValue = value
return nil
}
return errors.Newf("waiting for max_behind_nanos to update %d",
lastValue)
})
}

sliB, err := registry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics("label_b")
require.Equal(t, int64(0), sliB.AggregatorProgress.Value())
fooB := feed(t, f, `CREATE CHANGEFEED FOR foo WITH metrics_label='label_b', resolved='100ms'`)
Expand All @@ -448,7 +462,8 @@ func TestChangefeedProgressMetrics(t *testing.T) {
testutils.SucceedsSoon(t, func() error {
aggregatorProgress := sliA.AggregatorProgress.Value()
checkpointProgress := sliA.CheckpointProgress.Value()
if aggregatorProgress == 0 && checkpointProgress == 0 {
maxBehindNanos := sliA.MaxBehindNanos.Value()
if aggregatorProgress == 0 && checkpointProgress == 0 && maxBehindNanos == 0 {
return nil
}
return errors.Newf("waiting for progress metrics to be 0 (ap=%d, cp=%d)",
Expand Down
70 changes: 38 additions & 32 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package changefeedccl

import (
"context"
"slices"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -83,6 +84,7 @@ type AggMetrics struct {
CloudstorageBufferedBytes *aggmetric.AggGauge
KafkaThrottlingNanos *aggmetric.AggHistogram
SinkErrors *aggmetric.AggCounter
MaxBehindNanos *aggmetric.AggGauge

Timers *timers.Timers

Expand Down Expand Up @@ -164,6 +166,7 @@ type sliMetrics struct {
CloudstorageBufferedBytes *aggmetric.Gauge
KafkaThrottlingNanos *aggmetric.Histogram
SinkErrors *aggmetric.Counter
MaxBehindNanos *aggmetric.Gauge

Timers *timers.ScopedTimers

Expand Down Expand Up @@ -720,17 +723,6 @@ var (
Unit: metric.Unit_NANOSECONDS,
}

// TODO(dan): This was intended to be a measure of the minimum distance of
// any changefeed ahead of its gc ttl threshold, but keeping that correct in
// the face of changing zone configs is much harder, so this will have to do
// for now.
metaChangefeedMaxBehindNanos = metric.Metadata{
Name: "changefeed.max_behind_nanos",
Help: "(Deprecated in favor of checkpoint_progress) The most any changefeed's persisted checkpoint is behind the present",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

metaChangefeedFrontierUpdates = metric.Metadata{
Name: "changefeed.frontier_updates",
Help: "Number of change frontier updates across all feeds",
Expand Down Expand Up @@ -985,6 +977,16 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
Measurement: "Count",
Unit: metric.Unit_COUNT,
}
// TODO(dan): This was intended to be a measure of the minimum distance of
// any changefeed ahead of its gc ttl threshold, but keeping that correct in
// the face of changing zone configs is much harder, so this will have to do
// for now.
metaChangefeedMaxBehindNanos := metric.Metadata{
Name: "changefeed.max_behind_nanos",
Help: "The most any changefeed's persisted checkpoint is behind the present",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

functionalGaugeMinFn := func(childValues []int64) int64 {
var min int64
Expand All @@ -996,6 +998,13 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
return min
}

functionalGaugeMaxFn := func(childValues []int64) int64 {
if len(childValues) == 0 {
return 0
}
return slices.Max(childValues)
}

// NB: When adding new histograms, use sigFigs = 1. Older histograms
// retain significant figures of 2.
b := aggmetric.MakeBuilder("scope")
Expand Down Expand Up @@ -1085,9 +1094,10 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
SigFigs: 2,
BucketConfig: metric.ChangefeedBatchLatencyBuckets,
}),
SinkErrors: b.Counter(metaSinkErrors),
Timers: timers.New(histogramWindow),
NetMetrics: lookup.MakeNetMetrics(metaNetworkBytesOut, metaNetworkBytesIn, "sink"),
SinkErrors: b.Counter(metaSinkErrors),
MaxBehindNanos: b.FunctionalGauge(metaChangefeedMaxBehindNanos, functionalGaugeMaxFn),
Timers: timers.New(histogramWindow),
NetMetrics: lookup.MakeNetMetrics(metaNetworkBytesOut, metaNetworkBytesIn, "sink"),
}
a.mu.sliMetrics = make(map[string]*sliMetrics)
_, err := a.getOrCreateScope(defaultSLIScope)
Expand Down Expand Up @@ -1184,8 +1194,20 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
return minTs
}
}

maxBehindNanosGetter := func(m map[int64]hlc.Timestamp) func() int64 {
return func() int64 {
minTs := minTimestampGetter(m)()
if minTs == 0 {
return 0
}
return timeutil.Now().UnixNano() - minTs
}
}

sm.AggregatorProgress = a.AggregatorProgress.AddFunctionalChild(minTimestampGetter(sm.mu.resolved), scope)
sm.CheckpointProgress = a.CheckpointProgress.AddFunctionalChild(minTimestampGetter(sm.mu.checkpoint), scope)
sm.MaxBehindNanos = a.MaxBehindNanos.AddFunctionalChild(maxBehindNanosGetter(sm.mu.resolved), scope)

a.mu.sliMetrics[scope] = sm
return sm, nil
Expand Down Expand Up @@ -1242,14 +1264,10 @@ type Metrics struct {
ParallelConsumerConsumeNanos metric.IHistogram
ParallelConsumerInFlightEvents *metric.Gauge

// This map and the MaxBehindNanos metric are deprecated in favor of
// CheckpointProgress which is stored in the sliMetrics.
mu struct {
syncutil.Mutex
id int
resolved map[int]hlc.Timestamp
id int
}
MaxBehindNanos *metric.Gauge
}

// MetricStruct implements the metric.Struct interface.
Expand Down Expand Up @@ -1296,20 +1314,8 @@ func MakeMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) metric.Stru
ParallelConsumerInFlightEvents: metric.NewGauge(metaChangefeedEventConsumerInFlightEvents),
}

m.mu.resolved = make(map[int]hlc.Timestamp)
m.mu.id = 1 // start the first id at 1 so we can detect initialization
m.MaxBehindNanos = metric.NewFunctionalGauge(metaChangefeedMaxBehindNanos, func() int64 {
now := timeutil.Now()
var maxBehind time.Duration
m.mu.Lock()
defer m.mu.Unlock()
for _, resolved := range m.mu.resolved {
if behind := now.Sub(resolved.GoTime()); behind > maxBehind {
maxBehind = behind
}
}
return maxBehind.Nanoseconds()
})

return m
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/util/metric/prometheus_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (pm *PrometheusExporter) findOrCreateFamily(

// ScrapeRegistry scrapes all metrics contained in the registry to the metric
// family map, holding on only to the scraped data (which is no longer
// connected to the registry and metrics within) when returning from the the
// connected to the registry and metrics within) when returning from the
// call. It creates new families as needed.
func (pm *PrometheusExporter) ScrapeRegistry(registry *Registry, includeChildMetrics bool) {
labels := registry.GetLabels()
Expand Down

0 comments on commit f0224c3

Please sign in to comment.