Skip to content

Commit

Permalink
changefeedccl: add scoping support for max_behind_nanos
Browse files Browse the repository at this point in the history
Currently, changefeed.max_behind_nanos is a measurement of the
lag of the furthest behind changefeed. We'd like to be able to
support scoping/grouping changefeeds. This applies scoping to
that metric similar to the scoping on changefeed.aggregator_progress.

Epic: none
Fixes: #132281

Release note (ops change): the changefeed.max_behind_nanos metric
now supports scoping with metric labels.
  • Loading branch information
aerfrei committed Dec 17, 2024
1 parent 46e8e0f commit 41c8b83
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 74 deletions.
42 changes: 6 additions & 36 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,10 +1022,8 @@ type changeFrontier struct {
metrics *Metrics
sliMetrics *sliMetrics

// sliMetricsID and metricsID uniquely identify the changefeed in the metrics's
// map (a shared struct across all changefeeds on the node) and the sliMetrics's
// map (shared structure between all feeds within the same scope on the node).
metricsID int
// sliMetricsID uniquely identifies the changefeed in the sliMetrics's map
// (shared structure between all feeds within the same scope on the node).
sliMetricsID int64

knobs TestingKnobs
Expand Down Expand Up @@ -1233,14 +1231,14 @@ func newChangeFrontierProcessor(
return nil, err
}

sliMertics, err := flowCtx.Cfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics(cf.spec.Feed.Opts[changefeedbase.OptMetricsScope])
sliMetrics, err := flowCtx.Cfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics(cf.spec.Feed.Opts[changefeedbase.OptMetricsScope])
if err != nil {
return nil, err
}

if cf.encoder, err = getEncoder(
ctx, encodingOpts, AllTargets(spec.Feed), spec.Feed.Select != "",
makeExternalConnectionProvider(ctx, flowCtx.Cfg.DB), sliMertics,
makeExternalConnectionProvider(ctx, flowCtx.Cfg.DB), sliMetrics,
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1367,13 +1365,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
}()
}

func() {
cf.metrics.mu.Lock()
defer cf.metrics.mu.Unlock()
cf.metricsID = cf.metrics.mu.id
cf.metrics.mu.id++
sli.RunningCount.Inc(1)
}()
sli.RunningCount.Inc(1)

cf.sliMetricsID = cf.sliMetrics.claimId()

Expand Down Expand Up @@ -1467,21 +1459,8 @@ func (cf *changeFrontier) close() {
}
}

// closeMetrics de-registers from the progress registry that powers
// `changefeed.max_behind_nanos`. This method is idempotent.
func (cf *changeFrontier) closeMetrics() {
// Delete this feed from the MaxBehindNanos metric so it's no longer
// considered by the gauge.
func() {
cf.metrics.mu.Lock()
defer cf.metrics.mu.Unlock()
if cf.metricsID > 0 {
cf.sliMetrics.RunningCount.Dec(1)
}
delete(cf.metrics.mu.resolved, cf.metricsID)
cf.metricsID = -1
}()

cf.sliMetrics.RunningCount.Dec(1)
cf.sliMetrics.closeId(cf.sliMetricsID)
}

Expand Down Expand Up @@ -1640,15 +1619,6 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
// all feeds in the scope.
cf.sliMetrics.setCheckpoint(cf.sliMetricsID, newResolved)

// This backs max_behind_nanos which is deprecated in favor of checkpoint_progress
func() {
cf.metrics.mu.Lock()
defer cf.metrics.mu.Unlock()
if cf.metricsID != -1 {
cf.metrics.mu.resolved[cf.metricsID] = newResolved
}
}()

return cf.maybeEmitResolved(newResolved)
}

