Skip to content

Commit

Permalink
changefeedccl: rework max behind nanos metric
Browse files Browse the repository at this point in the history
Previously, this node-level metric would measure the maximum time between
the present and the oldest checkpoint seen by a change aggregator. Since
this metric was updated by in-memory checkpoints, it was prone to odd
behavior. For example:
- When a node restarts and a changefeed immediately begins a catchup scan,
  there are no checkpoints for this changefeed available to calculate the value
  of this metric. It's possible that an "inifinite" catchup scan could trigger
  where the metric would never get updated (

 had the description "Largest commit-to-emit duration of any running feed",

Informs: cockroachdb#97931
Closes: cockroachdb#97043
Closes: cockroachdb#99409
<what was there before: Previously, ...>
<why it needed to change: This was inadequate because ...>
<what you did about it: To address this, this patch ...>
  • Loading branch information
jayshrivastava committed Apr 11, 2023
1 parent a5d61d2 commit aa38da7
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 51 deletions.
19 changes: 1 addition & 18 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1037,11 +1037,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
cf.flowCtx.Cfg.Settings, cf.metrics, timeutil.DefaultTimeSource{})
}

cf.metrics.mu.Lock()
cf.metricsID = cf.metrics.mu.id
cf.metrics.mu.id++
sli.RunningCount.Inc(1)
cf.metrics.mu.Unlock()
// TODO(dan): It's very important that we de-register from the metric because
// if we orphan an entry in there, our monitoring will lie (say the changefeed
// is behind when it may not be). We call this in `close` but that doesn't
Expand Down Expand Up @@ -1072,15 +1068,9 @@ func (cf *changeFrontier) close() {
// closeMetrics de-registers from the progress registry that powers
// `changefeed.max_behind_nanos`. This method is idempotent.
func (cf *changeFrontier) closeMetrics() {
// Delete this feed from the MaxBehindNanos metric so it's no longer
// considered by the gauge.
cf.metrics.mu.Lock()
if cf.metricsID > 0 {
cf.sliMetrics.RunningCount.Dec(1)
}
delete(cf.metrics.mu.resolved, cf.metricsID)
cf.metricsID = -1
cf.metrics.mu.Unlock()
}

// Next is part of the RowSource interface.
Expand Down Expand Up @@ -1211,14 +1201,7 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
if emitResolved {
// Keeping this after the checkpointJobProgress call will avoid
// some duplicates if a restart happens.
newResolved := cf.frontier.Frontier()
cf.metrics.mu.Lock()
if cf.metricsID != -1 {
cf.metrics.mu.resolved[cf.metricsID] = newResolved
}
cf.metrics.mu.Unlock()

return cf.maybeEmitResolved(newResolved)
return cf.maybeEmitResolved(cf.frontier.Frontier())
}

return nil
Expand Down
30 changes: 3 additions & 27 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,9 @@ var (
Unit: metric.Unit_NANOSECONDS,
}

// TODO(dan): This was intended to be a measure of the minimum distance of
// any changefeed ahead of its gc ttl threshold, but keeping that correct in
// the face of changing zone configs is much harder, so this will have to do
// for now.
metaChangefeedMaxBehindNanos = metric.Metadata{
Name: "changefeed.max_behind_nanos",
Help: "Largest commit-to-emit duration of any running feed",
Help: "The duration of time between the present and the oldest highwater of any running changefeed",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
Expand Down Expand Up @@ -693,13 +689,7 @@ type Metrics struct {
ParallelConsumerFlushNanos metric.IHistogram
ParallelConsumerConsumeNanos metric.IHistogram
ParallelConsumerInFlightEvents *metric.Gauge

mu struct {
syncutil.Mutex
id int
resolved map[int]hlc.Timestamp
}
MaxBehindNanos *metric.Gauge
MaxBehindNanos *metric.Gauge
}

// MetricStruct implements the metric.Struct interface.
Expand Down Expand Up @@ -744,22 +734,8 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
Mode: metric.HistogramModePrometheus,
}),
ParallelConsumerInFlightEvents: metric.NewGauge(metaChangefeedEventConsumerInFlightEvents),
MaxBehindNanos: metric.NewGauge(metaChangefeedMaxBehindNanos),
}

m.mu.resolved = make(map[int]hlc.Timestamp)
m.mu.id = 1 // start the first id at 1 so we can detect initialization
m.MaxBehindNanos = metric.NewFunctionalGauge(metaChangefeedMaxBehindNanos, func() int64 {
now := timeutil.Now()
var maxBehind time.Duration
m.mu.Lock()
for _, resolved := range m.mu.resolved {
if behind := now.Sub(resolved.GoTime()); behind > maxBehind {
maxBehind = behind
}
}
m.mu.Unlock()
return maxBehind.Nanoseconds()
})
return m
}

Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/metricspoller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
Expand Down
47 changes: 41 additions & 6 deletions pkg/jobs/metricspoller/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
io_prometheus_client "github.com/prometheus/client_model/go"
)
Expand Down Expand Up @@ -71,7 +73,8 @@ func (mp *metricsPoller) Resume(ctx context.Context, execCtx interface{}) error
return ctx.Err()
case <-t.C:
t.Read = true
for name, task := range metricPollerTasks {
tasks := TaskRegistry.clone()
for name, task := range tasks {
if err := runTask(name, task); err != nil {
log.Errorf(ctx, "Periodic stats collector task %s completed with error %s", name, err)
metrics.numErrors.Inc(1)
Expand All @@ -85,11 +88,43 @@ type pollerMetrics struct {
numErrors *metric.Counter
}

// metricsPollerTasks lists the list of tasks performed on each iteration
// of metrics poller.
var metricPollerTasks = map[string]func(ctx context.Context, execCtx sql.JobExecContext) error{
"paused-jobs": updatePausedMetrics,
"manage-pts": manageProtectedTimestamps,
// A Task is a function which will be executed by the metrics poller job.
type Task func(ctx context.Context, execCtx sql.JobExecContext) error

type syncMetricsPollerTasks struct {
syncutil.Mutex
metricPollerTasks map[string]Task
}

// The TaskRegistry is used to register new tasks to be executed
// periodically by the metrics poller job.
var TaskRegistry = syncMetricsPollerTasks{
metricPollerTasks: map[string]Task{
"paused-jobs": updatePausedMetrics,
"manage-pts": manageProtectedTimestamps,
},
}

func (a *syncMetricsPollerTasks) Register(label string, task Task) error {
a.Lock()
defer a.Unlock()

if _, exists := a.metricPollerTasks[label]; exists {
return errors.Newf("duplicate metrics poller task registration: %s", label)
}

a.metricPollerTasks[label] = task
return nil
}

func (a *syncMetricsPollerTasks) clone() map[string]Task {
a.Lock()
defer a.Unlock()
tasks := make(map[string]Task, len(a.metricPollerTasks))
for label, task := range a.metricPollerTasks {
tasks[label] = task
}
return tasks
}

func (m pollerMetrics) MetricStruct() {}
Expand Down

0 comments on commit aa38da7

Please sign in to comment.