From c5cedc80d15882450f4cdf6e8800b3c545fa8c63 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 18 Sep 2023 09:40:12 +0000 Subject: [PATCH] rangefeed: move scheduler shard logic into shards Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/scheduler.go | 159 ++++++++++++++----------- 1 file changed, 89 insertions(+), 70 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/scheduler.go b/pkg/kv/kvserver/rangefeed/scheduler.go index e2016e1ca1f3..af294265912c 100644 --- a/pkg/kv/kvserver/rangefeed/scheduler.go +++ b/pkg/kv/kvserver/rangefeed/scheduler.go @@ -236,66 +236,35 @@ func (s *Scheduler) Start(ctx context.Context, stopper *stop.Stopper) error { // Scheduler is stopped. func (s *Scheduler) Register(f Callback) (int64, error) { id := s.nextID.Add(1) - shard := s.shards[shardIndex(id, len(s.shards))] - - shard.Lock() - defer shard.Unlock() - if shard.quiescing { - // Don't accept new registrations if quiesced. - return 0, errors.New("server stopping") + if err := s.shards[shardIndex(id, len(s.shards))].register(id, f); err != nil { + return 0, err } - shard.procs[id] = f return id, nil } // Unregister a processor. This function is removing processor callback and // status from scheduler. If processor is currently processing event it will // finish processing. +// // Processor won't receive Stopped event if it wasn't explicitly sent. // To make sure processor performs cleanup, it is easier to send it Stopped // event first and let it remove itself from registration during event handling. // Any attempts to enqueue events for processor after this call will return an // error. func (s *Scheduler) Unregister(id int64) { - shard := s.shards[shardIndex(id, len(s.shards))] - shard.Lock() - defer shard.Unlock() - - delete(shard.procs, id) - delete(shard.status, id) + s.shards[shardIndex(id, len(s.shards))].unregister(id) } func (s *Scheduler) Stop() { - // Stop all processors across all shards. + // Stop all shard workers. for _, shard := range s.shards { - shard.Lock() - if !shard.quiescing { - // On first close attempt trigger termination of all unfinished callbacks, - // we only need to do that once to avoid closing s.drained channel multiple - // times. - shard.quiescing = true - } - shard.Unlock() - shard.cond.Broadcast() + shard.quiesce() } s.wg.Wait() - // Synchronously notify all non-stopped processors about stop. + // Synchronously notify processors about stop. for _, shard := range s.shards { - shard.Lock() - for id, p := range shard.procs { - pending := shard.status[id] - // Ignore processors that already processed their stopped event. - if pending == Stopped { - continue - } - // Add stopped event on top of what was pending and remove queued. - pending = (^Queued & pending) | Stopped - shard.Unlock() - p(pending) - shard.Lock() - } - shard.Unlock() + shard.stop() } } @@ -306,42 +275,18 @@ func (s *Scheduler) StopProcessor(id int64) { s.Enqueue(id, Stopped) } -// Enqueue event for existing callback. Returns error if callback was not -// registered for the id or if processor is stopping. Error doesn't guarantee -// that processor actually handled stopped event it may either be pending or -// processed. +// Enqueue event for existing callback. The event is ignored if the processor +// does not exist. func (s *Scheduler) Enqueue(id int64, evt processorEventType) { - shard := s.shards[shardIndex(id, len(s.shards))] - - shard.Lock() - defer shard.Unlock() - if _, ok := shard.procs[id]; !ok { - return - } - newWork := shard.enqueueLocked(id, evt) - if newWork { - // Wake up potential waiting worker. - // We are allowed to do this under cond lock. - shard.cond.Signal() - } + s.shards[shardIndex(id, len(s.shards))].enqueue(id, evt) } // EnqueueBatch enqueues an event for a set of processors across all shards. // Using a batch allows efficient enqueueing with minimal lock contention. func (s *Scheduler) EnqueueBatch(batch *SchedulerBatch, evt processorEventType) { for shardIdx, ids := range batch.ids { - if len(ids) == 0 { - continue // skip shards with no new work - } - shard := s.shards[shardIdx] - count := shard.enqueueN(ids, evt) - - if count >= shard.numWorkers { - shard.cond.Broadcast() - } else { - for i := 0; i < count; i++ { - shard.cond.Signal() - } + if len(ids) > 0 { + s.shards[shardIdx].enqueueN(ids, evt) } } } @@ -353,8 +298,46 @@ func (s *Scheduler) NewEnqueueBatch() *SchedulerBatch { return newSchedulerBatch(len(s.shards)) } +// register registers a callback with the shard. The caller must not hold +// the shard lock. +func (ss *schedulerShard) register(id int64, f Callback) error { + ss.Lock() + defer ss.Unlock() + if ss.quiescing { + // Don't accept new registrations if quiesced. + return errors.New("server stopping") + } + ss.procs[id] = f + return nil +} + +// unregister unregisters a callbak with the shard. The caller must not +// hold the shard lock. +func (ss *schedulerShard) unregister(id int64) { + ss.Lock() + defer ss.Unlock() + delete(ss.procs, id) + delete(ss.status, id) +} + +// enqueue enqueues a single event for a given processor in this shard, and wakes +// up a worker to process it. The caller must not hold the shard lock. +func (ss *schedulerShard) enqueue(id int64, evt processorEventType) { + ss.Lock() + defer ss.Unlock() + if ss.enqueueLocked(id, evt) { + // Wake up potential waiting worker. + // We are allowed to do this under cond lock. + ss.cond.Signal() + } +} + // enqueueLocked enqueues a single event for a given processor in this shard. +// Does not wake up a worker to process it. func (ss *schedulerShard) enqueueLocked(id int64, evt processorEventType) bool { + if _, ok := ss.procs[id]; !ok { + return false + } pending := ss.status[id] if pending&Stopped != 0 { return false @@ -371,8 +354,8 @@ func (ss *schedulerShard) enqueueLocked(id int64, evt processorEventType) bool { return pending == 0 } -// enqueueN enqueues an event for multiple processors on this shard. The -// caller must not hold the shard lock. +// enqueueN enqueues an event for multiple processors on this shard, and wakes +// up workers to process them. The caller must not hold the shard lock. func (ss *schedulerShard) enqueueN(ids []int64, evt processorEventType) int { // Avoid locking for 0 new processors. if len(ids) == 0 { @@ -391,6 +374,14 @@ func (ss *schedulerShard) enqueueN(ids []int64, evt processorEventType) int { } } ss.Unlock() + + if count >= ss.numWorkers { + ss.cond.Broadcast() + } else { + for i := 0; i < count; i++ { + ss.cond.Signal() + } + } return count } @@ -465,6 +456,34 @@ func (ss *schedulerShard) processEvents(ctx context.Context) { } } +// quiesce asks shard workers to terminate and stops accepting new work. +func (ss *schedulerShard) quiesce() { + ss.Lock() + ss.quiescing = true + ss.Unlock() + ss.cond.Broadcast() +} + +// stop synchronously stops processors by submitting and processing a stopped +// event and any other pending work. quiesce() must be called first to stop +// shard workers. +func (ss *schedulerShard) stop() { + ss.Lock() + defer ss.Unlock() + for id, p := range ss.procs { + pending := ss.status[id] + // Ignore processors that already processed their stopped event. + if pending == Stopped { + continue + } + // Add stopped event on top of what was pending and remove queued. + pending = (^Queued & pending) | Stopped + ss.Unlock() + p(pending) + ss.Lock() + } +} + var schedulerBatchPool = sync.Pool{ New: func() interface{} { return new(SchedulerBatch)