Skip to content

Commit

Permalink
rangefeed: move scheduler shard logic into shards
Browse files Browse the repository at this point in the history
Epic: none
Release note: None
  • Loading branch information
erikgrinaker committed Sep 18, 2023
1 parent 1170601 commit c5cedc8
Showing 1 changed file with 89 additions and 70 deletions.
159 changes: 89 additions & 70 deletions pkg/kv/kvserver/rangefeed/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,66 +236,35 @@ func (s *Scheduler) Start(ctx context.Context, stopper *stop.Stopper) error {
// Scheduler is stopped.
func (s *Scheduler) Register(f Callback) (int64, error) {
id := s.nextID.Add(1)
shard := s.shards[shardIndex(id, len(s.shards))]

shard.Lock()
defer shard.Unlock()
if shard.quiescing {
// Don't accept new registrations if quiesced.
return 0, errors.New("server stopping")
if err := s.shards[shardIndex(id, len(s.shards))].register(id, f); err != nil {
return 0, err
}
shard.procs[id] = f
return id, nil
}

// Unregister a processor. This function is removing processor callback and
// status from scheduler. If processor is currently processing event it will
// finish processing.
//
// Processor won't receive Stopped event if it wasn't explicitly sent.
// To make sure processor performs cleanup, it is easier to send it Stopped
// event first and let it remove itself from registration during event handling.
// Any attempts to enqueue events for processor after this call will return an
// error.
func (s *Scheduler) Unregister(id int64) {
shard := s.shards[shardIndex(id, len(s.shards))]
shard.Lock()
defer shard.Unlock()

delete(shard.procs, id)
delete(shard.status, id)
s.shards[shardIndex(id, len(s.shards))].unregister(id)
}

func (s *Scheduler) Stop() {
// Stop all processors across all shards.
// Stop all shard workers.
for _, shard := range s.shards {
shard.Lock()
if !shard.quiescing {
// On first close attempt trigger termination of all unfinished callbacks,
// we only need to do that once to avoid closing s.drained channel multiple
// times.
shard.quiescing = true
}
shard.Unlock()
shard.cond.Broadcast()
shard.quiesce()
}
s.wg.Wait()

// Synchronously notify all non-stopped processors about stop.
// Synchronously notify processors about stop.
for _, shard := range s.shards {
shard.Lock()
for id, p := range shard.procs {
pending := shard.status[id]
// Ignore processors that already processed their stopped event.
if pending == Stopped {
continue
}
// Add stopped event on top of what was pending and remove queued.
pending = (^Queued & pending) | Stopped
shard.Unlock()
p(pending)
shard.Lock()
}
shard.Unlock()
shard.stop()
}
}

Expand All @@ -306,42 +275,18 @@ func (s *Scheduler) StopProcessor(id int64) {
s.Enqueue(id, Stopped)
}

// Enqueue event for existing callback. Returns error if callback was not
// registered for the id or if processor is stopping. Error doesn't guarantee
// that processor actually handled stopped event it may either be pending or
// processed.
// Enqueue event for existing callback. The event is ignored if the processor
// does not exist.
func (s *Scheduler) Enqueue(id int64, evt processorEventType) {
shard := s.shards[shardIndex(id, len(s.shards))]

shard.Lock()
defer shard.Unlock()
if _, ok := shard.procs[id]; !ok {
return
}
newWork := shard.enqueueLocked(id, evt)
if newWork {
// Wake up potential waiting worker.
// We are allowed to do this under cond lock.
shard.cond.Signal()
}
s.shards[shardIndex(id, len(s.shards))].enqueue(id, evt)
}

// EnqueueBatch enqueues an event for a set of processors across all shards.
// Using a batch allows efficient enqueueing with minimal lock contention.
func (s *Scheduler) EnqueueBatch(batch *SchedulerBatch, evt processorEventType) {
for shardIdx, ids := range batch.ids {
if len(ids) == 0 {
continue // skip shards with no new work
}
shard := s.shards[shardIdx]
count := shard.enqueueN(ids, evt)

if count >= shard.numWorkers {
shard.cond.Broadcast()
} else {
for i := 0; i < count; i++ {
shard.cond.Signal()
}
if len(ids) > 0 {
s.shards[shardIdx].enqueueN(ids, evt)
}
}
}
Expand All @@ -353,8 +298,46 @@ func (s *Scheduler) NewEnqueueBatch() *SchedulerBatch {
return newSchedulerBatch(len(s.shards))
}

// register registers a callback with the shard. The caller must not hold
// the shard lock.
func (ss *schedulerShard) register(id int64, f Callback) error {
ss.Lock()
defer ss.Unlock()
if ss.quiescing {
// Don't accept new registrations if quiesced.
return errors.New("server stopping")
}
ss.procs[id] = f
return nil
}

// unregister unregisters a callbak with the shard. The caller must not
// hold the shard lock.
func (ss *schedulerShard) unregister(id int64) {
ss.Lock()
defer ss.Unlock()
delete(ss.procs, id)
delete(ss.status, id)
}

// enqueue enqueues a single event for a given processor in this shard, and wakes
// up a worker to process it. The caller must not hold the shard lock.
func (ss *schedulerShard) enqueue(id int64, evt processorEventType) {
ss.Lock()
defer ss.Unlock()
if ss.enqueueLocked(id, evt) {
// Wake up potential waiting worker.
// We are allowed to do this under cond lock.
ss.cond.Signal()
}
}

// enqueueLocked enqueues a single event for a given processor in this shard.
// Does not wake up a worker to process it.
func (ss *schedulerShard) enqueueLocked(id int64, evt processorEventType) bool {
if _, ok := ss.procs[id]; !ok {
return false
}
pending := ss.status[id]
if pending&Stopped != 0 {
return false
Expand All @@ -371,8 +354,8 @@ func (ss *schedulerShard) enqueueLocked(id int64, evt processorEventType) bool {
return pending == 0
}

// enqueueN enqueues an event for multiple processors on this shard. The
// caller must not hold the shard lock.
// enqueueN enqueues an event for multiple processors on this shard, and wakes
// up workers to process them. The caller must not hold the shard lock.
func (ss *schedulerShard) enqueueN(ids []int64, evt processorEventType) int {
// Avoid locking for 0 new processors.
if len(ids) == 0 {
Expand All @@ -391,6 +374,14 @@ func (ss *schedulerShard) enqueueN(ids []int64, evt processorEventType) int {
}
}
ss.Unlock()

if count >= ss.numWorkers {
ss.cond.Broadcast()
} else {
for i := 0; i < count; i++ {
ss.cond.Signal()
}
}
return count
}

Expand Down Expand Up @@ -465,6 +456,34 @@ func (ss *schedulerShard) processEvents(ctx context.Context) {
}
}

// quiesce asks shard workers to terminate and stops accepting new work.
func (ss *schedulerShard) quiesce() {
ss.Lock()
ss.quiescing = true
ss.Unlock()
ss.cond.Broadcast()
}

// stop synchronously stops processors by submitting and processing a stopped
// event and any other pending work. quiesce() must be called first to stop
// shard workers.
func (ss *schedulerShard) stop() {
ss.Lock()
defer ss.Unlock()
for id, p := range ss.procs {
pending := ss.status[id]
// Ignore processors that already processed their stopped event.
if pending == Stopped {
continue
}
// Add stopped event on top of what was pending and remove queued.
pending = (^Queued & pending) | Stopped
ss.Unlock()
p(pending)
ss.Lock()
}
}

var schedulerBatchPool = sync.Pool{
New: func() interface{} {
return new(SchedulerBatch)
Expand Down

0 comments on commit c5cedc8

Please sign in to comment.