Skip to content

Commit

Permalink
changefeedccl: Improve observability of change frontier updates.
Browse files Browse the repository at this point in the history
Add a metric to keep track of the number of frontier updates in the
changefeed.  Add logging when job progress updates take excessive amount
of time.

Fixes #67192

Release Notes: None
  • Loading branch information
Yevgeniy Miretskiy committed Jul 6, 2021
1 parent 96e41c9 commit fefdf03
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
10 changes: 10 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1280,6 +1280,16 @@ func (cf *changeFrontier) checkpointJobProgress(
defer func() { cf.js.lastRunStatusUpdate = timeutil.Now() }()
}

updateStart := timeutil.Now()
defer func() {
elapsed := timeutil.Since(updateStart)
if elapsed > 5*time.Millisecond {
log.Warningf(cf.Ctx, "slow job progress update took %s", elapsed)
}
}()

cf.metrics.Flushes.Inc(1)

return cf.js.job.Update(cf.Ctx, nil, func(
txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater,
) error {
Expand Down
12 changes: 11 additions & 1 deletion pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ var (
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

metaChangefeedFrontierUpdates = metric.Metadata{
Name: "changefeed.frontier_updates",
Help: "Number of change frontier updates across all feeds",
Measurement: "Updates",
Unit: metric.Unit_COUNT,
}
)

// Metrics are for production monitoring of changefeeds.
Expand All @@ -214,6 +221,8 @@ type Metrics struct {

Running *metric.Gauge

FrontierUpdates *metric.Counter

mu struct {
syncutil.Mutex
id int
Expand Down Expand Up @@ -247,7 +256,8 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
FlushHistNanos: metric.NewHistogram(metaChangefeedFlushHistNanos, histogramWindow,
changefeedFlushHistMaxLatency.Nanoseconds(), 2),

Running: metric.NewGauge(metaChangefeedRunning),
Running: metric.NewGauge(metaChangefeedRunning),
FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates),
}
m.mu.resolved = make(map[int]hlc.Timestamp)
m.mu.id = 1 // start the first id at 1 so we can detect initialization
Expand Down

0 comments on commit fefdf03

Please sign in to comment.