Skip to content

Commit

Permalink
Merge pull request #98796 from irfansharif/backport22.2-98540
Browse files Browse the repository at this point in the history
release-22.2: spanconfig: export metrics for protected timestamp records
  • Loading branch information
irfansharif authored Mar 21, 2023
2 parents 8938010 + 480c6c1 commit fd258fa
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 47 deletions.
2 changes: 1 addition & 1 deletion monitoring/grafana-dashboards/queues.json
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "GC Queue",
"title": "MVCC GC Queue",
"tooltip": {
"shared": true,
"sort": 0,
Expand Down
1 change: 1 addition & 0 deletions pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"//pkg/kv/kvclient/rangefeed/rangefeedbuffer",
"//pkg/kv/kvclient/rangefeed/rangefeedcache",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigstore",
Expand Down
156 changes: 130 additions & 26 deletions pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@ package spanconfigkvsubscriber

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand All @@ -30,12 +33,38 @@ import (

var updateBehindNanos = metric.Metadata{
Name: "spanconfig.kvsubscriber.update_behind_nanos",
Help: "Latency between realtime and the last update received by the KVSubscriber; " +
"represents the staleness of the KVSubscriber, where a flat line means there are no updates being received",
Help: "Difference between the current time and when the KVSubscriber received its last update" +
" (an ever increasing number indicates that we're no longer receiving updates)",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

var protectedRecordCount = metric.Metadata{
Name: "spanconfig.kvsubscriber.protected_record_count",
Help: "Number of protected timestamp records, as seen by KV",
Measurement: "Records",
Unit: metric.Unit_COUNT,
}

var oldestProtectedRecordNanos = metric.Metadata{
Name: "spanconfig.kvsubscriber.oldest_protected_record_nanos",
Help: "Difference between the current time and the oldest protected timestamp" +
" (sudden drops indicate a record being released; an ever increasing" +
" number indicates that the oldest record is around and preventing GC if > configured GC TTL)",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

// metricsPollerInterval determines the frequency at which we refresh internal
// metrics.
var metricsPollerInterval = settings.RegisterDurationSetting(
settings.SystemOnly,
"spanconfig.kvsubscriber.metrics_poller_interval",
"the interval at which the spanconfig.kvsubscriber.* metrics are kept up-to-date; set to 0 to disable the mechanism",
5*time.Second,
settings.NonNegativeDuration,
)

// KVSubscriber is used to subscribe to global span configuration changes. It's
// a concrete implementation of the spanconfig.KVSubscriber interface.
//
Expand Down Expand Up @@ -73,24 +102,21 @@ var updateBehindNanos = metric.Metadata{
// we could diff the two data structures and only emit targeted updates.
//
// [1]: For a given key k, it's config may be stored as part of a larger span S
//
// (where S.start <= k < S.end). It's possible for S to get deleted and
// replaced with sub-spans S1...SN in the same transaction if the span is
// getting split. When applying these updates, we need to make sure to
// process the deletion event for S before processing S1...SN.
// (where S.start <= k < S.end). It's possible for S to get deleted and
// replaced with sub-spans S1...SN in the same transaction if the span is
// getting split. When applying these updates, we need to make sure to
// process the deletion event for S before processing S1...SN.
//
// [2]: In our example above deleting the config for S and adding configs for
//
// S1...SN we want to make sure that we apply the full set of updates all
// at once -- lest we expose the intermediate state where the config for S
// was deleted but the configs for S1...SN were not yet applied.
// S1...SN we want to make sure that we apply the full set of updates all
// at once -- lest we expose the intermediate state where the config for S
// was deleted but the configs for S1...SN were not yet applied.
//
// [3]: TODO(irfansharif): When tearing down the subscriber due to underlying
//
// errors, we could also capture a checkpoint to use the next time the
// subscriber is established. That way we can avoid the full initial scan
// over the span configuration state and simply pick up where we left off
// with our existing spanconfig.Store.
// errors, we could also capture a checkpoint to use the next time the
// subscriber is established. That way we can avoid the full initial scan
// over the span configuration state and simply pick up where we left off
// with our existing spanconfig.Store.
type KVSubscriber struct {
fallback roachpb.SpanConfig
knobs *spanconfig.TestingKnobs
Expand All @@ -101,7 +127,6 @@ type KVSubscriber struct {
mu struct { // serializes between Start and external threads
syncutil.RWMutex
lastUpdated hlc.Timestamp
metrics Metrics
// internal is the internal spanconfig.Store maintained by the
// KVSubscriber. A read-only view over this store is exposed as part of
// the interface. When re-subscribing, a fresh spanconfig.Store is
Expand All @@ -111,21 +136,34 @@ type KVSubscriber struct {
internal spanconfig.Store
handlers []handler
}

clock *hlc.Clock
metrics *Metrics
}

var _ spanconfig.KVSubscriber = &KVSubscriber{}

// Metrics are the Metrics associated with an instance of the
// KVSubscriber.
type Metrics struct {
// UpdateBehindNanos is the latency between realtime and the last update
// received by the KVSubscriber. This metric should be interpreted as a
// measure of the KVSubscribers' staleness.
// UpdateBehindNanos is the difference between the current time and when the
// last update was received by the KVSubscriber. This metric should be
// interpreted as a measure of the KVSubscribers' staleness.
UpdateBehindNanos *metric.Gauge
// ProtectedRecordCount is total number of protected timestamp records, as
// seen by KV.
ProtectedRecordCount *metric.Gauge
// OldestProtectedRecord is between the current time and the oldest
// protected timestamp.
OldestProtectedRecordNanos *metric.Gauge
}

func makeKVSubscriberMetrics() Metrics {
return Metrics{UpdateBehindNanos: metric.NewGauge(updateBehindNanos)}
func makeKVSubscriberMetrics() *Metrics {
return &Metrics{
UpdateBehindNanos: metric.NewGauge(updateBehindNanos),
ProtectedRecordCount: metric.NewGauge(protectedRecordCount),
OldestProtectedRecordNanos: metric.NewGauge(oldestProtectedRecordNanos),
}
}

// MetricStruct implements the metric.Struct interface.
Expand Down Expand Up @@ -167,6 +205,7 @@ func New(
fallback: fallback,
knobs: knobs,
settings: settings,
clock: clock,
}
var rfCacheKnobs *rangefeedcache.TestingKnobs
if knobs != nil {
Expand All @@ -183,9 +222,9 @@ func New(
rfCacheKnobs,
)
s.mu.internal = spanConfigStore
s.mu.metrics = makeKVSubscriberMetrics()
s.metrics = makeKVSubscriberMetrics()
if registry != nil {
registry.AddMetricStruct(&s.mu.metrics)
registry.AddMetricStruct(s.metrics)
}
return s
}
Expand All @@ -206,9 +245,74 @@ func New(
// the exported StoreReader will be up-to-date and continue to be
// incrementally maintained.
func (s *KVSubscriber) Start(ctx context.Context, stopper *stop.Stopper) error {
if err := stopper.RunAsyncTask(ctx, "kvsubscriber-metrics",
func(ctx context.Context) {
settingChangeCh := make(chan struct{}, 1)
metricsPollerInterval.SetOnChange(
&s.settings.SV, func(ctx context.Context) {
select {
case settingChangeCh <- struct{}{}:
default:
}
})

timer := timeutil.NewTimer()
defer timer.Stop()

for {
interval := metricsPollerInterval.Get(&s.settings.SV)
if interval > 0 {
timer.Reset(interval)
} else {
// Disable the mechanism.
timer.Stop()
timer = timeutil.NewTimer()
}
select {
case <-timer.C:
timer.Read = true
s.updateMetrics(ctx)
continue

case <-settingChangeCh:
// Loop around to use the updated timer.
continue

case <-stopper.ShouldQuiesce():
return
}
}
}); err != nil {
return err
}

return rangefeedcache.Start(ctx, stopper, s.rfc, nil /* onError */)
}

func (s *KVSubscriber) updateMetrics(ctx context.Context) {
protectedTimestamps, lastUpdated, err := s.GetProtectionTimestamps(ctx, keys.EverythingSpan)
if err != nil {
log.Errorf(ctx, "while refreshing kvsubscriber metrics: %v", err)
return
}

earliestTS := hlc.Timestamp{}
for _, protectedTimestamp := range protectedTimestamps {
if earliestTS.IsEmpty() || protectedTimestamp.Less(earliestTS) {
earliestTS = protectedTimestamp
}
}

now := s.clock.PhysicalTime()
s.metrics.ProtectedRecordCount.Update(int64(len(protectedTimestamps)))
s.metrics.UpdateBehindNanos.Update(now.Sub(lastUpdated.GoTime()).Nanoseconds())
if earliestTS.IsEmpty() {
s.metrics.OldestProtectedRecordNanos.Update(0)
} else {
s.metrics.OldestProtectedRecordNanos.Update(now.Sub(earliestTS.GoTime()).Nanoseconds())
}
}

// Subscribe installs a callback that's invoked with whatever span may have seen
// a config update.
func (s *KVSubscriber) Subscribe(fn func(context.Context, roachpb.Span)) {
Expand Down Expand Up @@ -307,9 +411,9 @@ func (s *KVSubscriber) handleCompleteUpdate(
}

func (s *KVSubscriber) setLastUpdatedLocked(ts hlc.Timestamp) {
nanos := timeutil.Since(ts.GoTime()).Nanoseconds()
s.mu.metrics.UpdateBehindNanos.Update(nanos)
s.mu.lastUpdated = ts
nanos := timeutil.Since(s.mu.lastUpdated.GoTime()).Nanoseconds()
s.metrics.UpdateBehindNanos.Update(nanos)
}

func (s *KVSubscriber) handlePartialUpdate(
Expand Down
21 changes: 20 additions & 1 deletion pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"sort"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -30,6 +31,11 @@ func (s *KVSubscriber) TestingRunInner(ctx context.Context) error {
return s.rfc.Run(ctx)
}

// TestingUpdateMetrics exports the inner updateMetrics method for testing purposes.
func (s *KVSubscriber) TestingUpdateMetrics(ctx context.Context) {
s.updateMetrics(ctx)
}

func TestGetProtectionTimestamps(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -75,8 +81,12 @@ func TestGetProtectionTimestamps(t *testing.T) {
// Mark sp43 as excluded from backup.
sp43Cfg.cfg.ExcludeDataFromBackup = true

const timeDeltaFromTS1 = 10
mt := timeutil.NewManualTime(ts1.GoTime())
mt.AdvanceTo(ts1.Add(timeDeltaFromTS1, 0).GoTime())

subscriber := New(
nil, /* clock */
hlc.NewClock(mt, time.Nanosecond),
nil, /* rangeFeedFactory */
keys.SpanConfigurationsTableID,
1<<20, /* 1 MB */
Expand Down Expand Up @@ -132,6 +142,15 @@ func TestGetProtectionTimestamps(t *testing.T) {
testCase.test(t, m, subscriber)
})
}

// Test internal metrics. We should expect a protected record count of 3,
// ignoring the one from ts3 since it has both
// {IgnoreIfExcludedFromBackup,ExcludeDataFromBackup} are true. We should
// also observe the right delta between the oldest protected timestamp and
// current wall clock time.
subscriber.TestingUpdateMetrics(ctx)
require.Equal(t, int64(3), subscriber.metrics.ProtectedRecordCount.Value())
require.Equal(t, int64(timeDeltaFromTS1), subscriber.metrics.OldestProtectedRecordNanos.Value())
}

var _ spanconfig.Store = &manualStore{}
Expand Down
13 changes: 11 additions & 2 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,8 +771,17 @@ var charts = []sectionDescription{
},
Charts: []chartDescription{
{
Title: "KVSubscriber",
Metrics: []string{"spanconfig.kvsubscriber.update_behind_nanos"},
Title: "KVSubscriber Lag Metrics",
Metrics: []string{
"spanconfig.kvsubscriber.oldest_protected_record_nanos",
"spanconfig.kvsubscriber.update_behind_nanos",
},
},
{
Title: "KVSubscriber Protected Record Count",
Metrics: []string{
"spanconfig.kvsubscriber.protected_record_count",
},
},
},
},
Expand Down
Loading

0 comments on commit fd258fa

Please sign in to comment.