From 48406e3b91f324e96d27bab9f36cb573cb51a87a Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Tue, 20 Apr 2021 10:35:08 +0100 Subject: [PATCH 1/5] changefeed: add new histogram metrics This adds 3 new histogram metrics to try to get some insight into whether we are seeing occasionally slow sinks or checkpoints during various nightly roachtests. While this produces some data duplication with the previous flush_nanos, flushes, emitted_messages, and emit_nanos metrics, I think that having the histogram will still be useful to more easily see if a changefeed experienced a small number of slow flushes or checkpoints. The naming of these metrics is a bit unfortunate, but since flush_nanos and emit_nanos already existed and since I didn't want to remove them, I've included 'hist' in the name of these new metrics. Release note: None --- .../changefeedccl/changefeed_processors.go | 4 ++ pkg/ccl/changefeedccl/metrics.go | 59 +++++++++++++++++-- 2 files changed, 59 insertions(+), 4 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index a1cd6b9d25d0..623fda4bd82a 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -1231,9 +1231,13 @@ func (cf *changeFrontier) handleFrontierChanged(isBehind bool) error { cf.metrics.mu.resolved[cf.metricsID] = newResolved } cf.metrics.mu.Unlock() + + checkpointStart := timeutil.Now() if err := cf.checkpointResolvedTimestamp(newResolved, isBehind); err != nil { return err } + cf.metrics.CheckpointHistNanos.RecordValue(timeutil.Since(checkpointStart).Nanoseconds()) + if err := cf.maybeEmitResolved(newResolved); err != nil { return err } diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index 11a5bf3fafdd..30f39fbf095b 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -39,9 +39,11 @@ func (s *metricsSink) EmitRow( start := timeutil.Now() err := s.wrapped.EmitRow(ctx, topic, key, value, updated) if err == nil { + emitNanos := timeutil.Since(start).Nanoseconds() s.metrics.EmittedMessages.Inc(1) s.metrics.EmittedBytes.Inc(int64(len(key) + len(value))) - s.metrics.EmitNanos.Inc(timeutil.Since(start).Nanoseconds()) + s.metrics.EmitNanos.Inc(emitNanos) + s.metrics.EmitHistNanos.RecordValue(emitNanos) } return err } @@ -52,11 +54,13 @@ func (s *metricsSink) EmitResolvedTimestamp( start := timeutil.Now() err := s.wrapped.EmitResolvedTimestamp(ctx, encoder, resolved) if err == nil { + emitNanos := timeutil.Since(start).Nanoseconds() s.metrics.EmittedMessages.Inc(1) // TODO(dan): This wasn't correct. The wrapped sink may emit the payload // any number of times. // s.metrics.EmittedBytes.Inc(int64(len(payload))) - s.metrics.EmitNanos.Inc(timeutil.Since(start).Nanoseconds()) + s.metrics.EmitNanos.Inc(emitNanos) + s.metrics.EmitHistNanos.RecordValue(emitNanos) } return err } @@ -65,9 +69,12 @@ func (s *metricsSink) Flush(ctx context.Context) error { start := timeutil.Now() err := s.wrapped.Flush(ctx) if err == nil { + flushNanos := timeutil.Since(start).Nanoseconds() s.metrics.Flushes.Inc(1) - s.metrics.FlushNanos.Inc(timeutil.Since(start).Nanoseconds()) + s.metrics.FlushNanos.Inc(flushNanos) + s.metrics.FlushHistNanos.RecordValue(flushNanos) } + return err } @@ -75,6 +82,12 @@ func (s *metricsSink) Close() error { return s.wrapped.Close() } +const ( + changefeedCheckpointHistMaxLatency = 30 * time.Second + changefeedEmitHistMaxLatency = 30 * time.Second + changefeedFlushHistMaxLatency = 1 * time.Minute +) + var ( metaChangefeedEmittedMessages = metric.Metadata{ Name: "changefeed.emitted_messages", @@ -138,6 +151,32 @@ var ( Unit: metric.Unit_COUNT, } + metaChangefeedCheckpointHistNanos = metric.Metadata{ + Name: "changefeed.checkpoint_hist_nanos", + Help: "Time spent checkpointing changefeed progress", + Measurement: "Changefeeds", + Unit: metric.Unit_NANOSECONDS, + } + + // emit_hist_nanos and flush_hist_nanos duplicate information + // in emit_nanos, emitted_messages, and flush_nanos, + // flushes. While all of those could be reconstructed from + // information in the histogram, We've kept the older metrics + // to avoid breaking historical timeseries data. + metaChangefeedEmitHistNanos = metric.Metadata{ + Name: "changefeed.emit_hist_nanos", + Help: "Time spent emitting messages across all changefeeds", + Measurement: "Changefeeds", + Unit: metric.Unit_NANOSECONDS, + } + + metaChangefeedFlushHistNanos = metric.Metadata{ + Name: "changefeed.flush_hist_nanos", + Help: "Time spent flushing messages across all changefeeds", + Measurement: "Changefeeds", + Unit: metric.Unit_NANOSECONDS, + } + // TODO(dan): This was intended to be a measure of the minimum distance of // any changefeed ahead of its gc ttl threshold, but keeping that correct in // the face of changing zone configs is much harder, so this will have to do @@ -164,6 +203,10 @@ type Metrics struct { EmitNanos *metric.Counter FlushNanos *metric.Counter + CheckpointHistNanos *metric.Histogram + EmitHistNanos *metric.Histogram + FlushHistNanos *metric.Histogram + Running *metric.Gauge mu struct { @@ -191,7 +234,15 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct { TableMetadataNanos: metric.NewCounter(metaChangefeedTableMetadataNanos), EmitNanos: metric.NewCounter(metaChangefeedEmitNanos), FlushNanos: metric.NewCounter(metaChangefeedFlushNanos), - Running: metric.NewGauge(metaChangefeedRunning), + + CheckpointHistNanos: metric.NewHistogram(metaChangefeedCheckpointHistNanos, histogramWindow, + changefeedCheckpointHistMaxLatency.Nanoseconds(), 2), + EmitHistNanos: metric.NewHistogram(metaChangefeedEmitHistNanos, histogramWindow, + changefeedEmitHistMaxLatency.Nanoseconds(), 2), + FlushHistNanos: metric.NewHistogram(metaChangefeedFlushHistNanos, histogramWindow, + changefeedFlushHistMaxLatency.Nanoseconds(), 2), + + Running: metric.NewGauge(metaChangefeedRunning), } m.mu.resolved = make(map[int]hlc.Timestamp) m.mu.id = 1 // start the first id at 1 so we can detect initialization From 07be0aba1e50e41a8afcc789d732b686d1cbc1a7 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Tue, 20 Apr 2021 13:32:47 +0100 Subject: [PATCH 2/5] changefeed: add slow_log_threshold cluster setting This adds a new cluster setting changfeed.slow_span_log_threshold That allows us to control the threshold for logging slow spans. This is useful for cases where the auto-calculated threshold is much higher than we would like. Release note (sql change): A new cluster setting `changefeed.slow_span_log_threshold` allows setting a cluster-wide default for slow span logging. --- .../changefeedccl/changefeed_processors.go | 57 ++++++++++++------- pkg/ccl/changefeedccl/changefeed_stmt.go | 17 ++++-- pkg/ccl/changefeedccl/changefeed_test.go | 6 ++ .../changefeedccl/changefeedbase/settings.go | 8 +++ pkg/cmd/roachtest/cdc.go | 14 +++++ 5 files changed, 77 insertions(+), 25 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 623fda4bd82a..34097adda766 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -851,8 +851,9 @@ type changeFrontier struct { freqEmitResolved time.Duration // lastEmitResolved is the last time a resolved timestamp was emitted. lastEmitResolved time.Time - // lastSlowSpanLog is the last time a slow span from `sf` was logged. - lastSlowSpanLog time.Time + + // slowLogEveryN rate-limits the logging of slow spans + slowLogEveryN log.EveryN // schemaChangeBoundary represents an hlc timestamp at which a schema change // event occurred to a target watched by this frontier. If the changefeed is @@ -895,7 +896,10 @@ type changeFrontier struct { metricsID int } -const runStatusUpdateFrequency time.Duration = time.Minute +const ( + runStatusUpdateFrequency time.Duration = time.Minute + slowSpanMaxFrequency = 10 * time.Second +) type jobState struct { job *jobs.Job @@ -916,11 +920,12 @@ func newChangeFrontierProcessor( ctx := flowCtx.EvalCtx.Ctx() memMonitor := execinfra.NewMonitor(ctx, flowCtx.EvalCtx.Mon, "changefntr-mem") cf := &changeFrontier{ - flowCtx: flowCtx, - spec: spec, - memAcc: memMonitor.MakeBoundAccount(), - input: input, - sf: span.MakeFrontier(spec.TrackedSpans...), + flowCtx: flowCtx, + spec: spec, + memAcc: memMonitor.MakeBoundAccount(), + input: input, + sf: span.MakeFrontier(spec.TrackedSpans...), + slowLogEveryN: log.Every(slowSpanMaxFrequency), } if err := cf.Init( cf, @@ -1392,22 +1397,14 @@ func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error { // returned boolean will be true if the resolved timestamp lags far behind the // present as defined by the current configuration. func (cf *changeFrontier) maybeLogBehindSpan(frontierChanged bool) (isBehind bool) { - // These two cluster setting values represent the target responsiveness of - // poller and range feed. The cluster setting for switching between poller and - // rangefeed is only checked at changefeed start/resume, so instead of - // switching on it here, just add them. Also add 1 second in case both these - // settings are set really low (as they are in unit tests). - pollInterval := changefeedbase.TableDescriptorPollInterval.Get(&cf.flowCtx.Cfg.Settings.SV) - closedtsInterval := closedts.TargetDuration.Get(&cf.flowCtx.Cfg.Settings.SV) - slownessThreshold := time.Second + 10*(pollInterval+closedtsInterval) frontier := cf.sf.Frontier() now := timeutil.Now() resolvedBehind := now.Sub(frontier.GoTime()) - if resolvedBehind <= slownessThreshold { + if resolvedBehind <= cf.slownessThreshold() { return false } - description := `sinkless feed` + description := "sinkless feed" if !cf.isSinkless() { description = fmt.Sprintf("job %d", cf.spec.JobID) } @@ -1415,15 +1412,33 @@ func (cf *changeFrontier) maybeLogBehindSpan(frontierChanged bool) (isBehind boo log.Infof(cf.Ctx, "%s new resolved timestamp %s is behind by %s", description, frontier, resolvedBehind) } - const slowSpanMaxFrequency = 10 * time.Second - if now.Sub(cf.lastSlowSpanLog) > slowSpanMaxFrequency { - cf.lastSlowSpanLog = now + + if cf.slowLogEveryN.ShouldProcess(now) { s := cf.sf.PeekFrontierSpan() log.Infof(cf.Ctx, "%s span %s is behind by %s", description, s, resolvedBehind) } return true } +func (cf *changeFrontier) slownessThreshold() time.Duration { + clusterThreshold := changefeedbase.SlowSpanLogThreshold.Get(&cf.flowCtx.Cfg.Settings.SV) + if clusterThreshold > 0 { + return clusterThreshold + } + + // These two cluster setting values represent the target + // responsiveness of schemafeed and rangefeed. + // + // We add 1 second in case both these settings are set really + // low (as they are in unit tests). + // + // TODO(ssd): We should probably take into account the flush + // frequency here. + pollInterval := changefeedbase.TableDescriptorPollInterval.Get(&cf.flowCtx.Cfg.Settings.SV) + closedtsInterval := closedts.TargetDuration.Get(&cf.flowCtx.Cfg.Settings.SV) + return time.Second + 10*(pollInterval+closedtsInterval) +} + // ConsumerClosed is part of the RowSource interface. func (cf *changeFrontier) ConsumerClosed() { // The consumer is done, Next() will not be called again. diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 4b93519f1a8f..d8ae05e47592 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -437,6 +437,18 @@ func changefeedJobDescription( return tree.AsStringWithFQNames(c, ann), nil } +// validateNonNegativeDuration returns a nil error if optValue can be +// parsed as a duration and is non-negative; otherwise, an error is +// returned. +func validateNonNegativeDuration(optName string, optValue string) error { + if d, err := time.ParseDuration(optValue); err != nil { + return err + } else if d < 0 { + return errors.Errorf("negative durations are not accepted: %s='%s'", optName, optValue) + } + return nil +} + func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails, error) { if details.Opts == nil { // The proto MarshalTo method omits the Opts field if the map is empty. @@ -447,11 +459,8 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails { const opt = changefeedbase.OptResolvedTimestamps if o, ok := details.Opts[opt]; ok && o != `` { - if d, err := time.ParseDuration(o); err != nil { + if err := validateNonNegativeDuration(opt, o); err != nil { return jobspb.ChangefeedDetails{}, err - } else if d < 0 { - return jobspb.ChangefeedDetails{}, errors.Errorf( - `negative durations are not accepted: %s='%s'`, opt, o) } } } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 26a3d01675d5..ee5e2007c75e 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -2228,10 +2228,16 @@ func TestChangefeedErrors(t *testing.T) { t, `unknown envelope: nope`, `EXPERIMENTAL CHANGEFEED FOR foo WITH envelope=nope`, ) + + sqlDB.ExpectErr( + t, `time: invalid duration "bar"`, + `EXPERIMENTAL CHANGEFEED FOR foo WITH resolved='bar'`, + ) sqlDB.ExpectErr( t, `negative durations are not accepted: resolved='-1s'`, `EXPERIMENTAL CHANGEFEED FOR foo WITH resolved='-1s'`, ) + sqlDB.ExpectErr( t, `cannot specify timestamp in the future`, `EXPERIMENTAL CHANGEFEED FOR foo WITH cursor=$1`, timeutil.Now().Add(time.Hour), diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 190b26becd3f..3b9a8122cbe5 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -45,3 +45,11 @@ var PerChangefeedMemLimit = settings.RegisterByteSizeSetting( "controls amount of data that can be buffered per changefeed", 1<<30, ) + +// SlowSpanLogThreshold controls when we will log slow spans. +var SlowSpanLogThreshold = settings.RegisterDurationSetting( + "changefeed.slow_span_log_threshold", + "a changefeed will log spans with resolved timestamps this far behind the current wall-clock time; if 0, a default value is calculated based on other cluster settings", + 0, + settings.NonNegativeDuration, +) diff --git a/pkg/cmd/roachtest/cdc.go b/pkg/cmd/roachtest/cdc.go index 4bd944e67461..0cf49b6ed0b6 100644 --- a/pkg/cmd/roachtest/cdc.go +++ b/pkg/cmd/roachtest/cdc.go @@ -158,12 +158,26 @@ func cdcBasicTest(ctx context.Context, t *test, c *cluster, args cdcTestArgs) { // changefeed is never considered sufficiently caught up. We could // instead make targetSteadyLatency less aggressive, but it'd be nice to // keep it where it is. + // + // TODO(ssd): As of 797819b35f5 this is actually increasing rather than decreasing + // the closed_timestamp.target_duration. We can probably remove this. However, + // as of 2021-04-20, we want to understand why this test has started failing more often + // before changing this. if _, err := db.Exec( `SET CLUSTER SETTING kv.closed_timestamp.target_duration='10s'`, ); err != nil { t.Fatal(err) } + // With a target_duration of 10s, we won't see slow span logs from changefeeds untils we are > 100s + // behind, which is well above the 60s targetSteadyLatency we have in some tests. + if _, err := db.Exec( + `SET CLUSTER SETTING changefeed.slow_span_log_threshold='30s'`, + ); err != nil { + // We don't hard fail here because, not all versions support this setting + t.l.Printf("failed to set cluster setting: %s", err) + } + for _, stmt := range args.preStartStatements { _, err := db.ExecContext(ctx, stmt) if err != nil { From 3c2702034582af8a7c72d1419af0a8eae680f715 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Tue, 20 Apr 2021 13:44:45 +0100 Subject: [PATCH 3/5] server: store total observations for histogram metrics I occasionally find this useful to know how many observations a given histogram is based on. The prometheus output already returns this, but it is nice to have it in the SQL and JSON output as well. Release note (ops change): Histogram metrics now store the total number of observations over time. --- pkg/server/status/recorder.go | 1 + pkg/server/status/recorder_test.go | 2 ++ 2 files changed, 3 insertions(+) diff --git a/pkg/server/status/recorder.go b/pkg/server/status/recorder.go index 79f989befb39..9b933a0da080 100644 --- a/pkg/server/status/recorder.go +++ b/pkg/server/status/recorder.go @@ -591,6 +591,7 @@ func eachRecordableValue(reg *metric.Registry, fn func(string, float64)) { for _, pt := range recordHistogramQuantiles { fn(name+pt.suffix, float64(curr.ValueAtQuantile(pt.quantile))) } + fn(name+"-count", float64(curr.TotalCount())) } else { val, err := extractValue(mtr) if err != nil { diff --git a/pkg/server/status/recorder_test.go b/pkg/server/status/recorder_test.go index 69977609aca5..fe0e8ed96ad0 100644 --- a/pkg/server/status/recorder_test.go +++ b/pkg/server/status/recorder_test.go @@ -294,6 +294,7 @@ func TestMetricsRecorder(t *testing.T) { for _, q := range recordHistogramQuantiles { addExpected(reg.prefix, data.name+q.suffix, reg.source, 100, data.val, reg.isNode) } + addExpected(reg.prefix, data.name+"-count", reg.source, 100, 1, reg.isNode) case "latency": l := metric.NewLatency(metric.Metadata{Name: reg.prefix + data.name}, time.Hour) reg.reg.AddMetric(l) @@ -303,6 +304,7 @@ func TestMetricsRecorder(t *testing.T) { for _, q := range recordHistogramQuantiles { addExpected(reg.prefix, data.name+q.suffix, reg.source, 100, data.val, reg.isNode) } + addExpected(reg.prefix, data.name+"-count", reg.source, 100, 1, reg.isNode) default: t.Fatalf("unexpected: %+v", data) } From 54fdee393576cf87124cc81ddb62215dfbb325c7 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Sat, 3 Jul 2021 09:19:37 -0400 Subject: [PATCH 4/5] changfeedccl: Introduce a knob to control concurrency of scan requests. Stop relying on ExportRequestLimit to determine the number of concurrent export requests, and introduce a decidated ScanRequestLimit setting. If the setting is specified, uses that setting; otherwise, the default value is computed as 3 * (number of nodes in the cluster), which is the old behavior, but we cap this number so that concurrency does not get out of hand if running in a very large cluster. Fixes #67190 Release Nodes: Provide a better configurability of scan request concurrency. Scan requests are issued by changefeeds during the backfill. --- .../changefeedccl/changefeedbase/settings.go | 9 ++++ pkg/ccl/changefeedccl/kvfeed/BUILD.bazel | 2 +- pkg/ccl/changefeedccl/kvfeed/scanner.go | 42 ++++++++++++------- 3 files changed, 38 insertions(+), 15 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 3b9a8122cbe5..e6f182194c09 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -53,3 +53,12 @@ var SlowSpanLogThreshold = settings.RegisterDurationSetting( 0, settings.NonNegativeDuration, ) + +// ScanRequestLimit is the number of Scan requests that can run at once. +// Scan requests are issued when changefeed performs the backfill. +// If set to 0, a reasonable default will be chosen. +var ScanRequestLimit = settings.RegisterIntSetting( + "changefeed.backfill.concurrent_scan_requests", + "number of concurrent scan requests per node issued during a backfill", + 0, +) diff --git a/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel b/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel index fb2163890b82..20eddf89b53f 100644 --- a/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel +++ b/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel @@ -19,8 +19,8 @@ go_library( "//pkg/keys", "//pkg/kv", "//pkg/kv/kvclient/kvcoord", - "//pkg/kv/kvserver", "//pkg/roachpb", + "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/catalog/colinfo", "//pkg/sql/catalog/lease", diff --git a/pkg/ccl/changefeedccl/kvfeed/scanner.go b/pkg/ccl/changefeedccl/kvfeed/scanner.go index fb7bb464209d..c1bd563b7daa 100644 --- a/pkg/ccl/changefeedccl/kvfeed/scanner.go +++ b/pkg/ccl/changefeedccl/kvfeed/scanner.go @@ -13,13 +13,14 @@ import ( "sync/atomic" "time" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/covering" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -59,16 +60,7 @@ func (p *scanRequestScanner) Scan( return err } - // Export requests for the various watched spans are executed in parallel, - // with a semaphore-enforced limit based on a cluster setting. - // The spans here generally correspond with range boundaries. - approxNodeCount, err := clusterNodeCount(p.gossip) - if err != nil { - // can't count nodes in tenants - approxNodeCount = 1 - } - maxConcurrentExports := approxNodeCount * - int(kvserver.ExportRequestsLimit.Get(&p.settings.SV)) + maxConcurrentExports := maxConcurrentExportRequests(p.gossip, &p.settings.SV) exportsSem := make(chan struct{}, maxConcurrentExports) g := ctxgroup.WithContext(ctx) @@ -258,15 +250,37 @@ func allRangeSpans( } // clusterNodeCount returns the approximate number of nodes in the cluster. -func clusterNodeCount(gw gossip.OptionalGossip) (int, error) { +func clusterNodeCount(gw gossip.OptionalGossip) int { g, err := gw.OptionalErr(47971) if err != nil { - return 0, err + // can't count nodes in tenants + return 1 } var nodes int _ = g.IterateInfos(gossip.KeyNodeIDPrefix, func(_ string, _ gossip.Info) error { nodes++ return nil }) - return nodes, nil + return nodes +} + +// maxConcurrentExportRequests returns the number of concurrent scan requests. +func maxConcurrentExportRequests(gw gossip.OptionalGossip, sv *settings.Values) int { + // If the user specified ScanRequestLimit -- use that value. + if max := changefeedbase.ScanRequestLimit.Get(sv); max > 0 { + return int(max) + } + + // TODO(yevgeniy): Currently, issuing multiple concurrent updates scaled to the size of + // the cluster only make sense for the core change feeds. This configuration shoould + // be specified explicitly when creating scanner. + nodes := clusterNodeCount(gw) + // This is all hand-wavy: 3 per node used to be the default for a very long time. + // However, this could get out of hand if the clusters are large. + // So cap the max to an arbitrary value of a 100. + max := 3 * nodes + if max > 100 { + max = 100 + } + return max } From a8bea98d0aa0ae383ab15e907eb5559ec12a17e7 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Tue, 27 Jul 2021 11:15:54 -0400 Subject: [PATCH 5/5] changefeedccl: Improve observability of change frontier updates. Add a metric to keep track of the number of frontier updates in the changefeed. Release Notes: None --- pkg/ccl/changefeedccl/changefeed_processors.go | 1 + pkg/ccl/changefeedccl/metrics.go | 12 +++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 34097adda766..bc3db25506e3 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -1242,6 +1242,7 @@ func (cf *changeFrontier) handleFrontierChanged(isBehind bool) error { return err } cf.metrics.CheckpointHistNanos.RecordValue(timeutil.Since(checkpointStart).Nanoseconds()) + cf.metrics.FrontierUpdates.Inc(1) if err := cf.maybeEmitResolved(newResolved); err != nil { return err diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index 30f39fbf095b..7680b453bf80 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -187,6 +187,13 @@ var ( Measurement: "Nanoseconds", Unit: metric.Unit_NANOSECONDS, } + + metaChangefeedFrontierUpdates = metric.Metadata{ + Name: "changefeed.frontier_updates", + Help: "Number of change frontier updates across all feeds", + Measurement: "Updates", + Unit: metric.Unit_COUNT, + } ) // Metrics are for production monitoring of changefeeds. @@ -209,6 +216,8 @@ type Metrics struct { Running *metric.Gauge + FrontierUpdates *metric.Counter + mu struct { syncutil.Mutex id int @@ -242,7 +251,8 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct { FlushHistNanos: metric.NewHistogram(metaChangefeedFlushHistNanos, histogramWindow, changefeedFlushHistMaxLatency.Nanoseconds(), 2), - Running: metric.NewGauge(metaChangefeedRunning), + Running: metric.NewGauge(metaChangefeedRunning), + FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates), } m.mu.resolved = make(map[int]hlc.Timestamp) m.mu.id = 1 // start the first id at 1 so we can detect initialization