Skip to content

Commit

Permalink
Merge 0a3420e into blathers/backport-release-24.2-137534
Browse files Browse the repository at this point in the history
  • Loading branch information
blathers-crl[bot] authored Jan 16, 2025
2 parents 5238483 + 0a3420e commit 217544d
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 51 deletions.
2 changes: 1 addition & 1 deletion docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@
<tr><td>APPLICATION</td><td>changefeed.internal_retry_message_count</td><td>Number of messages for which an attempt to retry them within an aggregator node was made</td><td>Messages</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.kafka_throttling_hist_nanos</td><td>Time spent in throttling due to exceeding kafka quota</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.lagging_ranges</td><td>The number of ranges considered to be lagging behind</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.max_behind_nanos</td><td>(Deprecated in favor of checkpoint_progress) The most any changefeed&#39;s persisted checkpoint is behind the present</td><td>Nanoseconds</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.max_behind_nanos</td><td>The most any changefeed&#39;s persisted checkpoint is behind the present</td><td>Nanoseconds</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.message_size_hist</td><td>Message size histogram</td><td>Bytes</td><td>HISTOGRAM</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.messages.messages_pushback_nanos</td><td>Total time spent throttled for messages quota</td><td>Nanoseconds</td><td>COUNTER</td><td>NANOSECONDS</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.network.bytes_in</td><td>The number of bytes received from the network by changefeeds</td><td>Bytes</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
18 changes: 2 additions & 16 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1222,14 +1222,14 @@ func newChangeFrontierProcessor(
return nil, err
}

sliMertics, err := flowCtx.Cfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics(cf.spec.Feed.Opts[changefeedbase.OptMetricsScope])
sliMetrics, err := flowCtx.Cfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics(cf.spec.Feed.Opts[changefeedbase.OptMetricsScope])
if err != nil {
return nil, err
}

if cf.encoder, err = getEncoder(
ctx, encodingOpts, AllTargets(spec.Feed), spec.Feed.Select != "",
makeExternalConnectionProvider(ctx, flowCtx.Cfg.DB), sliMertics,
makeExternalConnectionProvider(ctx, flowCtx.Cfg.DB), sliMetrics,
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1456,18 +1456,13 @@ func (cf *changeFrontier) close() {
}
}

// closeMetrics de-registers from the progress registry that powers
// `changefeed.max_behind_nanos`. This method is idempotent.
func (cf *changeFrontier) closeMetrics() {
// Delete this feed from the MaxBehindNanos metric so it's no longer
// considered by the gauge.
func() {
cf.metrics.mu.Lock()
defer cf.metrics.mu.Unlock()
if cf.metricsID > 0 {
cf.sliMetrics.RunningCount.Dec(1)
}
delete(cf.metrics.mu.resolved, cf.metricsID)
cf.metricsID = -1
}()

Expand Down Expand Up @@ -1629,15 +1624,6 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
// all feeds in the scope.
cf.sliMetrics.setCheckpoint(cf.sliMetricsID, newResolved)

// This backs max_behind_nanos which is deprecated in favor of checkpoint_progress
func() {
cf.metrics.mu.Lock()
defer cf.metrics.mu.Unlock()
if cf.metricsID != -1 {
cf.metrics.mu.resolved[cf.metricsID] = newResolved
}
}()

return cf.maybeEmitResolved(newResolved)
}

Expand Down
17 changes: 16 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,20 @@ func TestChangefeedProgressMetrics(t *testing.T) {
})
}

// Verify that max_behind_nanos has recurring updates
var lastValue int64 = 0
for i := 0; i < 3; i++ {
testutils.SucceedsSoon(t, func() error {
value := sliA.MaxBehindNanos.Value()
if value != lastValue {
lastValue = value
return nil
}
return errors.Newf("waiting for max_behind_nanos to update %d",
lastValue)
})
}

sliB, err := registry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics("label_b")
require.Equal(t, int64(0), sliB.AggregatorProgress.Value())
fooB := feed(t, f, `CREATE CHANGEFEED FOR foo WITH metrics_label='label_b', resolved='100ms'`)
Expand All @@ -448,7 +462,8 @@ func TestChangefeedProgressMetrics(t *testing.T) {
testutils.SucceedsSoon(t, func() error {
aggregatorProgress := sliA.AggregatorProgress.Value()
checkpointProgress := sliA.CheckpointProgress.Value()
if aggregatorProgress == 0 && checkpointProgress == 0 {
maxBehindNanos := sliA.MaxBehindNanos.Value()
if aggregatorProgress == 0 && checkpointProgress == 0 && maxBehindNanos == 0 {
return nil
}
return errors.Newf("waiting for progress metrics to be 0 (ap=%d, cp=%d)",
Expand Down
70 changes: 38 additions & 32 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package changefeedccl

import (
"context"
"slices"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -83,6 +84,7 @@ type AggMetrics struct {
CloudstorageBufferedBytes *aggmetric.AggGauge
KafkaThrottlingNanos *aggmetric.AggHistogram
SinkErrors *aggmetric.AggCounter
MaxBehindNanos *aggmetric.AggGauge

Timers *timers.Timers

Expand Down Expand Up @@ -165,6 +167,7 @@ type sliMetrics struct {
CloudstorageBufferedBytes *aggmetric.Gauge
KafkaThrottlingNanos *aggmetric.Histogram
SinkErrors *aggmetric.Counter
MaxBehindNanos *aggmetric.Gauge

Timers *timers.ScopedTimers

Expand Down Expand Up @@ -721,17 +724,6 @@ var (
Unit: metric.Unit_NANOSECONDS,
}

// TODO(dan): This was intended to be a measure of the minimum distance of
// any changefeed ahead of its gc ttl threshold, but keeping that correct in
// the face of changing zone configs is much harder, so this will have to do
// for now.
metaChangefeedMaxBehindNanos = metric.Metadata{
Name: "changefeed.max_behind_nanos",
Help: "(Deprecated in favor of checkpoint_progress) The most any changefeed's persisted checkpoint is behind the present",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

metaChangefeedFrontierUpdates = metric.Metadata{
Name: "changefeed.frontier_updates",
Help: "Number of change frontier updates across all feeds",
Expand Down Expand Up @@ -986,6 +978,16 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
Measurement: "Count",
Unit: metric.Unit_COUNT,
}
// TODO(dan): This was intended to be a measure of the minimum distance of
// any changefeed ahead of its gc ttl threshold, but keeping that correct in
// the face of changing zone configs is much harder, so this will have to do
// for now.
metaChangefeedMaxBehindNanos := metric.Metadata{
Name: "changefeed.max_behind_nanos",
Help: "The most any changefeed's persisted checkpoint is behind the present",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

functionalGaugeMinFn := func(childValues []int64) int64 {
var min int64
Expand All @@ -997,6 +999,13 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
return min
}

functionalGaugeMaxFn := func(childValues []int64) int64 {
if len(childValues) == 0 {
return 0
}
return slices.Max(childValues)
}

// NB: When adding new histograms, use sigFigs = 1. Older histograms
// retain significant figures of 2.
b := aggmetric.MakeBuilder("scope")
Expand Down Expand Up @@ -1087,9 +1096,10 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
SigFigs: 2,
BucketConfig: metric.ChangefeedBatchLatencyBuckets,
}),
SinkErrors: b.Counter(metaSinkErrors),
Timers: timers.New(histogramWindow),
NetMetrics: lookup.MakeNetMetrics(metaNetworkBytesOut, metaNetworkBytesIn, "sink"),
SinkErrors: b.Counter(metaSinkErrors),
MaxBehindNanos: b.FunctionalGauge(metaChangefeedMaxBehindNanos, functionalGaugeMaxFn),
Timers: timers.New(histogramWindow),
NetMetrics: lookup.MakeNetMetrics(metaNetworkBytesOut, metaNetworkBytesIn, "sink"),
}
a.mu.sliMetrics = make(map[string]*sliMetrics)
_, err := a.getOrCreateScope(defaultSLIScope)
Expand Down Expand Up @@ -1187,8 +1197,20 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
return minTs
}
}

maxBehindNanosGetter := func(m map[int64]hlc.Timestamp) func() int64 {
return func() int64 {
minTs := minTimestampGetter(m)()
if minTs == 0 {
return 0
}
return timeutil.Now().UnixNano() - minTs
}
}

sm.AggregatorProgress = a.AggregatorProgress.AddFunctionalChild(minTimestampGetter(sm.mu.resolved), scope)
sm.CheckpointProgress = a.CheckpointProgress.AddFunctionalChild(minTimestampGetter(sm.mu.checkpoint), scope)
sm.MaxBehindNanos = a.MaxBehindNanos.AddFunctionalChild(maxBehindNanosGetter(sm.mu.resolved), scope)

a.mu.sliMetrics[scope] = sm
return sm, nil
Expand Down Expand Up @@ -1245,14 +1267,10 @@ type Metrics struct {
ParallelConsumerConsumeNanos metric.IHistogram
ParallelConsumerInFlightEvents *metric.Gauge

// This map and the MaxBehindNanos metric are deprecated in favor of
// CheckpointProgress which is stored in the sliMetrics.
mu struct {
syncutil.Mutex
id int
resolved map[int]hlc.Timestamp
id int
}
MaxBehindNanos *metric.Gauge
}

// MetricStruct implements the metric.Struct interface.
Expand Down Expand Up @@ -1299,20 +1317,8 @@ func MakeMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) metric.Stru
ParallelConsumerInFlightEvents: metric.NewGauge(metaChangefeedEventConsumerInFlightEvents),
}

m.mu.resolved = make(map[int]hlc.Timestamp)
m.mu.id = 1 // start the first id at 1 so we can detect initialization
m.MaxBehindNanos = metric.NewFunctionalGauge(metaChangefeedMaxBehindNanos, func() int64 {
now := timeutil.Now()
var maxBehind time.Duration
m.mu.Lock()
defer m.mu.Unlock()
for _, resolved := range m.mu.resolved {
if behind := now.Sub(resolved.GoTime()); behind > maxBehind {
maxBehind = behind
}
}
return maxBehind.Nanoseconds()
})

return m
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/util/metric/prometheus_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (pm *PrometheusExporter) findOrCreateFamily(

// ScrapeRegistry scrapes all metrics contained in the registry to the metric
// family map, holding on only to the scraped data (which is no longer
// connected to the registry and metrics within) when returning from the the
// connected to the registry and metrics within) when returning from the
// call. It creates new families as needed.
func (pm *PrometheusExporter) ScrapeRegistry(registry *Registry, includeChildMetrics bool) {
labels := registry.GetLabels()
Expand Down

0 comments on commit 217544d

Please sign in to comment.