From 9284e309d767adeb8a512293294c5e1ae24459bf Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 12 Sep 2023 10:27:10 +0000 Subject: [PATCH] rangefeed: restructure scheduler/shard code Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/scheduler.go | 202 ++++++++++++------------- 1 file changed, 101 insertions(+), 101 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/scheduler.go b/pkg/kv/kvserver/rangefeed/scheduler.go index 01196215d847..654fc46a2ce0 100644 --- a/pkg/kv/kvserver/rangefeed/scheduler.go +++ b/pkg/kv/kvserver/rangefeed/scheduler.go @@ -134,6 +134,22 @@ type Scheduler struct { wg sync.WaitGroup } +// schedulerShard is a mutex shard, which reduces contention: workers in a shard +// share a mutex for scheduling bookkeeping, and this mutex becomes highly +// contended without sharding. Processors are assigned round-robin to a shard +// when registered, see shardIndex(). +type schedulerShard struct { + syncutil.Mutex + numWorkers int + bulkChunkSize int + cond *sync.Cond + procs map[int64]Callback + status map[int64]processorEventType + queue *idQueue + // No more new registrations allowed. Workers are winding down. + quiescing bool +} + // NewScheduler will instantiate an idle scheduler based on provided config. // Scheduler needs to be started to become operational. func NewScheduler(cfg SchedulerConfig) *Scheduler { @@ -163,22 +179,6 @@ func NewScheduler(cfg SchedulerConfig) *Scheduler { return s } -// schedulerShard is a mutex shard, which reduces contention: workers in a shard -// share a mutex for scheduling bookkeeping, and this mutex becomes highly -// contended without sharding. Processors are assigned round-robin to a shard -// when registered, see shardIndex(). -type schedulerShard struct { - syncutil.Mutex - numWorkers int - bulkChunkSize int - cond *sync.Cond - procs map[int64]Callback - status map[int64]processorEventType - queue *idQueue - // No more new registrations allowed. Workers are winding down. - quiescing bool -} - // newSchedulerShard creates a new shard with the given number of workers. func newSchedulerShard(numWorkers, bulkChunkSize int) *schedulerShard { ss := &schedulerShard{ @@ -248,6 +248,64 @@ func (s *Scheduler) Register(f Callback) (int64, error) { return id, nil } +// Unregister a processor. This function is removing processor callback and +// status from scheduler. If processor is currently processing event it will +// finish processing. +// Processor won't receive Stopped event if it wasn't explicitly sent. +// To make sure processor performs cleanup, it is easier to send it Stopped +// event first and let it remove itself from registration during event handling. +// Any attempts to enqueue events for processor after this call will return an +// error. +func (s *Scheduler) Unregister(id int64) { + shard := s.shards[shardIndex(id, len(s.shards))] + shard.Lock() + defer shard.Unlock() + + delete(shard.procs, id) + delete(shard.status, id) +} + +func (s *Scheduler) Stop() { + // Stop all processors across all shards. + for _, shard := range s.shards { + shard.Lock() + if !shard.quiescing { + // On first close attempt trigger termination of all unfinished callbacks, + // we only need to do that once to avoid closing s.drained channel multiple + // times. + shard.quiescing = true + } + shard.Unlock() + shard.cond.Broadcast() + } + s.wg.Wait() + + // Synchronously notify all non-stopped processors about stop. + for _, shard := range s.shards { + shard.Lock() + for id, p := range shard.procs { + pending := shard.status[id] + // Ignore processors that already processed their stopped event. + if pending == Stopped { + continue + } + // Add stopped event on top of what was pending and remove queued. + pending = (^Queued & pending) | Stopped + shard.Unlock() + p(pending) + shard.Lock() + } + shard.Unlock() + } +} + +// StopProcessor instructs processor to stop gracefully by sending it Stopped event. +// Once stop is called all subsequent Schedule calls for this id will return +// error. +func (s *Scheduler) StopProcessor(id int64) { + s.Enqueue(id, Stopped) +} + // Enqueue event for existing callback. Returns error if callback was not // registered for the id or if processor is stopping. Error doesn't guarantee // that processor actually handled stopped event it may either be pending or @@ -268,6 +326,33 @@ func (s *Scheduler) Enqueue(id int64, evt processorEventType) { } } +// EnqueueBatch enqueues an event for a set of processors across all shards. +// Using a batch allows efficient enqueueing with minimal lock contention. +func (s *Scheduler) EnqueueBatch(batch *SchedulerBatch, evt processorEventType) { + for shardIdx, ids := range batch.ids { + if len(ids) == 0 { + continue // skip shards with no new work + } + shard := s.shards[shardIdx] + count := shard.enqueueN(ids, evt) + + if count >= shard.numWorkers { + shard.cond.Broadcast() + } else { + for i := 0; i < count; i++ { + shard.cond.Signal() + } + } + } +} + +// NewEnqueueBatch creates a new batch that can be used to efficiently enqueue +// events for multiple processors via EnqueueBatch(). The batch should be closed +// when done by calling Close(). +func (s *Scheduler) NewEnqueueBatch() *SchedulerBatch { + return newSchedulerBatch(len(s.shards)) +} + // enqueueLocked enqueues a single event for a given processor in this shard. func (ss *schedulerShard) enqueueLocked(id int64, evt processorEventType) bool { pending := ss.status[id] @@ -309,40 +394,6 @@ func (ss *schedulerShard) enqueueN(ids []int64, evt processorEventType) int { return count } -// EnqueueBatch enqueues an event for a set of processors across all shards. -// Using a batch allows efficient enqueueing with minimal lock contention. -func (s *Scheduler) EnqueueBatch(batch *SchedulerBatch, evt processorEventType) { - for shardIdx, ids := range batch.ids { - if len(ids) == 0 { - continue // skip shards with no new work - } - shard := s.shards[shardIdx] - count := shard.enqueueN(ids, evt) - - if count >= shard.numWorkers { - shard.cond.Broadcast() - } else { - for i := 0; i < count; i++ { - shard.cond.Signal() - } - } - } -} - -// NewEnqueueBatch creates a new batch that can be used to efficiently enqueue -// events for multiple processors via EnqueueBatch(). The batch should be closed -// when done by calling Close(). -func (s *Scheduler) NewEnqueueBatch() *SchedulerBatch { - return newSchedulerBatch(len(s.shards)) -} - -// StopProcessor instructs processor to stop gracefully by sending it Stopped event. -// Once stop is called all subsequent Schedule calls for this id will return -// error. -func (s *Scheduler) StopProcessor(id int64) { - s.Enqueue(id, Stopped) -} - // processEvents is a main worker method of a scheduler pool. each one should // be launched in separate goroutine and will loop until scheduler is stopped. func (ss *schedulerShard) processEvents(ctx context.Context) { @@ -414,57 +465,6 @@ func (ss *schedulerShard) processEvents(ctx context.Context) { } } -// Unregister a processor. This function is removing processor callback and -// status from scheduler. If processor is currently processing event it will -// finish processing. -// Processor won't receive Stopped event if it wasn't explicitly sent. -// To make sure processor performs cleanup, it is easier to send it Stopped -// event first and let it remove itself from registration during event handling. -// Any attempts to enqueue events for processor after this call will return an -// error. -func (s *Scheduler) Unregister(id int64) { - shard := s.shards[shardIndex(id, len(s.shards))] - shard.Lock() - defer shard.Unlock() - - delete(shard.procs, id) - delete(shard.status, id) -} - -func (s *Scheduler) Stop() { - // Stop all processors across all shards. - for _, shard := range s.shards { - shard.Lock() - if !shard.quiescing { - // On first close attempt trigger termination of all unfinished callbacks, - // we only need to do that once to avoid closing s.drained channel multiple - // times. - shard.quiescing = true - } - shard.Unlock() - shard.cond.Broadcast() - } - s.wg.Wait() - - // Synchronously notify all non-stopped processors about stop. - for _, shard := range s.shards { - shard.Lock() - for id, p := range shard.procs { - pending := shard.status[id] - // Ignore processors that already processed their stopped event. - if pending == Stopped { - continue - } - // Add stopped event on top of what was pending and remove queued. - pending = (^Queued & pending) | Stopped - shard.Unlock() - p(pending) - shard.Lock() - } - shard.Unlock() - } -} - var schedulerBatchPool = sync.Pool{ New: func() interface{} { return new(SchedulerBatch)