Skip to content

Commit

Permalink
rangefeed: restructure scheduler/shard code
Browse files Browse the repository at this point in the history
Epic: none
Release note: None
  • Loading branch information
erikgrinaker committed Sep 18, 2023
1 parent 6670e8e commit 9284e30
Showing 1 changed file with 101 additions and 101 deletions.
202 changes: 101 additions & 101 deletions pkg/kv/kvserver/rangefeed/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,22 @@ type Scheduler struct {
wg sync.WaitGroup
}

// schedulerShard is a mutex shard, which reduces contention: workers in a shard
// share a mutex for scheduling bookkeeping, and this mutex becomes highly
// contended without sharding. Processors are assigned round-robin to a shard
// when registered, see shardIndex().
type schedulerShard struct {
syncutil.Mutex
numWorkers int
bulkChunkSize int
cond *sync.Cond
procs map[int64]Callback
status map[int64]processorEventType
queue *idQueue
// No more new registrations allowed. Workers are winding down.
quiescing bool
}

// NewScheduler will instantiate an idle scheduler based on provided config.
// Scheduler needs to be started to become operational.
func NewScheduler(cfg SchedulerConfig) *Scheduler {
Expand Down Expand Up @@ -163,22 +179,6 @@ func NewScheduler(cfg SchedulerConfig) *Scheduler {
return s
}

// schedulerShard is a mutex shard, which reduces contention: workers in a shard
// share a mutex for scheduling bookkeeping, and this mutex becomes highly
// contended without sharding. Processors are assigned round-robin to a shard
// when registered, see shardIndex().
type schedulerShard struct {
syncutil.Mutex
numWorkers int
bulkChunkSize int
cond *sync.Cond
procs map[int64]Callback
status map[int64]processorEventType
queue *idQueue
// No more new registrations allowed. Workers are winding down.
quiescing bool
}

// newSchedulerShard creates a new shard with the given number of workers.
func newSchedulerShard(numWorkers, bulkChunkSize int) *schedulerShard {
ss := &schedulerShard{
Expand Down Expand Up @@ -248,6 +248,64 @@ func (s *Scheduler) Register(f Callback) (int64, error) {
return id, nil
}

// Unregister a processor. This function is removing processor callback and
// status from scheduler. If processor is currently processing event it will
// finish processing.
// Processor won't receive Stopped event if it wasn't explicitly sent.
// To make sure processor performs cleanup, it is easier to send it Stopped
// event first and let it remove itself from registration during event handling.
// Any attempts to enqueue events for processor after this call will return an
// error.
func (s *Scheduler) Unregister(id int64) {
shard := s.shards[shardIndex(id, len(s.shards))]
shard.Lock()
defer shard.Unlock()

delete(shard.procs, id)
delete(shard.status, id)
}

func (s *Scheduler) Stop() {
// Stop all processors across all shards.
for _, shard := range s.shards {
shard.Lock()
if !shard.quiescing {
// On first close attempt trigger termination of all unfinished callbacks,
// we only need to do that once to avoid closing s.drained channel multiple
// times.
shard.quiescing = true
}
shard.Unlock()
shard.cond.Broadcast()
}
s.wg.Wait()

// Synchronously notify all non-stopped processors about stop.
for _, shard := range s.shards {
shard.Lock()
for id, p := range shard.procs {
pending := shard.status[id]
// Ignore processors that already processed their stopped event.
if pending == Stopped {
continue
}
// Add stopped event on top of what was pending and remove queued.
pending = (^Queued & pending) | Stopped
shard.Unlock()
p(pending)
shard.Lock()
}
shard.Unlock()
}
}

// StopProcessor instructs processor to stop gracefully by sending it Stopped event.
// Once stop is called all subsequent Schedule calls for this id will return
// error.
func (s *Scheduler) StopProcessor(id int64) {
s.Enqueue(id, Stopped)
}

// Enqueue event for existing callback. Returns error if callback was not
// registered for the id or if processor is stopping. Error doesn't guarantee
// that processor actually handled stopped event it may either be pending or
Expand All @@ -268,6 +326,33 @@ func (s *Scheduler) Enqueue(id int64, evt processorEventType) {
}
}

// EnqueueBatch enqueues an event for a set of processors across all shards.
// Using a batch allows efficient enqueueing with minimal lock contention.
func (s *Scheduler) EnqueueBatch(batch *SchedulerBatch, evt processorEventType) {
for shardIdx, ids := range batch.ids {
if len(ids) == 0 {
continue // skip shards with no new work
}
shard := s.shards[shardIdx]
count := shard.enqueueN(ids, evt)

if count >= shard.numWorkers {
shard.cond.Broadcast()
} else {
for i := 0; i < count; i++ {
shard.cond.Signal()
}
}
}
}

// NewEnqueueBatch creates a new batch that can be used to efficiently enqueue
// events for multiple processors via EnqueueBatch(). The batch should be closed
// when done by calling Close().
func (s *Scheduler) NewEnqueueBatch() *SchedulerBatch {
return newSchedulerBatch(len(s.shards))
}

// enqueueLocked enqueues a single event for a given processor in this shard.
func (ss *schedulerShard) enqueueLocked(id int64, evt processorEventType) bool {
pending := ss.status[id]
Expand Down Expand Up @@ -309,40 +394,6 @@ func (ss *schedulerShard) enqueueN(ids []int64, evt processorEventType) int {
return count
}

// EnqueueBatch enqueues an event for a set of processors across all shards.
// Using a batch allows efficient enqueueing with minimal lock contention.
func (s *Scheduler) EnqueueBatch(batch *SchedulerBatch, evt processorEventType) {
for shardIdx, ids := range batch.ids {
if len(ids) == 0 {
continue // skip shards with no new work
}
shard := s.shards[shardIdx]
count := shard.enqueueN(ids, evt)

if count >= shard.numWorkers {
shard.cond.Broadcast()
} else {
for i := 0; i < count; i++ {
shard.cond.Signal()
}
}
}
}

// NewEnqueueBatch creates a new batch that can be used to efficiently enqueue
// events for multiple processors via EnqueueBatch(). The batch should be closed
// when done by calling Close().
func (s *Scheduler) NewEnqueueBatch() *SchedulerBatch {
return newSchedulerBatch(len(s.shards))
}

// StopProcessor instructs processor to stop gracefully by sending it Stopped event.
// Once stop is called all subsequent Schedule calls for this id will return
// error.
func (s *Scheduler) StopProcessor(id int64) {
s.Enqueue(id, Stopped)
}

// processEvents is a main worker method of a scheduler pool. each one should
// be launched in separate goroutine and will loop until scheduler is stopped.
func (ss *schedulerShard) processEvents(ctx context.Context) {
Expand Down Expand Up @@ -414,57 +465,6 @@ func (ss *schedulerShard) processEvents(ctx context.Context) {
}
}

// Unregister a processor. This function is removing processor callback and
// status from scheduler. If processor is currently processing event it will
// finish processing.
// Processor won't receive Stopped event if it wasn't explicitly sent.
// To make sure processor performs cleanup, it is easier to send it Stopped
// event first and let it remove itself from registration during event handling.
// Any attempts to enqueue events for processor after this call will return an
// error.
func (s *Scheduler) Unregister(id int64) {
shard := s.shards[shardIndex(id, len(s.shards))]
shard.Lock()
defer shard.Unlock()

delete(shard.procs, id)
delete(shard.status, id)
}

func (s *Scheduler) Stop() {
// Stop all processors across all shards.
for _, shard := range s.shards {
shard.Lock()
if !shard.quiescing {
// On first close attempt trigger termination of all unfinished callbacks,
// we only need to do that once to avoid closing s.drained channel multiple
// times.
shard.quiescing = true
}
shard.Unlock()
shard.cond.Broadcast()
}
s.wg.Wait()

// Synchronously notify all non-stopped processors about stop.
for _, shard := range s.shards {
shard.Lock()
for id, p := range shard.procs {
pending := shard.status[id]
// Ignore processors that already processed their stopped event.
if pending == Stopped {
continue
}
// Add stopped event on top of what was pending and remove queued.
pending = (^Queued & pending) | Stopped
shard.Unlock()
p(pending)
shard.Lock()
}
shard.Unlock()
}
}

var schedulerBatchPool = sync.Pool{
New: func() interface{} {
return new(SchedulerBatch)
Expand Down

0 comments on commit 9284e30

Please sign in to comment.