Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
97820: schemachanger: Enable ALTER PRIMARY KEY USING HASH r=Xiang-Gu a=Xiang-Gu

We enable ALTER PRIMARY KEY USING HASH in the declarative schema changer. This commit also cleaned up some common logic to be re-used with CREATE INDEX ... USING HASH.

Note that we still fallback to legacy schema changer if the old primary key is on the implicit `rowid` column, because we don't support `ADD COLUMN, DROP COLUMN` in one statement yet (adding a shard column and dropping the `rowid` column).

Fixes: cockroachdb#96730
Epic: None

Release note: None

98540: spanconfig: export metrics for protected timestamp records r=irfansharif a=irfansharif

This commit introduces two new metrics, to help understand the effects of protected timestamps:
- `spanconfig.kvsubscriber.protected_record_count`, which exports the number of protected timestamp records as seen by KV.
- `spanconfig.kvsubscriber.oldest_protected_record_nanos`, which exports difference between the current time and the oldest protected timestamp. Sudden drops indicate a record being released; an ever-increasing duration would indicate the oldest record sticking around and preventing GC if > the configured GC TTL.

Fixes cockroachdb#98532 (as a backportable alternative to a0d6c19 for 22.1, 22.2).

Release note: None

Co-authored-by: Xiang Gu <[email protected]>
Co-authored-by: irfan sharif <[email protected]>
  • Loading branch information
