From 83a2ce544b7cb9d3306e1eb00f53bb03205512db Mon Sep 17 00:00:00 2001 From: Oliver Tan Date: Fri, 27 Aug 2021 12:08:06 +1000 Subject: [PATCH 1/4] server: buff timeout of server_test Release justification: test only change Release note: None --- pkg/server/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index e7ae4c73729a..4476254db097 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -259,6 +259,7 @@ go_library( go_test( name = "server_test", + size = "medium", srcs = [ "addjoin_test.go", "admin_cluster_test.go", From 2e708532bf0055eb816a5635366c2821ca97ef75 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Mon, 30 Aug 2021 17:17:20 -0400 Subject: [PATCH 2/4] kv/kvserver/rangefeed: fix catchupIter construction synchronization The catchupIter could have been constructed after the state of the underlying store has changed. In general this doesn't seem like a disaster unless the range has been removed or the gc threshold has changed and data is gone. So, maybe it is a disaster. Release justification: fixes for high-priority or high-severity bugs in existing functionality Release note: None --- pkg/kv/kvserver/rangefeed/processor.go | 5 ++ pkg/kv/kvserver/rangefeed/registry.go | 54 +++++++++++++++++++--- pkg/kv/kvserver/rangefeed/registry_test.go | 26 ++++++----- pkg/kv/kvserver/replica_rangefeed.go | 9 +++- 4 files changed, 74 insertions(+), 20 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index bc1f835116ed..dbc82d44c5c1 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -247,6 +247,11 @@ func (p *Processor) run( log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span) } + // Construct the catchupIter before notifying the registration that it + // has been registered. Note that if the catchUpScan is never run, then + // the iterator constructed here will be closed in disconnect. + r.maybeConstructCatchUpIter() + // Add the new registration to the registry. p.reg.Register(&r) diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index db5c710f8c4e..9ecb362d3192 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -53,11 +53,19 @@ type Stream interface { // has finished. type registration struct { // Input. - span roachpb.Span - catchupTimestamp hlc.Timestamp + span roachpb.Span + catchupTimestamp hlc.Timestamp + withDiff bool + metrics *Metrics + + // catchupIterConstructor is used to construct the catchupIter if necessary. + // The reason this constructor is plumbed down is to make sure that the + // iterator does not get constructed too late in server shutdown. However, + // it must also be stored in the struct to ensure that it is not constructed + // too late, after the raftMu has been dropped. Thus, this function, if + // non-nil, will be used to populate mu.catchupIter while the registration + // is being registered by the processor. catchupIterConstructor func() storage.SimpleMVCCIterator - withDiff bool - metrics *Metrics // Output. stream Stream @@ -80,6 +88,11 @@ type registration struct { // Management of the output loop goroutine, used to ensure proper teardown. outputLoopCancelFn func() disconnected bool + + // catchupIter is populated on the Processor's goroutine while the + // Replica.raftMu is still held. If it is non-nil at the time that + // disconnect is called, it is closed by disconnect. + catchupIter storage.SimpleMVCCIterator } } @@ -210,6 +223,10 @@ func (r *registration) disconnect(pErr *roachpb.Error) { r.mu.Lock() defer r.mu.Unlock() if !r.mu.disconnected { + if r.mu.catchupIter != nil { + r.mu.catchupIter.Close() + r.mu.catchupIter = nil + } if r.mu.outputLoopCancelFn != nil { r.mu.outputLoopCancelFn() } @@ -285,11 +302,10 @@ func (r *registration) runOutputLoop(ctx context.Context, _forStacks roachpb.Ran // If the registration does not have a catchUpIteratorConstructor, this method // is a no-op. func (r *registration) maybeRunCatchupScan() error { - if r.catchupIterConstructor == nil { + catchupIter := r.detachCatchUpIter() + if catchupIter == nil { return nil } - catchupIter := r.catchupIterConstructor() - r.catchupIterConstructor = nil start := timeutil.Now() defer func() { catchupIter.Close() @@ -587,6 +603,30 @@ func (r *registration) waitForCaughtUp() error { return errors.Errorf("registration %v failed to empty in time", r.Range()) } +// maybeConstructCatchUpIter calls the catchUpIterConstructor and attaches +// the catchUpIter to be detached in the catchUpScan or closed on disconnect. +func (r *registration) maybeConstructCatchUpIter() { + if r.catchupIterConstructor == nil { + return + } + + ci := r.catchupIterConstructor() + r.catchupIterConstructor = nil + + r.mu.Lock() + defer r.mu.Unlock() + r.mu.catchupIter = ci +} + +// detachCatchUpIter detaches the catchupIter that was previously attached. +func (r *registration) detachCatchUpIter() storage.SimpleMVCCIterator { + r.mu.Lock() + defer r.mu.Unlock() + catchupIter := r.mu.catchupIter + r.mu.catchupIter = nil + return catchupIter +} + // waitForCaughtUp waits for all registrations overlapping the given span to // completely process their internal buffers. func (reg *registry) waitForCaughtUp(span roachpb.Span) error { diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index f6db57ec6213..dcb3d35b20c4 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -100,19 +100,21 @@ func newTestRegistration( ) *testRegistration { s := newTestStream() errC := make(chan *roachpb.Error, 1) + r := newRegistration( + span, + ts, + makeIteratorConstructor(catchup), + withDiff, + 5, + NewMetrics(), + s, + errC, + ) + r.maybeConstructCatchUpIter() return &testRegistration{ - registration: newRegistration( - span, - ts, - makeIteratorConstructor(catchup), - withDiff, - 5, - NewMetrics(), - s, - errC, - ), - stream: s, - errC: errC, + registration: r, + stream: s, + errC: errC, } } diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 85b9f6ac4beb..6d6667b4e458 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -225,7 +225,9 @@ func (r *Replica) rangeFeedWithRangeID( var catchUpIterFunc rangefeed.IteratorConstructor if usingCatchupIter { catchUpIterFunc = func() storage.SimpleMVCCIterator { - + // Assert that we still hold the raftMu when this is called to ensure + // that the catchUpIter reads from the current snapshot. + r.raftMu.AssertHeld() innerIter := r.Engine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ UpperBound: args.Span.EndKey, // RangeFeed originally intended to use the time-bound iterator @@ -373,6 +375,11 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // Start it with an iterator to initialize the resolved timestamp. rtsIter := func() storage.SimpleMVCCIterator { + // Assert that we still hold the raftMu when this is called to ensure + // that the rtsIter reads from the current snapshot. The replica + // synchronizes with the rangefeed Processor calling this function by + // waiting for the Register call below to return. + r.raftMu.AssertHeld() return r.Engine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ UpperBound: desc.EndKey.AsRawKey(), // TODO(nvanbenschoten): To facilitate fast restarts of rangefeed From 9ff249eec5f46c074aec72263ebc37faa9c16af6 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Mon, 30 Aug 2021 17:57:45 -0400 Subject: [PATCH 3/4] kv/kvserver: catchup -> catchUp Release justification: non-production code change Release note: None --- pkg/kv/kvserver/rangefeed/metrics.go | 6 +- pkg/kv/kvserver/rangefeed/processor.go | 6 +- pkg/kv/kvserver/rangefeed/registry.go | 84 +++++++++++----------- pkg/kv/kvserver/rangefeed/registry_test.go | 10 +-- pkg/kv/kvserver/replica_rangefeed.go | 20 +++--- 5 files changed, 63 insertions(+), 63 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/metrics.go b/pkg/kv/kvserver/rangefeed/metrics.go index 33574d7271ba..e299bb6bd45f 100644 --- a/pkg/kv/kvserver/rangefeed/metrics.go +++ b/pkg/kv/kvserver/rangefeed/metrics.go @@ -19,7 +19,7 @@ import ( ) var ( - metaRangeFeedCatchupScanNanos = metric.Metadata{ + metaRangeFeedCatchUpScanNanos = metric.Metadata{ Name: "kv.rangefeed.catchup_scan_nanos", Help: "Time spent in RangeFeed catchup scan", Measurement: "Nanoseconds", @@ -29,7 +29,7 @@ var ( // Metrics are for production monitoring of RangeFeeds. type Metrics struct { - RangeFeedCatchupScanNanos *metric.Counter + RangeFeedCatchUpScanNanos *metric.Counter RangeFeedSlowClosedTimestampLogN log.EveryN RangeFeedSlowClosedTimestampNudge singleflight.Group @@ -46,7 +46,7 @@ func (*Metrics) MetricStruct() {} // NewMetrics makes the metrics for RangeFeeds monitoring. func NewMetrics() *Metrics { return &Metrics{ - RangeFeedCatchupScanNanos: metric.NewCounter(metaRangeFeedCatchupScanNanos), + RangeFeedCatchUpScanNanos: metric.NewCounter(metaRangeFeedCatchUpScanNanos), RangeFeedSlowClosedTimestampLogN: log.Every(5 * time.Second), RangeFeedSlowClosedTimestampNudgeSem: make(chan struct{}, 1024), } diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index dbc82d44c5c1..caf666ee0060 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -247,7 +247,7 @@ func (p *Processor) run( log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span) } - // Construct the catchupIter before notifying the registration that it + // Construct the catchUpIter before notifying the registration that it // has been registered. Note that if the catchUpScan is never run, then // the iterator constructed here will be closed in disconnect. r.maybeConstructCatchUpIter() @@ -399,7 +399,7 @@ func (p *Processor) sendStop(pErr *roachpb.Error) { func (p *Processor) Register( span roachpb.RSpan, startTS hlc.Timestamp, - catchupIterConstructor IteratorConstructor, + catchUpIterConstructor IteratorConstructor, withDiff bool, stream Stream, errC chan<- *roachpb.Error, @@ -410,7 +410,7 @@ func (p *Processor) Register( p.syncEventC() r := newRegistration( - span.AsRawSpanWithNoLocals(), startTS, catchupIterConstructor, withDiff, + span.AsRawSpanWithNoLocals(), startTS, catchUpIterConstructor, withDiff, p.Config.EventChanCap, p.Metrics, stream, errC, ) select { diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index 9ecb362d3192..f70f7cc60c7c 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -54,18 +54,18 @@ type Stream interface { type registration struct { // Input. span roachpb.Span - catchupTimestamp hlc.Timestamp + catchUpTimestamp hlc.Timestamp withDiff bool metrics *Metrics - // catchupIterConstructor is used to construct the catchupIter if necessary. + // catchUpIterConstructor is used to construct the catchUpIter if necessary. // The reason this constructor is plumbed down is to make sure that the // iterator does not get constructed too late in server shutdown. However, // it must also be stored in the struct to ensure that it is not constructed // too late, after the raftMu has been dropped. Thus, this function, if - // non-nil, will be used to populate mu.catchupIter while the registration + // non-nil, will be used to populate mu.catchUpIter while the registration // is being registered by the processor. - catchupIterConstructor func() storage.SimpleMVCCIterator + catchUpIterConstructor func() storage.SimpleMVCCIterator // Output. stream Stream @@ -89,17 +89,17 @@ type registration struct { outputLoopCancelFn func() disconnected bool - // catchupIter is populated on the Processor's goroutine while the + // catchUpIter is populated on the Processor's goroutine while the // Replica.raftMu is still held. If it is non-nil at the time that // disconnect is called, it is closed by disconnect. - catchupIter storage.SimpleMVCCIterator + catchUpIter storage.SimpleMVCCIterator } } func newRegistration( span roachpb.Span, startTS hlc.Timestamp, - catchupIterConstructor func() storage.SimpleMVCCIterator, + catchUpIterConstructor func() storage.SimpleMVCCIterator, withDiff bool, bufferSz int, metrics *Metrics, @@ -108,8 +108,8 @@ func newRegistration( ) registration { r := registration{ span: span, - catchupTimestamp: startTS, - catchupIterConstructor: catchupIterConstructor, + catchUpTimestamp: startTS, + catchUpIterConstructor: catchUpIterConstructor, withDiff: withDiff, metrics: metrics, stream: stream, @@ -123,7 +123,7 @@ func newRegistration( // publish attempts to send a single event to the output buffer for this // registration. If the output buffer is full, the overflowed flag is set, -// indicating that live events were lost and a catchup scan should be initiated. +// indicating that live events were lost and a catch-up scan should be initiated. // If overflowed is already set, events are ignored and not written to the // buffer. func (r *registration) publish(event *roachpb.RangeFeedEvent) { @@ -223,9 +223,9 @@ func (r *registration) disconnect(pErr *roachpb.Error) { r.mu.Lock() defer r.mu.Unlock() if !r.mu.disconnected { - if r.mu.catchupIter != nil { - r.mu.catchupIter.Close() - r.mu.catchupIter = nil + if r.mu.catchUpIter != nil { + r.mu.catchUpIter.Close() + r.mu.catchUpIter = nil } if r.mu.outputLoopCancelFn != nil { r.mu.outputLoopCancelFn() @@ -249,7 +249,7 @@ func (r *registration) disconnect(pErr *roachpb.Error) { // have been emitted. func (r *registration) outputLoop(ctx context.Context) error { // If the registration has a catch-up scan, run it. - if err := r.maybeRunCatchupScan(); err != nil { + if err := r.maybeRunCatchUpScan(); err != nil { err = errors.Wrap(err, "catch-up scan failed") log.Errorf(ctx, "%v", err) return err @@ -294,22 +294,22 @@ func (r *registration) runOutputLoop(ctx context.Context, _forStacks roachpb.Ran r.disconnect(roachpb.NewError(err)) } -// maybeRunCatchupScan starts a catchup scan which will output entries for all -// recorded changes in the replica that are newer than the catchupTimestamp. +// maybeRunCatchUpScan starts a catch-up scan which will output entries for all +// recorded changes in the replica that are newer than the catchUpTimestamp. // This uses the iterator provided when the registration was originally created; // after the scan completes, the iterator will be closed. // // If the registration does not have a catchUpIteratorConstructor, this method // is a no-op. -func (r *registration) maybeRunCatchupScan() error { - catchupIter := r.detachCatchUpIter() - if catchupIter == nil { +func (r *registration) maybeRunCatchUpScan() error { + catchUpIter := r.detachCatchUpIter() + if catchUpIter == nil { return nil } start := timeutil.Now() defer func() { - catchupIter.Close() - r.metrics.RangeFeedCatchupScanNanos.Inc(timeutil.Since(start).Nanoseconds()) + catchUpIter.Close() + r.metrics.RangeFeedCatchUpScanNanos.Inc(timeutil.Since(start).Nanoseconds()) }() var a bufalloc.ByteAllocator @@ -346,23 +346,23 @@ func (r *registration) maybeRunCatchupScan() error { // versions of each key that are after the registration's startTS, so we // can't use NextKey. var meta enginepb.MVCCMetadata - catchupIter.SeekGE(startKey) + catchUpIter.SeekGE(startKey) for { - if ok, err := catchupIter.Valid(); err != nil { + if ok, err := catchUpIter.Valid(); err != nil { return err - } else if !ok || !catchupIter.UnsafeKey().Less(endKey) { + } else if !ok || !catchUpIter.UnsafeKey().Less(endKey) { break } - unsafeKey := catchupIter.UnsafeKey() - unsafeVal := catchupIter.UnsafeValue() + unsafeKey := catchUpIter.UnsafeKey() + unsafeVal := catchUpIter.UnsafeValue() if !unsafeKey.IsValue() { // Found a metadata key. if err := protoutil.Unmarshal(unsafeVal, &meta); err != nil { return errors.Wrapf(err, "unmarshaling mvcc meta: %v", unsafeKey) } if !meta.IsInline() { - // This is an MVCCMetadata key for an intent. The catchup scan + // This is an MVCCMetadata key for an intent. The catch-up scan // only cares about committed values, so ignore this and skip // past the corresponding provisional key-value. To do this, // scan to the timestamp immediately before (i.e. the key @@ -371,7 +371,7 @@ func (r *registration) maybeRunCatchupScan() error { // Make a copy since should not pass an unsafe key from the iterator // that provided it, when asking it to seek. a, unsafeKey.Key = a.Copy(unsafeKey.Key, 0) - catchupIter.SeekGE(storage.MVCCKey{ + catchUpIter.SeekGE(storage.MVCCKey{ Key: unsafeKey.Key, Timestamp: meta.Timestamp.ToTimestamp().Prev(), }) @@ -398,11 +398,11 @@ func (r *registration) maybeRunCatchupScan() error { // Ignore the version if it's not inline and its timestamp is at // or before the registration's (exclusive) starting timestamp. - ignore := !(ts.IsEmpty() || r.catchupTimestamp.Less(ts)) + ignore := !(ts.IsEmpty() || r.catchUpTimestamp.Less(ts)) if ignore && !r.withDiff { // Skip all the way to the next key. // NB: fast-path to avoid value copy when !r.withDiff. - catchupIter.NextKey() + catchUpIter.NextKey() continue } @@ -415,10 +415,10 @@ func (r *registration) maybeRunCatchupScan() error { if ignore { // Skip all the way to the next key. - catchupIter.NextKey() + catchUpIter.NextKey() } else { // Move to the next version of this key. - catchupIter.Next() + catchUpIter.Next() var event roachpb.RangeFeedEvent event.MustSetValue(&roachpb.RangeFeedValue{ @@ -447,7 +447,7 @@ func (r *registration) Range() interval.Range { } func (r registration) String() string { - return fmt.Sprintf("[%s @ %s+]", r.span, r.catchupTimestamp) + return fmt.Sprintf("[%s @ %s+]", r.span, r.catchUpTimestamp) } // registry holds a set of registrations and manages their lifecycle. @@ -512,7 +512,7 @@ func (reg *registry) PublishToOverlapping(span roachpb.Span, event *roachpb.Rang reg.forOverlappingRegs(span, func(r *registration) (bool, *roachpb.Error) { // Don't publish events if they are equal to or less // than the registration's starting timestamp. - if r.catchupTimestamp.Less(minTS) { + if r.catchUpTimestamp.Less(minTS) { r.publish(event) } return false, nil @@ -606,25 +606,25 @@ func (r *registration) waitForCaughtUp() error { // maybeConstructCatchUpIter calls the catchUpIterConstructor and attaches // the catchUpIter to be detached in the catchUpScan or closed on disconnect. func (r *registration) maybeConstructCatchUpIter() { - if r.catchupIterConstructor == nil { + if r.catchUpIterConstructor == nil { return } - ci := r.catchupIterConstructor() - r.catchupIterConstructor = nil + catchUpIter := r.catchUpIterConstructor() + r.catchUpIterConstructor = nil r.mu.Lock() defer r.mu.Unlock() - r.mu.catchupIter = ci + r.mu.catchUpIter = catchUpIter } -// detachCatchUpIter detaches the catchupIter that was previously attached. +// detachCatchUpIter detaches the catchUpIter that was previously attached. func (r *registration) detachCatchUpIter() storage.SimpleMVCCIterator { r.mu.Lock() defer r.mu.Unlock() - catchupIter := r.mu.catchupIter - r.mu.catchupIter = nil - return catchupIter + catchUpIter := r.mu.catchUpIter + r.mu.catchUpIter = nil + return catchUpIter } // waitForCaughtUp waits for all registrations overlapping the given span to diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index dcb3d35b20c4..8f15ecfa272a 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -265,10 +265,10 @@ func TestRegistrationCatchUpScan(t *testing.T) { EndKey: roachpb.Key("w"), }, hlc.Timestamp{WallTime: 4}, iter, true /* withDiff */) - require.Zero(t, r.metrics.RangeFeedCatchupScanNanos.Count()) - require.NoError(t, r.maybeRunCatchupScan()) + require.Zero(t, r.metrics.RangeFeedCatchUpScanNanos.Count()) + require.NoError(t, r.maybeRunCatchUpScan()) require.True(t, iter.closed) - require.NotZero(t, r.metrics.RangeFeedCatchupScanNanos.Count()) + require.NotZero(t, r.metrics.RangeFeedCatchUpScanNanos.Count()) // Compare the events sent on the registration's Stream to the expected events. expEvents := []*roachpb.RangeFeedEvent{ @@ -566,14 +566,14 @@ func TestRegistrationString(t *testing.T) { { r: registration{ span: roachpb.Span{Key: roachpb.Key("d")}, - catchupTimestamp: hlc.Timestamp{WallTime: 10, Logical: 1}, + catchUpTimestamp: hlc.Timestamp{WallTime: 10, Logical: 1}, }, exp: `[d @ 0.000000010,1+]`, }, { r: registration{span: roachpb.Span{ Key: roachpb.Key("d"), EndKey: roachpb.Key("z")}, - catchupTimestamp: hlc.Timestamp{WallTime: 40, Logical: 9}, + catchUpTimestamp: hlc.Timestamp{WallTime: 40, Logical: 9}, }, exp: `[{d-z} @ 0.000000040,9+]`, }, diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 6d6667b4e458..bba569bb959d 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -136,7 +136,7 @@ func (i iteratorWithCloser) Close() { // RangeFeed registers a rangefeed over the specified span. It sends updates to // the provided stream and returns with an optional error when the rangefeed is // complete. The provided ConcurrentRequestLimiter is used to limit the number -// of rangefeeds using catchup iterators at the same time. +// of rangefeeds using catch-up iterators at the same time. func (r *Replica) RangeFeed( args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, ) *roachpb.Error { @@ -188,10 +188,10 @@ func (r *Replica) rangeFeedWithRangeID( // If we will be using a catch-up iterator, wait for the limiter here before // locking raftMu. - usingCatchupIter := false + usingCatchUpIter := false var iterSemRelease func() if !args.Timestamp.IsEmpty() { - usingCatchupIter = true + usingCatchUpIter = true alloc, err := r.store.limiters.ConcurrentRangefeedIters.Begin(ctx) if err != nil { return roachpb.NewError(err) @@ -199,10 +199,10 @@ func (r *Replica) rangeFeedWithRangeID( // Finish the iterator limit if we exit before the iterator finishes. // The release function will be hooked into the Close method on the // iterator below. The sync.Once prevents any races between exiting early - // from this call and finishing the catchup scan underneath the + // from this call and finishing the catch-up scan underneath the // rangefeed.Processor. We need to release here in case we fail to // register the processor, or, more perniciously, in the case where the - // processor gets registered by shut down before starting the catchup + // processor gets registered by shut down before starting the catch-up // scan. var iterSemReleaseOnce sync.Once iterSemRelease = func() { @@ -223,7 +223,7 @@ func (r *Replica) rangeFeedWithRangeID( // Register the stream with a catch-up iterator. var catchUpIterFunc rangefeed.IteratorConstructor - if usingCatchupIter { + if usingCatchUpIter { catchUpIterFunc = func() storage.SimpleMVCCIterator { // Assert that we still hold the raftMu when this is called to ensure // that the catchUpIter reads from the current snapshot. @@ -234,7 +234,7 @@ func (r *Replica) rangeFeedWithRangeID( // performance optimization. However, they've had correctness issues in // the past (#28358, #34819) and no-one has the time for the due-diligence // necessary to be confidant in their correctness going forward. Not using - // them causes the total time spent in RangeFeed catchup on changefeed + // them causes the total time spent in RangeFeed catch-up on changefeed // over tpcc-1000 to go from 40s -> 4853s, which is quite large but still // workable. See #35122 for details. // MinTimestampHint: args.Timestamp, @@ -328,7 +328,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( ctx context.Context, span roachpb.RSpan, startTS hlc.Timestamp, - catchupIter rangefeed.IteratorConstructor, + catchUpIter rangefeed.IteratorConstructor, withDiff bool, stream rangefeed.Stream, errC chan<- *roachpb.Error, @@ -339,7 +339,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( r.rangefeedMu.Lock() p := r.rangefeedMu.proc if p != nil { - reg, filter := p.Register(span, startTS, catchupIter, withDiff, stream, errC) + reg, filter := p.Register(span, startTS, catchUpIter, withDiff, stream, errC) if reg { // Registered successfully with an existing processor. // Update the rangefeed filter to avoid filtering ops @@ -398,7 +398,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // any other goroutines are able to stop the processor. In other words, // this ensures that the only time the registration fails is during // server shutdown. - reg, filter := p.Register(span, startTS, catchupIter, withDiff, stream, errC) + reg, filter := p.Register(span, startTS, catchUpIter, withDiff, stream, errC) if !reg { select { case <-r.store.Stopper().ShouldQuiesce(): From e8e963b6308079baeade411551d4c627815e2193 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Mon, 30 Aug 2021 16:24:21 -0400 Subject: [PATCH 4/4] sql/catalog/multiregion: move multiregion validation to its package This code and these invariants are seen as the responsibility of the sql-experience team, so, let's move them to a package that that team owns. Release justification: Low risk code movement. Release note: None --- .github/CODEOWNERS | 1 + pkg/ccl/backupccl/backupresolver/targets.go | 4 +- pkg/ccl/backupccl/restore_job.go | 4 +- pkg/sql/alter_table_locality.go | 3 +- pkg/sql/catalog/descriptor.go | 2 +- pkg/sql/catalog/multiregion/BUILD.bazel | 7 +- pkg/sql/catalog/multiregion/validate_table.go | 270 ++++++++++++++++++ pkg/sql/catalog/tabledesc/BUILD.bazel | 1 + pkg/sql/catalog/tabledesc/structured.go | 39 +-- pkg/sql/catalog/tabledesc/validate.go | 231 +-------------- pkg/sql/crdb_internal.go | 3 +- pkg/sql/drop_type.go | 4 +- pkg/sql/schema_changer.go | 4 +- .../schemachanger/scbuild/relation_common.go | 2 +- pkg/sql/show_create_clauses.go | 4 +- 15 files changed, 306 insertions(+), 273 deletions(-) create mode 100644 pkg/sql/catalog/multiregion/validate_table.go diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index f7444cd99428..bdecfad660f0 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -48,6 +48,7 @@ /pkg/sql/tests/rsg_test.go @cockroachdb/sql-experience /pkg/sql/catalog/ @cockroachdb/sql-schema +/pkg/sql/catalog/multiregion @cockroachdb/multiregion /pkg/sql/doctor/ @cockroachdb/sql-schema /pkg/sql/gcjob/ @cockroachdb/sql-schema /pkg/sql/gcjob_test/ @cockroachdb/sql-schema diff --git a/pkg/ccl/backupccl/backupresolver/targets.go b/pkg/ccl/backupccl/backupresolver/targets.go index f5f2a404e5d9..32ecfda7a197 100644 --- a/pkg/ccl/backupccl/backupresolver/targets.go +++ b/pkg/ccl/backupccl/backupresolver/targets.go @@ -434,7 +434,7 @@ func DescriptorsMatchingTargets( // Get all the types used by this table. desc := r.DescByID[tableDesc.GetParentID()] dbDesc := desc.(catalog.DatabaseDescriptor) - typeIDs, err := tableDesc.GetAllReferencedTypeIDs(dbDesc, getTypeByID) + typeIDs, _, err := tableDesc.GetAllReferencedTypeIDs(dbDesc, getTypeByID) if err != nil { return ret, err } @@ -526,7 +526,7 @@ func DescriptorsMatchingTargets( // Get all the types used by this table. dbRaw := r.DescByID[desc.GetParentID()] dbDesc := dbRaw.(catalog.DatabaseDescriptor) - typeIDs, err := desc.GetAllReferencedTypeIDs(dbDesc, getTypeByID) + typeIDs, _, err := desc.GetAllReferencedTypeIDs(dbDesc, getTypeByID) if err != nil { return err } diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 20ebf102d78b..6b9135c5870e 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -1315,7 +1315,7 @@ func createImportingDescriptors( if err != nil { return err } - typeIDs, err := table.GetAllReferencedTypeIDs(dbDesc, func(id descpb.ID) (catalog.TypeDescriptor, error) { + typeIDs, _, err := table.GetAllReferencedTypeIDs(dbDesc, func(id descpb.ID) (catalog.TypeDescriptor, error) { return typesByID[id], nil }) if err != nil { @@ -2343,7 +2343,7 @@ func (r *restoreResumer) removeExistingTypeBackReferences( } // Get all types that this descriptor references. - referencedTypes, err := tbl.GetAllReferencedTypeIDs(dbDesc, lookup) + referencedTypes, _, err := tbl.GetAllReferencedTypeIDs(dbDesc, lookup) if err != nil { return err } diff --git a/pkg/sql/alter_table_locality.go b/pkg/sql/alter_table_locality.go index 09e4976e4d5c..6515ea9bcfc3 100644 --- a/pkg/sql/alter_table_locality.go +++ b/pkg/sql/alter_table_locality.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/multiregion" "github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" @@ -101,7 +102,7 @@ func (n *alterTableSetLocalityNode) alterTableLocalityGlobalToRegionalByTable( ) error { if !n.tableDesc.IsLocalityGlobal() { f := params.p.EvalContext().FmtCtx(tree.FmtSimple) - if err := tabledesc.FormatTableLocalityConfig(n.tableDesc.LocalityConfig, f); err != nil { + if err := multiregion.FormatTableLocalityConfig(n.tableDesc.LocalityConfig, f); err != nil { // While we're in an error path and generally it's bad to return a // different error in an error path, we will only get an error here if the // locality is corrupted, in which case, it's probably the right error diff --git a/pkg/sql/catalog/descriptor.go b/pkg/sql/catalog/descriptor.go index befa7bfa788b..a0b8ef2c7b6e 100644 --- a/pkg/sql/catalog/descriptor.go +++ b/pkg/sql/catalog/descriptor.go @@ -326,7 +326,7 @@ type TableDescriptor interface { GetReplacementOf() descpb.TableDescriptor_Replacement GetAllReferencedTypeIDs( databaseDesc DatabaseDescriptor, getType func(descpb.ID) (TypeDescriptor, error), - ) (descpb.IDs, error) + ) (referencedAnywhere, referencedInColumns descpb.IDs, _ error) ForeachDependedOnBy(f func(dep *descpb.TableDescriptor_Reference) error) error GetDependedOnBy() []descpb.TableDescriptor_Reference diff --git a/pkg/sql/catalog/multiregion/BUILD.bazel b/pkg/sql/catalog/multiregion/BUILD.bazel index 34f8480f5235..bdd751353be8 100644 --- a/pkg/sql/catalog/multiregion/BUILD.bazel +++ b/pkg/sql/catalog/multiregion/BUILD.bazel @@ -2,13 +2,18 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "multiregion", - srcs = ["region_config.go"], + srcs = [ + "region_config.go", + "validate_table.go", + ], importpath = "github.com/cockroachdb/cockroach/pkg/sql/catalog/multiregion", visibility = ["//visibility:public"], deps = [ + "//pkg/sql/catalog", "//pkg/sql/catalog/descpb", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", + "//pkg/sql/sem/tree", "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/sql/catalog/multiregion/validate_table.go b/pkg/sql/catalog/multiregion/validate_table.go new file mode 100644 index 000000000000..4b7fb676d06c --- /dev/null +++ b/pkg/sql/catalog/multiregion/validate_table.go @@ -0,0 +1,270 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package multiregion + +import ( + "strings" + + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/errors" +) + +// ValidateTableLocalityConfig validates whether the descriptor's locality +// config is valid under the given database. +func ValidateTableLocalityConfig( + desc catalog.TableDescriptor, db catalog.DatabaseDescriptor, vdg catalog.ValidationDescGetter, +) error { + + lc := desc.GetLocalityConfig() + if lc == nil { + if db.IsMultiRegion() { + return pgerror.Newf( + pgcode.InvalidTableDefinition, + "database %s is multi-region enabled, but table %s has no locality set", + db.GetName(), + desc.GetName(), + ) + } + // Nothing to validate for non-multi-region databases. + return nil + } + + if !db.IsMultiRegion() { + s := tree.NewFmtCtx(tree.FmtSimple) + var locality string + // Formatting the table locality config should never fail; if it does, the + // error message is more clear if we construct a dummy locality here. + if err := FormatTableLocalityConfig(lc, s); err != nil { + locality = "INVALID LOCALITY" + } + locality = s.String() + return pgerror.Newf( + pgcode.InvalidTableDefinition, + "database %s is not multi-region enabled, but table %s has locality %s set", + db.GetName(), + desc.GetName(), + locality, + ) + } + + regionsEnumID, err := db.MultiRegionEnumID() + if err != nil { + return err + } + regionsEnumDesc, err := vdg.GetTypeDescriptor(regionsEnumID) + if err != nil { + return errors.Wrapf(err, "multi-region enum with ID %d does not exist", regionsEnumID) + } + + // Check non-table items have a correctly set locality. + if desc.IsSequence() { + if !desc.IsLocalityRegionalByTable() { + return errors.AssertionFailedf( + "expected sequence %s to have locality REGIONAL BY TABLE", + desc.GetName(), + ) + } + } + if desc.IsView() { + if desc.MaterializedView() { + if !desc.IsLocalityGlobal() { + return errors.AssertionFailedf( + "expected materialized view %s to have locality GLOBAL", + desc.GetName(), + ) + } + } else { + if !desc.IsLocalityRegionalByTable() { + return errors.AssertionFailedf( + "expected view %s to have locality REGIONAL BY TABLE", + desc.GetName(), + ) + } + } + } + + // REGIONAL BY TABLE tables homed in the primary region should include a + // reference to the multi-region type descriptor and a corresponding + // backreference. All other patterns should only contain a reference if there + // is an explicit column which uses the multi-region type descriptor as its + // *types.T. While the specific cases are validated below, we search for the + // region enum ID in the references list just once, up top here. + typeIDs, typeIDsReferencedByColumns, err := desc.GetAllReferencedTypeIDs(db, vdg.GetTypeDescriptor) + if err != nil { + return err + } + regionEnumIDReferenced := false + for _, typeID := range typeIDs { + if typeID == regionsEnumID { + regionEnumIDReferenced = true + break + } + } + columnTypesTypeIDs := catalog.MakeDescriptorIDSet(typeIDsReferencedByColumns...) + switch lc := lc.Locality.(type) { + case *descpb.TableDescriptor_LocalityConfig_Global_: + if regionEnumIDReferenced { + if !columnTypesTypeIDs.Contains(regionsEnumID) { + return errors.AssertionFailedf( + "expected no region Enum ID to be referenced by a GLOBAL TABLE: %q"+ + " but found: %d", + desc.GetName(), + regionsEnumDesc.GetID(), + ) + } + } + case *descpb.TableDescriptor_LocalityConfig_RegionalByRow_: + if !desc.IsPartitionAllBy() { + return errors.AssertionFailedf("expected REGIONAL BY ROW table to have PartitionAllBy set") + } + // For REGIONAL BY ROW tables, ensure partitions in the PRIMARY KEY match + // the database descriptor. Ensure each public region has a partition, + // and each transitioning region name to possibly have a partition. + // We do validation that ensures all index partitions are the same on + // PARTITION ALL BY. + regions, err := regionsEnumDesc.RegionNames() + if err != nil { + return err + } + regionNames := make(map[descpb.RegionName]struct{}, len(regions)) + for _, region := range regions { + regionNames[region] = struct{}{} + } + transitioningRegions, err := regionsEnumDesc.TransitioningRegionNames() + if err != nil { + return err + } + transitioningRegionNames := make(map[descpb.RegionName]struct{}, len(regions)) + for _, region := range transitioningRegions { + transitioningRegionNames[region] = struct{}{} + } + + part := desc.GetPrimaryIndex().GetPartitioning() + err = part.ForEachList(func(name string, _ [][]byte, _ catalog.Partitioning) error { + regionName := descpb.RegionName(name) + // Any transitioning region names may exist. + if _, ok := transitioningRegionNames[regionName]; ok { + return nil + } + // If a region is not found in any of the region names, we have an unknown + // partition. + if _, ok := regionNames[regionName]; !ok { + return errors.AssertionFailedf( + "unknown partition %s on PRIMARY INDEX of table %s", + name, + desc.GetName(), + ) + } + delete(regionNames, regionName) + return nil + }) + if err != nil { + return err + } + + // Any regions that are not deleted from the above loop is missing. + for regionName := range regionNames { + return errors.AssertionFailedf( + "missing partition %s on PRIMARY INDEX of table %s", + regionName, + desc.GetName(), + ) + } + + case *descpb.TableDescriptor_LocalityConfig_RegionalByTable_: + + // Table is homed in an explicit (non-primary) region. + if lc.RegionalByTable.Region != nil { + foundRegion := false + regions, err := regionsEnumDesc.RegionNamesForValidation() + if err != nil { + return err + } + for _, r := range regions { + if *lc.RegionalByTable.Region == r { + foundRegion = true + break + } + } + if !foundRegion { + return errors.WithHintf( + pgerror.Newf( + pgcode.InvalidTableDefinition, + `region "%s" has not been added to database "%s"`, + *lc.RegionalByTable.Region, + db.DatabaseDesc().Name, + ), + "available regions: %s", + strings.Join(regions.ToStrings(), ", "), + ) + } + if !regionEnumIDReferenced { + return errors.AssertionFailedf( + "expected multi-region enum ID %d to be referenced on REGIONAL BY TABLE: %q locality "+ + "config, but did not find it", + regionsEnumID, + desc.GetName(), + ) + } + } else { + if regionEnumIDReferenced { + // It may be the case that the multi-region type descriptor is used + // as the type of the table column. Validations should only fail if + // that is not the case. + if !columnTypesTypeIDs.Contains(regionsEnumID) { + return errors.AssertionFailedf( + "expected no region Enum ID to be referenced by a REGIONAL BY TABLE: %q homed in the "+ + "primary region, but found: %d", + desc.GetName(), + regionsEnumDesc.GetID(), + ) + } + } + } + default: + return pgerror.Newf( + pgcode.InvalidTableDefinition, + "unknown locality level: %T", + lc, + ) + } + return nil +} + +// FormatTableLocalityConfig formats the table locality. +func FormatTableLocalityConfig(c *descpb.TableDescriptor_LocalityConfig, f *tree.FmtCtx) error { + switch v := c.Locality.(type) { + case *descpb.TableDescriptor_LocalityConfig_Global_: + f.WriteString("GLOBAL") + case *descpb.TableDescriptor_LocalityConfig_RegionalByTable_: + f.WriteString("REGIONAL BY TABLE IN ") + if v.RegionalByTable.Region != nil { + region := tree.Name(*v.RegionalByTable.Region) + f.FormatNode(®ion) + } else { + f.WriteString("PRIMARY REGION") + } + case *descpb.TableDescriptor_LocalityConfig_RegionalByRow_: + f.WriteString("REGIONAL BY ROW") + if v.RegionalByRow.As != nil { + f.WriteString(" AS ") + col := tree.Name(*v.RegionalByRow.As) + f.FormatNode(&col) + } + default: + return errors.Newf("unknown locality: %T", v) + } + return nil +} diff --git a/pkg/sql/catalog/tabledesc/BUILD.bazel b/pkg/sql/catalog/tabledesc/BUILD.bazel index d309f71f4236..cfb76e798f20 100644 --- a/pkg/sql/catalog/tabledesc/BUILD.bazel +++ b/pkg/sql/catalog/tabledesc/BUILD.bazel @@ -27,6 +27,7 @@ go_library( "//pkg/sql/catalog/catprivilege", "//pkg/sql/catalog/colinfo", "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/multiregion", "//pkg/sql/catalog/schemaexpr", "//pkg/sql/catalog/typedesc", "//pkg/sql/parser", diff --git a/pkg/sql/catalog/tabledesc/structured.go b/pkg/sql/catalog/tabledesc/structured.go index 3e44106bb095..fbc070d02b8d 100644 --- a/pkg/sql/catalog/tabledesc/structured.go +++ b/pkg/sql/catalog/tabledesc/structured.go @@ -459,18 +459,23 @@ func ForEachExprStringInTableDesc(descI catalog.TableDescriptor, f func(expr *st // with the desired ID. func (desc *wrapper) GetAllReferencedTypeIDs( dbDesc catalog.DatabaseDescriptor, getType func(descpb.ID) (catalog.TypeDescriptor, error), -) (descpb.IDs, error) { +) (referencedAnywhere, referencedInColumns descpb.IDs, _ error) { ids, err := desc.getAllReferencedTypesInTableColumns(getType) if err != nil { - return nil, err + return nil, nil, err + } + referencedInColumns = make(descpb.IDs, 0, len(ids)) + for id := range ids { + referencedInColumns = append(referencedInColumns, id) } + sort.Sort(referencedInColumns) // REGIONAL BY TABLE tables may have a dependency with the multi-region enum. exists := desc.GetMultiRegionEnumDependencyIfExists() if exists { regionEnumID, err := dbDesc.MultiRegionEnumID() if err != nil { - return nil, err + return nil, nil, err } ids[regionEnumID] = struct{}{} } @@ -489,7 +494,7 @@ func (desc *wrapper) GetAllReferencedTypeIDs( // Sort the output so that the order is deterministic. sort.Sort(result) - return result, nil + return result, referencedInColumns, nil } // getAllReferencedTypesInTableColumns returns a map of all user defined @@ -983,32 +988,6 @@ func (desc *Mutable) OriginalVersion() descpb.DescriptorVersion { return desc.ClusterVersion.Version } -// FormatTableLocalityConfig formats the table locality. -func FormatTableLocalityConfig(c *descpb.TableDescriptor_LocalityConfig, f *tree.FmtCtx) error { - switch v := c.Locality.(type) { - case *descpb.TableDescriptor_LocalityConfig_Global_: - f.WriteString("GLOBAL") - case *descpb.TableDescriptor_LocalityConfig_RegionalByTable_: - f.WriteString("REGIONAL BY TABLE IN ") - if v.RegionalByTable.Region != nil { - region := tree.Name(*v.RegionalByTable.Region) - f.FormatNode(®ion) - } else { - f.WriteString("PRIMARY REGION") - } - case *descpb.TableDescriptor_LocalityConfig_RegionalByRow_: - f.WriteString("REGIONAL BY ROW") - if v.RegionalByRow.As != nil { - f.WriteString(" AS ") - col := tree.Name(*v.RegionalByRow.As) - f.FormatNode(&col) - } - default: - return errors.Newf("unknown locality: %T", v) - } - return nil -} - // ValidateIndexNameIsUnique validates that the index name does not exist. func (desc *wrapper) ValidateIndexNameIsUnique(indexName string) error { if catalog.FindNonDropIndex(desc, func(idx catalog.Index) bool { diff --git a/pkg/sql/catalog/tabledesc/validate.go b/pkg/sql/catalog/tabledesc/validate.go index b48b5d5a345b..5577d7caf4d8 100644 --- a/pkg/sql/catalog/tabledesc/validate.go +++ b/pkg/sql/catalog/tabledesc/validate.go @@ -11,14 +11,13 @@ package tabledesc import ( - "strings" - "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catprivilege" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/multiregion" "github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr" "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" "github.com/cockroachdb/cockroach/pkg/sql/parser" @@ -138,7 +137,7 @@ func (desc *wrapper) ValidateCrossReferences( if dbDesc != nil { // Validate the all types present in the descriptor exist. - typeIDs, err := desc.GetAllReferencedTypeIDs(dbDesc, vdg.GetTypeDescriptor) + typeIDs, _, err := desc.GetAllReferencedTypeIDs(dbDesc, vdg.GetTypeDescriptor) if err != nil { vea.Report(err) } else { @@ -149,7 +148,7 @@ func (desc *wrapper) ValidateCrossReferences( } // Validate table locality. - if err := desc.validateTableLocalityConfig(dbDesc, vdg); err != nil { + if err := multiregion.ValidateTableLocalityConfig(desc, dbDesc, vdg); err != nil { vea.Report(errors.Wrap(err, "invalid locality config")) return } @@ -1340,227 +1339,3 @@ func (desc *wrapper) validatePartitioning() error { ) }) } - -// validateTableLocalityConfig validates whether the descriptor's locality -// config is valid under the given database. -func (desc *wrapper) validateTableLocalityConfig( - db catalog.DatabaseDescriptor, vdg catalog.ValidationDescGetter, -) error { - - if desc.LocalityConfig == nil { - if db.IsMultiRegion() { - return pgerror.Newf( - pgcode.InvalidTableDefinition, - "database %s is multi-region enabled, but table %s has no locality set", - db.GetName(), - desc.GetName(), - ) - } - // Nothing to validate for non-multi-region databases. - return nil - } - - if !db.IsMultiRegion() { - s := tree.NewFmtCtx(tree.FmtSimple) - var locality string - // Formatting the table locality config should never fail; if it does, the - // error message is more clear if we construct a dummy locality here. - if err := FormatTableLocalityConfig(desc.LocalityConfig, s); err != nil { - locality = "INVALID LOCALITY" - } - locality = s.String() - return pgerror.Newf( - pgcode.InvalidTableDefinition, - "database %s is not multi-region enabled, but table %s has locality %s set", - db.GetName(), - desc.GetName(), - locality, - ) - } - - regionsEnumID, err := db.MultiRegionEnumID() - if err != nil { - return err - } - regionsEnumDesc, err := vdg.GetTypeDescriptor(regionsEnumID) - if err != nil { - return errors.Wrapf(err, "multi-region enum with ID %d does not exist", regionsEnumID) - } - - // Check non-table items have a correctly set locality. - if desc.IsSequence() { - if !desc.IsLocalityRegionalByTable() { - return errors.AssertionFailedf( - "expected sequence %s to have locality REGIONAL BY TABLE", - desc.Name, - ) - } - } - if desc.IsView() { - if desc.MaterializedView() { - if !desc.IsLocalityGlobal() { - return errors.AssertionFailedf( - "expected materialized view %s to have locality GLOBAL", - desc.Name, - ) - } - } else { - if !desc.IsLocalityRegionalByTable() { - return errors.AssertionFailedf( - "expected view %s to have locality REGIONAL BY TABLE", - desc.Name, - ) - } - } - } - - // REGIONAL BY TABLE tables homed in the primary region should include a - // reference to the multi-region type descriptor and a corresponding - // backreference. All other patterns should only contain a reference if there - // is an explicit column which uses the multi-region type descriptor as its - // *types.T. While the specific cases are validated below, we search for the - // region enum ID in the references list just once, up top here. - typeIDs, err := desc.GetAllReferencedTypeIDs(db, vdg.GetTypeDescriptor) - if err != nil { - return err - } - regionEnumIDReferenced := false - for _, typeID := range typeIDs { - if typeID == regionsEnumID { - regionEnumIDReferenced = true - break - } - } - columnTypesTypeIDs, err := desc.getAllReferencedTypesInTableColumns(vdg.GetTypeDescriptor) - if err != nil { - return err - } - switch lc := desc.LocalityConfig.Locality.(type) { - case *descpb.TableDescriptor_LocalityConfig_Global_: - if regionEnumIDReferenced { - if _, found := columnTypesTypeIDs[regionsEnumID]; !found { - return errors.AssertionFailedf( - "expected no region Enum ID to be referenced by a GLOBAL TABLE: %q"+ - " but found: %d", - desc.GetName(), - regionsEnumDesc.GetID(), - ) - } - } - case *descpb.TableDescriptor_LocalityConfig_RegionalByRow_: - if !desc.IsPartitionAllBy() { - return errors.AssertionFailedf("expected REGIONAL BY ROW table to have PartitionAllBy set") - } - // For REGIONAL BY ROW tables, ensure partitions in the PRIMARY KEY match - // the database descriptor. Ensure each public region has a partition, - // and each transitioning region name to possibly have a partition. - // We do validation that ensures all index partitions are the same on - // PARTITION ALL BY. - regions, err := regionsEnumDesc.RegionNames() - if err != nil { - return err - } - regionNames := make(map[descpb.RegionName]struct{}, len(regions)) - for _, region := range regions { - regionNames[region] = struct{}{} - } - transitioningRegions, err := regionsEnumDesc.TransitioningRegionNames() - if err != nil { - return err - } - transitioningRegionNames := make(map[descpb.RegionName]struct{}, len(regions)) - for _, region := range transitioningRegions { - transitioningRegionNames[region] = struct{}{} - } - - part := desc.GetPrimaryIndex().GetPartitioning() - err = part.ForEachList(func(name string, _ [][]byte, _ catalog.Partitioning) error { - regionName := descpb.RegionName(name) - // Any transitioning region names may exist. - if _, ok := transitioningRegionNames[regionName]; ok { - return nil - } - // If a region is not found in any of the region names, we have an unknown - // partition. - if _, ok := regionNames[regionName]; !ok { - return errors.AssertionFailedf( - "unknown partition %s on PRIMARY INDEX of table %s", - name, - desc.GetName(), - ) - } - delete(regionNames, regionName) - return nil - }) - if err != nil { - return err - } - - // Any regions that are not deleted from the above loop is missing. - for regionName := range regionNames { - return errors.AssertionFailedf( - "missing partition %s on PRIMARY INDEX of table %s", - regionName, - desc.GetName(), - ) - } - - case *descpb.TableDescriptor_LocalityConfig_RegionalByTable_: - - // Table is homed in an explicit (non-primary) region. - if lc.RegionalByTable.Region != nil { - foundRegion := false - regions, err := regionsEnumDesc.RegionNamesForValidation() - if err != nil { - return err - } - for _, r := range regions { - if *lc.RegionalByTable.Region == r { - foundRegion = true - break - } - } - if !foundRegion { - return errors.WithHintf( - pgerror.Newf( - pgcode.InvalidTableDefinition, - `region "%s" has not been added to database "%s"`, - *lc.RegionalByTable.Region, - db.DatabaseDesc().Name, - ), - "available regions: %s", - strings.Join(regions.ToStrings(), ", "), - ) - } - if !regionEnumIDReferenced { - return errors.AssertionFailedf( - "expected multi-region enum ID %d to be referenced on REGIONAL BY TABLE: %q locality "+ - "config, but did not find it", - regionsEnumID, - desc.GetName(), - ) - } - } else { - if regionEnumIDReferenced { - // It may be the case that the multi-region type descriptor is used - // as the type of the table column. Validations should only fail if - // that is not the case. - if _, found := columnTypesTypeIDs[regionsEnumID]; !found { - return errors.AssertionFailedf( - "expected no region Enum ID to be referenced by a REGIONAL BY TABLE: %q homed in the "+ - "primary region, but found: %d", - desc.GetName(), - regionsEnumDesc.GetID(), - ) - } - } - } - default: - return pgerror.Newf( - pgcode.InvalidTableDefinition, - "unknown locality level: %T", - lc, - ) - } - return nil -} diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 99c29c8447c9..bfb271d443fb 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -46,6 +46,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/catprivilege" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/multiregion" "github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/idxusage" @@ -399,7 +400,7 @@ CREATE TABLE crdb_internal.tables ( locality := tree.DNull if c := table.GetLocalityConfig(); c != nil { f := p.EvalContext().FmtCtx(tree.FmtSimple) - if err := tabledesc.FormatTableLocalityConfig(c, f); err != nil { + if err := multiregion.FormatTableLocalityConfig(c, f); err != nil { return err } locality = tree.NewDString(f.String()) diff --git a/pkg/sql/drop_type.go b/pkg/sql/drop_type.go index dafb071e4a9e..674248912bf9 100644 --- a/pkg/sql/drop_type.go +++ b/pkg/sql/drop_type.go @@ -195,7 +195,7 @@ func (p *planner) addBackRefsFromAllTypesInTable( if err != nil { return err } - typeIDs, err := desc.GetAllReferencedTypeIDs(dbDesc, func(id descpb.ID) (catalog.TypeDescriptor, error) { + typeIDs, _, err := desc.GetAllReferencedTypeIDs(dbDesc, func(id descpb.ID) (catalog.TypeDescriptor, error) { mutDesc, err := p.Descriptors().GetMutableTypeVersionByID(ctx, p.txn, id) if err != nil { return nil, err @@ -222,7 +222,7 @@ func (p *planner) removeBackRefsFromAllTypesInTable( if err != nil { return err } - typeIDs, err := desc.GetAllReferencedTypeIDs(dbDesc, func(id descpb.ID) (catalog.TypeDescriptor, error) { + typeIDs, _, err := desc.GetAllReferencedTypeIDs(dbDesc, func(id descpb.ID) (catalog.TypeDescriptor, error) { mutDesc, err := p.Descriptors().GetMutableTypeVersionByID(ctx, p.txn, id) if err != nil { return nil, err diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index d3df17387336..5e635caf4616 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -1062,7 +1062,7 @@ func (sc *SchemaChanger) done(ctx context.Context) error { if err != nil { return err } - referencedTypeIDs, err := scTable.GetAllReferencedTypeIDs(dbDesc, + referencedTypeIDs, _, err := scTable.GetAllReferencedTypeIDs(dbDesc, func(id descpb.ID) (catalog.TypeDescriptor, error) { desc, err := descsCol.GetImmutableTypeByID(ctx, txn, id, tree.ObjectLookupFlags{}) if err != nil { @@ -1314,7 +1314,7 @@ func (sc *SchemaChanger) done(ctx context.Context) error { // type descriptors. If this table has been dropped in the mean time, then // don't install any backreferences. if !scTable.Dropped() { - newReferencedTypeIDs, err := scTable.GetAllReferencedTypeIDs(dbDesc, + newReferencedTypeIDs, _, err := scTable.GetAllReferencedTypeIDs(dbDesc, func(id descpb.ID) (catalog.TypeDescriptor, error) { typ, err := descsCol.GetMutableTypeVersionByID(ctx, txn, id) if err != nil { diff --git a/pkg/sql/schemachanger/scbuild/relation_common.go b/pkg/sql/schemachanger/scbuild/relation_common.go index 12359a8efaba..dce2b2a334f2 100644 --- a/pkg/sql/schemachanger/scbuild/relation_common.go +++ b/pkg/sql/schemachanger/scbuild/relation_common.go @@ -33,7 +33,7 @@ func (b *buildContext) removeTypeBackRefDeps( if err != nil { panic(err) } - typeIDs, err := tableDesc.GetAllReferencedTypeIDs(dbDesc, func(id descpb.ID) (catalog.TypeDescriptor, error) { + typeIDs, _, err := tableDesc.GetAllReferencedTypeIDs(dbDesc, func(id descpb.ID) (catalog.TypeDescriptor, error) { mutDesc, err := b.Descs.GetMutableTypeByID(ctx, b.EvalCtx.Txn, id, tree.ObjectLookupFlagsWithRequired()) if err != nil { return nil, err diff --git a/pkg/sql/show_create_clauses.go b/pkg/sql/show_create_clauses.go index aedf26725b67..646699ff0a9c 100644 --- a/pkg/sql/show_create_clauses.go +++ b/pkg/sql/show_create_clauses.go @@ -19,8 +19,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/multiregion" "github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -376,7 +376,7 @@ func showFamilyClause(desc catalog.TableDescriptor, f *tree.FmtCtx) { func showCreateLocality(desc catalog.TableDescriptor, f *tree.FmtCtx) error { if c := desc.GetLocalityConfig(); c != nil { f.WriteString(" LOCALITY ") - return tabledesc.FormatTableLocalityConfig(c, f) + return multiregion.FormatTableLocalityConfig(c, f) } return nil }