Skip to content

Commit

Permalink
rangefeed: add support for priority processors in scheduler
Browse files Browse the repository at this point in the history
This adds a separate shard with a dedicated worker pool that can be used
for priority processors, to avoid head-of-line blocking. This is
intended for use with system ranges, which are low-volume but need to be
low-latency.

Epic: none
Release note: None
  • Loading branch information
erikgrinaker committed Sep 22, 2023
1 parent 989af00 commit b066349
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 70 deletions.
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ type Config struct {
// Rangefeed scheduler to use for processor. If set, SchedulerProcessor would
// be instantiated.
Scheduler *Scheduler

// Priority marks this rangefeed as a priority rangefeed, which will run in a
// separate scheduler shard with a dedicated worker pool. Should only be used
// for low-volume system ranges, since the worker pool is small (default 2).
// Only has an effect when Scheduler is used.
Priority bool
}

// SetDefaults initializes unset fields in Config to values
Expand Down
13 changes: 11 additions & 2 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,18 @@ func newTestProcessor(
o(&cfg)
}
if cfg.useScheduler {
sch := NewScheduler(SchedulerConfig{Workers: 1})
_ = sch.Start(context.Background(), stopper)
sch := NewScheduler(SchedulerConfig{Workers: 1, PriorityWorkers: 1})
require.NoError(t, sch.Start(context.Background(), stopper))
cfg.Scheduler = sch
// Also create a dummy priority processor to populate priorityIDs for
// BenchmarkRangefeed. It should never be called.
noop := func(e processorEventType) processorEventType {
if e != Stopped {
t.Errorf("unexpected event %s for noop priority processor", e)
}
return 0
}
require.NoError(t, sch.register(9, noop, true /* priority */))
}
s := NewProcessor(cfg.Config)
h := processorTestHelper{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/scheduled_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (p *ScheduledProcessor) Start(

// Note that callback registration must be performed before starting resolved
// timestamp init because resolution posts resolvedTS event when it is done.
if err := p.scheduler.Register(p.process); err != nil {
if err := p.scheduler.Register(p.process, p.Priority); err != nil {
p.cleanup()
return err
}
Expand Down
86 changes: 69 additions & 17 deletions pkg/kv/kvserver/rangefeed/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strings"
"sync"
"sync/atomic"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -102,6 +103,8 @@ type Callback func(event processorEventType) (remaining processorEventType)
type SchedulerConfig struct {
// Workers is the number of pool workers for scheduler to use.
Workers int
// PriorityWorkers is the number of workers to use for the priority shard.
PriorityWorkers int
// ShardSize is the maximum number of workers per scheduler shard. Once a
// shard is full, new shards are split off, and workers are evently distribued
// across all shards.
Expand All @@ -112,10 +115,21 @@ type SchedulerConfig struct {
BulkChunkSize int
}

// shardIndex returns the shard index of the given processor ID.
// priorityIDsValue is a placeholder value for Scheduler.priorityIDs. IntMap
// requires an unsafe.Pointer value, but we don't care about the value (only the
// key), so we can reuse the same allocation.
var priorityIDsValue = unsafe.Pointer(new(bool))

// shardIndex returns the shard index of the given processor ID based on the
// shard count and processor priority. Priority processors are assigned to the
// reserved shard 0, other ranges are modulo ID (ignoring shard 0). numShards
// will always be 2 or more (1 priority, 1 regular).
// gcassert:inline
func shardIndex(id int64, numShards int) int {
return int(id % int64(numShards))
func shardIndex(id int64, numShards int, priority bool) int {
if priority {
return 0
}
return 1 + int(id%int64(numShards-1))
}

// Scheduler is a simple scheduler that allows work to be scheduler
Expand All @@ -130,8 +144,12 @@ func shardIndex(id int64, numShards int) int {
// ORed together before being delivered to processor.
type Scheduler struct {
nextID atomic.Int64
shards []*schedulerShard // id % len(shards)
wg sync.WaitGroup
// shards contains scheduler shards. Processors and workers are allocated to
// separate shards to reduce mutex contention. Allocation is modulo
// processors, with shard 0 reserved for priority processors.
shards []*schedulerShard // 1 + id%(len(shards)-1)
priorityIDs syncutil.IntMap
wg sync.WaitGroup
}

// schedulerShard is a mutex shard, which reduces contention: workers in a shard
Expand Down Expand Up @@ -160,7 +178,14 @@ func NewScheduler(cfg SchedulerConfig) *Scheduler {

s := &Scheduler{}

// Create shards.
// Priority shard at index 0.
priorityWorkers := 1
if cfg.PriorityWorkers > 0 {
priorityWorkers = cfg.PriorityWorkers
}
s.shards = append(s.shards, newSchedulerShard(priorityWorkers, bulkChunkSize))

// Regular shards, excluding priority shard.
numShards := 1
if cfg.ShardSize > 0 && cfg.Workers > cfg.ShardSize {
numShards = (cfg.Workers-1)/cfg.ShardSize + 1 // ceiling division
Expand Down Expand Up @@ -232,9 +257,20 @@ func (s *Scheduler) Start(ctx context.Context, stopper *stop.Stopper) error {
}

// register callback to be able to schedule work. Returns error if id is already
// registered or if Scheduler is stopped.
func (s *Scheduler) register(id int64, f Callback) error {
return s.shards[shardIndex(id, len(s.shards))].register(id, f)
// registered or if Scheduler is stopped. If priority is true, the range is
// allocated to a separate priority shard with dedicated workers (intended for a
// small number of system ranges). Returns error if Scheduler is stopped.
func (s *Scheduler) register(id int64, f Callback, priority bool) error {
// Make sure we register the priority ID before registering the callback,
// since we can otherwise race with enqueues, using the wrong shard.
if priority {
s.priorityIDs.Store(id, priorityIDsValue)
}
if err := s.shards[shardIndex(id, len(s.shards), priority)].register(id, f); err != nil {
s.priorityIDs.Delete(id)
return err
}
return nil
}

// unregister removed the processor callback from scheduler. If processor is
Expand All @@ -246,7 +282,9 @@ func (s *Scheduler) register(id int64, f Callback) error {
// Any attempts to enqueue events for processor after this call will return an
// error.
func (s *Scheduler) unregister(id int64) {
s.shards[shardIndex(id, len(s.shards))].unregister(id)
_, priority := s.priorityIDs.Load(id)
s.shards[shardIndex(id, len(s.shards), priority)].unregister(id)
s.priorityIDs.Delete(id)
}

func (s *Scheduler) Stop() {
Expand All @@ -272,7 +310,8 @@ func (s *Scheduler) stopProcessor(id int64) {
// Enqueue event for existing callback. The event is ignored if the processor
// does not exist.
func (s *Scheduler) enqueue(id int64, evt processorEventType) {
s.shards[shardIndex(id, len(s.shards))].enqueue(id, evt)
_, priority := s.priorityIDs.Load(id)
s.shards[shardIndex(id, len(s.shards), priority)].enqueue(id, evt)
}

// EnqueueBatch enqueues an event for a set of processors across all shards.
Expand All @@ -289,7 +328,7 @@ func (s *Scheduler) EnqueueBatch(batch *SchedulerBatch, evt processorEventType)
// events for multiple processors via EnqueueBatch(). The batch should be closed
// when done by calling Close().
func (s *Scheduler) NewEnqueueBatch() *SchedulerBatch {
return newSchedulerBatch(len(s.shards))
return newSchedulerBatch(len(s.shards), &s.priorityIDs)
}

// register registers a callback with the shard. The caller must not hold
Expand Down Expand Up @@ -491,22 +530,32 @@ var schedulerBatchPool = sync.Pool{
// enqueueing, by pre-sharding the IDs and only locking a single shard at a time
// while bulk-enqueueing.
type SchedulerBatch struct {
ids [][]int64 // by shard
ids [][]int64 // by shard
priorityIDs map[int64]bool
}

func newSchedulerBatch(numShards int) *SchedulerBatch {
func newSchedulerBatch(numShards int, priorityIDs *syncutil.IntMap) *SchedulerBatch {
b := schedulerBatchPool.Get().(*SchedulerBatch)
if cap(b.ids) >= numShards {
b.ids = b.ids[:numShards]
} else {
b.ids = make([][]int64, numShards)
}
if b.priorityIDs == nil {
b.priorityIDs = make(map[int64]bool, 8) // expect few ranges, if any
}
// Cache the priority range IDs in an owned map, since we expect this to be
// very small or empty and we do a lookup for every Add() call.
priorityIDs.Range(func(id int64, _ unsafe.Pointer) bool {
b.priorityIDs[id] = true
return true
})
return b
}

// Add adds a processor ID to the batch.
func (b *SchedulerBatch) Add(id int64) {
shardIdx := shardIndex(id, len(b.ids))
shardIdx := shardIndex(id, len(b.ids), b.priorityIDs[id])
b.ids[shardIdx] = append(b.ids[shardIdx], id)
}

Expand All @@ -515,6 +564,9 @@ func (b *SchedulerBatch) Close() {
for i := range b.ids {
b.ids[i] = b.ids[i][:0]
}
for id := range b.priorityIDs {
delete(b.priorityIDs, id)
}
schedulerBatchPool.Put(b)
}

Expand Down Expand Up @@ -545,8 +597,8 @@ func (cs *ClientScheduler) ID() int64 {
// Register registers processing callback in scheduler. Error is returned if
// callback was already registered for this ClientScheduler or if scheduler is
// already quiescing.
func (cs *ClientScheduler) Register(cb Callback) error {
return cs.s.register(cs.id, cb)
func (cs *ClientScheduler) Register(cb Callback, priority bool) error {
return cs.s.register(cs.id, cb, priority)
}

// Enqueue schedules callback execution for event.
Expand Down
Loading

0 comments on commit b066349

Please sign in to comment.