Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
110810: rangefeed: prioritize system spans in rangefeed scheduler r=erikgrinaker a=erikgrinaker

**rangefeed: add support for priority processors in scheduler**

This adds a separate shard with a dedicated worker pool that can be used for priority processors, to avoid head-of-line blocking. This is intended for use with system ranges, which are low-volume but need to be low-latency.

**kvserver: prioritize system spans in rangefeed scheduler**

This patch marks system spans as having priority in the rangefeed scheduler. We currently do this based on the descriptor ID, although this no longer strictly holds with dynamic system table IDs.

Resolves cockroachdb#110567.
Release note: None

Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
craig[bot] and erikgrinaker committed Sep 22, 2023
2 parents 3c0be00 + 0f0fb8c commit 3d76521
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 84 deletions.
6 changes: 2 additions & 4 deletions pkg/kv/kvserver/rangefeed/budget.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/mon"
Expand Down Expand Up @@ -364,7 +362,7 @@ func (f *BudgetFactory) Stop(ctx context.Context) {
// CreateBudget creates feed budget using memory pools configured in the
// factory. It is safe to call on nil factory as it will produce nil budget
// which in turn disables memory accounting on range feed.
func (f *BudgetFactory) CreateBudget(key roachpb.RKey) *FeedBudget {
func (f *BudgetFactory) CreateBudget(isSystem bool) *FeedBudget {
if f == nil {
return nil
}
Expand All @@ -373,7 +371,7 @@ func (f *BudgetFactory) CreateBudget(key roachpb.RKey) *FeedBudget {
return nil
}
// We use any table with reserved ID in system tenant as system case.
if key.Less(roachpb.RKey(keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID + 1))) {
if isSystem {
acc := f.systemFeedBytesMon.MakeBoundAccount()
return NewFeedBudget(&acc, 0, f.settings)
}
Expand Down
15 changes: 6 additions & 9 deletions pkg/kv/kvserver/rangefeed/budget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -195,14 +193,14 @@ func TestBudgetFactory(t *testing.T) {
CreateBudgetFactoryConfig(rootMon, 10000, time.Second*5, budgetLowThresholdFn(10000), &s.SV))

// Verify system ranges use own budget.
bSys := bf.CreateBudget(keys.MustAddr(keys.Meta1Prefix))
bSys := bf.CreateBudget(true)
_, e := bSys.TryGet(context.Background(), 199)
require.NoError(t, e, "failed to obtain system range budget")
require.Equal(t, int64(0), rootMon.AllocBytes(), "System feeds should borrow from own budget")
require.Equal(t, int64(199), bf.Metrics().SystemBytesCount.Value(), "Metric was not updated")

// Verify user feeds use shared root budget.
bUsr := bf.CreateBudget(keys.MustAddr(keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID + 1)))
bUsr := bf.CreateBudget(false)
_, e = bUsr.TryGet(context.Background(), 99)
require.NoError(t, e, "failed to obtain non-system budget")
require.Equal(t, int64(99), rootMon.AllocBytes(),
Expand All @@ -220,7 +218,7 @@ func TestDisableBudget(t *testing.T) {
return 0
}, &s.SV))

bUsr := bf.CreateBudget(keys.MustAddr(keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID + 1)))
bUsr := bf.CreateBudget(false)
require.Nil(t, bUsr, "Range budget when budgets are disabled.")
}

Expand All @@ -239,7 +237,7 @@ func TestDisableBudgetOnTheFly(t *testing.T) {
},
&s.SV))

f := bf.CreateBudget(keys.MustAddr(keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID + 1)))
f := bf.CreateBudget(false)

objectSize := int64(1000)
alloc, err := f.TryGet(context.Background(), objectSize)
Expand Down Expand Up @@ -302,8 +300,7 @@ func TestBudgetLimits(t *testing.T) {
settings: &s.SV,
})

userKey := roachpb.RKey(keys.ScratchRangeMin)
b := bf.CreateBudget(userKey)
b := bf.CreateBudget(false)
require.NotNil(t, b, "budget is disabled")
require.Equal(t, b.limit, adjustedSize, "budget limit is not adjusted")

Expand All @@ -318,6 +315,6 @@ func TestBudgetLimits(t *testing.T) {
histogramWindowInterval: time.Second * 5,
settings: &s.SV,
})
b = bf.CreateBudget(userKey)
b = bf.CreateBudget(false)
require.Nil(t, b, "budget is disabled")
}
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ type Config struct {
// Rangefeed scheduler to use for processor. If set, SchedulerProcessor would
// be instantiated.
Scheduler *Scheduler

// Priority marks this rangefeed as a priority rangefeed, which will run in a
// separate scheduler shard with a dedicated worker pool. Should only be used
// for low-volume system ranges, since the worker pool is small (default 2).
// Only has an effect when Scheduler is used.
Priority bool
}

