diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index d6289ad9f907..664b75fc055e 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -1092,6 +1092,13 @@ func (b *changefeedResumer) maybeCleanUpProtectedTimestamp( var _ jobs.PauseRequester = (*changefeedResumer)(nil) +func (b *changefeedResumer) updatePausedMetrics(ctx context.Context, jobExec interface{}) { + exec := jobExec.(sql.JobExecContext) + telemetry.Count(`changefeed.enterprise.paused`) + exec.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics).Pauses.Inc(1) + logChangefeedFailedTelemetry(ctx, b.job, changefeedbase.UnknownError) +} + // OnPauseRequest implements jobs.PauseRequester. If this changefeed is being // paused, we may want to clear the protected timestamp record. func (b *changefeedResumer) OnPauseRequest( @@ -1102,6 +1109,8 @@ func (b *changefeedResumer) OnPauseRequest( cp := progress.GetChangefeed() execCfg := jobExec.(sql.JobExecContext).ExecCfg() + b.updatePausedMetrics(ctx, jobExec) + if _, shouldProtect := details.Opts[changefeedbase.OptProtectDataFromGCOnPause]; !shouldProtect { // Release existing pts record to avoid a single changefeed left on pause // resulting in storage issues diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index cd625b0f60fd..6551b1c047d5 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -320,6 +320,12 @@ var ( Measurement: "Errors", Unit: metric.Unit_COUNT, } + metaChangefeedPauses = metric.Metadata{ + Name: "changefeed.pauses", + Help: "Total number of changefeed jobs which have been paused for any reason", + Measurement: "Pauses", + Unit: metric.Unit_COUNT, + } metaEventQueueTime = metric.Metadata{ Name: "changefeed.queue_time_nanos", @@ -570,6 +576,7 @@ type Metrics struct { KVFeedMetrics kvevent.Metrics SchemaFeedMetrics schemafeed.Metrics Failures *metric.Counter + Pauses *metric.Counter ResolvedMessages *metric.Counter QueueTimeNanos *metric.Counter CheckpointHistNanos *metric.Histogram @@ -604,6 +611,7 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct { SchemaFeedMetrics: schemafeed.MakeMetrics(histogramWindow), ResolvedMessages: metric.NewCounter(metaChangefeedForwardedResolvedMessages), Failures: metric.NewCounter(metaChangefeedFailures), + Pauses: metric.NewCounter(metaChangefeedPauses), QueueTimeNanos: metric.NewCounter(metaEventQueueTime), CheckpointHistNanos: metric.NewHistogram(metaChangefeedCheckpointHistNanos, histogramWindow, metric.IOLatencyBuckets), FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates),