Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.1: spanconfig: export metrics for protected timestamp records #98797

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion monitoring/grafana-dashboards/queues.json
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "GC Queue",
"title": "MVCC GC Queue",
"tooltip": {
"shared": true,
"sort": 0,
Expand Down
1 change: 1 addition & 0 deletions pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//pkg/kv/kvclient/rangefeed/rangefeedbuffer",
"//pkg/kv/kvclient/rangefeed/rangefeedcache",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigstore",
Expand Down
131 changes: 119 additions & 12 deletions pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@ package spanconfigkvsubscriber

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand All @@ -30,12 +33,38 @@ import (

var updateBehindNanos = metric.Metadata{
Name: "spanconfig.kvsubscriber.update_behind_nanos",
Help: "Latency between realtime and the last update received by the KVSubscriber; " +
"represents the staleness of the KVSubscriber, where a flat line means there are no updates being received",
Help: "Difference between the current time and when the KVSubscriber received its last update" +
" (an ever increasing number indicates that we're no longer receiving updates)",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

var protectedRecordCount = metric.Metadata{
Name: "spanconfig.kvsubscriber.protected_record_count",
Help: "Number of protected timestamp records, as seen by KV",
Measurement: "Records",
Unit: metric.Unit_COUNT,
}

var oldestProtectedRecordNanos = metric.Metadata{
Name: "spanconfig.kvsubscriber.oldest_protected_record_nanos",
Help: "Difference between the current time and the oldest protected timestamp" +
" (sudden drops indicate a record being released; an ever increasing" +
" number indicates that the oldest record is around and preventing GC if > configured GC TTL)",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

// metricsPollerInterval determines the frequency at which we refresh internal
// metrics.
var metricsPollerInterval = settings.RegisterDurationSetting(
settings.SystemOnly,
"spanconfig.kvsubscriber.metrics_poller_interval",
"the interval at which the spanconfig.kvsubscriber.* metrics are kept up-to-date; set to 0 to disable the mechanism",
5*time.Second,
settings.NonNegativeDuration,
)

// KVSubscriber is used to subscribe to global span configuration changes. It's
// a concrete implementation of the spanconfig.KVSubscriber interface.
//
Expand Down Expand Up @@ -96,7 +125,6 @@ type KVSubscriber struct {
mu struct { // serializes between Start and external threads
syncutil.RWMutex
lastUpdated hlc.Timestamp
metrics Metrics
// internal is the internal spanconfig.Store maintained by the
// KVSubscriber. A read-only view over this store is exposed as part of
// the interface. When re-subscribing, a fresh spanconfig.Store is
Expand All @@ -106,21 +134,34 @@ type KVSubscriber struct {
internal spanconfig.Store
handlers []handler
}

clock *hlc.Clock
metrics *Metrics
}

var _ spanconfig.KVSubscriber = &KVSubscriber{}

// Metrics are the Metrics associated with an instance of the
// KVSubscriber.
type Metrics struct {
// UpdateBehindNanos is the latency between realtime and the last update
// received by the KVSubscriber. This metric should be interpreted as a
// measure of the KVSubscribers' staleness.
// UpdateBehindNanos is the difference between the current time and when the
// last update was received by the KVSubscriber. This metric should be
// interpreted as a measure of the KVSubscribers' staleness.
UpdateBehindNanos *metric.Gauge
// ProtectedRecordCount is total number of protected timestamp records, as
// seen by KV.
ProtectedRecordCount *metric.Gauge
// OldestProtectedRecord is between the current time and the oldest
// protected timestamp.
OldestProtectedRecordNanos *metric.Gauge
}

func makeKVSubscriberMetrics() Metrics {
return Metrics{UpdateBehindNanos: metric.NewGauge(updateBehindNanos)}
func makeKVSubscriberMetrics() *Metrics {
return &Metrics{
UpdateBehindNanos: metric.NewGauge(updateBehindNanos),
ProtectedRecordCount: metric.NewGauge(protectedRecordCount),
OldestProtectedRecordNanos: metric.NewGauge(oldestProtectedRecordNanos),
}
}

// MetricStruct implements the metric.Struct interface.
Expand Down Expand Up @@ -162,6 +203,7 @@ func New(
fallback: fallback,
knobs: knobs,
settings: settings,
clock: clock,
}
var rfCacheKnobs *rangefeedcache.TestingKnobs
if knobs != nil {
Expand All @@ -178,9 +220,9 @@ func New(
rfCacheKnobs,
)
s.mu.internal = spanConfigStore
s.mu.metrics = makeKVSubscriberMetrics()
s.metrics = makeKVSubscriberMetrics()
if registry != nil {
registry.AddMetricStruct(&s.mu.metrics)
registry.AddMetricStruct(s.metrics)
}
return s
}
Expand All @@ -200,9 +242,74 @@ func New(
// the exported StoreReader will be up-to-date and continue to be
// incrementally maintained.
func (s *KVSubscriber) Start(ctx context.Context, stopper *stop.Stopper) error {
if err := stopper.RunAsyncTask(ctx, "kvsubscriber-metrics",
func(ctx context.Context) {
settingChangeCh := make(chan struct{}, 1)
metricsPollerInterval.SetOnChange(
&s.settings.SV, func(ctx context.Context) {
select {
case settingChangeCh <- struct{}{}:
default:
}
})

timer := timeutil.NewTimer()
defer timer.Stop()

for {
interval := metricsPollerInterval.Get(&s.settings.SV)
if interval > 0 {
timer.Reset(interval)
} else {
// Disable the mechanism.
timer.Stop()
timer = timeutil.NewTimer()
}
select {
case <-timer.C:
timer.Read = true
s.updateMetrics(ctx)
continue

case <-settingChangeCh:
// Loop around to use the updated timer.
continue

case <-stopper.ShouldQuiesce():
return
}
}
}); err != nil {
return err
}

return rangefeedcache.Start(ctx, stopper, s.rfc, nil /* onError */)
}

func (s *KVSubscriber) updateMetrics(ctx context.Context) {
protectedTimestamps, lastUpdated, err := s.GetProtectionTimestamps(ctx, keys.EverythingSpan)
if err != nil {
log.Errorf(ctx, "while refreshing kvsubscriber metrics: %v", err)
return
}

earliestTS := hlc.Timestamp{}
for _, protectedTimestamp := range protectedTimestamps {
if earliestTS.IsEmpty() || protectedTimestamp.Less(earliestTS) {
earliestTS = protectedTimestamp
}
}

now := s.clock.PhysicalTime()
s.metrics.ProtectedRecordCount.Update(int64(len(protectedTimestamps)))
s.metrics.UpdateBehindNanos.Update(now.Sub(lastUpdated.GoTime()).Nanoseconds())
if earliestTS.IsEmpty() {
s.metrics.OldestProtectedRecordNanos.Update(0)
} else {
s.metrics.OldestProtectedRecordNanos.Update(now.Sub(earliestTS.GoTime()).Nanoseconds())
}
}

// Subscribe installs a callback that's invoked with whatever span may have seen
// a config update.
func (s *KVSubscriber) Subscribe(fn func(context.Context, roachpb.Span)) {
Expand Down Expand Up @@ -301,9 +408,9 @@ func (s *KVSubscriber) handleCompleteUpdate(
}

func (s *KVSubscriber) setLastUpdatedLocked(ts hlc.Timestamp) {
nanos := timeutil.Since(ts.GoTime()).Nanoseconds()
s.mu.metrics.UpdateBehindNanos.Update(nanos)
s.mu.lastUpdated = ts
nanos := timeutil.Since(s.mu.lastUpdated.GoTime()).Nanoseconds()
s.metrics.UpdateBehindNanos.Update(nanos)
}

func (s *KVSubscriber) handlePartialUpdate(
Expand Down
19 changes: 18 additions & 1 deletion pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"sort"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -30,6 +31,11 @@ func (s *KVSubscriber) TestingRunInner(ctx context.Context) error {
return s.rfc.Run(ctx)
}

// TestingUpdateMetrics exports the inner updateMetrics method for testing purposes.
func (s *KVSubscriber) TestingUpdateMetrics(ctx context.Context) {
s.updateMetrics(ctx)
}

func TestGetProtectionTimestamps(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -75,8 +81,10 @@ func TestGetProtectionTimestamps(t *testing.T) {
// Mark sp43 as excluded from backup.
sp43Cfg.cfg.ExcludeDataFromBackup = true

const timeDeltaFromTS1 = 10
mc := hlc.NewManualClock(ts1.WallTime + timeDeltaFromTS1)
subscriber := New(
nil, /* clock */
hlc.NewClock(mc.UnixNano, time.Nanosecond),
nil, /* rangeFeedFactory */
keys.SpanConfigurationsTableID,
1<<20, /* 1 MB */
Expand Down Expand Up @@ -132,6 +140,15 @@ func TestGetProtectionTimestamps(t *testing.T) {
testCase.test(t, m, subscriber)
})
}

// Test internal metrics. We should expect a protected record count of 3,
// ignoring the one from ts3 since it has both
// {IgnoreIfExcludedFromBackup,ExcludeDataFromBackup} are true. We should
// also observe the right delta between the oldest protected timestamp and
// current wall clock time.
subscriber.TestingUpdateMetrics(ctx)
require.Equal(t, int64(3), subscriber.metrics.ProtectedRecordCount.Value())
require.Equal(t, int64(timeDeltaFromTS1), subscriber.metrics.OldestProtectedRecordNanos.Value())
}

var _ spanconfig.Store = &manualStore{}
Expand Down
13 changes: 11 additions & 2 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,8 +728,17 @@ var charts = []sectionDescription{
},
Charts: []chartDescription{
{
Title: "KVSubscriber",
Metrics: []string{"spanconfig.kvsubscriber.update_behind_nanos"},
Title: "KVSubscriber Lag Metrics",
Metrics: []string{
"spanconfig.kvsubscriber.oldest_protected_record_nanos",
"spanconfig.kvsubscriber.update_behind_nanos",
},
},
{
Title: "KVSubscriber Protected Record Count",
Metrics: []string{
"spanconfig.kvsubscriber.protected_record_count",
},
},
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import { LineGraph } from "src/views/cluster/components/linegraph";
import { Metric, Axis } from "src/views/shared/components/metricQuery";
import { AxisUnits } from "@cockroachlabs/cluster-ui";

import { GraphDashboardProps } from "./dashboardUtils";
import { GraphDashboardProps, nodeDisplayName } from "./dashboardUtils";

export default function(props: GraphDashboardProps) {
const { storeSources } = props;
const { nodeIDs, nodeSources, storeSources, nodeDisplayNameByID } = props;

return [
<LineGraph title="Queue Processing Failures" sources={storeSources}>
Expand Down Expand Up @@ -212,21 +212,6 @@ export default function(props: GraphDashboardProps) {
</Axis>
</LineGraph>,

<LineGraph title="GC Queue" sources={storeSources}>
<Axis units={AxisUnits.Count} label="actions">
<Metric
name="cr.store.queue.gc.process.success"
title="Successful Actions / sec"
nonNegativeRate
/>
<Metric
name="cr.store.queue.gc.pending"
title="Pending Actions"
downsampleMax
/>
</Axis>
</LineGraph>,

<LineGraph title="Raft Log Queue" sources={storeSources}>
<Axis units={AxisUnits.Count} label="actions">
<Metric
Expand Down Expand Up @@ -286,5 +271,40 @@ export default function(props: GraphDashboardProps) {
/>
</Axis>
</LineGraph>,

<LineGraph title="MVCC GC Queue" sources={storeSources}>
<Axis units={AxisUnits.Count} label="actions">
<Metric
name="cr.store.queue.gc.process.success"
title="Successful Actions / sec"
nonNegativeRate
/>
<Metric
name="cr.store.queue.gc.pending"
title="Pending Actions"
downsampleMax
/>
</Axis>
</LineGraph>,

<LineGraph
title="Protected Timestamp Records"
sources={nodeSources}
tooltip={`Number of protected timestamp records (used by backups, changefeeds, etc. to prevent MVCC GC)`}
>
<Axis units={AxisUnits.Count} label="Records">
{nodeIDs.map(nid => (
<>
<Metric
key={nid}
name="cr.node.spanconfig.kvsubscriber.protected_record_count"
title={nodeDisplayName(nodeDisplayNameByID, nid)}
sources={[nid]}
downsampleMax
/>
</>
))}
</Axis>
</LineGraph>,
];
}