diff --git a/pkg/kv/kvserver/rangefeed/scheduler.go b/pkg/kv/kvserver/rangefeed/scheduler.go index 41895bd59ef2..af294265912c 100644 --- a/pkg/kv/kvserver/rangefeed/scheduler.go +++ b/pkg/kv/kvserver/rangefeed/scheduler.go @@ -15,6 +15,7 @@ import ( "fmt" "strings" "sync" + "sync/atomic" "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -67,7 +68,7 @@ const ( var eventNames = map[processorEventType]string{ Queued: "Queued", Stopped: "Stopped", - EventQueued: "Data", + EventQueued: "Event", RequestQueued: "Request", PushTxnQueued: "PushTxn", } @@ -101,12 +102,22 @@ type Callback func(event processorEventType) (remaining processorEventType) type SchedulerConfig struct { // Workers is the number of pool workers for scheduler to use. Workers int + // ShardSize is the maximum number of workers per scheduler shard. Once a + // shard is full, new shards are split off, and workers are evently distribued + // across all shards. + ShardSize int // BulkChunkSize is number of ids that would be enqueued in a single bulk // enqueue operation. Chunking is done to avoid holding locks for too long // as it will interfere with enqueue operations. BulkChunkSize int } +// shardIndex returns the shard index of the given processor ID. +// gcassert:inline +func shardIndex(id int64, numShards int) int { + return int(id % int64(numShards)) +} + // Scheduler is a simple scheduler that allows work to be scheduler // against number of processors. Each processor is represented by unique id and // a callback. @@ -118,55 +129,96 @@ type SchedulerConfig struct { // Each event is represented as a bit mask and multiple pending events could be // ORed together before being delivered to processor. type Scheduler struct { - SchedulerConfig - - mu struct { - syncutil.Mutex - nextID int64 - procs map[int64]Callback - status map[int64]processorEventType - queue *idQueue - // No more new registrations allowed. Workers are winding down. - quiescing bool - } - cond *sync.Cond - wg sync.WaitGroup + nextID atomic.Int64 + shards []*schedulerShard // id % len(shards) + wg sync.WaitGroup +} + +// schedulerShard is a mutex shard, which reduces contention: workers in a shard +// share a mutex for scheduling bookkeeping, and this mutex becomes highly +// contended without sharding. Processors are assigned round-robin to a shard +// when registered, see shardIndex(). +type schedulerShard struct { + syncutil.Mutex + numWorkers int + bulkChunkSize int + cond *sync.Cond + procs map[int64]Callback + status map[int64]processorEventType + queue *idQueue + // No more new registrations allowed. Workers are winding down. + quiescing bool } // NewScheduler will instantiate an idle scheduler based on provided config. // Scheduler needs to be started to become operational. func NewScheduler(cfg SchedulerConfig) *Scheduler { - if cfg.BulkChunkSize == 0 { - cfg.BulkChunkSize = enqueueBulkMaxChunk + bulkChunkSize := cfg.BulkChunkSize + if bulkChunkSize == 0 { + bulkChunkSize = enqueueBulkMaxChunk } - s := &Scheduler{ - SchedulerConfig: cfg, - wg: sync.WaitGroup{}, + + s := &Scheduler{} + + // Create shards. + numShards := 1 + if cfg.ShardSize > 0 && cfg.Workers > cfg.ShardSize { + numShards = (cfg.Workers-1)/cfg.ShardSize + 1 // ceiling division } - s.mu.procs = make(map[int64]Callback) - s.mu.status = make(map[int64]processorEventType) - s.mu.queue = newIDQueue() - s.cond = sync.NewCond(&s.mu) + for i := 0; i < numShards; i++ { + shardWorkers := cfg.Workers / numShards + if i < cfg.Workers%numShards { // distribute remainder + shardWorkers++ + } + if shardWorkers <= 0 { + shardWorkers = 1 // ensure we always have a worker + } + s.shards = append(s.shards, newSchedulerShard(shardWorkers, bulkChunkSize)) + } + return s } +// newSchedulerShard creates a new shard with the given number of workers. +func newSchedulerShard(numWorkers, bulkChunkSize int) *schedulerShard { + ss := &schedulerShard{ + numWorkers: numWorkers, + bulkChunkSize: bulkChunkSize, + procs: map[int64]Callback{}, + status: map[int64]processorEventType{}, + queue: newIDQueue(), + } + ss.cond = sync.NewCond(&ss.Mutex) + return ss +} + // Start scheduler workers. func (s *Scheduler) Start(ctx context.Context, stopper *stop.Stopper) error { - for i := 0; i < s.Workers; i++ { - s.wg.Add(1) - workerID := i - if err := stopper.RunAsyncTask(ctx, fmt.Sprintf("rangefeed-scheduler-worker-%d", workerID), - func(ctx context.Context) { - log.VEventf(ctx, 3, "%d scheduler worker started", workerID) - defer s.wg.Done() - s.processEvents(ctx) - log.VEventf(ctx, 3, "%d scheduler worker finished", workerID) - }); err != nil { - s.wg.Done() - s.Stop() - return err + // Start each shard. + for shardID, shard := range s.shards { + shardID, shard := shardID, shard // pin loop variables + + // Start the shard's workers. + for workerID := 0; workerID < shard.numWorkers; workerID++ { + workerID := workerID // pin loop variable + s.wg.Add(1) + + if err := stopper.RunAsyncTask(ctx, + fmt.Sprintf("rangefeed-scheduler-worker-shard%d-%d", shardID, workerID), + func(ctx context.Context) { + defer s.wg.Done() + log.VEventf(ctx, 3, "scheduler worker %d:%d started", shardID, workerID) + shard.processEvents(ctx) + log.VEventf(ctx, 3, "scheduler worker %d:%d finished", shardID, workerID) + }, + ); err != nil { + s.wg.Done() + s.Stop() + return err + } } } + if err := stopper.RunAsyncTask(ctx, "terminate scheduler", func(ctx context.Context) { <-stopper.ShouldQuiesce() @@ -183,123 +235,179 @@ func (s *Scheduler) Start(ctx context.Context, stopper *stop.Stopper) error { // which should be used to send notifications to the callback. Returns error if // Scheduler is stopped. func (s *Scheduler) Register(f Callback) (int64, error) { - s.mu.Lock() - defer s.mu.Unlock() - if s.mu.quiescing { - // Don't accept new registrations if quiesced. - return 0, errors.New("server stopping") + id := s.nextID.Add(1) + if err := s.shards[shardIndex(id, len(s.shards))].register(id, f); err != nil { + return 0, err } - s.mu.nextID++ - id := s.mu.nextID - s.mu.procs[id] = f return id, nil } -// Enqueue event for existing callback. Returns error if callback was not -// registered for the id or if processor is stopping. Error doesn't guarantee -// that processor actually handled stopped event it may either be pending or -// processed. +// Unregister a processor. This function is removing processor callback and +// status from scheduler. If processor is currently processing event it will +// finish processing. +// +// Processor won't receive Stopped event if it wasn't explicitly sent. +// To make sure processor performs cleanup, it is easier to send it Stopped +// event first and let it remove itself from registration during event handling. +// Any attempts to enqueue events for processor after this call will return an +// error. +func (s *Scheduler) Unregister(id int64) { + s.shards[shardIndex(id, len(s.shards))].unregister(id) +} + +func (s *Scheduler) Stop() { + // Stop all shard workers. + for _, shard := range s.shards { + shard.quiesce() + } + s.wg.Wait() + + // Synchronously notify processors about stop. + for _, shard := range s.shards { + shard.stop() + } +} + +// StopProcessor instructs processor to stop gracefully by sending it Stopped event. +// Once stop is called all subsequent Schedule calls for this id will return +// error. +func (s *Scheduler) StopProcessor(id int64) { + s.Enqueue(id, Stopped) +} + +// Enqueue event for existing callback. The event is ignored if the processor +// does not exist. func (s *Scheduler) Enqueue(id int64, evt processorEventType) { - s.mu.Lock() - defer s.mu.Unlock() - if _, ok := s.mu.procs[id]; !ok { - return + s.shards[shardIndex(id, len(s.shards))].enqueue(id, evt) +} + +// EnqueueBatch enqueues an event for a set of processors across all shards. +// Using a batch allows efficient enqueueing with minimal lock contention. +func (s *Scheduler) EnqueueBatch(batch *SchedulerBatch, evt processorEventType) { + for shardIdx, ids := range batch.ids { + if len(ids) > 0 { + s.shards[shardIdx].enqueueN(ids, evt) + } + } +} + +// NewEnqueueBatch creates a new batch that can be used to efficiently enqueue +// events for multiple processors via EnqueueBatch(). The batch should be closed +// when done by calling Close(). +func (s *Scheduler) NewEnqueueBatch() *SchedulerBatch { + return newSchedulerBatch(len(s.shards)) +} + +// register registers a callback with the shard. The caller must not hold +// the shard lock. +func (ss *schedulerShard) register(id int64, f Callback) error { + ss.Lock() + defer ss.Unlock() + if ss.quiescing { + // Don't accept new registrations if quiesced. + return errors.New("server stopping") } - newWork := s.enqueueInternalLocked(id, evt) - if newWork { + ss.procs[id] = f + return nil +} + +// unregister unregisters a callbak with the shard. The caller must not +// hold the shard lock. +func (ss *schedulerShard) unregister(id int64) { + ss.Lock() + defer ss.Unlock() + delete(ss.procs, id) + delete(ss.status, id) +} + +// enqueue enqueues a single event for a given processor in this shard, and wakes +// up a worker to process it. The caller must not hold the shard lock. +func (ss *schedulerShard) enqueue(id int64, evt processorEventType) { + ss.Lock() + defer ss.Unlock() + if ss.enqueueLocked(id, evt) { // Wake up potential waiting worker. // We are allowed to do this under cond lock. - s.cond.Signal() + ss.cond.Signal() } } -func (s *Scheduler) enqueueInternalLocked(id int64, evt processorEventType) bool { - pending := s.mu.status[id] +// enqueueLocked enqueues a single event for a given processor in this shard. +// Does not wake up a worker to process it. +func (ss *schedulerShard) enqueueLocked(id int64, evt processorEventType) bool { + if _, ok := ss.procs[id]; !ok { + return false + } + pending := ss.status[id] if pending&Stopped != 0 { return false } if pending == 0 { // Enqueue if processor was idle. - s.mu.queue.pushBack(id) + ss.queue.pushBack(id) } update := pending | evt | Queued if update != pending { // Only update if event actually changed. - s.mu.status[id] = update + ss.status[id] = update } return pending == 0 } -// EnqueueAll enqueues event for all existing non-stopped id's. Enqueueing is -// done in chunks to avoid holding lock for too long and interfering with other -// enqueue operations. -// -// If id is not known or already stopped it is ignored. -func (s *Scheduler) EnqueueAll(ids []int64, evt processorEventType) { - scheduleChunk := func(chunk []int64) int { - s.mu.Lock() - defer s.mu.Unlock() - wake := 0 - for _, id := range chunk { - if _, ok := s.mu.procs[id]; ok { - if newWork := s.enqueueInternalLocked(id, evt); newWork { - wake++ - } - } - } - return wake +// enqueueN enqueues an event for multiple processors on this shard, and wakes +// up workers to process them. The caller must not hold the shard lock. +func (ss *schedulerShard) enqueueN(ids []int64, evt processorEventType) int { + // Avoid locking for 0 new processors. + if len(ids) == 0 { + return 0 } - wake := 0 - total := len(ids) - for first := 0; first < total; first += s.BulkChunkSize { - last := first + s.BulkChunkSize - if last > total { - last = total + + ss.Lock() + var count int + for i, id := range ids { + if ss.enqueueLocked(id, evt) { + count++ + } + if (i+1)%ss.bulkChunkSize == 0 { + ss.Unlock() + ss.Lock() } - added := scheduleChunk(ids[first:last]) - wake += added } - // Wake up potential waiting workers. We wake all of them as we expect more - // than total number of workers. - if wake >= s.Workers { - s.cond.Broadcast() + ss.Unlock() + + if count >= ss.numWorkers { + ss.cond.Broadcast() } else { - for ; wake > 0; wake-- { - s.cond.Signal() + for i := 0; i < count; i++ { + ss.cond.Signal() } } -} - -// StopProcessor instructs processor to stop gracefully by sending it Stopped event. -// Once stop is called all subsequent Schedule calls for this id will return -// error. -func (s *Scheduler) StopProcessor(id int64) { - s.Enqueue(id, Stopped) + return count } // processEvents is a main worker method of a scheduler pool. each one should // be launched in separate goroutine and will loop until scheduler is stopped. -func (s *Scheduler) processEvents(ctx context.Context) { +func (ss *schedulerShard) processEvents(ctx context.Context) { for { var id int64 - s.mu.Lock() + ss.Lock() for { - if s.mu.quiescing { - s.mu.Unlock() + if ss.quiescing { + ss.Unlock() return } var ok bool - if id, ok = s.mu.queue.popFront(); ok { + if id, ok = ss.queue.popFront(); ok { break } - s.cond.Wait() + ss.cond.Wait() } - cb := s.mu.procs[id] - e := s.mu.status[id] + cb := ss.procs[id] + e := ss.status[id] // Keep Queued status and preserve Stopped to block any more events. - s.mu.status[id] = Queued | (e & Stopped) - s.mu.Unlock() + ss.status[id] = Queued | (e & Stopped) + ss.Unlock() procEventType := Queued ^ e remaining := cb(procEventType) @@ -319,79 +427,98 @@ func (s *Scheduler) processEvents(ctx context.Context) { } // We'll keep Stopped state to avoid calling stopped processor again // on scheduler shutdown. - s.mu.Lock() - s.mu.status[id] = Stopped - s.mu.Unlock() + ss.Lock() + ss.status[id] = Stopped + ss.Unlock() continue } - s.mu.Lock() - pendingStatus, ok := s.mu.status[id] + ss.Lock() + pendingStatus, ok := ss.status[id] if !ok { - s.mu.Unlock() + ss.Unlock() continue } newStatus := pendingStatus | remaining if newStatus == Queued { // If no events arrived, get rid of id. - delete(s.mu.status, id) + delete(ss.status, id) } else { // Since more events arrived during processing, reschedule. - s.mu.queue.pushBack(id) + ss.queue.pushBack(id) // If remaining work was returned and not already planned, then update // pending status to reflect that. if newStatus != pendingStatus { - s.mu.status[id] = newStatus + ss.status[id] = newStatus } } - s.mu.Unlock() + ss.Unlock() } } -// Unregister a processor. This function is removing processor callback and -// status from scheduler. If processor is currently processing event it will -// finish processing. -// Processor won't receive Stopped event if it wasn't explicitly sent. -// To make sure processor performs cleanup, it is easier to send it Stopped -// event first and let it remove itself from registration during event handling. -// Any attempts to enqueue events for processor after this call will return an -// error. -func (s *Scheduler) Unregister(id int64) { - s.mu.Lock() - defer s.mu.Unlock() - - delete(s.mu.procs, id) - delete(s.mu.status, id) +// quiesce asks shard workers to terminate and stops accepting new work. +func (ss *schedulerShard) quiesce() { + ss.Lock() + ss.quiescing = true + ss.Unlock() + ss.cond.Broadcast() } -func (s *Scheduler) Stop() { - // Stop all processors. - s.mu.Lock() - if !s.mu.quiescing { - // On first close attempt trigger termination of all unfinished callbacks, - // we only need to do that once to avoid closing s.drained channel multiple - // times. - s.mu.quiescing = true - } - s.mu.Unlock() - s.cond.Broadcast() - s.wg.Wait() - - // Synchronously notify all non-stopped processors about stop. - s.mu.Lock() - for id, p := range s.mu.procs { - pending := s.mu.status[id] +// stop synchronously stops processors by submitting and processing a stopped +// event and any other pending work. quiesce() must be called first to stop +// shard workers. +func (ss *schedulerShard) stop() { + ss.Lock() + defer ss.Unlock() + for id, p := range ss.procs { + pending := ss.status[id] // Ignore processors that already processed their stopped event. if pending == Stopped { continue } // Add stopped event on top of what was pending and remove queued. pending = (^Queued & pending) | Stopped - s.mu.Unlock() + ss.Unlock() p(pending) - s.mu.Lock() + ss.Lock() + } +} + +var schedulerBatchPool = sync.Pool{ + New: func() interface{} { + return new(SchedulerBatch) + }, +} + +// SchedulerBatch is a batch of IDs to enqueue. It enables efficient per-shard +// enqueueing, by pre-sharding the IDs and only locking a single shard at a time +// while bulk-enqueueing. +type SchedulerBatch struct { + ids [][]int64 // by shard +} + +func newSchedulerBatch(numShards int) *SchedulerBatch { + b := schedulerBatchPool.Get().(*SchedulerBatch) + if cap(b.ids) >= numShards { + b.ids = b.ids[:numShards] + } else { + b.ids = make([][]int64, numShards) + } + return b +} + +// Add adds a processor ID to the batch. +func (b *SchedulerBatch) Add(id int64) { + shardIdx := shardIndex(id, len(b.ids)) + b.ids[shardIdx] = append(b.ids[shardIdx], id) +} + +// Close returns the batch to the pool for reuse. +func (b *SchedulerBatch) Close() { + for i := range b.ids { + b.ids[i] = b.ids[i][:0] } - s.mu.Unlock() + schedulerBatchPool.Put(b) } // ClientScheduler is a wrapper on top of scheduler that could be passed to a diff --git a/pkg/kv/kvserver/rangefeed/scheduler_test.go b/pkg/kv/kvserver/rangefeed/scheduler_test.go index b89024101c9e..7b22df5aaffe 100644 --- a/pkg/kv/kvserver/rangefeed/scheduler_test.go +++ b/pkg/kv/kvserver/rangefeed/scheduler_test.go @@ -12,6 +12,7 @@ package rangefeed import ( "context" + "fmt" "testing" "time" @@ -326,22 +327,23 @@ func TestClientScheduler(t *testing.T) { assertStopsWithinTimeout(t, s) } -func TestScheduleMultiple(t *testing.T) { +func TestScheduleBatch(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() stopper := stop.NewStopper() defer stopper.Stop(ctx) - s := NewScheduler(SchedulerConfig{Workers: 2, BulkChunkSize: 2}) + s := NewScheduler(SchedulerConfig{Workers: 8, ShardSize: 2, BulkChunkSize: 2}) require.NoError(t, s.Start(ctx, stopper), "failed to start") - const consumerNumber = 10 + const consumerNumber = 100 consumers := make([]*schedulerConsumer, consumerNumber) - ids := make([]int64, consumerNumber) + batch := s.NewEnqueueBatch() + defer batch.Close() for i := 0; i < consumerNumber; i++ { consumers[i] = createAndRegisterConsumerOrFail(t, s) - ids[i] = consumers[i].id + batch.Add(consumers[i].id) } - s.EnqueueAll(ids, te1) + s.EnqueueBatch(batch, te1) for _, c := range consumers { c.requireEvent(t, time.Second*30000, te1, 1) } @@ -410,7 +412,7 @@ func TestSchedulerShutdown(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) - s := NewScheduler(SchedulerConfig{Workers: 1}) + s := NewScheduler(SchedulerConfig{Workers: 2, ShardSize: 1}) require.NoError(t, s.Start(ctx, stopper), "failed to start") c1 := createAndRegisterConsumerOrFail(t, s) c2 := createAndRegisterConsumerOrFail(t, s) @@ -462,3 +464,59 @@ func TestQueueReadEmpty(t *testing.T) { _, ok := q.popFront() require.False(t, ok, "unexpected value in empty queue") } + +func TestNewSchedulerShards(t *testing.T) { + defer leaktest.AfterTest(t)() + + testcases := []struct { + workers int + shardSize int + expectShards []int + }{ + // We balance workers across shards instead of filling up shards. We assume + // ranges are evenly distributed across shards, and want ranges to have + // about the same number of workers available on average. + {-1, -1, []int{1}}, + {0, 0, []int{1}}, + {1, -1, []int{1}}, + {1, 0, []int{1}}, + {1, 1, []int{1}}, + {1, 2, []int{1}}, + {2, 2, []int{2}}, + {3, 2, []int{2, 1}}, + {1, 3, []int{1}}, + {2, 3, []int{2}}, + {3, 3, []int{3}}, + {4, 3, []int{2, 2}}, + {5, 3, []int{3, 2}}, + {6, 3, []int{3, 3}}, + {7, 3, []int{3, 2, 2}}, + {8, 3, []int{3, 3, 2}}, + {9, 3, []int{3, 3, 3}}, + {10, 3, []int{3, 3, 2, 2}}, + {11, 3, []int{3, 3, 3, 2}}, + {12, 3, []int{3, 3, 3, 3}}, + + // Typical examples, using 4 workers per CPU core and 8 workers per shard. + // Note that we cap workers at 64 by default. + {1 * 4, 8, []int{4}}, + {2 * 4, 8, []int{8}}, + {3 * 4, 8, []int{6, 6}}, + {4 * 4, 8, []int{8, 8}}, + {6 * 4, 8, []int{8, 8, 8}}, + {8 * 4, 8, []int{8, 8, 8, 8}}, + {12 * 4, 8, []int{8, 8, 8, 8, 8, 8}}, + {16 * 4, 8, []int{8, 8, 8, 8, 8, 8, 8, 8}}, // 64 workers + } + for _, tc := range testcases { + t.Run(fmt.Sprintf("workers=%d/shardSize=%d", tc.workers, tc.shardSize), func(t *testing.T) { + s := NewScheduler(SchedulerConfig{Workers: tc.workers, ShardSize: tc.shardSize}) + + var shardWorkers []int + for _, shard := range s.shards { + shardWorkers = append(shardWorkers, shard.numWorkers) + } + require.Equal(t, tc.expectShards, shardWorkers) + }) + } +} diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 81a6118fcf70..b9cb1f36ae70 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -179,12 +179,21 @@ var logStoreTelemetryTicks = envutil.EnvOrDefaultInt( 6*60, ) -// defaultRangefeedSchedulerConcurency specifies how many workers rangefeed +// defaultRangefeedSchedulerConcurrency specifies how many workers rangefeed // scheduler will use to perform rangefeed work. This number will be divided // between stores of the node. -var defaultRangefeedSchedulerConcurency = envutil.EnvOrDefaultInt( +var defaultRangefeedSchedulerConcurrency = envutil.EnvOrDefaultInt( "COCKROACH_RANGEFEED_SCHEDULER_WORKERS", min(4*runtime.GOMAXPROCS(0), 64)) +// defaultRangefeedSchedulerShardSize specifies the default maximum number of +// scheduler worker goroutines per mutex shard. By default, we spin up 4 workers +// per CPU core, capped at 64, so 8 is equivalent to 2 CPUs per shard, or a +// maximum of 8 shards. Since rangefeed processing is generally cheap, this +// significantly relieves contention, while also avoiding starvation by +// excessive sharding. +var defaultRangefeedSchedulerShardSize = envutil.EnvOrDefaultInt( + "COCKROACH_RANGEFEED_SCHEDULER_SHARD_SIZE", 8) + // bulkIOWriteLimit is defined here because it is used by BulkIOWriteLimiter. var bulkIOWriteLimit = settings.RegisterByteSizeSetting( settings.SystemOnly, @@ -1236,6 +1245,10 @@ type StoreConfig struct { // RangeFeedSchedulerConcurrency specifies number of rangefeed scheduler // workers for the store. RangeFeedSchedulerConcurrency int + + // RangeFeedSchedulerShardSize specifies the maximum number of workers per + // scheduler shard. + RangeFeedSchedulerShardSize int } // logRangeAndNodeEventsEnabled is used to enable or disable logging range events @@ -1270,7 +1283,7 @@ func (sc *StoreConfig) Valid() bool { sc.RaftElectionTimeoutTicks > 0 && sc.RaftReproposalTimeoutTicks > 0 && sc.RaftSchedulerConcurrency > 0 && sc.RaftSchedulerConcurrencyPriority > 0 && sc.RaftSchedulerShardSize > 0 && sc.ScanInterval >= 0 && sc.AmbientCtx.Tracer != nil && - sc.RangeFeedSchedulerConcurrency > 0 + sc.RangeFeedSchedulerConcurrency > 0 && sc.RangeFeedSchedulerShardSize > 0 } // SetDefaults initializes unset fields in StoreConfig to values @@ -1311,7 +1324,7 @@ func (sc *StoreConfig) SetDefaults(numStores int) { sc.TestingKnobs.DisableQuiescence = true } if sc.RangeFeedSchedulerConcurrency == 0 { - sc.RangeFeedSchedulerConcurrency = defaultRangefeedSchedulerConcurency + sc.RangeFeedSchedulerConcurrency = defaultRangefeedSchedulerConcurrency if numStores > 1 && sc.RangeFeedSchedulerConcurrency > 1 { // We want at least two workers per store to avoid any blocking. sc.RangeFeedSchedulerConcurrency = min( @@ -1319,6 +1332,9 @@ func (sc *StoreConfig) SetDefaults(numStores int) { 2) } } + if sc.RangeFeedSchedulerShardSize == 0 { + sc.RangeFeedSchedulerShardSize = defaultRangefeedSchedulerShardSize + } } // GetStoreConfig exposes the config used for this store. @@ -1978,7 +1994,8 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { } rfs := rangefeed.NewScheduler(rangefeed.SchedulerConfig{ - Workers: s.cfg.RangeFeedSchedulerConcurrency, + Workers: s.cfg.RangeFeedSchedulerConcurrency, + ShardSize: s.cfg.RangeFeedSchedulerShardSize, }) if err = rfs.Start(ctx, s.stopper); err != nil { return err @@ -2400,26 +2417,26 @@ func (s *Store) startRangefeedTxnPushNotifier(ctx context.Context) { ctx, cancel := s.stopper.WithCancelOnQuiesce(ctx) defer cancel() - var schedulerIDs []int64 - updateSchedulerIDs := func() []int64 { - schedulerIDs = schedulerIDs[:0] + makeSchedulerBatch := func() *rangefeed.SchedulerBatch { + batch := s.rangefeedScheduler.NewEnqueueBatch() s.rangefeedReplicas.Lock() for _, id := range s.rangefeedReplicas.m { if id != 0 { // Only process ranges that use scheduler. - schedulerIDs = append(schedulerIDs, id) + batch.Add(id) } } s.rangefeedReplicas.Unlock() - return schedulerIDs + return batch } ticker := time.NewTicker(rangefeed.DefaultPushTxnsInterval) for { select { case <-ticker.C: - activeIDs := updateSchedulerIDs() - s.rangefeedScheduler.EnqueueAll(activeIDs, rangefeed.PushTxnQueued) + batch := makeSchedulerBatch() + s.rangefeedScheduler.EnqueueBatch(batch, rangefeed.PushTxnQueued) + batch.Close() case <-ctx.Done(): ticker.Stop() return diff --git a/pkg/testutils/lint/gcassert_paths.txt b/pkg/testutils/lint/gcassert_paths.txt index 94288e2cf952..f3f772052ee3 100644 --- a/pkg/testutils/lint/gcassert_paths.txt +++ b/pkg/testutils/lint/gcassert_paths.txt @@ -5,6 +5,7 @@ kv/kvclient/kvcoord kv/kvclient/rangecache kv/kvpb kv/kvserver/intentresolver +kv/kvserver/rangefeed roachpb sql/catalog/descs sql/colcontainer