From fefdf03b71db89a22d9dc3b7832bae8843869ef6 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Sat, 3 Jul 2021 09:29:51 -0400 Subject: [PATCH] changefeedccl: Improve observability of change frontier updates. Add a metric to keep track of the number of frontier updates in the changefeed. Add logging when job progress updates take excessive amount of time. Fixes #67192 Release Notes: None --- pkg/ccl/changefeedccl/changefeed_processors.go | 10 ++++++++++ pkg/ccl/changefeedccl/metrics.go | 12 +++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index f8c0257fb69b..0d49358eb49b 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -1280,6 +1280,16 @@ func (cf *changeFrontier) checkpointJobProgress( defer func() { cf.js.lastRunStatusUpdate = timeutil.Now() }() } + updateStart := timeutil.Now() + defer func() { + elapsed := timeutil.Since(updateStart) + if elapsed > 5*time.Millisecond { + log.Warningf(cf.Ctx, "slow job progress update took %s", elapsed) + } + }() + + cf.metrics.Flushes.Inc(1) + return cf.js.job.Update(cf.Ctx, nil, func( txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater, ) error { diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index f00934830345..d5a8fb535316 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -191,6 +191,13 @@ var ( Measurement: "Nanoseconds", Unit: metric.Unit_NANOSECONDS, } + + metaChangefeedFrontierUpdates = metric.Metadata{ + Name: "changefeed.frontier_updates", + Help: "Number of change frontier updates across all feeds", + Measurement: "Updates", + Unit: metric.Unit_COUNT, + } ) // Metrics are for production monitoring of changefeeds. @@ -214,6 +221,8 @@ type Metrics struct { Running *metric.Gauge + FrontierUpdates *metric.Counter + mu struct { syncutil.Mutex id int @@ -247,7 +256,8 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct { FlushHistNanos: metric.NewHistogram(metaChangefeedFlushHistNanos, histogramWindow, changefeedFlushHistMaxLatency.Nanoseconds(), 2), - Running: metric.NewGauge(metaChangefeedRunning), + Running: metric.NewGauge(metaChangefeedRunning), + FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates), } m.mu.resolved = make(map[int]hlc.Timestamp) m.mu.id = 1 // start the first id at 1 so we can detect initialization