From 54b8a73097929568cee8897783c7c8c41b764339 Mon Sep 17 00:00:00 2001 From: Oleg Afanasyev Date: Thu, 21 Sep 2023 15:25:29 +0100 Subject: [PATCH 1/4] rangefeed: create catchup iterators eagerly Previously, catchup iterators were created in the main rangefeed processor work loop. This is negatively affecting scheduler based processors as this operation could be slow. This commit makes iterator creation eager, simplifying error handling and making rangefeed times delays lower. Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/processor.go | 25 +++----- pkg/kv/kvserver/rangefeed/registry.go | 57 +++++-------------- pkg/kv/kvserver/rangefeed/registry_test.go | 19 +++---- .../kvserver/rangefeed/scheduled_processor.go | 12 +--- pkg/kv/kvserver/replica_rangefeed.go | 45 +++++++++------ 5 files changed, 61 insertions(+), 97 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 42abbdae4463..d18b893e65e9 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -157,7 +157,10 @@ type Processor interface { // provided an error when the registration closes. // // The optionally provided "catch-up" iterator is used to read changes from the - // engine which occurred after the provided start timestamp (exclusive). + // engine which occurred after the provided start timestamp (exclusive). If + // this method succeeds, registration must take ownership of iterator and + // subsequently close it. If method fails, iterator must be kept intact and + // would be closed by caller. // // If the method returns false, the processor will have been stopped, so calling // Stop is not necessary. If the method returns true, it will also return an @@ -168,7 +171,7 @@ type Processor interface { Register( span roachpb.RSpan, startTS hlc.Timestamp, // exclusive - catchUpIterConstructor CatchUpIteratorConstructor, + catchUpIter *CatchUpIterator, withDiff bool, stream Stream, disconnectFn func(), @@ -326,12 +329,6 @@ func NewLegacyProcessor(cfg Config) *LegacyProcessor { // engine has not been closed. type IntentScannerConstructor func() IntentScanner -// CatchUpIteratorConstructor is used to construct an iterator that can be used -// for catchup-scans. Takes the key span and exclusive start time to run the -// catchup scan for. It should be called from underneath a stopper task to -// ensure that the engine has not been closed. -type CatchUpIteratorConstructor func(roachpb.Span, hlc.Timestamp) (*CatchUpIterator, error) - // Start implements Processor interface. // // LegacyProcessor launches a goroutine to process rangefeed events and send @@ -404,14 +401,6 @@ func (p *LegacyProcessor) run( log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span) } - // Construct the catchUpIter before notifying the registration that it - // has been registered. Note that if the catchUpScan is never run, then - // the iterator constructed here will be closed in disconnect. - if err := r.maybeConstructCatchUpIter(); err != nil { - r.disconnect(kvpb.NewError(err)) - return - } - // Add the new registration to the registry. p.reg.Register(&r) @@ -559,7 +548,7 @@ func (p *LegacyProcessor) sendStop(pErr *kvpb.Error) { func (p *LegacyProcessor) Register( span roachpb.RSpan, startTS hlc.Timestamp, - catchUpIterConstructor CatchUpIteratorConstructor, + catchUpIter *CatchUpIterator, withDiff bool, stream Stream, disconnectFn func(), @@ -572,7 +561,7 @@ func (p *LegacyProcessor) Register( blockWhenFull := p.Config.EventChanTimeout == 0 // for testing r := newRegistration( - span.AsRawSpanWithNoLocals(), startTS, catchUpIterConstructor, withDiff, + span.AsRawSpanWithNoLocals(), startTS, catchUpIter, withDiff, p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, disconnectFn, done, ) select { diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index 6dc9487b4692..8a2517c5bdce 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -81,15 +81,6 @@ type registration struct { withDiff bool metrics *Metrics - // catchUpIterConstructor is used to construct the catchUpIter if necessary. - // The reason this constructor is plumbed down is to make sure that the - // iterator does not get constructed too late in server shutdown. However, - // it must also be stored in the struct to ensure that it is not constructed - // too late, after the raftMu has been dropped. Thus, this function, if - // non-nil, will be used to populate mu.catchUpIter while the registration - // is being registered by the processor. - catchUpIterConstructor CatchUpIteratorConstructor - // Output. stream Stream done *future.ErrorFuture @@ -113,9 +104,10 @@ type registration struct { outputLoopCancelFn func() disconnected bool - // catchUpIter is populated on the Processor's goroutine while the - // Replica.raftMu is still held. If it is non-nil at the time that - // disconnect is called, it is closed by disconnect. + // catchUpIter is created by replcia under raftMu lock when registration is + // created. It is detached by output loop for processing and closed. + // If output loop was not started and catchUpIter is non-nil at the time + // that disconnect is called, it is closed by disconnect. catchUpIter *CatchUpIterator } } @@ -123,7 +115,7 @@ type registration struct { func newRegistration( span roachpb.Span, startTS hlc.Timestamp, - catchUpIterConstructor CatchUpIteratorConstructor, + catchUpIter *CatchUpIterator, withDiff bool, bufferSz int, blockWhenFull bool, @@ -133,19 +125,19 @@ func newRegistration( done *future.ErrorFuture, ) registration { r := registration{ - span: span, - catchUpTimestamp: startTS, - catchUpIterConstructor: catchUpIterConstructor, - withDiff: withDiff, - metrics: metrics, - stream: stream, - done: done, - unreg: unregisterFn, - buf: make(chan *sharedEvent, bufferSz), - blockWhenFull: blockWhenFull, + span: span, + catchUpTimestamp: startTS, + withDiff: withDiff, + metrics: metrics, + stream: stream, + done: done, + unreg: unregisterFn, + buf: make(chan *sharedEvent, bufferSz), + blockWhenFull: blockWhenFull, } r.mu.Locker = &syncutil.Mutex{} r.mu.caughtUp = true + r.mu.catchUpIter = catchUpIter return r } @@ -587,25 +579,6 @@ func (r *registration) waitForCaughtUp() error { return errors.Errorf("registration %v failed to empty in time", r.Range()) } -// maybeConstructCatchUpIter calls the catchUpIterConstructor and attaches -// the catchUpIter to be detached in the catchUpScan or closed on disconnect. -func (r *registration) maybeConstructCatchUpIter() error { - if r.catchUpIterConstructor == nil { - return nil - } - - catchUpIter, err := r.catchUpIterConstructor(r.span, r.catchUpTimestamp) - if err != nil { - return err - } - r.catchUpIterConstructor = nil - - r.mu.Lock() - defer r.mu.Unlock() - r.mu.catchUpIter = catchUpIter - return nil -} - // detachCatchUpIter detaches the catchUpIter that was previously attached. func (r *registration) detachCatchUpIter() *CatchUpIterator { r.mu.Lock() diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index cf618d16ef5e..624117b76553 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -100,16 +100,16 @@ type testRegistration struct { stream *testStream } -func makeCatchUpIteratorConstructor(iter storage.SimpleMVCCIterator) CatchUpIteratorConstructor { +func makeCatchUpIterator( + iter storage.SimpleMVCCIterator, span roachpb.Span, startTime hlc.Timestamp, +) *CatchUpIterator { if iter == nil { return nil } - return func(span roachpb.Span, startTime hlc.Timestamp) (*CatchUpIterator, error) { - return &CatchUpIterator{ - simpleCatchupIter: simpleCatchupIterAdapter{iter}, - span: span, - startTime: startTime, - }, nil + return &CatchUpIterator{ + simpleCatchupIter: simpleCatchupIterAdapter{iter}, + span: span, + startTime: startTime, } } @@ -120,7 +120,7 @@ func newTestRegistration( r := newRegistration( span, ts, - makeCatchUpIteratorConstructor(catchup), + makeCatchUpIterator(catchup, span, ts), withDiff, 5, false, /* blockWhenFull */ @@ -129,9 +129,6 @@ func newTestRegistration( func() {}, &future.ErrorFuture{}, ) - if err := r.maybeConstructCatchUpIter(); err != nil { - panic(err) - } return &testRegistration{ registration: r, stream: s, diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index 96469c74ca7d..fa9779ed1807 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -294,7 +294,7 @@ func (p *ScheduledProcessor) sendStop(pErr *kvpb.Error) { func (p *ScheduledProcessor) Register( span roachpb.RSpan, startTS hlc.Timestamp, - catchUpIterConstructor CatchUpIteratorConstructor, + catchUpIter *CatchUpIterator, withDiff bool, stream Stream, disconnectFn func(), @@ -307,7 +307,7 @@ func (p *ScheduledProcessor) Register( blockWhenFull := p.Config.EventChanTimeout == 0 // for testing r := newRegistration( - span.AsRawSpanWithNoLocals(), startTS, catchUpIterConstructor, withDiff, + span.AsRawSpanWithNoLocals(), startTS, catchUpIter, withDiff, p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, disconnectFn, done, ) @@ -319,14 +319,6 @@ func (p *ScheduledProcessor) Register( log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span) } - // Construct the catchUpIter before notifying the registration that it - // has been registered. Note that if the catchUpScan is never run, then - // the iterator constructed here will be closed in disconnect. - if err := r.maybeConstructCatchUpIter(); err != nil { - r.disconnect(kvpb.NewError(err)) - return nil - } - // Add the new registration to the registry. p.reg.Register(&r) diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index e671ea47940d..3e70c5415d64 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -271,25 +271,22 @@ func (r *Replica) RangeFeed( } // Register the stream with a catch-up iterator. - var catchUpIterFunc rangefeed.CatchUpIteratorConstructor + var catchUpIter *rangefeed.CatchUpIterator if usingCatchUpIter { - catchUpIterFunc = func(span roachpb.Span, startTime hlc.Timestamp) (*rangefeed.CatchUpIterator, error) { - // Assert that we still hold the raftMu when this is called to ensure - // that the catchUpIter reads from the current snapshot. - r.raftMu.AssertHeld() - i, err := rangefeed.NewCatchUpIterator(r.store.TODOEngine(), span, startTime, iterSemRelease, pacer) - if err != nil { - return nil, err - } - if f := r.store.TestingKnobs().RangefeedValueHeaderFilter; f != nil { - i.OnEmit = f - } - return i, nil + catchUpIter, err = rangefeed.NewCatchUpIterator(r.store.TODOEngine(), rSpan.AsRawSpanWithNoLocals(), + args.Timestamp, iterSemRelease, pacer) + if err != nil { + r.raftMu.Unlock() + iterSemRelease() + return future.MakeCompletedErrorFuture(err) + } + if f := r.store.TestingKnobs().RangefeedValueHeaderFilter; f != nil { + catchUpIter.OnEmit = f } } var done future.ErrorFuture p := r.registerWithRangefeedRaftMuLocked( - ctx, rSpan, args.Timestamp, catchUpIterFunc, args.WithDiff, lockedStream, &done, + ctx, rSpan, args.Timestamp, catchUpIter, args.WithDiff, lockedStream, &done, ) r.raftMu.Unlock() @@ -373,18 +370,32 @@ func logSlowRangefeedRegistration(ctx context.Context) func() { // registerWithRangefeedRaftMuLocked sets up a Rangefeed registration over the // provided span. It initializes a rangefeed for the Replica if one is not // already running. Requires raftMu be locked. -// Returns Future[*roachpb.Error] which will return an error once rangefeed completes. +// Returns Future[*roachpb.Error] which will return an error once rangefeed +// completes. +// Note that caller delegates lifecycle of catchUpIter to this method in both +// success and failure cases. So it is important that this method closes +// iterator in case registration fails. Successful registration takes iterator +// ownership and ensures it is closed when catch up is complete or aborted. func (r *Replica) registerWithRangefeedRaftMuLocked( ctx context.Context, span roachpb.RSpan, startTS hlc.Timestamp, // exclusive - catchUpIter rangefeed.CatchUpIteratorConstructor, + catchUpIter *rangefeed.CatchUpIterator, withDiff bool, stream rangefeed.Stream, done *future.ErrorFuture, ) rangefeed.Processor { defer logSlowRangefeedRegistration(ctx)() + // Always defer closing iterator to cover old and new failure cases. + // On successful path where registration succeeds reset catchUpIter to prevent + // closing it. + defer func() { + if catchUpIter != nil { + catchUpIter.Close() + } + }() + // Attempt to register with an existing Rangefeed processor, if one exists. // The locking here is a little tricky because we need to handle the case // of concurrent processor shutdowns (see maybeDisconnectEmptyRangefeed). @@ -399,6 +410,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // that this new registration might be interested in. r.setRangefeedFilterLocked(filter) r.rangefeedMu.Unlock() + catchUpIter = nil return p } // If the registration failed, the processor was already being shut @@ -477,6 +489,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( panic("unexpected Stopped processor") } } + catchUpIter = nil // Set the rangefeed processor and filter reference. r.setRangefeedProcessor(p) From ab00ae96196b3b2ca8b77d1ec3bce35050316f1d Mon Sep 17 00:00:00 2001 From: Oleg Afanasyev Date: Tue, 12 Sep 2023 17:59:18 +0100 Subject: [PATCH 2/4] rangefeed: add rangefeed scheduler metrics This commit adds two rangefeed scheduler metrics to assist performance debugging. Latency histogram which is showing how long events spent waiting in queue before they are scheduled for processing: `kv.rangefeed.scheduler.normal.latency` `kv.rangefeed.scheduler.system.latency` Queue size which is showing how many ranges are pending event to be processed: `kv.rangefeed.scheduler.normal.queue_size` `kv.rangefeed.scheduler.system.queue_size` Epic: #110420 Release note: None --- docs/generated/metrics/metrics.html | 6 +- pkg/kv/kvserver/rangefeed/metrics.go | 61 ++++++++- pkg/kv/kvserver/rangefeed/processor_test.go | 6 +- .../kvserver/rangefeed/scheduled_processor.go | 2 - pkg/kv/kvserver/rangefeed/scheduler.go | 123 +++++++++++++----- pkg/kv/kvserver/rangefeed/scheduler_test.go | 82 +++++++++--- pkg/kv/kvserver/store.go | 21 +-- 7 files changed, 240 insertions(+), 61 deletions(-) diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html index 5de1a81e14c9..36434ecd54d6 100644 --- a/docs/generated/metrics/metrics.html +++ b/docs/generated/metrics/metrics.html @@ -235,7 +235,11 @@ STORAGEkv.rangefeed.mem_systemMemory usage by rangefeeds on system rangesMemoryGAUGEBYTESAVGNONE STORAGEkv.rangefeed.processors_goroutineNumber of active RangeFeed processors using goroutinesProcessorsGAUGECOUNTAVGNONE STORAGEkv.rangefeed.processors_schedulerNumber of active RangeFeed processors using schedulerProcessorsGAUGECOUNTAVGNONE -STORAGEkv.rangefeed.registrationsNumber of active rangefeed registrationsRegistrationsGAUGECOUNTAVGNONE +STORAGEkv.rangefeed.registrationsNumber of active RangeFeed registrationsRegistrationsGAUGECOUNTAVGNONE +STORAGEkv.rangefeed.scheduler.normal.latencyKV RangeFeed normal scheduler latencyLatencyHISTOGRAMNANOSECONDSAVGNONE +STORAGEkv.rangefeed.scheduler.normal.queue_sizeNumber of entries in the KV RangeFeed normal scheduler queuePending RangesGAUGECOUNTAVGNONE +STORAGEkv.rangefeed.scheduler.system.latencyKV RangeFeed system scheduler latencyLatencyHISTOGRAMNANOSECONDSAVGNONE +STORAGEkv.rangefeed.scheduler.system.queue_sizeNumber of entries in the KV RangeFeed system scheduler queuePending RangesGAUGECOUNTAVGNONE STORAGEkv.replica_circuit_breaker.num_tripped_eventsNumber of times the per-Replica circuit breakers tripped since process start.EventsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE STORAGEkv.replica_circuit_breaker.num_tripped_replicasNumber of Replicas for which the per-Replica circuit breaker is currently tripped.

A nonzero value indicates range or replica unavailability, and should be investigated.
Replicas in this state will fail-fast all inbound requests.
ReplicasGAUGECOUNTAVGNONE STORAGEkv.replica_read_batch_evaluate.dropped_latches_before_evalNumber of times read-only batches dropped latches before evaluation.BatchesCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE diff --git a/pkg/kv/kvserver/rangefeed/metrics.go b/pkg/kv/kvserver/rangefeed/metrics.go index c0a28c9f0c1e..837bcfe6fd7c 100644 --- a/pkg/kv/kvserver/rangefeed/metrics.go +++ b/pkg/kv/kvserver/rangefeed/metrics.go @@ -11,6 +11,7 @@ package rangefeed import ( + "fmt" "time" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -38,7 +39,7 @@ var ( } metaRangeFeedRegistrations = metric.Metadata{ Name: "kv.rangefeed.registrations", - Help: "Number of active rangefeed registrations", + Help: "Number of active RangeFeed registrations", Measurement: "Registrations", Unit: metric.Unit_COUNT, } @@ -54,6 +55,18 @@ var ( Measurement: "Processors", Unit: metric.Unit_COUNT, } + metaQueueTimeHistogramsTemplate = metric.Metadata{ + Name: "kv.rangefeed.scheduler.%s.latency", + Help: "KV RangeFeed %s scheduler latency", + Measurement: "Latency", + Unit: metric.Unit_NANOSECONDS, + } + metaQueueSizeTemplate = metric.Metadata{ + Name: "kv.rangefeed.scheduler.%s.queue_size", + Help: "Number of entries in the KV RangeFeed %s scheduler queue", + Measurement: "Pending Ranges", + Unit: metric.Unit_COUNT, + } ) // Metrics are for production monitoring of RangeFeeds. @@ -120,3 +133,49 @@ func NewFeedBudgetMetrics(histogramWindow time.Duration) *FeedBudgetPoolMetrics "Memory usage by rangefeeds")), } } + +// ShardMetrics metrics for individual scheduler shard. +type ShardMetrics struct { + // QueueTime is time spent by range in scheduler queue. + QueueTime metric.IHistogram + // QueueSize is number of elements in the queue recently observed by reader. + QueueSize *metric.Gauge +} + +// MetricStruct implements metrics.Struct interface. +func (*ShardMetrics) MetricStruct() {} + +// SchedulerMetrics for production monitoring of rangefeed Scheduler. +type SchedulerMetrics struct { + SystemPriority *ShardMetrics + NormalPriority *ShardMetrics +} + +// MetricStruct implements metrics.Struct interface. +func (*SchedulerMetrics) MetricStruct() {} + +// NewSchedulerMetrics creates metric struct for Scheduler. +func NewSchedulerMetrics(histogramWindow time.Duration) *SchedulerMetrics { + return &SchedulerMetrics{ + SystemPriority: newSchedulerShardMetrics("system", histogramWindow), + NormalPriority: newSchedulerShardMetrics("normal", histogramWindow), + } +} + +func newSchedulerShardMetrics(name string, histogramWindow time.Duration) *ShardMetrics { + expandTemplate := func(template metric.Metadata) metric.Metadata { + result := template + result.Name = fmt.Sprintf(template.Name, name) + result.Help = fmt.Sprintf(template.Help, name) + return result + } + return &ShardMetrics{ + QueueTime: metric.NewHistogram(metric.HistogramOptions{ + Mode: metric.HistogramModePreferHdrLatency, + Metadata: expandTemplate(metaQueueTimeHistogramsTemplate), + Duration: histogramWindow, + BucketConfig: metric.IOLatencyBuckets, + }), + QueueSize: metric.NewGauge(expandTemplate(metaQueueSizeTemplate)), + } +} diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index dbf1ccf8cba7..2561f0acfa79 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -297,7 +297,11 @@ func newTestProcessor( o(&cfg) } if cfg.useScheduler { - sch := NewScheduler(SchedulerConfig{Workers: 1, PriorityWorkers: 1}) + sch := NewScheduler(SchedulerConfig{ + Workers: 1, + PriorityWorkers: 1, + Metrics: NewSchedulerMetrics(time.Second), + }) require.NoError(t, sch.Start(context.Background(), stopper)) cfg.Scheduler = sch // Also create a dummy priority processor to populate priorityIDs for diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index ff4f9059d4a8..70eda73ea7e8 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -159,8 +159,6 @@ func (p *ScheduledProcessor) processRequests(ctx context.Context) { // Transform and route pending events. func (p *ScheduledProcessor) processEvents(ctx context.Context) { - // TODO(oleg): maybe limit max count and allow returning some data for - // further processing on next iteration. // Only process as much data as was present at the start of the processing // run to avoid starving other processors. for max := len(p.eventC); max > 0; max-- { diff --git a/pkg/kv/kvserver/rangefeed/scheduler.go b/pkg/kv/kvserver/rangefeed/scheduler.go index 315d637f8f7c..c173ac06daa0 100644 --- a/pkg/kv/kvserver/rangefeed/scheduler.go +++ b/pkg/kv/kvserver/rangefeed/scheduler.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -88,6 +89,11 @@ func (e processorEventType) String() string { // scheduler lock. const enqueueBulkMaxChunk = 100 +// schedulerLatencyHistogramCollectionFrequency defines how frequently scheduler +// will update histogram. we choose a prime number to reduce changes of periodic +// events skewing sampling. +const schedulerLatencyHistogramCollectionFrequency = 13 + // Callback is a callback to perform work set by processor. Event is a // combination of all event types scheduled since last callback invocation. // @@ -113,6 +119,12 @@ type SchedulerConfig struct { // enqueue operation. Chunking is done to avoid holding locks for too long // as it will interfere with enqueue operations. BulkChunkSize int + + // Metrics for scheduler performance monitoring + Metrics *SchedulerMetrics + // histogramFrequency sets how frequently wait latency is recorded for metrics + // purposes. + HistogramFrequency int64 } // priorityIDsValue is a placeholder value for Scheduler.priorityIDs. IntMap @@ -166,6 +178,11 @@ type schedulerShard struct { queue *idQueue // No more new registrations allowed. Workers are winding down. quiescing bool + + metrics *ShardMetrics + // histogramFrequency determines frequency of histogram collection + histogramFrequency int64 + nextLatencyCheck int64 } // NewScheduler will instantiate an idle scheduler based on provided config. @@ -175,6 +192,10 @@ func NewScheduler(cfg SchedulerConfig) *Scheduler { if bulkChunkSize == 0 { bulkChunkSize = enqueueBulkMaxChunk } + histogramFrequency := cfg.HistogramFrequency + if histogramFrequency == 0 { + histogramFrequency = schedulerLatencyHistogramCollectionFrequency + } s := &Scheduler{} @@ -183,7 +204,8 @@ func NewScheduler(cfg SchedulerConfig) *Scheduler { if cfg.PriorityWorkers > 0 { priorityWorkers = cfg.PriorityWorkers } - s.shards = append(s.shards, newSchedulerShard(priorityWorkers, bulkChunkSize)) + s.shards = append(s.shards, + newSchedulerShard(priorityWorkers, bulkChunkSize, cfg.Metrics.SystemPriority, histogramFrequency)) // Regular shards, excluding priority shard. numShards := 1 @@ -198,20 +220,25 @@ func NewScheduler(cfg SchedulerConfig) *Scheduler { if shardWorkers <= 0 { shardWorkers = 1 // ensure we always have a worker } - s.shards = append(s.shards, newSchedulerShard(shardWorkers, bulkChunkSize)) + s.shards = append(s.shards, + newSchedulerShard(shardWorkers, bulkChunkSize, cfg.Metrics.NormalPriority, histogramFrequency)) } return s } // newSchedulerShard creates a new shard with the given number of workers. -func newSchedulerShard(numWorkers, bulkChunkSize int) *schedulerShard { +func newSchedulerShard( + numWorkers, bulkChunkSize int, metrics *ShardMetrics, histogramFrequency int64, +) *schedulerShard { ss := &schedulerShard{ - numWorkers: numWorkers, - bulkChunkSize: bulkChunkSize, - procs: map[int64]Callback{}, - status: map[int64]processorEventType{}, - queue: newIDQueue(), + numWorkers: numWorkers, + bulkChunkSize: bulkChunkSize, + procs: map[int64]Callback{}, + status: map[int64]processorEventType{}, + queue: newIDQueue(), + metrics: metrics, + histogramFrequency: histogramFrequency, } ss.cond = sync.NewCond(&ss.Mutex) return ss @@ -359,33 +386,49 @@ func (ss *schedulerShard) unregister(id int64) { // enqueue enqueues a single event for a given processor in this shard, and wakes // up a worker to process it. The caller must not hold the shard lock. func (ss *schedulerShard) enqueue(id int64, evt processorEventType) { + // We get time outside of lock to get more realistic delay in case there's a + // scheduler contention. + now := ss.maybeEnqueueStartTime() ss.Lock() defer ss.Unlock() - if ss.enqueueLocked(id, evt) { + if ss.enqueueLocked(queueEntry{id: id, startTime: now}, evt) { // Wake up potential waiting worker. // We are allowed to do this under cond lock. ss.cond.Signal() + ss.metrics.QueueSize.Inc(1) + } +} + +// maybeEnqueueStartTime returns now in nanos or 0. 0 means event will have no start +// time and hence won't be included in histogram. This is done to throttle +// histogram collection by only recording every n-th event to reduce CPU waste. +func (ss *schedulerShard) maybeEnqueueStartTime() int64 { + var now int64 + if v := atomic.AddInt64(&ss.nextLatencyCheck, -1); v < 1 && (-v%ss.histogramFrequency) == 0 { + now = timeutil.Now().UnixNano() + atomic.AddInt64(&ss.nextLatencyCheck, ss.histogramFrequency) } + return now } // enqueueLocked enqueues a single event for a given processor in this shard. // Does not wake up a worker to process it. -func (ss *schedulerShard) enqueueLocked(id int64, evt processorEventType) bool { - if _, ok := ss.procs[id]; !ok { +func (ss *schedulerShard) enqueueLocked(entry queueEntry, evt processorEventType) bool { + if _, ok := ss.procs[entry.id]; !ok { return false } - pending := ss.status[id] + pending := ss.status[entry.id] if pending&Stopped != 0 { return false } if pending == 0 { // Enqueue if processor was idle. - ss.queue.pushBack(id) + ss.queue.pushBack(entry) } update := pending | evt | Queued if update != pending { // Only update if event actually changed. - ss.status[id] = update + ss.status[entry.id] = update } return pending == 0 } @@ -398,10 +441,17 @@ func (ss *schedulerShard) enqueueN(ids []int64, evt processorEventType) int { return 0 } + // For bulk insertions we just apply sampling frequency within the batch + // to reduce contention. + now := timeutil.Now().UnixNano() ss.Lock() var count int for i, id := range ids { - if ss.enqueueLocked(id, evt) { + time := int64(0) + if int64(i)%ss.histogramFrequency == 0 { + time = now + } + if ss.enqueueLocked(queueEntry{id: id, startTime: time}, evt) { count++ } if (i+1)%ss.bulkChunkSize == 0 { @@ -410,6 +460,7 @@ func (ss *schedulerShard) enqueueN(ids []int64, evt processorEventType) int { } } ss.Unlock() + ss.metrics.QueueSize.Inc(int64(count)) if count >= ss.numWorkers { ss.cond.Broadcast() @@ -425,7 +476,7 @@ func (ss *schedulerShard) enqueueN(ids []int64, evt processorEventType) int { // be launched in separate goroutine and will loop until scheduler is stopped. func (ss *schedulerShard) processEvents(ctx context.Context) { for { - var id int64 + var entry queueEntry ss.Lock() for { if ss.quiescing { @@ -433,18 +484,24 @@ func (ss *schedulerShard) processEvents(ctx context.Context) { return } var ok bool - if id, ok = ss.queue.popFront(); ok { + if entry, ok = ss.queue.popFront(); ok { break } ss.cond.Wait() } - cb := ss.procs[id] - e := ss.status[id] + cb := ss.procs[entry.id] + e := ss.status[entry.id] // Keep Queued status and preserve Stopped to block any more events. - ss.status[id] = Queued | (e & Stopped) + ss.status[entry.id] = Queued | (e & Stopped) ss.Unlock() + if entry.startTime != 0 { + delay := timeutil.Now().UnixNano() - entry.startTime + ss.metrics.QueueTime.RecordValue(delay) + } + ss.metrics.QueueSize.Dec(1) + procEventType := Queued ^ e remaining := cb(procEventType) @@ -459,18 +516,18 @@ func (ss *schedulerShard) processEvents(ctx context.Context) { if e&Stopped != 0 { if remaining != 0 { log.VWarningf(ctx, 5, - "rangefeed processor %d didn't process all events on close", id) + "rangefeed processor %d didn't process all events on close", entry.id) } // We'll keep Stopped state to avoid calling stopped processor again // on scheduler shutdown. ss.Lock() - ss.status[id] = Stopped + ss.status[entry.id] = Stopped ss.Unlock() continue } ss.Lock() - pendingStatus, ok := ss.status[id] + pendingStatus, ok := ss.status[entry.id] if !ok { ss.Unlock() continue @@ -478,15 +535,16 @@ func (ss *schedulerShard) processEvents(ctx context.Context) { newStatus := pendingStatus | remaining if newStatus == Queued { // If no events arrived, get rid of id. - delete(ss.status, id) + delete(ss.status, entry.id) } else { // Since more events arrived during processing, reschedule. - ss.queue.pushBack(id) + ss.queue.pushBack(queueEntry{id: entry.id, startTime: ss.maybeEnqueueStartTime()}) // If remaining work was returned and not already planned, then update // pending status to reflect that. if newStatus != pendingStatus { - ss.status[id] = newStatus + ss.status[entry.id] = newStatus } + ss.metrics.QueueSize.Inc(1) } ss.Unlock() } @@ -621,10 +679,15 @@ func (cs *ClientScheduler) Unregister() { // Number of queue elements allocated at once to amortize queue allocations. const idQueueChunkSize = 8000 +type queueEntry struct { + id int64 + startTime int64 +} + // idQueueChunk is a queue chunk of a fixed size which idQueue uses to extend // its storage. Chunks are kept in the pool to reduce allocations. type idQueueChunk struct { - data [idQueueChunkSize]int64 + data [idQueueChunkSize]queueEntry nextChunk *idQueueChunk } @@ -665,7 +728,7 @@ func newIDQueue() *idQueue { } } -func (q *idQueue) pushBack(id int64) { +func (q *idQueue) pushBack(id queueEntry) { if q.write == idQueueChunkSize { nexChunk := getPooledIDQueueChunk() q.last.nextChunk = nexChunk @@ -677,9 +740,9 @@ func (q *idQueue) pushBack(id int64) { q.size++ } -func (q *idQueue) popFront() (int64, bool) { +func (q *idQueue) popFront() (queueEntry, bool) { if q.size == 0 { - return 0, false + return queueEntry{}, false } if q.read == idQueueChunkSize { removed := q.first diff --git a/pkg/kv/kvserver/rangefeed/scheduler_test.go b/pkg/kv/kvserver/rangefeed/scheduler_test.go index 44932fee3743..a81945d5e9ff 100644 --- a/pkg/kv/kvserver/rangefeed/scheduler_test.go +++ b/pkg/kv/kvserver/rangefeed/scheduler_test.go @@ -13,6 +13,7 @@ package rangefeed import ( "context" "fmt" + "sync" "testing" "time" @@ -29,7 +30,7 @@ func TestStopEmpty(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) - s := NewScheduler(SchedulerConfig{Workers: 1}) + s := newTestScheduler(1) require.NoError(t, s.Start(ctx, stopper), "failed to start") s.Stop() @@ -42,7 +43,7 @@ func TestStopNonEmpty(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) - s := NewScheduler(SchedulerConfig{Workers: 1}) + s := newTestScheduler(1) require.NoError(t, s.Start(ctx, stopper), "failed to start") c := createAndRegisterConsumerOrFail(t, s, 1, false /* priority */) s.stopProcessor(c.id) @@ -214,7 +215,7 @@ func TestDeliverEvents(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) - s := NewScheduler(SchedulerConfig{Workers: 1}) + s := newTestScheduler(1) require.NoError(t, s.Start(ctx, stopper), "failed to start") c := createAndRegisterConsumerOrFail(t, s, 1, false /* priority */) s.enqueue(c.id, te1) @@ -228,7 +229,7 @@ func TestNoParallel(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) - s := NewScheduler(SchedulerConfig{Workers: 2}) + s := newTestScheduler(2) require.NoError(t, s.Start(ctx, stopper), "failed to start") c := createAndRegisterConsumerOrFail(t, s, 1, false /* priority */) c.pause() @@ -246,7 +247,7 @@ func TestProcessOtherWhilePaused(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) - s := NewScheduler(SchedulerConfig{Workers: 2}) + s := newTestScheduler(2) require.NoError(t, s.Start(ctx, stopper), "failed to start") c1 := createAndRegisterConsumerOrFail(t, s, 1, false /* priority */) c2 := createAndRegisterConsumerOrFail(t, s, 2, false /* priority */) @@ -268,7 +269,7 @@ func TestEventsCombined(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) - s := NewScheduler(SchedulerConfig{Workers: 2}) + s := newTestScheduler(2) require.NoError(t, s.Start(ctx, stopper), "failed to start") c := createAndRegisterConsumerOrFail(t, s, 1, false /* priority */) c.pause() @@ -287,7 +288,7 @@ func TestRescheduleEvent(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) - s := NewScheduler(SchedulerConfig{Workers: 2}) + s := newTestScheduler(2) require.NoError(t, s.Start(ctx, stopper), "failed to start") c := createAndRegisterConsumerOrFail(t, s, 1, false /* priority */) c.pause() @@ -305,7 +306,7 @@ func TestClientScheduler(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) - s := NewScheduler(SchedulerConfig{Workers: 2}) + s := newTestScheduler(2) require.NoError(t, s.Start(ctx, stopper), "failed to start") cs := s.NewClientScheduler() // Manually create consumer as we don't want it to start, but want to use it @@ -335,7 +336,12 @@ func TestScheduleBatch(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) - s := NewScheduler(SchedulerConfig{Workers: 8, ShardSize: 2, BulkChunkSize: 2}) + s := NewScheduler(SchedulerConfig{ + Workers: 2, + BulkChunkSize: 2, + ShardSize: 2, + Metrics: NewSchedulerMetrics(time.Minute), + }) require.NoError(t, s.Start(ctx, stopper), "failed to start") const consumerNumber = 100 consumers := make([]*schedulerConsumer, consumerNumber) @@ -358,7 +364,7 @@ func TestPartialProcessing(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) - s := NewScheduler(SchedulerConfig{Workers: 1}) + s := newTestScheduler(1) require.NoError(t, s.Start(ctx, stopper), "failed to start") c := createAndRegisterConsumerOrFail(t, s, 1, false /* priority */) // Set process response to trigger process once again. @@ -387,7 +393,7 @@ func TestUnregisterWithoutStop(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) - s := NewScheduler(SchedulerConfig{Workers: 1}) + s := newTestScheduler(1) require.NoError(t, s.Start(ctx, stopper), "failed to start") c := createAndRegisterConsumerOrFail(t, s, 1, false /* priority */) s.enqueue(c.id, te1) @@ -404,7 +410,7 @@ func TestStartupFailure(t *testing.T) { stopper := stop.NewStopper() stopper.Stop(ctx) - s := NewScheduler(SchedulerConfig{Workers: 1}) + s := newTestScheduler(1) require.Error(t, s.Start(ctx, stopper), "started despite stopper stopped") } @@ -414,7 +420,9 @@ func TestSchedulerShutdown(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) - s := NewScheduler(SchedulerConfig{Workers: 2, ShardSize: 1}) + s := NewScheduler(SchedulerConfig{ + Workers: 2, ShardSize: 1, Metrics: NewSchedulerMetrics(time.Minute), + }) require.NoError(t, s.Start(ctx, stopper), "failed to start") c1 := createAndRegisterConsumerOrFail(t, s, 1, false /* priority */) c2 := createAndRegisterConsumerOrFail(t, s, 2, false /* priority */) @@ -429,11 +437,11 @@ func TestQueueReadWrite1By1(t *testing.T) { q := newIDQueue() val := int64(7) for i := 0; i < idQueueChunkSize*3; i++ { - q.pushBack(val) + q.pushBack(queueEntry{id: val}) require.Equal(t, 1, q.Len(), "queue size") v, ok := q.popFront() require.True(t, ok, "value not found after writing") - require.Equal(t, val, v, "read different from write") + require.Equal(t, val, v.id, "read different from write") val = val*3 + 7 } _, ok := q.popFront() @@ -445,7 +453,7 @@ func TestQueueReadWriteFull(t *testing.T) { val := int64(7) for i := 0; i < idQueueChunkSize*3; i++ { require.Equal(t, i, q.Len(), "queue size") - q.pushBack(val) + q.pushBack(queueEntry{id: val}) val = val*3 + 7 } val = int64(7) @@ -453,7 +461,7 @@ func TestQueueReadWriteFull(t *testing.T) { require.Equal(t, idQueueChunkSize*3-i, q.Len(), "queue size") v, ok := q.popFront() require.True(t, ok, "value not found after writing") - require.Equal(t, val, v, "read different from write") + require.Equal(t, val, v.id, "read different from write") val = val*3 + 7 } require.Equal(t, 0, q.Len(), "queue size") @@ -467,6 +475,10 @@ func TestQueueReadEmpty(t *testing.T) { require.False(t, ok, "unexpected value in empty queue") } +func newTestScheduler(workers int) *Scheduler { + return NewScheduler(SchedulerConfig{Workers: workers, Metrics: NewSchedulerMetrics(time.Minute)}) +} + func TestNewSchedulerShards(t *testing.T) { defer leaktest.AfterTest(t)() @@ -524,6 +536,7 @@ func TestNewSchedulerShards(t *testing.T) { Workers: tc.workers, PriorityWorkers: tc.priorityWorkers, ShardSize: tc.shardSize, + Metrics: NewSchedulerMetrics(time.Minute), }) var shardWorkers []int @@ -543,7 +556,13 @@ func TestSchedulerPriority(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) - s := NewScheduler(SchedulerConfig{Workers: 1, PriorityWorkers: 1, ShardSize: 1, BulkChunkSize: 1}) + s := NewScheduler(SchedulerConfig{ + Workers: 1, + PriorityWorkers: 1, + ShardSize: 1, + BulkChunkSize: 1, + Metrics: NewSchedulerMetrics(time.Minute), + }) require.NoError(t, s.Start(ctx, stopper)) defer s.Stop() @@ -567,3 +586,30 @@ func TestSchedulerPriority(t *testing.T) { c.requireStopped(t, 5*time.Second) cPri.requireStopped(t, 5*time.Second) } + +func TestMetricThrottling(t *testing.T) { + const workers = 100 + const iterations = 1000 + const freq = 3 + s := schedulerShard{histogramFrequency: freq} + var wg sync.WaitGroup + ticks := make([]int, workers) + for i := 0; i < workers; i++ { + worker := i + wg.Add(1) + go func() { + for j := 0; j < iterations; j++ { + if s.maybeEnqueueStartTime() != 0 { + ticks[worker]++ + } + } + wg.Done() + }() + } + wg.Wait() + var sum int + for _, v := range ticks { + sum += v + } + require.Equal(t, workers*iterations/freq, sum, "total number of ticks") +} diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index b058e73f04b7..6241538fd7dc 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2012,15 +2012,20 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { return err } - rfs := rangefeed.NewScheduler(rangefeed.SchedulerConfig{ - Workers: s.cfg.RangeFeedSchedulerConcurrency, - PriorityWorkers: s.cfg.RangeFeedSchedulerConcurrencyPriority, - ShardSize: s.cfg.RangeFeedSchedulerShardSize, - }) - if err = rfs.Start(ctx, s.stopper); err != nil { - return err + { + m := rangefeed.NewSchedulerMetrics(s.cfg.HistogramWindowInterval) + rfs := rangefeed.NewScheduler(rangefeed.SchedulerConfig{ + Workers: s.cfg.RangeFeedSchedulerConcurrency, + PriorityWorkers: s.cfg.RangeFeedSchedulerConcurrencyPriority, + ShardSize: s.cfg.RangeFeedSchedulerShardSize, + Metrics: m, + }) + s.Registry().AddMetricStruct(m) + if err = rfs.Start(ctx, s.stopper); err != nil { + return err + } + s.rangefeedScheduler = rfs } - s.rangefeedScheduler = rfs s.rangefeedRestarter.start(ctx, stopper) From 499bbe0780274263cc9ea659f506b032d6c5c4ff Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Wed, 13 Sep 2023 17:15:57 -0400 Subject: [PATCH 3/4] server: replace admin checks with VIEWCLUSTERMETADATA and REPAIRCLUSTERMETADATA Release note (security update): Endpoints in the admin and status server that previously required the admin role now can be used by users with the VIEWCLUSTERMETADATA or REPAIRCLUSTERMETADATA system privilege, depending on whether the endpoint is read-only or can modify state. --- .../serverccl/statusccl/tenant_status_test.go | 14 +++++ pkg/server/admin.go | 25 ++++++--- pkg/server/index_usage_stats.go | 2 +- pkg/server/job_profiler.go | 8 ++- pkg/server/privchecker/api.go | 6 +-- pkg/server/privchecker/privchecker.go | 37 ++++++++----- pkg/server/sql_stats.go | 2 +- pkg/server/status.go | 54 +++++++++---------- 8 files changed, 89 insertions(+), 59 deletions(-) diff --git a/pkg/ccl/serverccl/statusccl/tenant_status_test.go b/pkg/ccl/serverccl/statusccl/tenant_status_test.go index 9a7ef513bb55..21c55c50fe3f 100644 --- a/pkg/ccl/serverccl/statusccl/tenant_status_test.go +++ b/pkg/ccl/serverccl/statusccl/tenant_status_test.go @@ -159,6 +159,17 @@ func testTenantSpanStats(ctx context.Context, t *testing.T, helper serverccl.Ten require.Error(t, err) require.Contains(t, err.Error(), "Forbidden") + // VIEWCLUSTERMETADATA should allow the user to see the span stats. + grantStmt := `GRANT SYSTEM VIEWCLUSTERMETADATA TO authentic_user_noadmin;` + helper.TestCluster().TenantConn(0).Exec(t, grantStmt) + + err = client.PostJSONChecked("/_status/span", &req, &resp) + require.NoError(t, err) + require.NotEmpty(t, resp.SpanToStats) + + revokeStmt := `REVOKE SYSTEM VIEWCLUSTERMETADATA FROM authentic_user_noadmin;` + helper.TestCluster().TenantConn(0).Exec(t, revokeStmt) + adminClient := helper.TestCluster().TenantHTTPClient(t, 1, true) adminClient.PostJSON("/_status/span", &req, &resp) require.Greaterf(t, resp.SpanToStats[aSpan.String()].RangeCount, int32(0), "positive range count") @@ -1508,6 +1519,9 @@ func testTenantHotRanges(_ context.Context, t *testing.T, helper serverccl.Tenan client.PostJSON("/_status/v2/hotranges", &req, &resp) require.NotEmpty(t, resp.Ranges) + + revokeStmt := `REVOKE SYSTEM VIEWCLUSTERMETADATA FROM authentic_user_noadmin;` + helper.TestCluster().TenantConn(0).Exec(t, revokeStmt) }) t.Run("test tenant hot ranges respects tenant isolation", func(t *testing.T) { diff --git a/pkg/server/admin.go b/pkg/server/admin.go index ca92fd2dde68..a67fc7463a90 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -1547,7 +1547,7 @@ func (s *adminServer) Events( ) (_ *serverpb.EventsResponse, retErr error) { ctx = s.AnnotateCtx(ctx) - userName, err := s.privilegeChecker.RequireAdminUser(ctx) + err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx) if err != nil { // NB: not using srverrors.ServerError() here since the priv checker // already returns a proper gRPC error status. @@ -1560,6 +1560,10 @@ func (s *adminServer) Events( limit = apiconstants.DefaultAPIEventLimit } + userName, err := authserver.UserFromIncomingRPCContext(ctx) + if err != nil { + return nil, srverrors.ServerError(ctx, err) + } r, err := s.eventsHelper(ctx, req, userName, int(limit), 0, redactEvents) if err != nil { return nil, srverrors.ServerError(ctx, err) @@ -3188,7 +3192,7 @@ func (s *systemAdminServer) EnqueueRange( ctx = authserver.ForwardSQLIdentityThroughRPCCalls(ctx) ctx = s.AnnotateCtx(ctx) - if _, err := s.privilegeChecker.RequireAdminUser(ctx); err != nil { + if err := s.privilegeChecker.RequireRepairClusterMetadataPermission(ctx); err != nil { // NB: not using srverrors.ServerError() here since the priv checker // already returns a proper gRPC error status. return nil, err @@ -3336,7 +3340,7 @@ func (s *systemAdminServer) SendKVBatch( ctx = s.AnnotateCtx(ctx) // Note: the root user will bypass SQL auth checks, which is useful in case of // a cluster outage. - user, err := s.privilegeChecker.RequireAdminUser(ctx) + err := s.privilegeChecker.RequireRepairClusterMetadataPermission(ctx) if err != nil { // NB: not using srverrors.ServerError() here since the priv checker // already returns a proper gRPC error status. @@ -3346,6 +3350,11 @@ func (s *systemAdminServer) SendKVBatch( return nil, grpcstatus.Errorf(codes.InvalidArgument, "BatchRequest cannot be nil") } + user, err := authserver.UserFromIncomingRPCContext(ctx) + if err != nil { + return nil, srverrors.ServerError(ctx, err) + } + // Emit a structured log event for the call. jsonpb := protoutil.JSONPb{} baJSON, err := jsonpb.Marshal(ba) @@ -3394,7 +3403,7 @@ func (s *systemAdminServer) RecoveryCollectReplicaInfo( ) error { ctx := stream.Context() ctx = s.server.AnnotateCtx(ctx) - _, err := s.privilegeChecker.RequireAdminUser(ctx) + err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx) if err != nil { return err } @@ -3409,7 +3418,7 @@ func (s *systemAdminServer) RecoveryCollectLocalReplicaInfo( ) error { ctx := stream.Context() ctx = s.server.AnnotateCtx(ctx) - _, err := s.privilegeChecker.RequireAdminUser(ctx) + err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx) if err != nil { return err } @@ -3422,7 +3431,7 @@ func (s *systemAdminServer) RecoveryStagePlan( ctx context.Context, request *serverpb.RecoveryStagePlanRequest, ) (*serverpb.RecoveryStagePlanResponse, error) { ctx = s.server.AnnotateCtx(ctx) - _, err := s.privilegeChecker.RequireAdminUser(ctx) + err := s.privilegeChecker.RequireRepairClusterMetadataPermission(ctx) if err != nil { return nil, err } @@ -3435,7 +3444,7 @@ func (s *systemAdminServer) RecoveryNodeStatus( ctx context.Context, request *serverpb.RecoveryNodeStatusRequest, ) (*serverpb.RecoveryNodeStatusResponse, error) { ctx = s.server.AnnotateCtx(ctx) - _, err := s.privilegeChecker.RequireAdminUser(ctx) + err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx) if err != nil { return nil, err } @@ -3447,7 +3456,7 @@ func (s *systemAdminServer) RecoveryVerify( ctx context.Context, request *serverpb.RecoveryVerifyRequest, ) (*serverpb.RecoveryVerifyResponse, error) { ctx = s.server.AnnotateCtx(ctx) - _, err := s.privilegeChecker.RequireAdminUser(ctx) + err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx) if err != nil { return nil, err } diff --git a/pkg/server/index_usage_stats.go b/pkg/server/index_usage_stats.go index 6bf8bef46d75..e5d1a4c4554c 100644 --- a/pkg/server/index_usage_stats.go +++ b/pkg/server/index_usage_stats.go @@ -136,7 +136,7 @@ func (s *statusServer) ResetIndexUsageStats( ctx = authserver.ForwardSQLIdentityThroughRPCCalls(ctx) ctx = s.AnnotateCtx(ctx) - if _, err := s.privilegeChecker.RequireAdminUser(ctx); err != nil { + if err := s.privilegeChecker.RequireRepairClusterMetadataPermission(ctx); err != nil { return nil, err } diff --git a/pkg/server/job_profiler.go b/pkg/server/job_profiler.go index d4f381011686..fb9eefef67eb 100644 --- a/pkg/server/job_profiler.go +++ b/pkg/server/job_profiler.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/server/authserver" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/server/srverrors" "github.com/cockroachdb/cockroach/pkg/sql" @@ -40,8 +41,7 @@ func (s *statusServer) RequestJobProfilerExecutionDetails( ctx context.Context, req *serverpb.RequestJobProfilerExecutionDetailsRequest, ) (*serverpb.RequestJobProfilerExecutionDetailsResponse, error) { ctx = s.AnnotateCtx(ctx) - // TODO(adityamaru): Figure out the correct privileges required to request execution details. - user, err := s.privilegeChecker.RequireAdminUser(ctx) + err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx) if err != nil { return nil, err } @@ -79,6 +79,10 @@ func (s *statusServer) RequestJobProfilerExecutionDetails( e.addLabelledGoroutines(ctx) e.addClusterWideTraces(ctx) + user, err := authserver.UserFromIncomingRPCContext(ctx) + if err != nil { + return nil, srverrors.ServerError(ctx, err) + } r, err := execCfg.JobRegistry.GetResumerForClaimedJob(jobID) if err != nil { return nil, err diff --git a/pkg/server/privchecker/api.go b/pkg/server/privchecker/api.go index bc6637058aef..7e22a673e909 100644 --- a/pkg/server/privchecker/api.go +++ b/pkg/server/privchecker/api.go @@ -38,11 +38,6 @@ type CheckerForRPCHandlers interface { // responsibility to convert them through srverrors.ServerError. GetUserAndRole(ctx context.Context) (userName username.SQLUsername, isAdmin bool, err error) - // RequireAdminUser validates the current user is - // an admin user. It returns the current user's name. - // Its error return is a gRPC error. - RequireAdminUser(ctx context.Context) (userName username.SQLUsername, err error) - // RequireViewActivityPermission validates the current user has the VIEWACTIVITY // privilege or role option. // Its error return is a gRPC error. @@ -52,6 +47,7 @@ type CheckerForRPCHandlers interface { RequireViewClusterSettingOrModifyClusterSettingPermission(ctx context.Context) error RequireViewActivityAndNoViewActivityRedactedPermission(ctx context.Context) error RequireViewClusterMetadataPermission(ctx context.Context) error + RequireRepairClusterMetadataPermission(ctx context.Context) error RequireViewDebugPermission(ctx context.Context) error } diff --git a/pkg/server/privchecker/privchecker.go b/pkg/server/privchecker/privchecker.go index b7761150c1b7..3d876b6f5ae9 100644 --- a/pkg/server/privchecker/privchecker.go +++ b/pkg/server/privchecker/privchecker.go @@ -44,20 +44,6 @@ type adminPrivilegeChecker struct { makeAuthzAccessor func(opName string) (sql.AuthorizationAccessor, func()) } -// RequireAdminUser is part of the CheckerForRPCHandlers interface. -func (c *adminPrivilegeChecker) RequireAdminUser( - ctx context.Context, -) (userName username.SQLUsername, err error) { - userName, isAdmin, err := c.GetUserAndRole(ctx) - if err != nil { - return userName, srverrors.ServerError(ctx, err) - } - if !isAdmin { - return userName, ErrRequiresAdmin - } - return userName, nil -} - // RequireViewActivityPermission is part of the CheckerForRPCHandlers interface. func (c *adminPrivilegeChecker) RequireViewActivityPermission(ctx context.Context) (err error) { userName, isAdmin, err := c.GetUserAndRole(ctx) @@ -199,6 +185,29 @@ func (c *adminPrivilegeChecker) RequireViewClusterMetadataPermission( privilege.VIEWCLUSTERMETADATA) } +// RequireRepairClusterMetadataPermission requires the user have admin +// or the VIEWCLUSTERMETADATA system privilege and returns an error if +// the user does not have it. +func (c *adminPrivilegeChecker) RequireRepairClusterMetadataPermission( + ctx context.Context, +) (err error) { + userName, isAdmin, err := c.GetUserAndRole(ctx) + if err != nil { + return srverrors.ServerError(ctx, err) + } + if isAdmin { + return nil + } + if hasRepairClusterMetadata, err := c.HasGlobalPrivilege(ctx, userName, privilege.REPAIRCLUSTERMETADATA); err != nil { + return srverrors.ServerError(ctx, err) + } else if hasRepairClusterMetadata { + return nil + } + return grpcstatus.Errorf( + codes.PermissionDenied, "this operation requires the %s system privilege", + privilege.REPAIRCLUSTERMETADATA) +} + // RequireViewDebugPermission requires the user have admin or the // VIEWDEBUG system privilege and returns an error if the user does // not have it. diff --git a/pkg/server/sql_stats.go b/pkg/server/sql_stats.go index 304ea26d3c0c..adc8947b1be8 100644 --- a/pkg/server/sql_stats.go +++ b/pkg/server/sql_stats.go @@ -27,7 +27,7 @@ func (s *statusServer) ResetSQLStats( ctx = authserver.ForwardSQLIdentityThroughRPCCalls(ctx) ctx = s.AnnotateCtx(ctx) - if _, err := s.privilegeChecker.RequireAdminUser(ctx); err != nil { + if err := s.privilegeChecker.RequireRepairClusterMetadataPermission(ctx); err != nil { return nil, err } diff --git a/pkg/server/status.go b/pkg/server/status.go index 6215e791f638..d2e9773df840 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -713,7 +713,7 @@ func (s *systemStatusServer) Gossip( ctx = authserver.ForwardSQLIdentityThroughRPCCalls(ctx) ctx = s.AnnotateCtx(ctx) - if _, err := s.privilegeChecker.RequireAdminUser(ctx); err != nil { + if err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx); err != nil { // NB: not using srverrors.ServerError() here since the priv checker // already returns a proper gRPC error status. return nil, err @@ -741,7 +741,7 @@ func (s *systemStatusServer) EngineStats( ctx = authserver.ForwardSQLIdentityThroughRPCCalls(ctx) ctx = s.AnnotateCtx(ctx) - if _, err := s.privilegeChecker.RequireAdminUser(ctx); err != nil { + if err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx); err != nil { // NB: not using srverrors.ServerError() here since the priv checker // already returns a proper gRPC error status. return nil, err @@ -880,7 +880,7 @@ func (s *systemStatusServer) CriticalNodes( ctx context.Context, req *serverpb.CriticalNodesRequest, ) (*serverpb.CriticalNodesResponse, error) { ctx = s.AnnotateCtx(ctx) - if _, err := s.privilegeChecker.RequireAdminUser(ctx); err != nil { + if err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx); err != nil { return nil, err } conformance, err := s.node.SpanConfigConformance( @@ -1013,7 +1013,7 @@ func (s *statusServer) Certificates( ctx = authserver.ForwardSQLIdentityThroughRPCCalls(ctx) ctx = s.AnnotateCtx(ctx) - if _, err := s.privilegeChecker.RequireAdminUser(ctx); err != nil { + if err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx); err != nil { // NB: not using srverrors.ServerError() here since the priv checker // already returns a proper gRPC error status. return nil, err @@ -1134,7 +1134,7 @@ func (s *statusServer) Details( ctx = authserver.ForwardSQLIdentityThroughRPCCalls(ctx) ctx = s.AnnotateCtx(ctx) - if _, err := s.privilegeChecker.RequireAdminUser(ctx); err != nil { + if err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx); err != nil { // NB: not using srverrors.ServerError() here since the priv checker // already returns a proper gRPC error status. return nil, err @@ -1175,7 +1175,7 @@ func (s *statusServer) GetFiles( ctx = authserver.ForwardSQLIdentityThroughRPCCalls(ctx) ctx = s.AnnotateCtx(ctx) - if _, err := s.privilegeChecker.RequireAdminUser(ctx); err != nil { + if err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx); err != nil { // NB: not using srverrors.ServerError() here since the priv checker // already returns a proper gRPC error status. return nil, err @@ -1231,7 +1231,7 @@ func (s *statusServer) LogFilesList( ctx = authserver.ForwardSQLIdentityThroughRPCCalls(ctx) ctx = s.AnnotateCtx(ctx) - if _, err := s.privilegeChecker.RequireAdminUser(ctx); err != nil { + if err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx); err != nil { // NB: not using srverrors.ServerError() here since the priv checker // already returns a proper gRPC error status. return nil, err @@ -1266,7 +1266,7 @@ func (s *statusServer) LogFile( ctx = authserver.ForwardSQLIdentityThroughRPCCalls(ctx) ctx = s.AnnotateCtx(ctx) - if _, err := s.privilegeChecker.RequireAdminUser(ctx); err != nil { + if err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx); err != nil { // NB: not using srverrors.ServerError() here since the priv checker // already returns a proper gRPC error status. return nil, err @@ -1364,7 +1364,7 @@ func (s *statusServer) Logs( ctx = authserver.ForwardSQLIdentityThroughRPCCalls(ctx) ctx = s.AnnotateCtx(ctx) - if _, err := s.privilegeChecker.RequireAdminUser(ctx); err != nil { + if err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx); err != nil { // NB: not using srverrors.ServerError() here since the priv checker // already returns a proper gRPC error status. return nil, err @@ -1452,7 +1452,7 @@ func (s *statusServer) Stacks( ctx = authserver.ForwardSQLIdentityThroughRPCCalls(ctx) ctx = s.AnnotateCtx(ctx) - if _, err := s.privilegeChecker.RequireAdminUser(ctx); err != nil { + if err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx); err != nil { // NB: not using srverrors.ServerError() here since the priv checker // already returns a proper gRPC error status. return nil, err @@ -1680,7 +1680,7 @@ func (s *statusServer) Profile( ctx = authserver.ForwardSQLIdentityThroughRPCCalls(ctx) ctx = s.AnnotateCtx(ctx) - if _, err := s.privilegeChecker.RequireAdminUser(ctx); err != nil { + if err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx); err != nil { // NB: not using srverrors.ServerError() here since the priv checker // already returns a proper gRPC error status. return nil, err @@ -1736,8 +1736,8 @@ func (s *statusServer) NodesList( ctx = s.AnnotateCtx(ctx) // The node status contains details about the command line, network - // addresses, env vars etc which are admin-only. - if _, err := s.privilegeChecker.RequireAdminUser(ctx); err != nil { + // addresses, env vars etc which are privileged information. + if err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx); err != nil { // NB: not using srverrors.ServerError() here since the priv checker // already returns a proper gRPC error status. return nil, err @@ -1920,8 +1920,8 @@ func (s *statusServer) Node( ctx = s.AnnotateCtx(ctx) // The node status contains details about the command line, network - // addresses, env vars etc which are admin-only. - if _, err := s.privilegeChecker.RequireAdminUser(ctx); err != nil { + // addresses, env vars etc which are privileged information.. + if err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx); err != nil { // NB: not using srverrors.ServerError() here since the priv checker // already returns a proper gRPC error status. return nil, err @@ -2445,8 +2445,8 @@ func (t *statusServer) TenantRanges( ctx = authserver.ForwardSQLIdentityThroughRPCCalls(ctx) ctx = t.AnnotateCtx(ctx) - // The tenant range report contains replica metadata which is admin-only. - if _, err := t.privilegeChecker.RequireAdminUser(ctx); err != nil { + // The tenant range report contains replica metadata which is privileged. + if err := t.privilegeChecker.RequireViewClusterMetadataPermission(ctx); err != nil { return nil, err } @@ -2458,7 +2458,7 @@ func (s *systemStatusServer) TenantRanges( ) (*serverpb.TenantRangesResponse, error) { authserver.ForwardSQLIdentityThroughRPCCalls(ctx) ctx = s.AnnotateCtx(ctx) - if _, err := s.privilegeChecker.RequireAdminUser(ctx); err != nil { + if err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx); err != nil { return nil, err } @@ -2923,7 +2923,7 @@ func (s *statusServer) KeyVisSamples( ctx context.Context, req *serverpb.KeyVisSamplesRequest, ) (*serverpb.KeyVisSamplesResponse, error) { - if _, err := s.privilegeChecker.RequireAdminUser(ctx); err != nil { + if err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx); err != nil { return nil, err } @@ -3628,7 +3628,7 @@ func (s *statusServer) SpanStats( ctx context.Context, req *roachpb.SpanStatsRequest, ) (*roachpb.SpanStatsResponse, error) { ctx = s.AnnotateCtx(ctx) - if _, err := s.privilegeChecker.RequireAdminUser(ctx); err != nil { + if err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx); err != nil { // NB: not using srverrors.ServerError() here since the priv checker // already returns a proper gRPC error status. return nil, err @@ -3647,7 +3647,7 @@ func (s *systemStatusServer) SpanStats( ) (*roachpb.SpanStatsResponse, error) { ctx = authserver.ForwardSQLIdentityThroughRPCCalls(ctx) ctx = s.AnnotateCtx(ctx) - if _, err := s.privilegeChecker.RequireAdminUser(ctx); err != nil { + if err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx); err != nil { // NB: not using srverrors.ServerError() here since the priv checker // already returns a proper gRPC error status. return nil, err @@ -3818,7 +3818,7 @@ func (s *statusServer) JobRegistryStatus( ctx = authserver.ForwardSQLIdentityThroughRPCCalls(ctx) ctx = s.AnnotateCtx(ctx) - if _, err := s.privilegeChecker.RequireAdminUser(ctx); err != nil { + if err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx); err != nil { // NB: not using srverrors.ServerError() here since the priv checker // already returns a proper gRPC error status. return nil, err @@ -3856,7 +3856,7 @@ func (s *statusServer) JobStatus( ) (*serverpb.JobStatusResponse, error) { ctx = s.AnnotateCtx(authserver.ForwardSQLIdentityThroughRPCCalls(ctx)) - if _, err := s.privilegeChecker.RequireAdminUser(ctx); err != nil { + if err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx); err != nil { // NB: not using srverrors.ServerError() here since the priv checker // already returns a proper gRPC error status. return nil, err @@ -3888,7 +3888,7 @@ func (s *statusServer) TxnIDResolution( ctx context.Context, req *serverpb.TxnIDResolutionRequest, ) (*serverpb.TxnIDResolutionResponse, error) { ctx = s.AnnotateCtx(authserver.ForwardSQLIdentityThroughRPCCalls(ctx)) - if _, err := s.privilegeChecker.RequireAdminUser(ctx); err != nil { + if err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx); err != nil { return nil, err } @@ -3994,8 +3994,7 @@ func (s *statusServer) GetJobProfilerExecutionDetails( ctx context.Context, req *serverpb.GetJobProfilerExecutionDetailRequest, ) (*serverpb.GetJobProfilerExecutionDetailResponse, error) { ctx = s.AnnotateCtx(ctx) - // TODO(adityamaru): Figure out the correct privileges required to get execution details. - _, err := s.privilegeChecker.RequireAdminUser(ctx) + err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx) if err != nil { return nil, err } @@ -4018,8 +4017,7 @@ func (s *statusServer) ListJobProfilerExecutionDetails( ctx context.Context, req *serverpb.ListJobProfilerExecutionDetailsRequest, ) (*serverpb.ListJobProfilerExecutionDetailsResponse, error) { ctx = s.AnnotateCtx(ctx) - // TODO(adityamaru): Figure out the correct privileges required to get execution details. - _, err := s.privilegeChecker.RequireAdminUser(ctx) + err := s.privilegeChecker.RequireViewClusterMetadataPermission(ctx) if err != nil { return nil, err } From 2325016a7ebb8b16dc3aa11623a217b78415af56 Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Wed, 13 Sep 2023 17:35:20 -0400 Subject: [PATCH 4/4] sql: remove usages of UserHasAdminRole This commit covers a few cases that were missed by an earlier commit: https://github.com/cockroachdb/cockroach/pull/110084. No release note is included since it would be redundant with the release note from that PR. Release note: None --- pkg/ccl/backupccl/alter_backup_schedule.go | 12 +++++++----- .../testdata/backup-restore/schedule-privileges | 10 +++++----- .../multi-tenant/show_create_external_connections | 6 +++--- .../testdata/show_create_external_connections | 6 +++--- pkg/sql/control_schedules.go | 13 +++++++++---- pkg/sql/repair.go | 6 +----- pkg/sql/show_create_external_connection.go | 12 +++++++----- pkg/sql/show_create_schedule.go | 11 ++++------- 8 files changed, 39 insertions(+), 37 deletions(-) diff --git a/pkg/ccl/backupccl/alter_backup_schedule.go b/pkg/ccl/backupccl/alter_backup_schedule.go index bcd236ea0a6c..ec153e15a084 100644 --- a/pkg/ccl/backupccl/alter_backup_schedule.go +++ b/pkg/ccl/backupccl/alter_backup_schedule.go @@ -24,7 +24,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege" "github.com/cockroachdb/errors" pbtypes "github.com/gogo/protobuf/types" ) @@ -127,16 +129,16 @@ func doAlterBackupSchedules( s.incJob.ScheduleID()) } - // Check that the user is admin or the owner of the schedules being altered. - isAdmin, err := p.UserHasAdminRole(ctx, p.User()) + // Check that the user has privileges or is the owner of the schedules being altered. + hasPriv, err := p.HasPrivilege(ctx, syntheticprivilege.GlobalPrivilegeObject, privilege.REPAIRCLUSTERMETADATA, p.User()) if err != nil { return err } isOwnerOfFullJob := s.fullJob == nil || s.fullJob.Owner() == p.User() isOwnerOfIncJob := s.incJob == nil || s.incJob.Owner() == p.User() - if !isAdmin && !(isOwnerOfFullJob && isOwnerOfIncJob) { - return pgerror.New(pgcode.InsufficientPrivilege, "must be admin or owner of the "+ - "schedules being altered.") + if !hasPriv && !(isOwnerOfFullJob && isOwnerOfIncJob) { + return pgerror.Newf(pgcode.InsufficientPrivilege, "must be admin or the owner of the "+ + "schedules being altered, or have %s privilege", privilege.REPAIRCLUSTERMETADATA) } if s, err = processFullBackupRecurrence( diff --git a/pkg/ccl/backupccl/testdata/backup-restore/schedule-privileges b/pkg/ccl/backupccl/testdata/backup-restore/schedule-privileges index fa9ab4fdbc47..28f98eb2f68f 100644 --- a/pkg/ccl/backupccl/testdata/backup-restore/schedule-privileges +++ b/pkg/ccl/backupccl/testdata/backup-restore/schedule-privileges @@ -108,7 +108,7 @@ foocluster_admin BACKUP INTO LATEST IN 'external://foo/cluster' WITH OPTIONS (de foocluster_admin BACKUP INTO 'external://foo/cluster' WITH OPTIONS (detached) # nonadmin testuser is not allowed to drop a schedule they do not own. -exec-sql expect-error-regex=(must be admin or owner of the schedule [0-9]+ to DROP it) user=testuser +exec-sql expect-error-regex=(must have REPAIRCLUSTERMETADATA privilege or be owner of the schedule [0-9]+ to DROP it) user=testuser DROP SCHEDULE $fullID ---- regex matches error @@ -141,7 +141,7 @@ let $otherFullID $otherIncID with schedules as (show schedules) select id from schedules where label='foocluster_admin_revoke' order by command->>'backup_type' asc; ---- -exec-sql expect-error-regex=(must be admin or owner of the schedule [0-9]+ to DROP it) user=testuser +exec-sql expect-error-regex=(must have REPAIRCLUSTERMETADATA privilege or be owner of the schedule [0-9]+ to DROP it) user=testuser DROP SCHEDULE $otherFullID ---- regex matches error @@ -180,17 +180,17 @@ DROP SCHEDULE $testuserIncID; ---- # But testuser can't drop, alter, resume or pause the root owned schedules. -exec-sql expect-error-regex=(must be admin or owner of the schedule [0-9]+ to PAUSE it) user=testuser +exec-sql expect-error-regex=(must have REPAIRCLUSTERMETADATA privilege or be owner of the schedule [0-9]+ to PAUSE it) user=testuser PAUSE SCHEDULE $otherFullID ---- regex matches error -exec-sql expect-error-regex=(must be admin or owner of the schedule [0-9]+ to RESUME it) user=testuser +exec-sql expect-error-regex=(must have REPAIRCLUSTERMETADATA privilege or be owner of the schedule [0-9]+ to RESUME it) user=testuser RESUME SCHEDULE $otherFullID ---- regex matches error -exec-sql expect-error-regex=(must be admin or owner of the schedule [0-9]+ to DROP it) user=testuser +exec-sql expect-error-regex=(must have REPAIRCLUSTERMETADATA privilege or be owner of the schedule [0-9]+ to DROP it) user=testuser DROP SCHEDULE $otherFullID; ---- regex matches error diff --git a/pkg/ccl/cloudccl/externalconn/testdata/multi-tenant/show_create_external_connections b/pkg/ccl/cloudccl/externalconn/testdata/multi-tenant/show_create_external_connections index 840901baab39..f2a01adfa423 100644 --- a/pkg/ccl/cloudccl/externalconn/testdata/multi-tenant/show_create_external_connections +++ b/pkg/ccl/cloudccl/externalconn/testdata/multi-tenant/show_create_external_connections @@ -80,12 +80,12 @@ GRANT SYSTEM EXTERNALCONNECTION TO testuser query-sql user=testuser SHOW CREATE ALL EXTERNAL CONNECTIONS ---- -pq: must be admin to run `SHOW CREATE ALL EXTERNAL CONNECTIONS +pq: must have VIEWCLUSTERMETADATA privilege to run `SHOW CREATE ALL EXTERNAL CONNECTIONS` query-sql user=testuser SHOW CREATE EXTERNAL CONNECTION foo ---- -pq: must be admin or owner of the External Connection "foo" +pq: must have VIEWCLUSTERMETADATA privilege or be owner of the External Connection "foo" # Create External Connection where testuser is the owner, they should be able to SHOW this object. exec-sql user=testuser @@ -95,7 +95,7 @@ CREATE EXTERNAL CONNECTION bar AS 'nodelocal://1/foo' query-sql user=testuser SHOW CREATE ALL EXTERNAL CONNECTIONS ---- -pq: must be admin to run `SHOW CREATE ALL EXTERNAL CONNECTIONS +pq: must have VIEWCLUSTERMETADATA privilege to run `SHOW CREATE ALL EXTERNAL CONNECTIONS` # TODO(aditymaru): Synthetic privileges do not have a concept of owners. Once they do, testuser will # be able to run this query successfully since they are the owner of the External Connection object. diff --git a/pkg/ccl/cloudccl/externalconn/testdata/show_create_external_connections b/pkg/ccl/cloudccl/externalconn/testdata/show_create_external_connections index bb19510eab91..a25dd532b232 100644 --- a/pkg/ccl/cloudccl/externalconn/testdata/show_create_external_connections +++ b/pkg/ccl/cloudccl/externalconn/testdata/show_create_external_connections @@ -76,12 +76,12 @@ GRANT SYSTEM EXTERNALCONNECTION TO testuser query-sql user=testuser SHOW CREATE ALL EXTERNAL CONNECTIONS ---- -pq: must be admin to run `SHOW CREATE ALL EXTERNAL CONNECTIONS +pq: must have VIEWCLUSTERMETADATA privilege to run `SHOW CREATE ALL EXTERNAL CONNECTIONS` query-sql user=testuser SHOW CREATE EXTERNAL CONNECTION foo ---- -pq: must be admin or owner of the External Connection "foo" +pq: must have VIEWCLUSTERMETADATA privilege or be owner of the External Connection "foo" # Create External Connection where testuser is the owner, they should be able to SHOW this object. exec-sql user=testuser @@ -91,7 +91,7 @@ CREATE EXTERNAL CONNECTION bar AS 'nodelocal://1/foo' query-sql user=testuser SHOW CREATE ALL EXTERNAL CONNECTIONS ---- -pq: must be admin to run `SHOW CREATE ALL EXTERNAL CONNECTIONS +pq: must have VIEWCLUSTERMETADATA privilege to run `SHOW CREATE ALL EXTERNAL CONNECTIONS` # TODO(aditymaru): Synthetic privileges do not have a concept of owners. Once they do, testuser will # be able to run this query successfully since they are the owner of the External Connection object. diff --git a/pkg/sql/control_schedules.go b/pkg/sql/control_schedules.go index 24240f5aff56..85e3a53902c9 100644 --- a/pkg/sql/control_schedules.go +++ b/pkg/sql/control_schedules.go @@ -20,9 +20,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege" "github.com/cockroachdb/errors" ) @@ -129,14 +131,17 @@ func (n *controlSchedulesNode) startExec(params runParams) error { continue // not an error if schedule does not exist } - isAdmin, err := params.p.UserHasAdminRole(params.ctx, params.p.User()) + // Check that the user has privileges or is the owner of the schedules being altered. + hasPriv, err := params.p.HasPrivilege( + params.ctx, syntheticprivilege.GlobalPrivilegeObject, privilege.REPAIRCLUSTERMETADATA, params.p.User(), + ) if err != nil { return err } isOwner := schedule.Owner() == params.p.User() - if !isAdmin && !isOwner { - return pgerror.Newf(pgcode.InsufficientPrivilege, "must be admin or owner of the "+ - "schedule %d to %s it", schedule.ScheduleID(), n.command.String()) + if !hasPriv && !isOwner { + return pgerror.Newf(pgcode.InsufficientPrivilege, "must have %s privilege or be owner of the "+ + "schedule %d to %s it", privilege.REPAIRCLUSTERMETADATA, schedule.ScheduleID(), n.command.String()) } switch n.command { diff --git a/pkg/sql/repair.go b/pkg/sql/repair.go index 00272d8aeaca..f8811293ae5c 100644 --- a/pkg/sql/repair.go +++ b/pkg/sql/repair.go @@ -712,13 +712,9 @@ func checkPlannerStateForRepairFunctions(ctx context.Context, p *planner, method if p.extendedEvalCtx.TxnReadOnly { return readOnlyError(method) } - hasAdmin, err := p.UserHasAdminRole(ctx, p.User()) - if err != nil { + if err := p.CheckPrivilege(ctx, syntheticprivilege.GlobalPrivilegeObject, privilege.REPAIRCLUSTERMETADATA); err != nil { return err } - if !hasAdmin { - return pgerror.Newf(pgcode.InsufficientPrivilege, "admin role required for %s", method) - } return nil } diff --git a/pkg/sql/show_create_external_connection.go b/pkg/sql/show_create_external_connection.go index ec75d1b70a4f..31ee3caf921f 100644 --- a/pkg/sql/show_create_external_connection.go +++ b/pkg/sql/show_create_external_connection.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" @@ -74,12 +75,13 @@ func (p *planner) ShowCreateExternalConnection( ) (planNode, error) { var hasPrivileges bool var err error - if hasPrivileges, err = p.UserHasAdminRole(ctx, p.User()); err != nil { + if hasPrivileges, err = p.HasPrivilege(ctx, syntheticprivilege.GlobalPrivilegeObject, privilege.VIEWCLUSTERMETADATA, p.User()); err != nil { return nil, err } - // If the user is not admin, and is running a `SHOW CREATE EXTERNAL CONNECTION foo` - // check if the user is the owner of the object. + // If the user does not have VIEWCLUSTERMETADATA, and is running a `SHOW + // CREATE EXTERNAL CONNECTION foo` check if the user is the owner of the + // object. exprEval := p.ExprEvaluator(externalConnectionOp) if !hasPrivileges && n.ConnectionLabel != nil { name, err := exprEval.String(ctx, n.ConnectionLabel) @@ -94,10 +96,10 @@ func (p *planner) ShowCreateExternalConnection( return nil, err } if !isOwner { - return nil, pgerror.Newf(pgcode.InsufficientPrivilege, "must be admin or owner of the External Connection %q", name) + return nil, pgerror.Newf(pgcode.InsufficientPrivilege, "must have %s privilege or be owner of the External Connection %q", privilege.VIEWCLUSTERMETADATA, name) } } else if !hasPrivileges { - return nil, pgerror.New(pgcode.InsufficientPrivilege, "must be admin to run `SHOW CREATE ALL EXTERNAL CONNECTIONS") + return nil, pgerror.Newf(pgcode.InsufficientPrivilege, "must have %s privilege to run `SHOW CREATE ALL EXTERNAL CONNECTIONS`", privilege.VIEWCLUSTERMETADATA) } sqltelemetry.IncrementShowCounter(sqltelemetry.CreateExternalConnection) diff --git a/pkg/sql/show_create_schedule.go b/pkg/sql/show_create_schedule.go index 55ddc7c4309d..8cf1eb46a6e2 100644 --- a/pkg/sql/show_create_schedule.go +++ b/pkg/sql/show_create_schedule.go @@ -18,11 +18,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/scheduledjobs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" - "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" - "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege" "github.com/cockroachdb/cockroach/pkg/sql/types" ) @@ -85,12 +85,9 @@ func loadSchedules(params runParams, n *tree.ShowCreateSchedules) ([]*jobs.Sched func (p *planner) ShowCreateSchedule( ctx context.Context, n *tree.ShowCreateSchedules, ) (planNode, error) { - // Only admin users can execute SHOW CREATE SCHEDULE - if userIsAdmin, err := p.UserHasAdminRole(ctx, p.User()); err != nil { + // Only privileged users can execute SHOW CREATE SCHEDULE + if err := p.CheckPrivilege(ctx, syntheticprivilege.GlobalPrivilegeObject, privilege.VIEWCLUSTERMETADATA); err != nil { return nil, err - } else if !userIsAdmin { - return nil, pgerror.Newf(pgcode.InsufficientPrivilege, - "user %s does not have admin role", p.User()) } sqltelemetry.IncrementShowCounter(sqltelemetry.CreateSchedule)