Skip to content

Commit

Permalink
spanconfig: export metrics for protected timestamp records
Browse files Browse the repository at this point in the history
This commit introduces two new metrics, to help understand the effects
of protected timestamps:
- spanconfig.kvsubscriber.protected_record_count, which exports the
  number of protected timestamp records as seen by KV.
- spanconfig.kvsubscriber.oldest_protected_record_nanos, which exports
  difference between the current time and the oldest protected
  timestamp. Sudden drops indicate a record being released; an
  ever-increasing duration would indicate the oldest record sticking
  around and preventing GC if > the configured GC TTL.

Fixes cockroachdb#98532 (as a backportable alternative to
a0d6c19 for 22.1, 22.2).

Release note: None
  • Loading branch information
irfansharif committed Mar 16, 2023
1 parent 8699c74 commit 480c6c1
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 47 deletions.
2 changes: 1 addition & 1 deletion monitoring/grafana-dashboards/queues.json
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "GC Queue",
"title": "MVCC GC Queue",
"tooltip": {
"shared": true,
"sort": 0,
Expand Down
1 change: 1 addition & 0 deletions pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"//pkg/kv/kvclient/rangefeed/rangefeedbuffer",
"//pkg/kv/kvclient/rangefeed/rangefeedcache",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigstore",
Expand Down
156 changes: 130 additions & 26 deletions pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@ package spanconfigkvsubscriber

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand All @@ -30,12 +33,38 @@ import (

var updateBehindNanos = metric.Metadata{
Name: "spanconfig.kvsubscriber.update_behind_nanos",
Help: "Latency between realtime and the last update received by the KVSubscriber; " +
"represents the staleness of the KVSubscriber, where a flat line means there are no updates being received",
Help: "Difference between the current time and when the KVSubscriber received its last update" +
" (an ever increasing number indicates that we're no longer receiving updates)",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

var protectedRecordCount = metric.Metadata{
Name: "spanconfig.kvsubscriber.protected_record_count",
Help: "Number of protected timestamp records, as seen by KV",
Measurement: "Records",
Unit: metric.Unit_COUNT,
}

var oldestProtectedRecordNanos = metric.Metadata{
Name: "spanconfig.kvsubscriber.oldest_protected_record_nanos",
Help: "Difference between the current time and the oldest protected timestamp" +
" (sudden drops indicate a record being released; an ever increasing" +
" number indicates that the oldest record is around and preventing GC if > configured GC TTL)",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

// metricsPollerInterval determines the frequency at which we refresh internal
// metrics.
var metricsPollerInterval = settings.RegisterDurationSetting(
settings.SystemOnly,
"spanconfig.kvsubscriber.metrics_poller_interval",
"the interval at which the spanconfig.kvsubscriber.* metrics are kept up-to-date; set to 0 to disable the mechanism",
5*time.Second,
settings.NonNegativeDuration,
)

// KVSubscriber is used to subscribe to global span configuration changes. It's
// a concrete implementation of the spanconfig.KVSubscriber interface.
//
Expand Down Expand Up @@ -73,24 +102,21 @@ var updateBehindNanos = metric.Metadata{
// we could diff the two data structures and only emit targeted updates.
//
// [1]: For a given key k, it's config may be stored as part of a larger span S
//
// (where S.start <= k < S.end). It's possible for S to get deleted and
// replaced with sub-spans S1...SN in the same transaction if the span is
// getting split. When applying these updates, we need to make sure to
// process the deletion event for S before processing S1...SN.
// (where S.start <= k < S.end). It's possible for S to get deleted and
// replaced with sub-spans S1...SN in the same transaction if the span is
// getting split. When applying these updates, we need to make sure to
// process the deletion event for S before processing S1...SN.
//
// [2]: In our example above deleting the config for S and adding configs for
//
// S1...SN we want to make sure that we apply the full set of updates all
// at once -- lest we expose the intermediate state where the config for S
// was deleted but the configs for S1...SN were not yet applied.
// S1...SN we want to make sure that we apply the full set of updates all
// at once -- lest we expose the intermediate state where the config for S
// was deleted but the configs for S1...SN were not yet applied.
//
// [3]: TODO(irfansharif): When tearing down the subscriber due to underlying
//
// errors, we could also capture a checkpoint to use the next time the
// subscriber is established. That way we can avoid the full initial scan
// over the span configuration state and simply pick up where we left off
// with our existing spanconfig.Store.
// errors, we could also capture a checkpoint to use the next time the
// subscriber is established. That way we can avoid the full initial scan
// over the span configuration state and simply pick up where we left off
// with our existing spanconfig.Store.
type KVSubscriber struct {
fallback roachpb.SpanConfig
knobs *spanconfig.TestingKnobs
Expand All @@ -101,7 +127,6 @@ type KVSubscriber struct {
mu struct { // serializes between Start and external threads
syncutil.RWMutex
lastUpdated hlc.Timestamp
metrics Metrics
// internal is the internal spanconfig.Store maintained by the
// KVSubscriber. A read-only view over this store is exposed as part of
// the interface. When re-subscribing, a fresh spanconfig.Store is
Expand All @@ -111,21 +136,34 @@ type KVSubscriber struct {
internal spanconfig.Store
handlers []handler
}

clock *hlc.Clock
metrics *Metrics
}

var _ spanconfig.KVSubscriber = &KVSubscriber{}

// Metrics are the Metrics associated with an instance of the
// KVSubscriber.
type Metrics struct {
// UpdateBehindNanos is the latency between realtime and the last update
// received by the KVSubscriber. This metric should be interpreted as a
// measure of the KVSubscribers' staleness.
// UpdateBehindNanos is the difference between the current time and when the
// last update was received by the KVSubscriber. This metric should be
// interpreted as a measure of the KVSubscribers' staleness.
UpdateBehindNanos *metric.Gauge
// ProtectedRecordCount is total number of protected timestamp records, as
// seen by KV.
ProtectedRecordCount *metric.Gauge
// OldestProtectedRecord is between the current time and the oldest
// protected timestamp.
OldestProtectedRecordNanos *metric.Gauge
}

func makeKVSubscriberMetrics() Metrics {
return Metrics{UpdateBehindNanos: metric.NewGauge(updateBehindNanos)}
func makeKVSubscriberMetrics() *Metrics {
return &Metrics{
UpdateBehindNanos: metric.NewGauge(updateBehindNanos),
ProtectedRecordCount: metric.NewGauge(protectedRecordCount),
OldestProtectedRecordNanos: metric.NewGauge(oldestProtectedRecordNanos),
}
}

// MetricStruct implements the metric.Struct interface.
Expand Down Expand Up @@ -167,6 +205,7 @@ func New(
fallback: fallback,
knobs: knobs,
settings: settings,
clock: clock,
}
var rfCacheKnobs *rangefeedcache.TestingKnobs
if knobs != nil {
Expand All @@ -183,9 +222,9 @@ func New(
rfCacheKnobs,
)
s.mu.internal = spanConfigStore
s.mu.metrics = makeKVSubscriberMetrics()
s.metrics = makeKVSubscriberMetrics()
if registry != nil {
registry.AddMetricStruct(&s.mu.metrics)
registry.AddMetricStruct(s.metrics)
}
return s
}
Expand All @@ -206,9 +245,74 @@ func New(
// the exported StoreReader will be up-to-date and continue to be
// incrementally maintained.
func (s *KVSubscriber) Start(ctx context.Context, stopper *stop.Stopper) error {
if err := stopper.RunAsyncTask(ctx, "kvsubscriber-metrics",
func(ctx context.Context) {
settingChangeCh := make(chan struct{}, 1)
metricsPollerInterval.SetOnChange(
&s.settings.SV, func(ctx context.Context) {
select {
case settingChangeCh <- struct{}{}:
default:
}
})

timer := timeutil.NewTimer()
defer timer.Stop()

for {
interval := metricsPollerInterval.Get(&s.settings.SV)
if interval > 0 {
timer.Reset(interval)
} else {
// Disable the mechanism.
timer.Stop()
timer = timeutil.NewTimer()
}
select {
case <-timer.C:
timer.Read = true
s.updateMetrics(ctx)
continue

case <-settingChangeCh:
// Loop around to use the updated timer.
continue

case <-stopper.ShouldQuiesce():
return
}
}
}); err != nil {
return err
}

return rangefeedcache.Start(ctx, stopper, s.rfc, nil /* onError */)
}

func (s *KVSubscriber) updateMetrics(ctx context.Context) {
protectedTimestamps, lastUpdated, err := s.GetProtectionTimestamps(ctx, keys.EverythingSpan)
if err != nil {
log.Errorf(ctx, "while refreshing kvsubscriber metrics: %v", err)
return
}

earliestTS := hlc.Timestamp{}
for _, protectedTimestamp := range protectedTimestamps {
if earliestTS.IsEmpty() || protectedTimestamp.Less(earliestTS) {
earliestTS = protectedTimestamp
}
}

now := s.clock.PhysicalTime()
s.metrics.ProtectedRecordCount.Update(int64(len(protectedTimestamps)))
s.metrics.UpdateBehindNanos.Update(now.Sub(lastUpdated.GoTime()).Nanoseconds())
if earliestTS.IsEmpty() {
s.metrics.OldestProtectedRecordNanos.Update(0)
} else {
s.metrics.OldestProtectedRecordNanos.Update(now.Sub(earliestTS.GoTime()).Nanoseconds())
}
}

// Subscribe installs a callback that's invoked with whatever span may have seen
// a config update.
func (s *KVSubscriber) Subscribe(fn func(context.Context, roachpb.Span)) {
Expand Down Expand Up @@ -307,9 +411,9 @@ func (s *KVSubscriber) handleCompleteUpdate(
}

func (s *KVSubscriber) setLastUpdatedLocked(ts hlc.Timestamp) {
nanos := timeutil.Since(ts.GoTime()).Nanoseconds()
s.mu.metrics.UpdateBehindNanos.Update(nanos)
s.mu.lastUpdated = ts
nanos := timeutil.Since(s.mu.lastUpdated.GoTime()).Nanoseconds()
s.metrics.UpdateBehindNanos.Update(nanos)
}

func (s *KVSubscriber) handlePartialUpdate(
Expand Down
21 changes: 20 additions & 1 deletion pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"sort"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -30,6 +31,11 @@ func (s *KVSubscriber) TestingRunInner(ctx context.Context) error {
return s.rfc.Run(ctx)
}

// TestingUpdateMetrics exports the inner updateMetrics method for testing purposes.
func (s *KVSubscriber) TestingUpdateMetrics(ctx context.Context) {
s.updateMetrics(ctx)
}

func TestGetProtectionTimestamps(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -75,8 +81,12 @@ func TestGetProtectionTimestamps(t *testing.T) {
// Mark sp43 as excluded from backup.
sp43Cfg.cfg.ExcludeDataFromBackup = true

const timeDeltaFromTS1 = 10
mt := timeutil.NewManualTime(ts1.GoTime())
mt.AdvanceTo(ts1.Add(timeDeltaFromTS1, 0).GoTime())

subscriber := New(
nil, /* clock */
hlc.NewClock(mt, time.Nanosecond),
nil, /* rangeFeedFactory */
keys.SpanConfigurationsTableID,
1<<20, /* 1 MB */
Expand Down Expand Up @@ -132,6 +142,15 @@ func TestGetProtectionTimestamps(t *testing.T) {
testCase.test(t, m, subscriber)
})
}

// Test internal metrics. We should expect a protected record count of 3,
// ignoring the one from ts3 since it has both
// {IgnoreIfExcludedFromBackup,ExcludeDataFromBackup} are true. We should
// also observe the right delta between the oldest protected timestamp and
// current wall clock time.
subscriber.TestingUpdateMetrics(ctx)
require.Equal(t, int64(3), subscriber.metrics.ProtectedRecordCount.Value())
require.Equal(t, int64(timeDeltaFromTS1), subscriber.metrics.OldestProtectedRecordNanos.Value())
}

var _ spanconfig.Store = &manualStore{}
Expand Down
13 changes: 11 additions & 2 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,8 +771,17 @@ var charts = []sectionDescription{
},
Charts: []chartDescription{
{
Title: "KVSubscriber",
Metrics: []string{"spanconfig.kvsubscriber.update_behind_nanos"},
Title: "KVSubscriber Lag Metrics",
Metrics: []string{
"spanconfig.kvsubscriber.oldest_protected_record_nanos",
"spanconfig.kvsubscriber.update_behind_nanos",
},
},
{
Title: "KVSubscriber Protected Record Count",
Metrics: []string{
"spanconfig.kvsubscriber.protected_record_count",
},
},
},
},
Expand Down
Loading

0 comments on commit 480c6c1

Please sign in to comment.