// SetDefaults initializes unset fields in Config to values
Expand Down
13 changes: 11 additions & 2 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,18 @@ func newTestProcessor(
o(&cfg)
}
if cfg.useScheduler {
sch := NewScheduler(SchedulerConfig{Workers: 1})
_ = sch.Start(context.Background(), stopper)
sch := NewScheduler(SchedulerConfig{Workers: 1, PriorityWorkers: 1})
require.NoError(t, sch.Start(context.Background(), stopper))
cfg.Scheduler = sch
// Also create a dummy priority processor to populate priorityIDs for
// BenchmarkRangefeed. It should never be called.
noop := func(e processorEventType) processorEventType {
if e != Stopped {
t.Errorf("unexpected event %s for noop priority processor", e)
}
return 0
}
require.NoError(t, sch.register(9, noop, true /* priority */))
}
s := NewProcessor(cfg.Config)
h := processorTestHelper{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/scheduled_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (p *ScheduledProcessor) Start(

// Note that callback registration must be performed before starting resolved
// timestamp init because resolution posts resolvedTS event when it is done.
if err := p.scheduler.Register(p.process); err != nil {
if err := p.scheduler.Register(p.process, p.Priority); err != nil {
p.cleanup()
return err
}
Expand Down
86 changes: 69 additions & 17 deletions pkg/kv/kvserver/rangefeed/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strings"
"sync"
"sync/atomic"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -102,6 +103,8 @@ type Callback func(event processorEventType) (remaining processorEventType)
type SchedulerConfig struct {
// Workers is the number of pool workers for scheduler to use.
Workers int
// PriorityWorkers is the number of workers to use for the priority shard.
PriorityWorkers int
// ShardSize is the maximum number of workers per scheduler shard. Once a
// shard is full, new shards are split off, and workers are evently distribued
// across all shards.
Expand All @@ -112,10 +115,21 @@ type SchedulerConfig struct {
BulkChunkSize int
}

// shardIndex returns the shard index of the given processor ID.
// priorityIDsValue is a placeholder value for Scheduler.priorityIDs. IntMap
// requires an unsafe.Pointer value, but we don't care about the value (only the
// key), so we can reuse the same allocation.
var priorityIDsValue = unsafe.Pointer(new(bool))

// shardIndex returns the shard index of the given processor ID based on the
// shard count and processor priority. Priority processors are assigned to the
// reserved shard 0, other ranges are modulo ID (ignoring shard 0). numShards
// will always be 2 or more (1 priority, 1 regular).
// gcassert:inline
func shardIndex(id int64, numShards int) int {
return int(id % int64(numShards))
func shardIndex(id int64, numShards int, priority bool) int {
if priority {
return 0
}
return 1 + int(id%int64(numShards-1))
}

// Scheduler is a simple scheduler that allows work to be scheduler
Expand All @@ -130,8 +144,12 @@ func shardIndex(id int64, numShards int) int {
// ORed together before being delivered to processor.
type Scheduler struct {
nextID atomic.Int64
shards []*schedulerShard // id % len(shards)
wg sync.WaitGroup
// shards contains scheduler shards. Processors and workers are allocated to
// separate shards to reduce mutex contention. Allocation is modulo
// processors, with shard 0 reserved for priority processors.
shards []*schedulerShard // 1 + id%(len(shards)-1)
priorityIDs syncutil.IntMap
wg sync.WaitGroup
}

// schedulerShard is a mutex shard, which reduces contention: workers in a shard
Expand Down Expand Up @@ -160,7 +178,14 @@ func NewScheduler(cfg SchedulerConfig) *Scheduler {

s := &Scheduler{}

// Create shards.
// Priority shard at index 0.
priorityWorkers := 1
if cfg.PriorityWorkers > 0 {
priorityWorkers = cfg.PriorityWorkers
}
s.shards = append(s.shards, newSchedulerShard(priorityWorkers, bulkChunkSize))

// Regular shards, excluding priority shard.
numShards := 1
if cfg.ShardSize > 0 && cfg.Workers > cfg.ShardSize {
numShards = (cfg.Workers-1)/cfg.ShardSize + 1 // ceiling division
Expand Down Expand Up @@ -232,9 +257,20 @@ func (s *Scheduler) Start(ctx context.Context, stopper *stop.Stopper) error {
}

// register callback to be able to schedule work. Returns error if id is already
// registered or if Scheduler is stopped.
func (s *Scheduler) register(id int64, f Callback) error {
return s.shards[shardIndex(id, len(s.shards))].register(id, f)
// registered or if Scheduler is stopped. If priority is true, the range is
// allocated to a separate priority shard with dedicated workers (intended for a
// small number of system ranges). Returns error if Scheduler is stopped.
func (s *Scheduler) register(id int64, f Callback, priority bool) error {
// Make sure we register the priority ID before registering the callback,
// since we can otherwise race with enqueues, using the wrong shard.
if priority {
s.priorityIDs.Store(id, priorityIDsValue)
}
if err := s.shards[shardIndex(id, len(s.shards), priority)].register(id, f); err != nil {
s.priorityIDs.Delete(id)
return err
}
return nil
}

// unregister removed the processor callback from scheduler. If processor is
Expand All @@ -246,7 +282,9 @@ func (s *Scheduler) register(id int64, f Callback) error {
// Any attempts to enqueue events for processor after this call will return an
// error.
func (s *Scheduler) unregister(id int64) {
s.shards[shardIndex(id, len(s.shards))].unregister(id)
_, priority := s.priorityIDs.Load(id)
s.shards[shardIndex(id, len(s.shards), priority)].unregister(id)
s.priorityIDs.Delete(id)
}

func (s *Scheduler) Stop() {
Expand All @@ -272,7 +310,8 @@ func (s *Scheduler) stopProcessor(id int64) {
// Enqueue event for existing callback. The event is ignored if the processor
// does not exist.
func (s *Scheduler) enqueue(id int64, evt processorEventType) {
s.shards[shardIndex(id, len(s.shards))].enqueue(id, evt)
_, priority := s.priorityIDs.Load(id)
s.shards[shardIndex(id, len(s.shards), priority)].enqueue(id, evt)
}

// EnqueueBatch enqueues an event for a set of processors across all shards.
Expand All @@ -289,7 +328,7 @@ func (s *Scheduler) EnqueueBatch(batch *SchedulerBatch, evt processorEventType)
// events for multiple processors via EnqueueBatch(). The batch should be closed
// when done by calling Close().
func (s *Scheduler) NewEnqueueBatch() *SchedulerBatch {
return newSchedulerBatch(len(s.shards))
return newSchedulerBatch(len(s.shards), &s.priorityIDs)
}

// register registers a callback with the shard. The caller must not hold
Expand Down Expand Up @@ -491,22 +530,32 @@ var schedulerBatchPool = sync.Pool{
// enqueueing, by pre-sharding the IDs and only locking a single shard at a time
// while bulk-enqueueing.
type SchedulerBatch struct {
ids [][]int64 // by shard
ids [][]int64 // by shard
priorityIDs map[int64]bool
}

func newSchedulerBatch(numShards int) *SchedulerBatch {
func newSchedulerBatch(numShards int, priorityIDs *syncutil.IntMap) *SchedulerBatch {
b := schedulerBatchPool.Get().(*SchedulerBatch)
if cap(b.ids) >= numShards {
b.ids = b.ids[:numShards]
} else {
b.ids = make([][]int64, numShards)
}
if b.priorityIDs == nil {
b.priorityIDs = make(map[int64]bool, 8) // expect few ranges, if any
}
// Cache the priority range IDs in an owned map, since we expect this to be
// very small or empty and we do a lookup for every Add() call.
priorityIDs.Range(func(id int64, _ unsafe.Pointer) bool {
b.priorityIDs[id] = true
return true
})
return b
}

// Add adds a processor ID to the batch.
func (b *SchedulerBatch) Add(id int64) {
shardIdx := shardIndex(id, len(b.ids))
shardIdx := shardIndex(id, len(b.ids), b.priorityIDs[id])
b.ids[shardIdx] = append(b.ids[shardIdx], id)
}

Expand All @@ -515,6 +564,9 @@ func (b *SchedulerBatch) Close() {
for i := range b.ids {
b.ids[i] = b.ids[i][:0]
}
for id := range b.priorityIDs {
delete(b.priorityIDs, id)
}
schedulerBatchPool.Put(b)
}

Expand Down Expand Up @@ -545,8 +597,8 @@ func (cs *ClientScheduler) ID() int64 {
// Register registers processing callback in scheduler. Error is returned if
// callback was already registered for this ClientScheduler or if scheduler is
// already quiescing.
func (cs *ClientScheduler) Register(cb Callback) error {
return cs.s.register(cs.id, cb)
func (cs *ClientScheduler) Register(cb Callback, priority bool) error {
return cs.s.register(cs.id, cb, priority)
}

// Enqueue schedules callback execution for event.
Expand Down
Loading

0 comments on commit 3d76521

Please sign in to comment.