Expand Down
70 changes: 33 additions & 37 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type AggMetrics struct {
CloudstorageBufferedBytes *aggmetric.AggGauge
KafkaThrottlingNanos *aggmetric.AggHistogram
SinkErrors *aggmetric.AggCounter
MaxBehindNanos *aggmetric.AggGauge

Timers *timers.Timers

Expand Down Expand Up @@ -165,6 +166,7 @@ type sliMetrics struct {
CloudstorageBufferedBytes *aggmetric.Gauge
KafkaThrottlingNanos *aggmetric.Histogram
SinkErrors *aggmetric.Counter
MaxBehindNanos *aggmetric.Gauge

Timers *timers.ScopedTimers

Expand Down Expand Up @@ -721,17 +723,6 @@ var (
Unit: metric.Unit_NANOSECONDS,
}

// TODO(dan): This was intended to be a measure of the minimum distance of
// any changefeed ahead of its gc ttl threshold, but keeping that correct in
// the face of changing zone configs is much harder, so this will have to do
// for now.
metaChangefeedMaxBehindNanos = metric.Metadata{
Name: "changefeed.max_behind_nanos",
Help: "(Deprecated in favor of checkpoint_progress) The most any changefeed's persisted checkpoint is behind the present",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

metaChangefeedFrontierUpdates = metric.Metadata{
Name: "changefeed.frontier_updates",
Help: "Number of change frontier updates across all feeds",
Expand Down Expand Up @@ -986,6 +977,16 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
Measurement: "Count",
Unit: metric.Unit_COUNT,
}
// TODO(dan): This was intended to be a measure of the minimum distance of
// any changefeed ahead of its gc ttl threshold, but keeping that correct in
// the face of changing zone configs is much harder, so this will have to do
// for now.
metaChangefeedMaxBehindNanos := metric.Metadata{
Name: "changefeed.max_behind_nanos",
Help: "(Deprecated in favor of checkpoint_progress) The most any changefeed's persisted checkpoint is behind the present",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

functionalGaugeMinFn := func(childValues []int64) int64 {
var min int64
Expand All @@ -996,6 +997,15 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
}
return min
}
functionalGaugeMaxFn := func(childValues []int64) int64 {
var max int64
for _, val := range childValues {
if val != 0 && val > max {
max = val
}
}
return max
}

// NB: When adding new histograms, use sigFigs = 1. Older histograms
// retain significant figures of 2.
Expand Down Expand Up @@ -1087,9 +1097,10 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
SigFigs: 2,
BucketConfig: metric.ChangefeedBatchLatencyBuckets,
}),
SinkErrors: b.Counter(metaSinkErrors),
Timers: timers.New(histogramWindow),
NetMetrics: lookup.MakeNetMetrics(metaNetworkBytesOut, metaNetworkBytesIn, "sink"),
SinkErrors: b.Counter(metaSinkErrors),
MaxBehindNanos: b.FunctionalGauge(metaChangefeedMaxBehindNanos, functionalGaugeMaxFn),
Timers: timers.New(histogramWindow),
NetMetrics: lookup.MakeNetMetrics(metaNetworkBytesOut, metaNetworkBytesIn, "sink"),
}
a.mu.sliMetrics = make(map[string]*sliMetrics)
_, err := a.getOrCreateScope(defaultSLIScope)
Expand Down Expand Up @@ -1187,8 +1198,16 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
return minTs
}
}

maxBehindNanosGetter := func(m map[int64]hlc.Timestamp) func() int64 {
return func() int64 {
return timeutil.Now().UnixNano() - minTimestampGetter(m)()
}
}

sm.AggregatorProgress = a.AggregatorProgress.AddFunctionalChild(minTimestampGetter(sm.mu.resolved), scope)
sm.CheckpointProgress = a.CheckpointProgress.AddFunctionalChild(minTimestampGetter(sm.mu.checkpoint), scope)
sm.MaxBehindNanos = a.MaxBehindNanos.AddFunctionalChild(maxBehindNanosGetter(sm.mu.resolved), scope)

a.mu.sliMetrics[scope] = sm
return sm, nil
Expand Down Expand Up @@ -1244,15 +1263,6 @@ type Metrics struct {
ParallelConsumerFlushNanos metric.IHistogram
ParallelConsumerConsumeNanos metric.IHistogram
ParallelConsumerInFlightEvents *metric.Gauge

// This map and the MaxBehindNanos metric are deprecated in favor of
// CheckpointProgress which is stored in the sliMetrics.
mu struct {
syncutil.Mutex
id int
resolved map[int]hlc.Timestamp
}
MaxBehindNanos *metric.Gauge
}

// MetricStruct implements the metric.Struct interface.
Expand Down Expand Up @@ -1299,20 +1309,6 @@ func MakeMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) metric.Stru
ParallelConsumerInFlightEvents: metric.NewGauge(metaChangefeedEventConsumerInFlightEvents),
}

m.mu.resolved = make(map[int]hlc.Timestamp)
m.mu.id = 1 // start the first id at 1 so we can detect initialization
m.MaxBehindNanos = metric.NewFunctionalGauge(metaChangefeedMaxBehindNanos, func() int64 {
now := timeutil.Now()
var maxBehind time.Duration
m.mu.Lock()
defer m.mu.Unlock()
for _, resolved := range m.mu.resolved {
if behind := now.Sub(resolved.GoTime()); behind > maxBehind {
maxBehind = behind
}
}
return maxBehind.Nanoseconds()
})
return m
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/util/metric/prometheus_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (pm *PrometheusExporter) findOrCreateFamily(

// ScrapeRegistry scrapes all metrics contained in the registry to the metric
// family map, holding on only to the scraped data (which is no longer
// connected to the registry and metrics within) when returning from the the
// connected to the registry and metrics within) when returning from the
// call. It creates new families as needed.
func (pm *PrometheusExporter) ScrapeRegistry(registry *Registry, includeChildMetrics bool) {
labels := registry.GetLabels()
Expand Down

0 comments on commit 41c8b83

Please sign in to comment.