Skip to content

Commit

Permalink
changefeedccl: add metrics for paused changefeed jobs
Browse files Browse the repository at this point in the history
Paused changefeed jobs will now show up as a counter in the
debug UI. This counter will also be added to telemetry.

Release note: None
  • Loading branch information
jayshrivastava committed Oct 11, 2022
1 parent 9e7e704 commit 297bc36
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 0 deletions.
9 changes: 9 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,13 @@ func (b *changefeedResumer) maybeCleanUpProtectedTimestamp(

var _ jobs.PauseRequester = (*changefeedResumer)(nil)

func (b *changefeedResumer) updatePausedMetrics(ctx context.Context, jobExec interface{}) {
exec := jobExec.(sql.JobExecContext)
telemetry.Count(`changefeed.enterprise.paused`)
exec.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics).Pauses.Inc(1)
logChangefeedFailedTelemetry(ctx, b.job, changefeedbase.UnknownError)
}

// OnPauseRequest implements jobs.PauseRequester. If this changefeed is being
// paused, we may want to clear the protected timestamp record.
func (b *changefeedResumer) OnPauseRequest(
Expand All @@ -1102,6 +1109,8 @@ func (b *changefeedResumer) OnPauseRequest(
cp := progress.GetChangefeed()
execCfg := jobExec.(sql.JobExecContext).ExecCfg()

b.updatePausedMetrics(ctx, jobExec)

if _, shouldProtect := details.Opts[changefeedbase.OptProtectDataFromGCOnPause]; !shouldProtect {
// Release existing pts record to avoid a single changefeed left on pause
// resulting in storage issues
Expand Down
8 changes: 8 additions & 0 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,12 @@ var (
Measurement: "Errors",
Unit: metric.Unit_COUNT,
}
metaChangefeedPauses = metric.Metadata{
Name: "changefeed.pauses",
Help: "Total number of changefeed jobs which have been paused for any reason",
Measurement: "Pauses",
Unit: metric.Unit_COUNT,
}

metaEventQueueTime = metric.Metadata{
Name: "changefeed.queue_time_nanos",
Expand Down Expand Up @@ -570,6 +576,7 @@ type Metrics struct {
KVFeedMetrics kvevent.Metrics
SchemaFeedMetrics schemafeed.Metrics
Failures *metric.Counter
Pauses *metric.Counter
ResolvedMessages *metric.Counter
QueueTimeNanos *metric.Counter
CheckpointHistNanos *metric.Histogram
Expand Down Expand Up @@ -604,6 +611,7 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
SchemaFeedMetrics: schemafeed.MakeMetrics(histogramWindow),
ResolvedMessages: metric.NewCounter(metaChangefeedForwardedResolvedMessages),
Failures: metric.NewCounter(metaChangefeedFailures),
Pauses: metric.NewCounter(metaChangefeedPauses),
QueueTimeNanos: metric.NewCounter(metaEventQueueTime),
CheckpointHistNanos: metric.NewHistogram(metaChangefeedCheckpointHistNanos, histogramWindow, metric.IOLatencyBuckets),
FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates),
Expand Down

0 comments on commit 297bc36

Please sign in to comment.