diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 76032968ded5..990a82213855 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -1037,11 +1037,7 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.flowCtx.Cfg.Settings, cf.metrics, timeutil.DefaultTimeSource{}) } - cf.metrics.mu.Lock() - cf.metricsID = cf.metrics.mu.id - cf.metrics.mu.id++ sli.RunningCount.Inc(1) - cf.metrics.mu.Unlock() // TODO(dan): It's very important that we de-register from the metric because // if we orphan an entry in there, our monitoring will lie (say the changefeed // is behind when it may not be). We call this in `close` but that doesn't @@ -1072,15 +1068,9 @@ func (cf *changeFrontier) close() { // closeMetrics de-registers from the progress registry that powers // `changefeed.max_behind_nanos`. This method is idempotent. func (cf *changeFrontier) closeMetrics() { - // Delete this feed from the MaxBehindNanos metric so it's no longer - // considered by the gauge. - cf.metrics.mu.Lock() if cf.metricsID > 0 { cf.sliMetrics.RunningCount.Dec(1) } - delete(cf.metrics.mu.resolved, cf.metricsID) - cf.metricsID = -1 - cf.metrics.mu.Unlock() } // Next is part of the RowSource interface. @@ -1211,14 +1201,7 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error { if emitResolved { // Keeping this after the checkpointJobProgress call will avoid // some duplicates if a restart happens. - newResolved := cf.frontier.Frontier() - cf.metrics.mu.Lock() - if cf.metricsID != -1 { - cf.metrics.mu.resolved[cf.metricsID] = newResolved - } - cf.metrics.mu.Unlock() - - return cf.maybeEmitResolved(newResolved) + return cf.maybeEmitResolved(cf.frontier.Frontier()) } return nil diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index 4d9539ebe213..368a71de3863 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -377,13 +377,9 @@ var ( Unit: metric.Unit_NANOSECONDS, } - // TODO(dan): This was intended to be a measure of the minimum distance of - // any changefeed ahead of its gc ttl threshold, but keeping that correct in - // the face of changing zone configs is much harder, so this will have to do - // for now. metaChangefeedMaxBehindNanos = metric.Metadata{ Name: "changefeed.max_behind_nanos", - Help: "Largest commit-to-emit duration of any running feed", + Help: "The duration of time between the present and the oldest highwater of any running changefeed", Measurement: "Nanoseconds", Unit: metric.Unit_NANOSECONDS, } @@ -693,13 +689,7 @@ type Metrics struct { ParallelConsumerFlushNanos metric.IHistogram ParallelConsumerConsumeNanos metric.IHistogram ParallelConsumerInFlightEvents *metric.Gauge - - mu struct { - syncutil.Mutex - id int - resolved map[int]hlc.Timestamp - } - MaxBehindNanos *metric.Gauge + MaxBehindNanos *metric.Gauge } // MetricStruct implements the metric.Struct interface. @@ -744,22 +734,8 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct { Mode: metric.HistogramModePrometheus, }), ParallelConsumerInFlightEvents: metric.NewGauge(metaChangefeedEventConsumerInFlightEvents), + MaxBehindNanos: metric.NewGauge(metaChangefeedMaxBehindNanos), } - - m.mu.resolved = make(map[int]hlc.Timestamp) - m.mu.id = 1 // start the first id at 1 so we can detect initialization - m.MaxBehindNanos = metric.NewFunctionalGauge(metaChangefeedMaxBehindNanos, func() int64 { - now := timeutil.Now() - var maxBehind time.Duration - m.mu.Lock() - for _, resolved := range m.mu.resolved { - if behind := now.Sub(resolved.GoTime()); behind > maxBehind { - maxBehind = behind - } - } - m.mu.Unlock() - return maxBehind.Nanoseconds() - }) return m } diff --git a/pkg/jobs/metricspoller/BUILD.bazel b/pkg/jobs/metricspoller/BUILD.bazel index 58804319ae9f..44872a53c17c 100644 --- a/pkg/jobs/metricspoller/BUILD.bazel +++ b/pkg/jobs/metricspoller/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/metric", + "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", diff --git a/pkg/jobs/metricspoller/poller.go b/pkg/jobs/metricspoller/poller.go index bafd99852f2e..5398fe9c210b 100644 --- a/pkg/jobs/metricspoller/poller.go +++ b/pkg/jobs/metricspoller/poller.go @@ -19,7 +19,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" io_prometheus_client "github.com/prometheus/client_model/go" ) @@ -71,7 +73,8 @@ func (mp *metricsPoller) Resume(ctx context.Context, execCtx interface{}) error return ctx.Err() case <-t.C: t.Read = true - for name, task := range metricPollerTasks { + tasks := TaskRegistry.clone() + for name, task := range tasks { if err := runTask(name, task); err != nil { log.Errorf(ctx, "Periodic stats collector task %s completed with error %s", name, err) metrics.numErrors.Inc(1) @@ -85,11 +88,43 @@ type pollerMetrics struct { numErrors *metric.Counter } -// metricsPollerTasks lists the list of tasks performed on each iteration -// of metrics poller. -var metricPollerTasks = map[string]func(ctx context.Context, execCtx sql.JobExecContext) error{ - "paused-jobs": updatePausedMetrics, - "manage-pts": manageProtectedTimestamps, +// A Task is a function which will be executed by the metrics poller job. +type Task func(ctx context.Context, execCtx sql.JobExecContext) error + +type syncMetricsPollerTasks struct { + syncutil.Mutex + metricPollerTasks map[string]Task +} + +// The TaskRegistry is used to register new tasks to be executed +// periodically by the metrics poller job. +var TaskRegistry = syncMetricsPollerTasks{ + metricPollerTasks: map[string]Task{ + "paused-jobs": updatePausedMetrics, + "manage-pts": manageProtectedTimestamps, + }, +} + +func (a *syncMetricsPollerTasks) Register(label string, task Task) error { + a.Lock() + defer a.Unlock() + + if _, exists := a.metricPollerTasks[label]; exists { + return errors.Newf("duplicate metrics poller task registration: %s", label) + } + + a.metricPollerTasks[label] = task + return nil +} + +func (a *syncMetricsPollerTasks) clone() map[string]Task { + a.Lock() + defer a.Unlock() + tasks := make(map[string]Task, len(a.metricPollerTasks)) + for label, task := range a.metricPollerTasks { + tasks[label] = task + } + return tasks } func (m pollerMetrics) MetricStruct() {}