From 347814009c848e6b9aaed0be73a8ad81e4244fe9 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 17 Mar 2023 12:03:35 +0000 Subject: [PATCH] kvserver: shard Raft scheduler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Raft scheduler mutex can become very contended on machines with many cores and high range counts. This patch shards the scheduler by allocating ranges and workers to individual scheduler shards. By default, we create a new shard for every 16 workers, and distribute workers evenly. We spin up 8 workers per CPU core, capped at 96, so 16 is equivalent to 2 CPUs per shard, or a maximum of 6 shards. This significantly relieves contention at high core counts, while also avoiding starvation by excessive sharding. The shard size can be adjusted via `COCKROACH_SCHEDULER_SHARD_SIZE`. This results in a substantial performance improvement on high-CPU nodes: ``` name old ops/sec new ops/sec delta kv0/enc=false/nodes=3/cpu=4 7.71k ± 5% 7.93k ± 4% ~ (p=0.310 n=5+5) kv0/enc=false/nodes=3/cpu=8 15.6k ± 3% 14.8k ± 7% ~ (p=0.095 n=5+5) kv0/enc=false/nodes=3/cpu=32 43.4k ± 2% 45.0k ± 3% +3.73% (p=0.032 n=5+5) kv0/enc=false/nodes=3/cpu=96/cloud=aws 40.5k ± 2% 61.7k ± 1% +52.53% (p=0.008 n=5+5) kv0/enc=false/nodes=3/cpu=96/cloud=gce 35.6k ± 4% 44.5k ± 0% +24.99% (p=0.008 n=5+5) name old p50 new p50 delta kv0/enc=false/nodes=3/cpu=4 21.2 ± 6% 20.6 ± 3% ~ (p=0.397 n=5+5) kv0/enc=false/nodes=3/cpu=8 10.5 ± 0% 11.2 ± 8% ~ (p=0.079 n=4+5) kv0/enc=false/nodes=3/cpu=32 4.16 ± 1% 4.00 ± 5% ~ (p=0.143 n=5+5) kv0/enc=false/nodes=3/cpu=96/cloud=aws 3.00 ± 0% 2.00 ± 0% ~ (p=0.079 n=4+5) kv0/enc=false/nodes=3/cpu=96/cloud=gce 4.70 ± 0% 4.10 ± 0% -12.77% (p=0.000 n=5+4) name old p95 new p95 delta kv0/enc=false/nodes=3/cpu=4 61.6 ± 5% 60.8 ± 3% ~ (p=0.762 n=5+5) kv0/enc=false/nodes=3/cpu=8 28.3 ± 4% 30.4 ± 0% +7.34% (p=0.016 n=5+4) kv0/enc=false/nodes=3/cpu=32 7.98 ± 2% 7.60 ± 0% -4.76% (p=0.008 n=5+5) kv0/enc=false/nodes=3/cpu=96/cloud=aws 12.3 ± 2% 6.8 ± 0% -44.72% (p=0.000 n=5+4) kv0/enc=false/nodes=3/cpu=96/cloud=gce 10.2 ± 3% 8.9 ± 0% -12.75% (p=0.000 n=5+4) name old p99 new p99 delta kv0/enc=false/nodes=3/cpu=4 89.8 ± 7% 88.9 ± 6% ~ (p=0.921 n=5+5) kv0/enc=false/nodes=3/cpu=8 46.1 ± 0% 48.6 ± 5% +5.47% (p=0.048 n=5+5) kv0/enc=false/nodes=3/cpu=32 11.5 ± 0% 11.0 ± 0% -4.35% (p=0.008 n=5+5) kv0/enc=false/nodes=3/cpu=96/cloud=aws 14.0 ± 3% 12.1 ± 0% -13.32% (p=0.008 n=5+5) kv0/enc=false/nodes=3/cpu=96/cloud=gce 14.3 ± 3% 13.1 ± 0% -8.55% (p=0.000 n=4+5) ``` Furthermore, on an idle 24-core 3-node cluster with 50.000 unquiesced ranges, this reduced CPU usage from 12% to 10%. The basic cost of enqueueing ranges in the scheduler (without workers or contention) only increases slightly in absolute terms, thanks to `raftSchedulerBatch` pre-sharding the enqueued ranges: ``` name old time/op new time/op delta SchedulerEnqueueRaftTicks/collect=true/ranges=10/workers=8-24 457ns ± 2% 564ns ± 2% +23.36% (p=0.001 n=7+7) SchedulerEnqueueRaftTicks/collect=true/ranges=10/workers=16-24 461ns ± 3% 563ns ± 2% +22.14% (p=0.000 n=8+8) SchedulerEnqueueRaftTicks/collect=true/ranges=10/workers=32-24 459ns ± 2% 591ns ± 2% +28.63% (p=0.000 n=8+8) SchedulerEnqueueRaftTicks/collect=true/ranges=10/workers=64-24 455ns ± 0% 776ns ± 5% +70.60% (p=0.001 n=6+8) SchedulerEnqueueRaftTicks/collect=true/ranges=10/workers=128-24 456ns ± 2% 1058ns ± 1% +132.13% (p=0.000 n=8+8) SchedulerEnqueueRaftTicks/collect=true/ranges=100000/workers=8-24 7.15ms ± 1% 8.18ms ± 1% +14.33% (p=0.000 n=8+8) SchedulerEnqueueRaftTicks/collect=true/ranges=100000/workers=16-24 7.13ms ± 1% 8.18ms ± 1% +14.77% (p=0.000 n=8+8) SchedulerEnqueueRaftTicks/collect=true/ranges=100000/workers=32-24 7.12ms ± 2% 7.86ms ± 1% +10.30% (p=0.000 n=8+8) SchedulerEnqueueRaftTicks/collect=true/ranges=100000/workers=64-24 7.20ms ± 1% 7.11ms ± 1% -1.27% (p=0.001 n=8+8) SchedulerEnqueueRaftTicks/collect=true/ranges=100000/workers=128-24 7.12ms ± 2% 7.16ms ± 3% ~ (p=0.721 n=8+8) ``` Epic: none Release note (performance improvement): The Raft scheduler is now sharded to relieve contention during range Raft processing, which can significantly improve performance at high CPU core counts. --- pkg/kv/kvserver/scheduler.go | 243 +++++++++++++++++++----------- pkg/kv/kvserver/scheduler_test.go | 102 +++++++++++-- pkg/kv/kvserver/store.go | 9 +- pkg/kv/kvserver/store_raft.go | 16 +- 4 files changed, 262 insertions(+), 108 deletions(-) diff --git a/pkg/kv/kvserver/scheduler.go b/pkg/kv/kvserver/scheduler.go index 062bb4562843..7c26caabe0a3 100644 --- a/pkg/kv/kvserver/scheduler.go +++ b/pkg/kv/kvserver/scheduler.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -178,22 +177,56 @@ type raftScheduleState struct { ticks int } +var raftSchedulerBatchPool = sync.Pool{ + New: func() interface{} { + return new(raftSchedulerBatch) + }, +} + +// raftSchedulerBatch is a batch of range IDs to enqueue. It enables +// efficient per-shard enqueueing. +type raftSchedulerBatch [][]roachpb.RangeID // by shard + +func newRaftSchedulerBatch(numShards int) raftSchedulerBatch { + b := raftSchedulerBatchPool.Get().(*raftSchedulerBatch) + if len(*b) != numShards { + *b = make([][]roachpb.RangeID, numShards) + } + return *b +} + +func (b raftSchedulerBatch) Add(id roachpb.RangeID) { + shard := int(id) % len(b) + b[shard] = append(b[shard], id) +} + +func (b raftSchedulerBatch) Reset() { + for i := range b { + b[i] = b[i][:0] + } +} + +func (b raftSchedulerBatch) Close() { + b.Reset() + raftSchedulerBatchPool.Put(&b) +} + type raftScheduler struct { ambientContext log.AmbientContext processor raftProcessor - latency metric.IHistogram - numWorkers int - maxTicks int - - mu struct { - syncutil.Mutex - cond *sync.Cond - queue rangeIDQueue - state map[roachpb.RangeID]raftScheduleState - stopped bool - } + metrics *StoreMetrics + shards []*raftSchedulerShard // RangeID % len(shards) + done sync.WaitGroup +} - done sync.WaitGroup +type raftSchedulerShard struct { + syncutil.Mutex + cond *sync.Cond + queue rangeIDQueue + state map[roachpb.RangeID]raftScheduleState + numWorkers int + maxTicks int + stopped bool } func newRaftScheduler( @@ -201,17 +234,31 @@ func newRaftScheduler( metrics *StoreMetrics, processor raftProcessor, numWorkers int, + shardSize int, maxTicks int, ) *raftScheduler { s := &raftScheduler{ ambientContext: ambient, processor: processor, - latency: metrics.RaftSchedulerLatency, - numWorkers: numWorkers, - maxTicks: maxTicks, + metrics: metrics, + } + numShards := 1 + if shardSize > 0 && numWorkers > shardSize { + numShards = (numWorkers-1)/shardSize + 1 + } + for i := 0; i < numShards; i++ { + shardWorkers := numWorkers / numShards + if i < numWorkers%numShards { // distribute remainder + shardWorkers++ + } + shard := &raftSchedulerShard{ + state: map[roachpb.RangeID]raftScheduleState{}, + numWorkers: shardWorkers, + maxTicks: maxTicks, + } + shard.cond = sync.NewCond(&shard.Mutex) + s.shards = append(s.shards, shard) } - s.mu.cond = sync.NewCond(&s.mu.Mutex) - s.mu.state = make(map[roachpb.RangeID]raftScheduleState) return s } @@ -219,10 +266,12 @@ func (s *raftScheduler) Start(stopper *stop.Stopper) { ctx := s.ambientContext.AnnotateCtx(context.Background()) waitQuiesce := func(context.Context) { <-stopper.ShouldQuiesce() - s.mu.Lock() - s.mu.stopped = true - s.mu.Unlock() - s.mu.cond.Broadcast() + for _, shard := range s.shards { + shard.Lock() + shard.stopped = true + shard.Unlock() + shard.cond.Broadcast() + } } if err := stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{ @@ -235,17 +284,24 @@ func (s *raftScheduler) Start(stopper *stop.Stopper) { waitQuiesce(ctx) } - s.done.Add(s.numWorkers) - for i := 0; i < s.numWorkers; i++ { - if err := stopper.RunAsyncTaskEx(ctx, - stop.TaskOpts{ - TaskName: "raft-worker", - // This task doesn't reference a parent because it runs for the server's - // lifetime. - SpanOpt: stop.SterileRootSpan, - }, - s.worker); err != nil { - s.done.Done() + for _, shard := range s.shards { + s.done.Add(shard.numWorkers) + shard := shard // pin loop variable + for i := 0; i < shard.numWorkers; i++ { + if err := stopper.RunAsyncTaskEx(ctx, + stop.TaskOpts{ + TaskName: "raft-worker", + // This task doesn't reference a parent because it runs for the server's + // lifetime. + SpanOpt: stop.SterileRootSpan, + }, + func(ctx context.Context) { + shard.worker(ctx, s.processor, s.metrics) + s.done.Done() + }, + ); err != nil { + s.done.Done() + } } } } @@ -257,19 +313,22 @@ func (s *raftScheduler) Wait(context.Context) { // SetPriorityID configures the single range that the scheduler will prioritize // above others. Once set, callers are not permitted to change this value. func (s *raftScheduler) SetPriorityID(id roachpb.RangeID) { - s.mu.Lock() - s.mu.queue.SetPriorityID(id) - s.mu.Unlock() + for _, shard := range s.shards { + shard.Lock() + shard.queue.SetPriorityID(id) + shard.Unlock() + } } func (s *raftScheduler) PriorityID() roachpb.RangeID { - s.mu.Lock() - defer s.mu.Unlock() - return s.mu.queue.priorityID + s.shards[0].Lock() + defer s.shards[0].Unlock() + return s.shards[0].queue.priorityID } -func (s *raftScheduler) worker(ctx context.Context) { - defer s.done.Done() +func (ss *raftSchedulerShard) worker( + ctx context.Context, processor raftProcessor, metrics *StoreMetrics, +) { // We use a sync.Cond for worker notification instead of a buffered // channel. Buffered channels have internal overhead for maintaining the @@ -278,31 +337,31 @@ func (s *raftScheduler) worker(ctx context.Context) { // signaling a sync.Cond is significantly faster than selecting and sending // on a buffered channel. - s.mu.Lock() + ss.Lock() for { var id roachpb.RangeID for { - if s.mu.stopped { - s.mu.Unlock() + if ss.stopped { + ss.Unlock() return } var ok bool - if id, ok = s.mu.queue.PopFront(); ok { + if id, ok = ss.queue.PopFront(); ok { break } - s.mu.cond.Wait() + ss.cond.Wait() } // Grab and clear the existing state for the range ID. Note that we leave // the range ID marked as "queued" so that a concurrent Enqueue* will not // queue the range ID again. - state := s.mu.state[id] - s.mu.state[id] = raftScheduleState{flags: stateQueued} - s.mu.Unlock() + state := ss.state[id] + ss.state[id] = raftScheduleState{flags: stateQueued} + ss.Unlock() // Record the scheduling latency for the range. lat := nowNanos() - state.begin - s.latency.RecordValue(lat) + metrics.RaftSchedulerLatency.RecordValue(lat) // Process requests first. This avoids a scenario where a tick and a // "quiesce" message are processed in the same iteration and intervening @@ -311,7 +370,7 @@ func (s *raftScheduler) worker(ctx context.Context) { if state.flags&stateRaftRequest != 0 { // processRequestQueue returns true if the range should perform ready // processing. Do not reorder this below the call to processReady. - if s.processor.processRequestQueue(ctx, id) { + if processor.processRequestQueue(ctx, id) { state.flags |= stateRaftReady } } @@ -324,21 +383,21 @@ func (s *raftScheduler) worker(ctx context.Context) { for t := state.ticks; t > 0; t-- { // processRaftTick returns true if the range should perform ready // processing. Do not reorder this below the call to processReady. - if s.processor.processTick(ctx, id) { + if processor.processTick(ctx, id) { state.flags |= stateRaftReady } } } if state.flags&stateRaftReady != 0 { - s.processor.processReady(id) + processor.processReady(id) } - s.mu.Lock() - state = s.mu.state[id] + ss.Lock() + state = ss.state[id] if state.flags == stateQueued { // No further processing required by the range ID, clear it from the // state map. - delete(s.mu.state, id) + delete(ss.state, id) } else { // There was a concurrent call to one of the Enqueue* methods. Queue // the range ID for further processing. @@ -361,17 +420,24 @@ func (s *raftScheduler) worker(ctx context.Context) { // and the worker does not go back to sleep between the current // iteration and the next iteration, so no change to num_signals // is needed. - s.mu.queue.Push(id) + ss.queue.Push(id) } } } -func (s *raftScheduler) enqueue1Locked( +// NewEnqueueBatch creates a new range ID batch for enqueueing via +// EnqueueRaft(Ticks|Requests). The caller must call Close() on the batch when +// done. +func (s *raftScheduler) NewEnqueueBatch() raftSchedulerBatch { + return newRaftSchedulerBatch(len(s.shards)) +} + +func (ss *raftSchedulerShard) enqueue1Locked( addFlags raftScheduleFlags, id roachpb.RangeID, now int64, ) int { ticks := int((addFlags & stateRaftTick) / stateRaftTick) // 0 or 1 - prevState := s.mu.state[id] + prevState := ss.state[id] if prevState.flags&addFlags == addFlags && ticks == 0 { return 0 } @@ -379,30 +445,32 @@ func (s *raftScheduler) enqueue1Locked( newState := prevState newState.flags = newState.flags | addFlags newState.ticks += ticks - if newState.ticks > s.maxTicks { - newState.ticks = s.maxTicks + if newState.ticks > ss.maxTicks { + newState.ticks = ss.maxTicks } if newState.flags&stateQueued == 0 { newState.flags |= stateQueued queued++ - s.mu.queue.Push(id) + ss.queue.Push(id) } if newState.begin == 0 { newState.begin = now } - s.mu.state[id] = newState + ss.state[id] = newState return queued } -func (s *raftScheduler) enqueue1(addFlags raftScheduleFlags, id roachpb.RangeID) int { +func (s *raftScheduler) enqueue1(addFlags raftScheduleFlags, id roachpb.RangeID) { now := nowNanos() - s.mu.Lock() - defer s.mu.Unlock() - return s.enqueue1Locked(addFlags, id, now) + shard := s.shards[int(id)%len(s.shards)] + shard.Lock() + n := shard.enqueue1Locked(addFlags, id, now) + shard.Unlock() + shard.signal(n) } -func (s *raftScheduler) enqueueN(addFlags raftScheduleFlags, ids ...roachpb.RangeID) int { - // Enqueue the ids in chunks to avoid hold raftScheduler.mu for too long. +func (ss *raftSchedulerShard) enqueueN(addFlags raftScheduleFlags, ids ...roachpb.RangeID) int { + // Enqueue the ids in chunks to avoid holding mutex for too long. const enqueueChunkSize = 128 // Avoid locking for 0 new ranges. @@ -411,44 +479,51 @@ func (s *raftScheduler) enqueueN(addFlags raftScheduleFlags, ids ...roachpb.Rang } now := nowNanos() - s.mu.Lock() + ss.Lock() var count int for i, id := range ids { - count += s.enqueue1Locked(addFlags, id, now) + count += ss.enqueue1Locked(addFlags, id, now) if (i+1)%enqueueChunkSize == 0 { - s.mu.Unlock() + ss.Unlock() now = nowNanos() - s.mu.Lock() + ss.Lock() } } - s.mu.Unlock() + ss.Unlock() return count } -func (s *raftScheduler) signal(count int) { - if count >= s.numWorkers { - s.mu.cond.Broadcast() +func (s *raftScheduler) enqueueBatch(addFlags raftScheduleFlags, batch raftSchedulerBatch) { + for i, ids := range batch { + count := s.shards[i].enqueueN(addFlags, ids...) + s.shards[i].signal(count) + } +} + +func (ss *raftSchedulerShard) signal(count int) { + if count >= ss.numWorkers { + ss.cond.Broadcast() } else { for i := 0; i < count; i++ { - s.mu.cond.Signal() + ss.cond.Signal() } } } func (s *raftScheduler) EnqueueRaftReady(id roachpb.RangeID) { - s.signal(s.enqueue1(stateRaftReady, id)) + s.enqueue1(stateRaftReady, id) } func (s *raftScheduler) EnqueueRaftRequest(id roachpb.RangeID) { - s.signal(s.enqueue1(stateRaftRequest, id)) + s.enqueue1(stateRaftRequest, id) } -func (s *raftScheduler) EnqueueRaftRequests(ids ...roachpb.RangeID) { - s.signal(s.enqueueN(stateRaftRequest, ids...)) +func (s *raftScheduler) EnqueueRaftRequests(batch raftSchedulerBatch) { + s.enqueueBatch(stateRaftRequest, batch) } -func (s *raftScheduler) EnqueueRaftTicks(ids ...roachpb.RangeID) { - s.signal(s.enqueueN(stateRaftTick, ids...)) +func (s *raftScheduler) EnqueueRaftTicks(batch raftSchedulerBatch) { + s.enqueueBatch(stateRaftTick, batch) } func nowNanos() int64 { diff --git a/pkg/kv/kvserver/scheduler_test.go b/pkg/kv/kvserver/scheduler_test.go index a85322c5e8a3..76435b37439b 100644 --- a/pkg/kv/kvserver/scheduler_test.go +++ b/pkg/kv/kvserver/scheduler_test.go @@ -248,10 +248,15 @@ func TestSchedulerLoop(t *testing.T) { m := newStoreMetrics(metric.TestSampleInterval) p := newTestProcessor() - s := newRaftScheduler(log.MakeTestingAmbientContext(stopper.Tracer()), m, p, 1, 1) - + s := newRaftScheduler(log.MakeTestingAmbientContext(stopper.Tracer()), m, p, 1, 1, 1) s.Start(stopper) - s.EnqueueRaftTicks(1, 2, 3) + + batch := s.NewEnqueueBatch() + defer batch.Close() + batch.Add(1) + batch.Add(2) + batch.Add(3) + s.EnqueueRaftTicks(batch) testutils.SucceedsSoon(t, func() error { const expected = "ready=[] request=[] tick=[1:1,2:1,3:1]" @@ -278,7 +283,7 @@ func TestSchedulerBuffering(t *testing.T) { m := newStoreMetrics(metric.TestSampleInterval) p := newTestProcessor() - s := newRaftScheduler(log.MakeTestingAmbientContext(stopper.Tracer()), m, p, 1, 5) + s := newRaftScheduler(log.MakeTestingAmbientContext(stopper.Tracer()), m, p, 1, 1, 5) s.Start(stopper) testCases := []struct { @@ -313,7 +318,12 @@ func TestSchedulerBuffering(t *testing.T) { const id = roachpb.RangeID(1) if c.flag != 0 { - s.signal(s.enqueueN(c.flag, id, id, id, id, id)) + batch := s.NewEnqueueBatch() + for i := 0; i < 5; i++ { + batch.Add(id) + } + s.enqueueBatch(c.flag, batch) + batch.Close() if started != nil { <-started // wait until slow Ready processing has started // NB: this is necessary to work around the race between the subsequent @@ -322,11 +332,9 @@ func TestSchedulerBuffering(t *testing.T) { } } - cnt := 0 for t := c.ticks; t > 0; t-- { - cnt += s.enqueue1(stateRaftTick, id) + s.enqueue1(stateRaftTick, id) } - s.signal(cnt) if done != nil { close(done) // finish slow Ready processing to unblock progress } @@ -340,6 +348,64 @@ func TestSchedulerBuffering(t *testing.T) { } } +func TestNewSchedulerShards(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testcases := []struct { + workers int + shardSize int + expectShards []int + }{ + // NB: We balance workers across shards instead of filling up shards. We + // assume ranges are evenly distributed across shards, and want ranges to + // have about the same number of workers available on average. + {0, 0, []int{0}}, + {1, -1, []int{1}}, + {1, 0, []int{1}}, + {1, 1, []int{1}}, + {1, 2, []int{1}}, + {2, 2, []int{2}}, + {3, 2, []int{2, 1}}, + {1, 3, []int{1}}, + {2, 3, []int{2}}, + {3, 3, []int{3}}, + {4, 3, []int{2, 2}}, + {5, 3, []int{3, 2}}, + {6, 3, []int{3, 3}}, + {7, 3, []int{3, 2, 2}}, + {8, 3, []int{3, 3, 2}}, + {9, 3, []int{3, 3, 3}}, + {10, 3, []int{3, 3, 2, 2}}, + {11, 3, []int{3, 3, 3, 2}}, + {12, 3, []int{3, 3, 3, 3}}, + + // Typical examples, using 8 workers per CPU core. Note that we cap workers + // at 96 by default. + {1 * 8, 16, []int{8}}, + {2 * 8, 16, []int{16}}, + {3 * 8, 16, []int{12, 12}}, + {4 * 8, 16, []int{16, 16}}, + {6 * 8, 16, []int{16, 16, 16}}, + {8 * 8, 16, []int{16, 16, 16, 16}}, + {12 * 8, 16, []int{16, 16, 16, 16, 16, 16}}, // 96 workers + {16 * 8, 16, []int{16, 16, 16, 16, 16, 16, 16, 16}}, + } + for _, tc := range testcases { + t.Run(fmt.Sprintf("workers=%d/shardSize=%d", tc.workers, tc.shardSize), func(t *testing.T) { + m := newStoreMetrics(metric.TestSampleInterval) + p := newTestProcessor() + s := newRaftScheduler(log.MakeTestingAmbientContext(nil), m, p, tc.workers, tc.shardSize, 5) + + var shardWorkers []int + for _, shard := range s.shards { + shardWorkers = append(shardWorkers, shard.numWorkers) + } + require.Equal(t, tc.expectShards, shardWorkers) + }) + } +} + // BenchmarkSchedulerEnqueueRaftTicks benchmarks the performance of enqueueing // Raft ticks in the scheduler. This does *not* take contention into account, // and does not run any workers that pull work off of the queue: it enqueues the @@ -369,9 +435,10 @@ func runSchedulerEnqueueRaftTicks( stopper := stop.NewStopper() defer stopper.Stop(ctx) + a := log.MakeTestingAmbientContext(stopper.Tracer()) m := newStoreMetrics(metric.TestSampleInterval) p := newTestProcessor() - s := newRaftScheduler(log.MakeTestingAmbientContext(stopper.Tracer()), m, p, numWorkers, 5) + s := newRaftScheduler(a, m, p, numWorkers, storeSchedulerShardSize, 5) // raftTickLoop keeps unquiesced ranges in a map, so we do the same. ranges := make(map[roachpb.RangeID]struct{}) @@ -381,13 +448,14 @@ func runSchedulerEnqueueRaftTicks( // Collect range IDs in the same way as raftTickLoop does, such that the // performance is comparable. - var buf []roachpb.RangeID - getRangeIDs := func() []roachpb.RangeID { - buf = buf[:0] + batch := s.NewEnqueueBatch() + defer batch.Close() + getRangeIDs := func() raftSchedulerBatch { + batch.Reset() for id := range ranges { - buf = append(buf, id) + batch.Add(id) } - return buf + return batch } ids := getRangeIDs() @@ -397,10 +465,12 @@ func runSchedulerEnqueueRaftTicks( if collect { ids = getRangeIDs() } - s.EnqueueRaftTicks(ids...) + s.EnqueueRaftTicks(ids) // Flush the queue. We haven't started any workers that pull from it, so we // just clear it out. - s.mu.queue = rangeIDQueue{} + for _, shard := range s.shards { + shard.queue = rangeIDQueue{} + } } } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 45ae1e241d3b..dd553221a229 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -123,6 +123,13 @@ var storeSchedulerConcurrency = envutil.EnvOrDefaultInt( // As of November 2020, this default value could be re-tuned. "COCKROACH_SCHEDULER_CONCURRENCY", min(8*runtime.GOMAXPROCS(0), 96)) +// storeSchedulerShardSize specifies the maximum number of scheduler worker +// goroutines per mutex shard. By default, we spin up 8 workers per CPU core, +// capped at 96, so 16 is equivalent to 2 CPUs per shard, or a maximum of 6 +// shards. This significantly relieves contention at high core counts, while +// also avoiding starvation by excessive sharding. +var storeSchedulerShardSize = envutil.EnvOrDefaultInt("COCKROACH_SCHEDULER_SHARD_SIZE", 16) + var logSSTInfoTicks = envutil.EnvOrDefaultInt( "COCKROACH_LOG_SST_INFO_TICKS_INTERVAL", 60) @@ -1295,7 +1302,7 @@ func NewStore( // NB: buffer up to RaftElectionTimeoutTicks in Raft scheduler to avoid // unnecessary elections when ticks are temporarily delayed and piled up. s.scheduler = newRaftScheduler(cfg.AmbientCtx, s.metrics, s, storeSchedulerConcurrency, - cfg.RaftElectionTimeoutTicks) + storeSchedulerShardSize, cfg.RaftElectionTimeoutTicks) s.syncWaiter = logstore.NewSyncWaiterLoop() diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index f37d0a213236..dc99302c14ae 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -226,7 +226,8 @@ func (s *Store) uncoalesceBeats( log.Infof(ctx, "uncoalescing %d beats of type %v: %+v", len(beats), msgT, beats) } beatReqs := make([]kvserverpb.RaftMessageRequest, len(beats)) - var toEnqueue []roachpb.RangeID + batch := s.scheduler.NewEnqueueBatch() + defer batch.Close() for i, beat := range beats { msg := raftpb.Message{ Type: msgT, @@ -257,10 +258,10 @@ func (s *Store) uncoalesceBeats( enqueue := s.HandleRaftUncoalescedRequest(ctx, &beatReqs[i], respStream) if enqueue { - toEnqueue = append(toEnqueue, beat.RangeID) + batch.Add(beat.RangeID) } } - s.scheduler.EnqueueRaftRequests(toEnqueue...) + s.scheduler.EnqueueRaftRequests(batch) } // HandleRaftRequest dispatches a raft message to the appropriate Replica. It @@ -752,12 +753,12 @@ func (s *Store) raftTickLoop(ctx context.Context) { ticker := time.NewTicker(s.cfg.RaftTickInterval) defer ticker.Stop() - var rangeIDs []roachpb.RangeID + batch := s.scheduler.NewEnqueueBatch() + defer batch.Close() // reuse the same batch until done for { select { case <-ticker.C: - rangeIDs = rangeIDs[:0] // Update the liveness map. if s.cfg.NodeLiveness != nil { s.updateLivenessMap() @@ -770,12 +771,13 @@ func (s *Store) raftTickLoop(ctx context.Context) { // then a single bad/slow Replica can disrupt tick processing for every // Replica on the store which cascades into Raft elections and more // disruption. + batch.Reset() for rangeID := range s.unquiescedReplicas.m { - rangeIDs = append(rangeIDs, rangeID) + batch.Add(rangeID) } s.unquiescedReplicas.Unlock() - s.scheduler.EnqueueRaftTicks(rangeIDs...) + s.scheduler.EnqueueRaftTicks(batch) s.metrics.RaftTicks.Inc(1) case <-s.stopper.ShouldQuiesce():