From b066349b42ef9e9ccbc6f794e238b57f270bb919 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 18 Sep 2023 12:29:56 +0000 Subject: [PATCH 1/2] rangefeed: add support for priority processors in scheduler This adds a separate shard with a dedicated worker pool that can be used for priority processors, to avoid head-of-line blocking. This is intended for use with system ranges, which are low-volume but need to be low-latency. Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/processor.go | 6 + pkg/kv/kvserver/rangefeed/processor_test.go | 13 +- .../kvserver/rangefeed/scheduled_processor.go | 2 +- pkg/kv/kvserver/rangefeed/scheduler.go | 86 ++++++++--- pkg/kv/kvserver/rangefeed/scheduler_test.go | 141 ++++++++++++------ pkg/kv/kvserver/store.go | 21 ++- 6 files changed, 199 insertions(+), 70 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 5c440d427359..9bc9c5db2c50 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -86,6 +86,12 @@ type Config struct { // Rangefeed scheduler to use for processor. If set, SchedulerProcessor would // be instantiated. Scheduler *Scheduler + + // Priority marks this rangefeed as a priority rangefeed, which will run in a + // separate scheduler shard with a dedicated worker pool. Should only be used + // for low-volume system ranges, since the worker pool is small (default 2). + // Only has an effect when Scheduler is used. + Priority bool } // SetDefaults initializes unset fields in Config to values diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 6baedd826254..dbf1ccf8cba7 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -297,9 +297,18 @@ func newTestProcessor( o(&cfg) } if cfg.useScheduler { - sch := NewScheduler(SchedulerConfig{Workers: 1}) - _ = sch.Start(context.Background(), stopper) + sch := NewScheduler(SchedulerConfig{Workers: 1, PriorityWorkers: 1}) + require.NoError(t, sch.Start(context.Background(), stopper)) cfg.Scheduler = sch + // Also create a dummy priority processor to populate priorityIDs for + // BenchmarkRangefeed. It should never be called. + noop := func(e processorEventType) processorEventType { + if e != Stopped { + t.Errorf("unexpected event %s for noop priority processor", e) + } + return 0 + } + require.NoError(t, sch.register(9, noop, true /* priority */)) } s := NewProcessor(cfg.Config) h := processorTestHelper{} diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index 8674cee3b5ef..ff4f9059d4a8 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -99,7 +99,7 @@ func (p *ScheduledProcessor) Start( // Note that callback registration must be performed before starting resolved // timestamp init because resolution posts resolvedTS event when it is done. - if err := p.scheduler.Register(p.process); err != nil { + if err := p.scheduler.Register(p.process, p.Priority); err != nil { p.cleanup() return err } diff --git a/pkg/kv/kvserver/rangefeed/scheduler.go b/pkg/kv/kvserver/rangefeed/scheduler.go index cad5ae6913c9..315d637f8f7c 100644 --- a/pkg/kv/kvserver/rangefeed/scheduler.go +++ b/pkg/kv/kvserver/rangefeed/scheduler.go @@ -16,6 +16,7 @@ import ( "strings" "sync" "sync/atomic" + "unsafe" "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -102,6 +103,8 @@ type Callback func(event processorEventType) (remaining processorEventType) type SchedulerConfig struct { // Workers is the number of pool workers for scheduler to use. Workers int + // PriorityWorkers is the number of workers to use for the priority shard. + PriorityWorkers int // ShardSize is the maximum number of workers per scheduler shard. Once a // shard is full, new shards are split off, and workers are evently distribued // across all shards. @@ -112,10 +115,21 @@ type SchedulerConfig struct { BulkChunkSize int } -// shardIndex returns the shard index of the given processor ID. +// priorityIDsValue is a placeholder value for Scheduler.priorityIDs. IntMap +// requires an unsafe.Pointer value, but we don't care about the value (only the +// key), so we can reuse the same allocation. +var priorityIDsValue = unsafe.Pointer(new(bool)) + +// shardIndex returns the shard index of the given processor ID based on the +// shard count and processor priority. Priority processors are assigned to the +// reserved shard 0, other ranges are modulo ID (ignoring shard 0). numShards +// will always be 2 or more (1 priority, 1 regular). // gcassert:inline -func shardIndex(id int64, numShards int) int { - return int(id % int64(numShards)) +func shardIndex(id int64, numShards int, priority bool) int { + if priority { + return 0 + } + return 1 + int(id%int64(numShards-1)) } // Scheduler is a simple scheduler that allows work to be scheduler @@ -130,8 +144,12 @@ func shardIndex(id int64, numShards int) int { // ORed together before being delivered to processor. type Scheduler struct { nextID atomic.Int64 - shards []*schedulerShard // id % len(shards) - wg sync.WaitGroup + // shards contains scheduler shards. Processors and workers are allocated to + // separate shards to reduce mutex contention. Allocation is modulo + // processors, with shard 0 reserved for priority processors. + shards []*schedulerShard // 1 + id%(len(shards)-1) + priorityIDs syncutil.IntMap + wg sync.WaitGroup } // schedulerShard is a mutex shard, which reduces contention: workers in a shard @@ -160,7 +178,14 @@ func NewScheduler(cfg SchedulerConfig) *Scheduler { s := &Scheduler{} - // Create shards. + // Priority shard at index 0. + priorityWorkers := 1 + if cfg.PriorityWorkers > 0 { + priorityWorkers = cfg.PriorityWorkers + } + s.shards = append(s.shards, newSchedulerShard(priorityWorkers, bulkChunkSize)) + + // Regular shards, excluding priority shard. numShards := 1 if cfg.ShardSize > 0 && cfg.Workers > cfg.ShardSize { numShards = (cfg.Workers-1)/cfg.ShardSize + 1 // ceiling division @@ -232,9 +257,20 @@ func (s *Scheduler) Start(ctx context.Context, stopper *stop.Stopper) error { } // register callback to be able to schedule work. Returns error if id is already -// registered or if Scheduler is stopped. -func (s *Scheduler) register(id int64, f Callback) error { - return s.shards[shardIndex(id, len(s.shards))].register(id, f) +// registered or if Scheduler is stopped. If priority is true, the range is +// allocated to a separate priority shard with dedicated workers (intended for a +// small number of system ranges). Returns error if Scheduler is stopped. +func (s *Scheduler) register(id int64, f Callback, priority bool) error { + // Make sure we register the priority ID before registering the callback, + // since we can otherwise race with enqueues, using the wrong shard. + if priority { + s.priorityIDs.Store(id, priorityIDsValue) + } + if err := s.shards[shardIndex(id, len(s.shards), priority)].register(id, f); err != nil { + s.priorityIDs.Delete(id) + return err + } + return nil } // unregister removed the processor callback from scheduler. If processor is @@ -246,7 +282,9 @@ func (s *Scheduler) register(id int64, f Callback) error { // Any attempts to enqueue events for processor after this call will return an // error. func (s *Scheduler) unregister(id int64) { - s.shards[shardIndex(id, len(s.shards))].unregister(id) + _, priority := s.priorityIDs.Load(id) + s.shards[shardIndex(id, len(s.shards), priority)].unregister(id) + s.priorityIDs.Delete(id) } func (s *Scheduler) Stop() { @@ -272,7 +310,8 @@ func (s *Scheduler) stopProcessor(id int64) { // Enqueue event for existing callback. The event is ignored if the processor // does not exist. func (s *Scheduler) enqueue(id int64, evt processorEventType) { - s.shards[shardIndex(id, len(s.shards))].enqueue(id, evt) + _, priority := s.priorityIDs.Load(id) + s.shards[shardIndex(id, len(s.shards), priority)].enqueue(id, evt) } // EnqueueBatch enqueues an event for a set of processors across all shards. @@ -289,7 +328,7 @@ func (s *Scheduler) EnqueueBatch(batch *SchedulerBatch, evt processorEventType) // events for multiple processors via EnqueueBatch(). The batch should be closed // when done by calling Close(). func (s *Scheduler) NewEnqueueBatch() *SchedulerBatch { - return newSchedulerBatch(len(s.shards)) + return newSchedulerBatch(len(s.shards), &s.priorityIDs) } // register registers a callback with the shard. The caller must not hold @@ -491,22 +530,32 @@ var schedulerBatchPool = sync.Pool{ // enqueueing, by pre-sharding the IDs and only locking a single shard at a time // while bulk-enqueueing. type SchedulerBatch struct { - ids [][]int64 // by shard + ids [][]int64 // by shard + priorityIDs map[int64]bool } -func newSchedulerBatch(numShards int) *SchedulerBatch { +func newSchedulerBatch(numShards int, priorityIDs *syncutil.IntMap) *SchedulerBatch { b := schedulerBatchPool.Get().(*SchedulerBatch) if cap(b.ids) >= numShards { b.ids = b.ids[:numShards] } else { b.ids = make([][]int64, numShards) } + if b.priorityIDs == nil { + b.priorityIDs = make(map[int64]bool, 8) // expect few ranges, if any + } + // Cache the priority range IDs in an owned map, since we expect this to be + // very small or empty and we do a lookup for every Add() call. + priorityIDs.Range(func(id int64, _ unsafe.Pointer) bool { + b.priorityIDs[id] = true + return true + }) return b } // Add adds a processor ID to the batch. func (b *SchedulerBatch) Add(id int64) { - shardIdx := shardIndex(id, len(b.ids)) + shardIdx := shardIndex(id, len(b.ids), b.priorityIDs[id]) b.ids[shardIdx] = append(b.ids[shardIdx], id) } @@ -515,6 +564,9 @@ func (b *SchedulerBatch) Close() { for i := range b.ids { b.ids[i] = b.ids[i][:0] } + for id := range b.priorityIDs { + delete(b.priorityIDs, id) + } schedulerBatchPool.Put(b) } @@ -545,8 +597,8 @@ func (cs *ClientScheduler) ID() int64 { // Register registers processing callback in scheduler. Error is returned if // callback was already registered for this ClientScheduler or if scheduler is // already quiescing. -func (cs *ClientScheduler) Register(cb Callback) error { - return cs.s.register(cs.id, cb) +func (cs *ClientScheduler) Register(cb Callback, priority bool) error { + return cs.s.register(cs.id, cb, priority) } // Enqueue schedules callback execution for event. diff --git a/pkg/kv/kvserver/rangefeed/scheduler_test.go b/pkg/kv/kvserver/rangefeed/scheduler_test.go index ff48356ab499..44932fee3743 100644 --- a/pkg/kv/kvserver/rangefeed/scheduler_test.go +++ b/pkg/kv/kvserver/rangefeed/scheduler_test.go @@ -44,7 +44,7 @@ func TestStopNonEmpty(t *testing.T) { s := NewScheduler(SchedulerConfig{Workers: 1}) require.NoError(t, s.Start(ctx, stopper), "failed to start") - c := createAndRegisterConsumerOrFail(t, s, 1) + c := createAndRegisterConsumerOrFail(t, s, 1, false /* priority */) s.stopProcessor(c.id) assertStopsWithinTimeout(t, s) c.requireStopped(t, time.Second*30) @@ -64,7 +64,7 @@ type schedulerConsumer struct { } func createAndRegisterConsumerOrFail( - t *testing.T, scheduler *Scheduler, id int64, + t *testing.T, scheduler *Scheduler, id int64, priority bool, ) *schedulerConsumer { t.Helper() c := &schedulerConsumer{ @@ -73,7 +73,7 @@ func createAndRegisterConsumerOrFail( sched: scheduler, id: id, } - err := c.sched.register(id, c.process) + err := c.sched.register(id, c.process, priority) require.NoError(t, err, "failed to register processor") return c } @@ -216,7 +216,7 @@ func TestDeliverEvents(t *testing.T) { s := NewScheduler(SchedulerConfig{Workers: 1}) require.NoError(t, s.Start(ctx, stopper), "failed to start") - c := createAndRegisterConsumerOrFail(t, s, 1) + c := createAndRegisterConsumerOrFail(t, s, 1, false /* priority */) s.enqueue(c.id, te1) c.requireEvent(t, time.Second*30000, te1, 1) assertStopsWithinTimeout(t, s) @@ -230,7 +230,7 @@ func TestNoParallel(t *testing.T) { s := NewScheduler(SchedulerConfig{Workers: 2}) require.NoError(t, s.Start(ctx, stopper), "failed to start") - c := createAndRegisterConsumerOrFail(t, s, 1) + c := createAndRegisterConsumerOrFail(t, s, 1, false /* priority */) c.pause() s.enqueue(c.id, te1) c.waitPaused() @@ -248,8 +248,8 @@ func TestProcessOtherWhilePaused(t *testing.T) { s := NewScheduler(SchedulerConfig{Workers: 2}) require.NoError(t, s.Start(ctx, stopper), "failed to start") - c1 := createAndRegisterConsumerOrFail(t, s, 1) - c2 := createAndRegisterConsumerOrFail(t, s, 2) + c1 := createAndRegisterConsumerOrFail(t, s, 1, false /* priority */) + c2 := createAndRegisterConsumerOrFail(t, s, 2, false /* priority */) c1.pause() s.enqueue(c1.id, te1) c1.waitPaused() @@ -270,7 +270,7 @@ func TestEventsCombined(t *testing.T) { s := NewScheduler(SchedulerConfig{Workers: 2}) require.NoError(t, s.Start(ctx, stopper), "failed to start") - c := createAndRegisterConsumerOrFail(t, s, 1) + c := createAndRegisterConsumerOrFail(t, s, 1, false /* priority */) c.pause() s.enqueue(c.id, te1) c.waitPaused() @@ -289,7 +289,7 @@ func TestRescheduleEvent(t *testing.T) { s := NewScheduler(SchedulerConfig{Workers: 2}) require.NoError(t, s.Start(ctx, stopper), "failed to start") - c := createAndRegisterConsumerOrFail(t, s, 1) + c := createAndRegisterConsumerOrFail(t, s, 1, false /* priority */) c.pause() s.enqueue(c.id, te1) c.waitPaused() @@ -316,9 +316,9 @@ func TestClientScheduler(t *testing.T) { sched: s, id: 1, } - require.NoError(t, cs.Register(c.process), "failed to register consumer") + require.NoError(t, cs.Register(c.process, false), "failed to register consumer") require.Error(t, - cs.Register(func(event processorEventType) (remaining processorEventType) { return 0 }), + cs.Register(func(event processorEventType) (remaining processorEventType) { return 0 }, false), "reregistration must fail") c.pause() cs.Enqueue(te2) @@ -342,7 +342,7 @@ func TestScheduleBatch(t *testing.T) { batch := s.NewEnqueueBatch() defer batch.Close() for i := 0; i < consumerNumber; i++ { - consumers[i] = createAndRegisterConsumerOrFail(t, s, int64(i+1)) + consumers[i] = createAndRegisterConsumerOrFail(t, s, int64(i+1), false /* priority */) batch.Add(consumers[i].id) } s.EnqueueBatch(batch, te1) @@ -360,7 +360,7 @@ func TestPartialProcessing(t *testing.T) { s := NewScheduler(SchedulerConfig{Workers: 1}) require.NoError(t, s.Start(ctx, stopper), "failed to start") - c := createAndRegisterConsumerOrFail(t, s, 1) + c := createAndRegisterConsumerOrFail(t, s, 1, false /* priority */) // Set process response to trigger process once again. c.rescheduleNext(te1) s.enqueue(c.id, te1) @@ -389,7 +389,7 @@ func TestUnregisterWithoutStop(t *testing.T) { s := NewScheduler(SchedulerConfig{Workers: 1}) require.NoError(t, s.Start(ctx, stopper), "failed to start") - c := createAndRegisterConsumerOrFail(t, s, 1) + c := createAndRegisterConsumerOrFail(t, s, 1, false /* priority */) s.enqueue(c.id, te1) c.requireHistory(t, time.Second*30, []processorEventType{te1}) s.unregister(c.id) @@ -416,8 +416,8 @@ func TestSchedulerShutdown(t *testing.T) { s := NewScheduler(SchedulerConfig{Workers: 2, ShardSize: 1}) require.NoError(t, s.Start(ctx, stopper), "failed to start") - c1 := createAndRegisterConsumerOrFail(t, s, 1) - c2 := createAndRegisterConsumerOrFail(t, s, 2) + c1 := createAndRegisterConsumerOrFail(t, s, 1, false /* priority */) + c2 := createAndRegisterConsumerOrFail(t, s, 2, false /* priority */) s.stopProcessor(c2.id) s.Stop() // Ensure that we are not stopped twice. @@ -470,49 +470,61 @@ func TestQueueReadEmpty(t *testing.T) { func TestNewSchedulerShards(t *testing.T) { defer leaktest.AfterTest(t)() + pri := 2 // priority workers + testcases := []struct { - workers int - shardSize int - expectShards []int + priorityWorkers int + workers int + shardSize int + expectShards []int }{ + // We always assign at least 1 priority worker to the priority shard. + {-1, 1, 1, []int{1, 1}}, + {0, 1, 1, []int{1, 1}}, + {2, 1, 1, []int{2, 1}}, + // We balance workers across shards instead of filling up shards. We assume // ranges are evenly distributed across shards, and want ranges to have // about the same number of workers available on average. - {-1, -1, []int{1}}, - {0, 0, []int{1}}, - {1, -1, []int{1}}, - {1, 0, []int{1}}, - {1, 1, []int{1}}, - {1, 2, []int{1}}, - {2, 2, []int{2}}, - {3, 2, []int{2, 1}}, - {1, 3, []int{1}}, - {2, 3, []int{2}}, - {3, 3, []int{3}}, - {4, 3, []int{2, 2}}, - {5, 3, []int{3, 2}}, - {6, 3, []int{3, 3}}, - {7, 3, []int{3, 2, 2}}, - {8, 3, []int{3, 3, 2}}, - {9, 3, []int{3, 3, 3}}, - {10, 3, []int{3, 3, 2, 2}}, - {11, 3, []int{3, 3, 3, 2}}, - {12, 3, []int{3, 3, 3, 3}}, + {pri, -1, -1, []int{pri, 1}}, + {pri, 0, 0, []int{pri, 1}}, + {pri, 1, -1, []int{pri, 1}}, + {pri, 1, 0, []int{pri, 1}}, + {pri, 1, 1, []int{pri, 1}}, + {pri, 1, 2, []int{pri, 1}}, + {pri, 2, 2, []int{pri, 2}}, + {pri, 3, 2, []int{pri, 2, 1}}, + {pri, 1, 3, []int{pri, 1}}, + {pri, 2, 3, []int{pri, 2}}, + {pri, 3, 3, []int{pri, 3}}, + {pri, 4, 3, []int{pri, 2, 2}}, + {pri, 5, 3, []int{pri, 3, 2}}, + {pri, 6, 3, []int{pri, 3, 3}}, + {pri, 7, 3, []int{pri, 3, 2, 2}}, + {pri, 8, 3, []int{pri, 3, 3, 2}}, + {pri, 9, 3, []int{pri, 3, 3, 3}}, + {pri, 10, 3, []int{pri, 3, 3, 2, 2}}, + {pri, 11, 3, []int{pri, 3, 3, 3, 2}}, + {pri, 12, 3, []int{pri, 3, 3, 3, 3}}, // Typical examples, using 4 workers per CPU core and 8 workers per shard. // Note that we cap workers at 64 by default. - {1 * 4, 8, []int{4}}, - {2 * 4, 8, []int{8}}, - {3 * 4, 8, []int{6, 6}}, - {4 * 4, 8, []int{8, 8}}, - {6 * 4, 8, []int{8, 8, 8}}, - {8 * 4, 8, []int{8, 8, 8, 8}}, - {12 * 4, 8, []int{8, 8, 8, 8, 8, 8}}, - {16 * 4, 8, []int{8, 8, 8, 8, 8, 8, 8, 8}}, // 64 workers + {pri, 1 * 4, 8, []int{pri, 4}}, + {pri, 2 * 4, 8, []int{pri, 8}}, + {pri, 3 * 4, 8, []int{pri, 6, 6}}, + {pri, 4 * 4, 8, []int{pri, 8, 8}}, + {pri, 6 * 4, 8, []int{pri, 8, 8, 8}}, + {pri, 8 * 4, 8, []int{pri, 8, 8, 8, 8}}, + {pri, 12 * 4, 8, []int{pri, 8, 8, 8, 8, 8, 8}}, + {pri, 16 * 4, 8, []int{pri, 8, 8, 8, 8, 8, 8, 8, 8}}, // 64 workers } for _, tc := range testcases { t.Run(fmt.Sprintf("workers=%d/shardSize=%d", tc.workers, tc.shardSize), func(t *testing.T) { - s := NewScheduler(SchedulerConfig{Workers: tc.workers, ShardSize: tc.shardSize}) + s := NewScheduler(SchedulerConfig{ + Workers: tc.workers, + PriorityWorkers: tc.priorityWorkers, + ShardSize: tc.shardSize, + }) var shardWorkers []int for _, shard := range s.shards { @@ -522,3 +534,36 @@ func TestNewSchedulerShards(t *testing.T) { }) } } + +// TestSchedulerPriority tests that the scheduler correctly registers +// and enqueues events for priority processors. +func TestSchedulerPriority(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + s := NewScheduler(SchedulerConfig{Workers: 1, PriorityWorkers: 1, ShardSize: 1, BulkChunkSize: 1}) + require.NoError(t, s.Start(ctx, stopper)) + defer s.Stop() + + // Create one regular and one priority consumer. + c := createAndRegisterConsumerOrFail(t, s, 1, false /* priority */) + cPri := createAndRegisterConsumerOrFail(t, s, 2, true /* priority */) + + // Block the regular consumer. + c.pause() + s.enqueue(c.id, te1) + c.waitPaused() + + // The priority consumer should be able to process events. + s.enqueue(cPri.id, te1) + cPri.requireHistory(t, 5*time.Second, []processorEventType{te1}) + + // Resuming the regular consumer should process its queued event. + c.resume() + c.requireHistory(t, 5*time.Second, []processorEventType{te1}) + assertStopsWithinTimeout(t, s) + c.requireStopped(t, 5*time.Second) + cPri.requireStopped(t, 5*time.Second) +} diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index f98a743fe95f..b058e73f04b7 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -194,6 +194,14 @@ var defaultRangefeedSchedulerConcurrency = envutil.EnvOrDefaultInt( var defaultRangefeedSchedulerShardSize = envutil.EnvOrDefaultInt( "COCKROACH_RANGEFEED_SCHEDULER_SHARD_SIZE", 8) +// defaultRangefeedSchedulerPriorityShardSize specifies the default size of the +// rangefeed scheduler priority shard, used for certain system ranges. This +// shard is always fully populated with workers that don't count towards the +// concurrency limit, and is thus effectively the number of priority workers per +// store. +var defaultRangefeedSchedulerPriorityShardSize = envutil.EnvOrDefaultInt( + "COCKROACH_RANGEFEED_SCHEDULER_PRIORITY_SHARD_SIZE", 2) + // bulkIOWriteLimit is defined here because it is used by BulkIOWriteLimiter. var bulkIOWriteLimit = settings.RegisterByteSizeSetting( settings.SystemOnly, @@ -1248,6 +1256,11 @@ type StoreConfig struct { // workers for the store. RangeFeedSchedulerConcurrency int + // RangeFeedSchedulerConcurrentPriority specifies the number of rangefeed + // scheduler workers for this store's dedicated priority shard. Values < 1 + // imply 1. + RangeFeedSchedulerConcurrencyPriority int + // RangeFeedSchedulerShardSize specifies the maximum number of workers per // scheduler shard. RangeFeedSchedulerShardSize int @@ -1337,6 +1350,9 @@ func (sc *StoreConfig) SetDefaults(numStores int) { if sc.RangeFeedSchedulerShardSize == 0 { sc.RangeFeedSchedulerShardSize = defaultRangefeedSchedulerShardSize } + if sc.RangeFeedSchedulerConcurrencyPriority == 0 { + sc.RangeFeedSchedulerConcurrencyPriority = defaultRangefeedSchedulerPriorityShardSize + } } // GetStoreConfig exposes the config used for this store. @@ -1997,8 +2013,9 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { } rfs := rangefeed.NewScheduler(rangefeed.SchedulerConfig{ - Workers: s.cfg.RangeFeedSchedulerConcurrency, - ShardSize: s.cfg.RangeFeedSchedulerShardSize, + Workers: s.cfg.RangeFeedSchedulerConcurrency, + PriorityWorkers: s.cfg.RangeFeedSchedulerConcurrencyPriority, + ShardSize: s.cfg.RangeFeedSchedulerShardSize, }) if err = rfs.Start(ctx, s.stopper); err != nil { return err From 0f0fb8c210440900a3797c43ca5fdfbca57ca892 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 19 Sep 2023 08:54:18 +0000 Subject: [PATCH 2/2] kvserver: prioritize system spans in rangefeed scheduler This patch marks system spans as having priority in the rangefeed scheduler. We currently do this based on the descriptor ID, although this no longer strictly holds with dynamic system table IDs. Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/budget.go | 6 ++---- pkg/kv/kvserver/rangefeed/budget_test.go | 15 ++++++--------- pkg/kv/kvserver/replica_rangefeed.go | 12 +++++++++++- 3 files changed, 19 insertions(+), 14 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/budget.go b/pkg/kv/kvserver/rangefeed/budget.go index 3d2d9f7b1294..26ea58bbe8fe 100644 --- a/pkg/kv/kvserver/rangefeed/budget.go +++ b/pkg/kv/kvserver/rangefeed/budget.go @@ -16,8 +16,6 @@ import ( "sync/atomic" "time" - "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/mon" @@ -364,7 +362,7 @@ func (f *BudgetFactory) Stop(ctx context.Context) { // CreateBudget creates feed budget using memory pools configured in the // factory. It is safe to call on nil factory as it will produce nil budget // which in turn disables memory accounting on range feed. -func (f *BudgetFactory) CreateBudget(key roachpb.RKey) *FeedBudget { +func (f *BudgetFactory) CreateBudget(isSystem bool) *FeedBudget { if f == nil { return nil } @@ -373,7 +371,7 @@ func (f *BudgetFactory) CreateBudget(key roachpb.RKey) *FeedBudget { return nil } // We use any table with reserved ID in system tenant as system case. - if key.Less(roachpb.RKey(keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID + 1))) { + if isSystem { acc := f.systemFeedBytesMon.MakeBoundAccount() return NewFeedBudget(&acc, 0, f.settings) } diff --git a/pkg/kv/kvserver/rangefeed/budget_test.go b/pkg/kv/kvserver/rangefeed/budget_test.go index f48761866716..d227744927c0 100644 --- a/pkg/kv/kvserver/rangefeed/budget_test.go +++ b/pkg/kv/kvserver/rangefeed/budget_test.go @@ -16,8 +16,6 @@ import ( "testing" "time" - "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/stretchr/testify/require" @@ -195,14 +193,14 @@ func TestBudgetFactory(t *testing.T) { CreateBudgetFactoryConfig(rootMon, 10000, time.Second*5, budgetLowThresholdFn(10000), &s.SV)) // Verify system ranges use own budget. - bSys := bf.CreateBudget(keys.MustAddr(keys.Meta1Prefix)) + bSys := bf.CreateBudget(true) _, e := bSys.TryGet(context.Background(), 199) require.NoError(t, e, "failed to obtain system range budget") require.Equal(t, int64(0), rootMon.AllocBytes(), "System feeds should borrow from own budget") require.Equal(t, int64(199), bf.Metrics().SystemBytesCount.Value(), "Metric was not updated") // Verify user feeds use shared root budget. - bUsr := bf.CreateBudget(keys.MustAddr(keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID + 1))) + bUsr := bf.CreateBudget(false) _, e = bUsr.TryGet(context.Background(), 99) require.NoError(t, e, "failed to obtain non-system budget") require.Equal(t, int64(99), rootMon.AllocBytes(), @@ -220,7 +218,7 @@ func TestDisableBudget(t *testing.T) { return 0 }, &s.SV)) - bUsr := bf.CreateBudget(keys.MustAddr(keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID + 1))) + bUsr := bf.CreateBudget(false) require.Nil(t, bUsr, "Range budget when budgets are disabled.") } @@ -239,7 +237,7 @@ func TestDisableBudgetOnTheFly(t *testing.T) { }, &s.SV)) - f := bf.CreateBudget(keys.MustAddr(keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID + 1))) + f := bf.CreateBudget(false) objectSize := int64(1000) alloc, err := f.TryGet(context.Background(), objectSize) @@ -302,8 +300,7 @@ func TestBudgetLimits(t *testing.T) { settings: &s.SV, }) - userKey := roachpb.RKey(keys.ScratchRangeMin) - b := bf.CreateBudget(userKey) + b := bf.CreateBudget(false) require.NotNil(t, b, "budget is disabled") require.Equal(t, b.limit, adjustedSize, "budget limit is not adjusted") @@ -318,6 +315,6 @@ func TestBudgetLimits(t *testing.T) { histogramWindowInterval: time.Second * 5, settings: &s.SV, }) - b = bf.CreateBudget(userKey) + b = bf.CreateBudget(false) require.Nil(t, b, "budget is disabled") } diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index e671ea47940d..78e04896d07d 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -409,8 +409,17 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( } r.rangefeedMu.Unlock() + // Determine if this is a system span, which should get priority. + // + // TODO(erikgrinaker): With dynamic system tables, this should really check + // catalog.IsSystemDescriptor() for the table descriptor, but we don't have + // easy access to it here. Consider plumbing this down from the client + // instead. See: https://github.com/cockroachdb/cockroach/issues/110883 + isSystemSpan := span.EndKey.Compare( + roachpb.RKey(keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID+1))) <= 0 + // Create a new rangefeed. - feedBudget := r.store.GetStoreConfig().RangefeedBudgetFactory.CreateBudget(r.startKey) + feedBudget := r.store.GetStoreConfig().RangefeedBudgetFactory.CreateBudget(isSystemSpan) var sched *rangefeed.Scheduler if shouldUseRangefeedScheduler(&r.ClusterSettings().SV) { @@ -433,6 +442,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( Metrics: r.store.metrics.RangeFeedMetrics, MemBudget: feedBudget, Scheduler: sched, + Priority: isSystemSpan, // only takes effect when Scheduler != nil } p = rangefeed.NewProcessor(cfg)