diff --git a/monitoring/grafana-dashboards/queues.json b/monitoring/grafana-dashboards/queues.json index 9486abb298de..7c15efe72745 100644 --- a/monitoring/grafana-dashboards/queues.json +++ b/monitoring/grafana-dashboards/queues.json @@ -858,7 +858,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "GC Queue", + "title": "MVCC GC Queue", "tooltip": { "shared": true, "sort": 0, diff --git a/monitoring/splunk-dashboard/queues.xml b/monitoring/splunk-dashboard/queues.xml index fabbb0849690..fa6530d49fdf 100644 --- a/monitoring/splunk-dashboard/queues.xml +++ b/monitoring/splunk-dashboard/queues.xml @@ -241,7 +241,7 @@ where index=$index_name$ span=10s - GC Queue + MVCC GC Queue | mstats rate_sum(queue_gc_process_success) as "Successful Actions / sec", diff --git a/pkg/cmd/roachprod/grafana/configs/queues.json b/pkg/cmd/roachprod/grafana/configs/queues.json index 9486abb298de..7c15efe72745 100644 --- a/pkg/cmd/roachprod/grafana/configs/queues.json +++ b/pkg/cmd/roachprod/grafana/configs/queues.json @@ -858,7 +858,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "GC Queue", + "title": "MVCC GC Queue", "tooltip": { "shared": true, "sort": 0, diff --git a/pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel b/pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel index bda82ba55260..4f7f7c4c6fab 100644 --- a/pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel +++ b/pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//pkg/kv/kvclient/rangefeed/rangefeedcache", "//pkg/kv/kvpb", "//pkg/roachpb", + "//pkg/settings", "//pkg/settings/cluster", "//pkg/spanconfig", "//pkg/spanconfig/spanconfigstore", diff --git a/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go index 35b904ac5b3d..285f07b37622 100644 --- a/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go +++ b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go @@ -12,16 +12,19 @@ package spanconfigkvsubscriber import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -30,12 +33,38 @@ import ( var updateBehindNanos = metric.Metadata{ Name: "spanconfig.kvsubscriber.update_behind_nanos", - Help: "Latency between realtime and the last update received by the KVSubscriber; " + - "represents the staleness of the KVSubscriber, where a flat line means there are no updates being received", + Help: "Difference between the current time and when the KVSubscriber received its last update" + + " (an ever increasing number indicates that we're no longer receiving updates)", Measurement: "Nanoseconds", Unit: metric.Unit_NANOSECONDS, } +var protectedRecordCount = metric.Metadata{ + Name: "spanconfig.kvsubscriber.protected_record_count", + Help: "Number of protected timestamp records, as seen by KV", + Measurement: "Records", + Unit: metric.Unit_COUNT, +} + +var oldestProtectedRecordNanos = metric.Metadata{ + Name: "spanconfig.kvsubscriber.oldest_protected_record_nanos", + Help: "Difference between the current time and the oldest protected timestamp" + + " (sudden drops indicate a record being released; an ever increasing" + + " number indicates that the oldest record is around and preventing GC if > configured GC TTL)", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, +} + +// metricsPollerInterval determines the frequency at which we refresh internal +// metrics. +var metricsPollerInterval = settings.RegisterDurationSetting( + settings.SystemOnly, + "spanconfig.kvsubscriber.metrics_poller_interval", + "the interval at which the spanconfig.kvsubscriber.* metrics are kept up-to-date; set to 0 to disable the mechanism", + 5*time.Second, + settings.NonNegativeDuration, +) + // KVSubscriber is used to subscribe to global span configuration changes. It's // a concrete implementation of the spanconfig.KVSubscriber interface. // @@ -73,24 +102,21 @@ var updateBehindNanos = metric.Metadata{ // we could diff the two data structures and only emit targeted updates. // // [1]: For a given key k, it's config may be stored as part of a larger span S -// -// (where S.start <= k < S.end). It's possible for S to get deleted and -// replaced with sub-spans S1...SN in the same transaction if the span is -// getting split. When applying these updates, we need to make sure to -// process the deletion event for S before processing S1...SN. +// (where S.start <= k < S.end). It's possible for S to get deleted and +// replaced with sub-spans S1...SN in the same transaction if the span is +// getting split. When applying these updates, we need to make sure to +// process the deletion event for S before processing S1...SN. // // [2]: In our example above deleting the config for S and adding configs for -// -// S1...SN we want to make sure that we apply the full set of updates all -// at once -- lest we expose the intermediate state where the config for S -// was deleted but the configs for S1...SN were not yet applied. +// S1...SN we want to make sure that we apply the full set of updates all +// at once -- lest we expose the intermediate state where the config for S +// was deleted but the configs for S1...SN were not yet applied. // // [3]: TODO(irfansharif): When tearing down the subscriber due to underlying -// -// errors, we could also capture a checkpoint to use the next time the -// subscriber is established. That way we can avoid the full initial scan -// over the span configuration state and simply pick up where we left off -// with our existing spanconfig.Store. +// errors, we could also capture a checkpoint to use the next time the +// subscriber is established. That way we can avoid the full initial scan +// over the span configuration state and simply pick up where we left off +// with our existing spanconfig.Store. type KVSubscriber struct { fallback roachpb.SpanConfig knobs *spanconfig.TestingKnobs @@ -101,7 +127,6 @@ type KVSubscriber struct { mu struct { // serializes between Start and external threads syncutil.RWMutex lastUpdated hlc.Timestamp - metrics Metrics // internal is the internal spanconfig.Store maintained by the // KVSubscriber. A read-only view over this store is exposed as part of // the interface. When re-subscribing, a fresh spanconfig.Store is @@ -111,6 +136,9 @@ type KVSubscriber struct { internal spanconfig.Store handlers []handler } + + clock *hlc.Clock + metrics *Metrics } var _ spanconfig.KVSubscriber = &KVSubscriber{} @@ -118,14 +146,24 @@ var _ spanconfig.KVSubscriber = &KVSubscriber{} // Metrics are the Metrics associated with an instance of the // KVSubscriber. type Metrics struct { - // UpdateBehindNanos is the latency between realtime and the last update - // received by the KVSubscriber. This metric should be interpreted as a - // measure of the KVSubscribers' staleness. + // UpdateBehindNanos is the difference between the current time and when the + // last update was received by the KVSubscriber. This metric should be + // interpreted as a measure of the KVSubscribers' staleness. UpdateBehindNanos *metric.Gauge + // ProtectedRecordCount is total number of protected timestamp records, as + // seen by KV. + ProtectedRecordCount *metric.Gauge + // OldestProtectedRecord is between the current time and the oldest + // protected timestamp. + OldestProtectedRecordNanos *metric.Gauge } -func makeKVSubscriberMetrics() Metrics { - return Metrics{UpdateBehindNanos: metric.NewGauge(updateBehindNanos)} +func makeKVSubscriberMetrics() *Metrics { + return &Metrics{ + UpdateBehindNanos: metric.NewGauge(updateBehindNanos), + ProtectedRecordCount: metric.NewGauge(protectedRecordCount), + OldestProtectedRecordNanos: metric.NewGauge(oldestProtectedRecordNanos), + } } // MetricStruct implements the metric.Struct interface. @@ -167,6 +205,7 @@ func New( fallback: fallback, knobs: knobs, settings: settings, + clock: clock, } var rfCacheKnobs *rangefeedcache.TestingKnobs if knobs != nil { @@ -183,9 +222,9 @@ func New( rfCacheKnobs, ) s.mu.internal = spanConfigStore - s.mu.metrics = makeKVSubscriberMetrics() + s.metrics = makeKVSubscriberMetrics() if registry != nil { - registry.AddMetricStruct(&s.mu.metrics) + registry.AddMetricStruct(s.metrics) } return s } @@ -206,9 +245,74 @@ func New( // the exported StoreReader will be up-to-date and continue to be // incrementally maintained. func (s *KVSubscriber) Start(ctx context.Context, stopper *stop.Stopper) error { + if err := stopper.RunAsyncTask(ctx, "kvsubscriber-metrics", + func(ctx context.Context) { + settingChangeCh := make(chan struct{}, 1) + metricsPollerInterval.SetOnChange( + &s.settings.SV, func(ctx context.Context) { + select { + case settingChangeCh <- struct{}{}: + default: + } + }) + + timer := timeutil.NewTimer() + defer timer.Stop() + + for { + interval := metricsPollerInterval.Get(&s.settings.SV) + if interval > 0 { + timer.Reset(interval) + } else { + // Disable the mechanism. + timer.Stop() + timer = timeutil.NewTimer() + } + select { + case <-timer.C: + timer.Read = true + s.updateMetrics(ctx) + continue + + case <-settingChangeCh: + // Loop around to use the updated timer. + continue + + case <-stopper.ShouldQuiesce(): + return + } + } + }); err != nil { + return err + } + return rangefeedcache.Start(ctx, stopper, s.rfc, nil /* onError */) } +func (s *KVSubscriber) updateMetrics(ctx context.Context) { + protectedTimestamps, lastUpdated, err := s.GetProtectionTimestamps(ctx, keys.EverythingSpan) + if err != nil { + log.Errorf(ctx, "while refreshing kvsubscriber metrics: %v", err) + return + } + + earliestTS := hlc.Timestamp{} + for _, protectedTimestamp := range protectedTimestamps { + if earliestTS.IsEmpty() || protectedTimestamp.Less(earliestTS) { + earliestTS = protectedTimestamp + } + } + + now := s.clock.PhysicalTime() + s.metrics.ProtectedRecordCount.Update(int64(len(protectedTimestamps))) + s.metrics.UpdateBehindNanos.Update(now.Sub(lastUpdated.GoTime()).Nanoseconds()) + if earliestTS.IsEmpty() { + s.metrics.OldestProtectedRecordNanos.Update(0) + } else { + s.metrics.OldestProtectedRecordNanos.Update(now.Sub(earliestTS.GoTime()).Nanoseconds()) + } +} + // Subscribe installs a callback that's invoked with whatever span may have seen // a config update. func (s *KVSubscriber) Subscribe(fn func(context.Context, roachpb.Span)) { @@ -307,9 +411,9 @@ func (s *KVSubscriber) handleCompleteUpdate( } func (s *KVSubscriber) setLastUpdatedLocked(ts hlc.Timestamp) { - nanos := timeutil.Since(ts.GoTime()).Nanoseconds() - s.mu.metrics.UpdateBehindNanos.Update(nanos) s.mu.lastUpdated = ts + nanos := timeutil.Since(s.mu.lastUpdated.GoTime()).Nanoseconds() + s.metrics.UpdateBehindNanos.Update(nanos) } func (s *KVSubscriber) handlePartialUpdate( diff --git a/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_test.go b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_test.go index 0c9d4c0b7fa4..8bc1f7842b5f 100644 --- a/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_test.go +++ b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_test.go @@ -30,6 +30,11 @@ func (s *KVSubscriber) TestingRunInner(ctx context.Context) error { return s.rfc.Run(ctx) } +// TestingUpdateMetrics exports the inner updateMetrics method for testing purposes. +func (s *KVSubscriber) TestingUpdateMetrics(ctx context.Context) { + s.updateMetrics(ctx) +} + func TestGetProtectionTimestamps(t *testing.T) { defer leaktest.AfterTest(t)() @@ -75,8 +80,12 @@ func TestGetProtectionTimestamps(t *testing.T) { // Mark sp43 as excluded from backup. sp43Cfg.cfg.ExcludeDataFromBackup = true + const timeDeltaFromTS1 = 10 + mt := timeutil.NewManualTime(ts1.GoTime()) + mt.AdvanceTo(ts1.Add(timeDeltaFromTS1, 0).GoTime()) + subscriber := New( - nil, /* clock */ + hlc.NewClockForTesting(mt), nil, /* rangeFeedFactory */ keys.SpanConfigurationsTableID, 1<<20, /* 1 MB */ @@ -132,6 +141,15 @@ func TestGetProtectionTimestamps(t *testing.T) { testCase.test(t, m, subscriber) }) } + + // Test internal metrics. We should expect a protected record count of 3, + // ignoring the one from ts3 since it has both + // {IgnoreIfExcludedFromBackup,ExcludeDataFromBackup} are true. We should + // also observe the right delta between the oldest protected timestamp and + // current wall clock time. + subscriber.TestingUpdateMetrics(ctx) + require.Equal(t, int64(3), subscriber.metrics.ProtectedRecordCount.Value()) + require.Equal(t, int64(timeDeltaFromTS1), subscriber.metrics.OldestProtectedRecordNanos.Value()) } var _ spanconfig.Store = &manualStore{} diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index ff417ba42192..f3390dfc5e7b 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -817,8 +817,17 @@ var charts = []sectionDescription{ }, Charts: []chartDescription{ { - Title: "KVSubscriber", - Metrics: []string{"spanconfig.kvsubscriber.update_behind_nanos"}, + Title: "KVSubscriber Lag Metrics", + Metrics: []string{ + "spanconfig.kvsubscriber.oldest_protected_record_nanos", + "spanconfig.kvsubscriber.update_behind_nanos", + }, + }, + { + Title: "KVSubscriber Protected Record Count", + Metrics: []string{ + "spanconfig.kvsubscriber.protected_record_count", + }, }, }, }, diff --git a/pkg/ui/workspaces/db-console/src/views/cluster/containers/nodeGraphs/dashboards/queues.tsx b/pkg/ui/workspaces/db-console/src/views/cluster/containers/nodeGraphs/dashboards/queues.tsx index dcc31c82729b..598454cb86a8 100644 --- a/pkg/ui/workspaces/db-console/src/views/cluster/containers/nodeGraphs/dashboards/queues.tsx +++ b/pkg/ui/workspaces/db-console/src/views/cluster/containers/nodeGraphs/dashboards/queues.tsx @@ -14,10 +14,10 @@ import { LineGraph } from "src/views/cluster/components/linegraph"; import { Metric, Axis } from "src/views/shared/components/metricQuery"; import { AxisUnits } from "@cockroachlabs/cluster-ui"; -import { GraphDashboardProps } from "./dashboardUtils"; +import { GraphDashboardProps, nodeDisplayName } from "./dashboardUtils"; export default function (props: GraphDashboardProps) { - const { storeSources } = props; + const { nodeIDs, nodeSources, storeSources, nodeDisplayNameByID } = props; return [ @@ -212,21 +212,6 @@ export default function (props: GraphDashboardProps) { , - - - - - - , - , + + + + + + + , + + + + {nodeIDs.map(nid => ( + <> + + + ))} + + , ]; }