3 people committed Mar 14, 2023
3 parents 024da43 + 2c723f9 + 8be8d9d commit a79338a
Show file tree
Hide file tree
Showing 44 changed files with 7,240 additions and 463 deletions.
2 changes: 1 addition & 1 deletion monitoring/grafana-dashboards/queues.json
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "GC Queue",
"title": "MVCC GC Queue",
"tooltip": {
"shared": true,
"sort": 0,
Expand Down
2 changes: 1 addition & 1 deletion monitoring/splunk-dashboard/queues.xml
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ where index=$index_name$ span=10s</query>
</row>
<row>
<panel>
<title>GC Queue</title>
<title>MVCC GC Queue</title>
<chart>
<search>
<query>| mstats rate_sum(queue_gc_process_success) as "Successful Actions / sec",
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/schemachangerccl/backup_base_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/ccl/schemachangerccl/backup_base_mixed_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/cmd/roachprod/grafana/configs/queues.json
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "GC Queue",
"title": "MVCC GC Queue",
"tooltip": {
"shared": true,
"sort": 0,
Expand Down
1 change: 1 addition & 0 deletions pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//pkg/kv/kvclient/rangefeed/rangefeedcache",
"//pkg/kv/kvpb",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigstore",
Expand Down
156 changes: 130 additions & 26 deletions pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@ package spanconfigkvsubscriber

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand All @@ -30,12 +33,38 @@ import (

var updateBehindNanos = metric.Metadata{
Name: "spanconfig.kvsubscriber.update_behind_nanos",
Help: "Latency between realtime and the last update received by the KVSubscriber; " +
"represents the staleness of the KVSubscriber, where a flat line means there are no updates being received",
Help: "Difference between the current time and when the KVSubscriber received its last update" +
" (an ever increasing number indicates that we're no longer receiving updates)",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

var protectedRecordCount = metric.Metadata{
Name: "spanconfig.kvsubscriber.protected_record_count",
Help: "Number of protected timestamp records, as seen by KV",
Measurement: "Records",
Unit: metric.Unit_COUNT,
}

var oldestProtectedRecordNanos = metric.Metadata{
Name: "spanconfig.kvsubscriber.oldest_protected_record_nanos",
Help: "Difference between the current time and the oldest protected timestamp" +
" (sudden drops indicate a record being released; an ever increasing" +
" number indicates that the oldest record is around and preventing GC if > configured GC TTL)",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

// metricsPollerInterval determines the frequency at which we refresh internal
// metrics.
var metricsPollerInterval = settings.RegisterDurationSetting(
settings.SystemOnly,
"spanconfig.kvsubscriber.metrics_poller_interval",
"the interval at which the spanconfig.kvsubscriber.* metrics are kept up-to-date; set to 0 to disable the mechanism",
5*time.Second,
settings.NonNegativeDuration,
)

// KVSubscriber is used to subscribe to global span configuration changes. It's
// a concrete implementation of the spanconfig.KVSubscriber interface.
//
Expand Down Expand Up @@ -73,24 +102,21 @@ var updateBehindNanos = metric.Metadata{
// we could diff the two data structures and only emit targeted updates.
//
// [1]: For a given key k, it's config may be stored as part of a larger span S
//
// (where S.start <= k < S.end). It's possible for S to get deleted and
// replaced with sub-spans S1...SN in the same transaction if the span is
// getting split. When applying these updates, we need to make sure to
// process the deletion event for S before processing S1...SN.
// (where S.start <= k < S.end). It's possible for S to get deleted and
// replaced with sub-spans S1...SN in the same transaction if the span is
// getting split. When applying these updates, we need to make sure to
// process the deletion event for S before processing S1...SN.
//
// [2]: In our example above deleting the config for S and adding configs for
//
// S1...SN we want to make sure that we apply the full set of updates all
// at once -- lest we expose the intermediate state where the config for S
// was deleted but the configs for S1...SN were not yet applied.
// S1...SN we want to make sure that we apply the full set of updates all
// at once -- lest we expose the intermediate state where the config for S
// was deleted but the configs for S1...SN were not yet applied.
//
// [3]: TODO(irfansharif): When tearing down the subscriber due to underlying
//
// errors, we could also capture a checkpoint to use the next time the
// subscriber is established. That way we can avoid the full initial scan
// over the span configuration state and simply pick up where we left off
// with our existing spanconfig.Store.
// errors, we could also capture a checkpoint to use the next time the
// subscriber is established. That way we can avoid the full initial scan
// over the span configuration state and simply pick up where we left off
// with our existing spanconfig.Store.
type KVSubscriber struct {
fallback roachpb.SpanConfig
knobs *spanconfig.TestingKnobs
Expand All @@ -101,7 +127,6 @@ type KVSubscriber struct {
mu struct { // serializes between Start and external threads
syncutil.RWMutex
lastUpdated hlc.Timestamp
metrics Metrics
// internal is the internal spanconfig.Store maintained by the
// KVSubscriber. A read-only view over this store is exposed as part of
// the interface. When re-subscribing, a fresh spanconfig.Store is
Expand All @@ -111,21 +136,34 @@ type KVSubscriber struct {
internal spanconfig.Store
handlers []handler
}

clock *hlc.Clock
metrics *Metrics
}

var _ spanconfig.KVSubscriber = &KVSubscriber{}

// Metrics are the Metrics associated with an instance of the
// KVSubscriber.
type Metrics struct {
// UpdateBehindNanos is the latency between realtime and the last update
// received by the KVSubscriber. This metric should be interpreted as a
// measure of the KVSubscribers' staleness.
// UpdateBehindNanos is the difference between the current time and when the
// last update was received by the KVSubscriber. This metric should be
// interpreted as a measure of the KVSubscribers' staleness.
UpdateBehindNanos *metric.Gauge
// ProtectedRecordCount is total number of protected timestamp records, as
// seen by KV.
ProtectedRecordCount *metric.Gauge
// OldestProtectedRecord is between the current time and the oldest
// protected timestamp.
OldestProtectedRecordNanos *metric.Gauge
}

func makeKVSubscriberMetrics() Metrics {
return Metrics{UpdateBehindNanos: metric.NewGauge(updateBehindNanos)}
func makeKVSubscriberMetrics() *Metrics {
return &Metrics{
UpdateBehindNanos: metric.NewGauge(updateBehindNanos),
ProtectedRecordCount: metric.NewGauge(protectedRecordCount),
OldestProtectedRecordNanos: metric.NewGauge(oldestProtectedRecordNanos),
}
}

// MetricStruct implements the metric.Struct interface.
Expand Down Expand Up @@ -167,6 +205,7 @@ func New(
fallback: fallback,
knobs: knobs,
settings: settings,
clock: clock,
}
var rfCacheKnobs *rangefeedcache.TestingKnobs
if knobs != nil {
Expand All @@ -183,9 +222,9 @@ func New(
rfCacheKnobs,
)
s.mu.internal = spanConfigStore
s.mu.metrics = makeKVSubscriberMetrics()
s.metrics = makeKVSubscriberMetrics()
if registry != nil {
registry.AddMetricStruct(&s.mu.metrics)
registry.AddMetricStruct(s.metrics)
}
return s
}
Expand All @@ -206,9 +245,74 @@ func New(
// the exported StoreReader will be up-to-date and continue to be
// incrementally maintained.
func (s *KVSubscriber) Start(ctx context.Context, stopper *stop.Stopper) error {
if err := stopper.RunAsyncTask(ctx, "kvsubscriber-metrics",
func(ctx context.Context) {
settingChangeCh := make(chan struct{}, 1)
metricsPollerInterval.SetOnChange(
&s.settings.SV, func(ctx context.Context) {
select {
case settingChangeCh <- struct{}{}:
default:
}
})

timer := timeutil.NewTimer()
defer timer.Stop()

for {
interval := metricsPollerInterval.Get(&s.settings.SV)
if interval > 0 {
timer.Reset(interval)
} else {
// Disable the mechanism.
timer.Stop()
timer = timeutil.NewTimer()
}
select {
case <-timer.C:
timer.Read = true
s.updateMetrics(ctx)
continue

case <-settingChangeCh:
// Loop around to use the updated timer.
continue

case <-stopper.ShouldQuiesce():
return
}
}
}); err != nil {
return err
}

return rangefeedcache.Start(ctx, stopper, s.rfc, nil /* onError */)
}

func (s *KVSubscriber) updateMetrics(ctx context.Context) {
protectedTimestamps, lastUpdated, err := s.GetProtectionTimestamps(ctx, keys.EverythingSpan)
if err != nil {
log.Errorf(ctx, "while refreshing kvsubscriber metrics: %v", err)
return
}

earliestTS := hlc.Timestamp{}
for _, protectedTimestamp := range protectedTimestamps {
if earliestTS.IsEmpty() || protectedTimestamp.Less(earliestTS) {
earliestTS = protectedTimestamp
}
}

now := s.clock.PhysicalTime()
s.metrics.ProtectedRecordCount.Update(int64(len(protectedTimestamps)))
s.metrics.UpdateBehindNanos.Update(now.Sub(lastUpdated.GoTime()).Nanoseconds())
if earliestTS.IsEmpty() {
s.metrics.OldestProtectedRecordNanos.Update(0)
} else {
s.metrics.OldestProtectedRecordNanos.Update(now.Sub(earliestTS.GoTime()).Nanoseconds())
}
}

// Subscribe installs a callback that's invoked with whatever span may have seen
// a config update.
func (s *KVSubscriber) Subscribe(fn func(context.Context, roachpb.Span)) {
Expand Down Expand Up @@ -307,9 +411,9 @@ func (s *KVSubscriber) handleCompleteUpdate(
}

func (s *KVSubscriber) setLastUpdatedLocked(ts hlc.Timestamp) {
nanos := timeutil.Since(ts.GoTime()).Nanoseconds()
s.mu.metrics.UpdateBehindNanos.Update(nanos)
s.mu.lastUpdated = ts
nanos := timeutil.Since(s.mu.lastUpdated.GoTime()).Nanoseconds()
s.metrics.UpdateBehindNanos.Update(nanos)
}

func (s *KVSubscriber) handlePartialUpdate(
Expand Down
20 changes: 19 additions & 1 deletion pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ func (s *KVSubscriber) TestingRunInner(ctx context.Context) error {
return s.rfc.Run(ctx)
}

// TestingUpdateMetrics exports the inner updateMetrics method for testing purposes.
func (s *KVSubscriber) TestingUpdateMetrics(ctx context.Context) {
s.updateMetrics(ctx)
}

func TestGetProtectionTimestamps(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -75,8 +80,12 @@ func TestGetProtectionTimestamps(t *testing.T) {
// Mark sp43 as excluded from backup.
sp43Cfg.cfg.ExcludeDataFromBackup = true

const timeDeltaFromTS1 = 10
mt := timeutil.NewManualTime(ts1.GoTime())
mt.AdvanceTo(ts1.Add(timeDeltaFromTS1, 0).GoTime())

subscriber := New(
nil, /* clock */
hlc.NewClockForTesting(mt),
nil, /* rangeFeedFactory */
keys.SpanConfigurationsTableID,
1<<20, /* 1 MB */
Expand Down Expand Up @@ -132,6 +141,15 @@ func TestGetProtectionTimestamps(t *testing.T) {
testCase.test(t, m, subscriber)
})
}

// Test internal metrics. We should expect a protected record count of 3,
// ignoring the one from ts3 since it has both
// {IgnoreIfExcludedFromBackup,ExcludeDataFromBackup} are true. We should
// also observe the right delta between the oldest protected timestamp and
// current wall clock time.
subscriber.TestingUpdateMetrics(ctx)
require.Equal(t, int64(3), subscriber.metrics.ProtectedRecordCount.Value())
require.Equal(t, int64(timeDeltaFromTS1), subscriber.metrics.OldestProtectedRecordNanos.Value())
}

var _ spanconfig.Store = &manualStore{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1481,7 +1481,7 @@ func validateConstraintNameIsNotUsed(
if idx.Dropped() {
return false, pgerror.Newf(pgcode.DuplicateObject, "constraint with name %q already exists and is being dropped, try again later", name)
}
return false, pgerror.Newf(pgcode.DuplicateObject, "constraint with name %q already exists", name)
return false, pgerror.Newf(pgcode.DuplicateRelation, "constraint with name %q already exists", name)

default:
return false, errors.AssertionFailedf(
Expand Down
Loading

0 comments on commit a79338a

Please sign in to comment.