From 518fe21e858835d7d91e952f23aec6df344eb96e Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 19 Nov 2020 23:10:39 -0500 Subject: [PATCH 1/2] kv: don't signal in Raft scheduler after concurrent call detected The current worker is already going to loop back around and try to pull off the queue before waiting, so it can guarantee that someone will process the range. This commit makes me a little nervous because the code seems so deliberate, so I may be missing something, but I've stared at this for a while and don't see what that could be. --- pkg/kv/kvserver/scheduler.go | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kvserver/scheduler.go b/pkg/kv/kvserver/scheduler.go index f7462c16c20d..3e83f0eab600 100644 --- a/pkg/kv/kvserver/scheduler.go +++ b/pkg/kv/kvserver/scheduler.go @@ -275,10 +275,28 @@ func (s *raftScheduler) worker(ctx context.Context) { // state map. delete(s.mu.state, id) } else { - // There was a concurrent call to one of the Enqueue* methods. Queue the - // range ID for further processing. + // There was a concurrent call to one of the Enqueue* methods. Queue + // the range ID for further processing. + // + // Even though the Enqueue* method did not signal after detecting + // that the range was being processed, there also is no need for us + // to signal the condition variable. This is because this worker + // goroutine will loop back around and continue working without ever + // going back to sleep. + // + // We can prove this out through a short derivation. + // - For optimal concurrency, we want: + // awake_workers = min(max_workers, num_ranges) + // - The condition variable / mutex structure ensures that: + // awake_workers = cur_awake_workers + num_signals + // - So we need the following number of signals for optimal concurrency: + // num_signals = min(max_workers, num_ranges) - cur_awake_workers + // - If we re-enqueue a range that's currently being processed, the + // num_ranges does not change once the current iteration completes + // and the worker does not go back to sleep between the current + // iteration and the next iteration, so no change to num_signals + // is needed. s.mu.queue.Push(id) - s.mu.cond.Signal() } } } From 4c53af121da5609af679c6491541ff0b955502a6 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 19 Nov 2020 23:12:57 -0500 Subject: [PATCH 2/2] kv: add raft.scheduler.latency histogram metric This commit adds a new `raft.scheduler.latency` metric, which monitors the latency between initially enqueuing a range in the Raft scheduler and that range being picked up by a worker and processed. This metric would be helpful to watch in cases where Raft slows down due to many active ranges. Release note (ui change): A new metric called `raft.scheduler.latency` was added which monitors the latency for operations to be picked up and processed by the Raft scheduler. --- pkg/kv/kvserver/metrics.go | 8 ++++ pkg/kv/kvserver/scheduler.go | 65 +++++++++++++++++++++---------- pkg/kv/kvserver/scheduler_test.go | 13 +++++-- pkg/ts/catalog/chart_catalog.go | 4 ++ 4 files changed, 66 insertions(+), 24 deletions(-) diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index ed06a9251be6..634c99d90217 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -517,6 +517,12 @@ var ( Measurement: "Latency", Unit: metric.Unit_NANOSECONDS, } + metaRaftSchedulerLatency = metric.Metadata{ + Name: "raft.scheduler.latency", + Help: "Nanoseconds spent waiting for a range to be processed by the Raft scheduler", + Measurement: "Latency", + Unit: metric.Unit_NANOSECONDS, + } // Raft message metrics. metaRaftRcvdProp = metric.Metadata{ @@ -1126,6 +1132,7 @@ type StoreMetrics struct { RaftCommandCommitLatency *metric.Histogram RaftHandleReadyLatency *metric.Histogram RaftApplyCommittedLatency *metric.Histogram + RaftSchedulerLatency *metric.Histogram // Raft message metrics. // @@ -1491,6 +1498,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { RaftCommandCommitLatency: metric.NewLatency(metaRaftCommandCommitLatency, histogramWindow), RaftHandleReadyLatency: metric.NewLatency(metaRaftHandleReadyLatency, histogramWindow), RaftApplyCommittedLatency: metric.NewLatency(metaRaftApplyCommittedLatency, histogramWindow), + RaftSchedulerLatency: metric.NewLatency(metaRaftSchedulerLatency, histogramWindow), // Raft message metrics. RaftRcvdMessages: [...]*metric.Counter{ diff --git a/pkg/kv/kvserver/scheduler.go b/pkg/kv/kvserver/scheduler.go index 3e83f0eab600..037996a7efac 100644 --- a/pkg/kv/kvserver/scheduler.go +++ b/pkg/kv/kvserver/scheduler.go @@ -17,8 +17,10 @@ import ( "sync" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) const rangeIDChunkSize = 1000 @@ -143,17 +145,23 @@ type raftProcessor interface { processTick(context.Context, roachpb.RangeID) bool } -type raftScheduleState int +type raftScheduleFlags int const ( - stateQueued raftScheduleState = 1 << iota + stateQueued raftScheduleFlags = 1 << iota stateRaftReady stateRaftRequest stateRaftTick ) +type raftScheduleState struct { + flags raftScheduleFlags + begin int64 // nanoseconds +} + type raftScheduler struct { processor raftProcessor + latency *metric.Histogram numWorkers int mu struct { @@ -172,6 +180,7 @@ func newRaftScheduler( ) *raftScheduler { s := &raftScheduler{ processor: processor, + latency: metrics.RaftSchedulerLatency, numWorkers: numWorkers, } s.mu.cond = sync.NewCond(&s.mu.Mutex) @@ -243,34 +252,38 @@ func (s *raftScheduler) worker(ctx context.Context) { // the range ID marked as "queued" so that a concurrent Enqueue* will not // queue the range ID again. state := s.mu.state[id] - s.mu.state[id] = stateQueued + s.mu.state[id] = raftScheduleState{flags: stateQueued} s.mu.Unlock() + // Record the scheduling latency for the range. + lat := nowNanos() - state.begin + s.latency.RecordValue(lat) + // Process requests first. This avoids a scenario where a tick and a // "quiesce" message are processed in the same iteration and intervening // raft ready processing unquiesces the replica because the tick triggers // an election. - if state&stateRaftRequest != 0 { + if state.flags&stateRaftRequest != 0 { // processRequestQueue returns true if the range should perform ready // processing. Do not reorder this below the call to processReady. if s.processor.processRequestQueue(ctx, id) { - state |= stateRaftReady + state.flags |= stateRaftReady } } - if state&stateRaftTick != 0 { + if state.flags&stateRaftTick != 0 { // processRaftTick returns true if the range should perform ready // processing. Do not reorder this below the call to processReady. if s.processor.processTick(ctx, id) { - state |= stateRaftReady + state.flags |= stateRaftReady } } - if state&stateRaftReady != 0 { + if state.flags&stateRaftReady != 0 { s.processor.processReady(ctx, id) } s.mu.Lock() state = s.mu.state[id] - if state == stateQueued { + if state.flags == stateQueued { // No further processing required by the range ID, clear it from the // state map. delete(s.mu.state, id) @@ -301,30 +314,36 @@ func (s *raftScheduler) worker(ctx context.Context) { } } -func (s *raftScheduler) enqueue1Locked(addState raftScheduleState, id roachpb.RangeID) int { +func (s *raftScheduler) enqueue1Locked( + addFlags raftScheduleFlags, id roachpb.RangeID, now int64, +) int { prevState := s.mu.state[id] - if prevState&addState == addState { + if prevState.flags&addFlags == addFlags { return 0 } var queued int - newState := prevState | addState - if newState&stateQueued == 0 { - newState |= stateQueued + newState := prevState + newState.flags = newState.flags | addFlags + if newState.flags&stateQueued == 0 { + newState.flags |= stateQueued queued++ s.mu.queue.Push(id) } + if newState.begin == 0 { + newState.begin = now + } s.mu.state[id] = newState return queued } -func (s *raftScheduler) enqueue1(addState raftScheduleState, id roachpb.RangeID) int { +func (s *raftScheduler) enqueue1(addFlags raftScheduleFlags, id roachpb.RangeID) int { + now := nowNanos() s.mu.Lock() - count := s.enqueue1Locked(addState, id) - s.mu.Unlock() - return count + defer s.mu.Unlock() + return s.enqueue1Locked(addFlags, id, now) } -func (s *raftScheduler) enqueueN(addState raftScheduleState, ids ...roachpb.RangeID) int { +func (s *raftScheduler) enqueueN(addFlags raftScheduleFlags, ids ...roachpb.RangeID) int { // Enqueue the ids in chunks to avoid hold raftScheduler.mu for too long. const enqueueChunkSize = 128 @@ -333,12 +352,14 @@ func (s *raftScheduler) enqueueN(addState raftScheduleState, ids ...roachpb.Rang return 0 } + now := nowNanos() s.mu.Lock() var count int for i, id := range ids { - count += s.enqueue1Locked(addState, id) + count += s.enqueue1Locked(addFlags, id, now) if (i+1)%enqueueChunkSize == 0 { s.mu.Unlock() + now = nowNanos() s.mu.Lock() } } @@ -371,3 +392,7 @@ func (s *raftScheduler) EnqueueRaftRequests(ids ...roachpb.RangeID) { func (s *raftScheduler) EnqueueRaftTicks(ids ...roachpb.RangeID) { s.signal(s.enqueueN(stateRaftTick, ids...)) } + +func nowNanos() int64 { + return timeutil.Now().UnixNano() +} diff --git a/pkg/kv/kvserver/scheduler_test.go b/pkg/kv/kvserver/scheduler_test.go index 01984984324e..767e0c247d72 100644 --- a/pkg/kv/kvserver/scheduler_test.go +++ b/pkg/kv/kvserver/scheduler_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" @@ -229,8 +230,9 @@ func TestSchedulerLoop(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + m := newStoreMetrics(metric.TestSampleInterval) p := newTestProcessor() - s := newRaftScheduler(nil, p, 1) + s := newRaftScheduler(m, p, 1) stopper := stop.NewStopper() ctx := context.Background() defer stopper.Stop(ctx) @@ -244,6 +246,8 @@ func TestSchedulerLoop(t *testing.T) { } return nil }) + + require.Equal(t, int64(3), m.RaftSchedulerLatency.TotalCount()) } // Verify that when we enqueue the same range multiple times for the same @@ -252,15 +256,16 @@ func TestSchedulerBuffering(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + m := newStoreMetrics(metric.TestSampleInterval) p := newTestProcessor() - s := newRaftScheduler(nil, p, 1) + s := newRaftScheduler(m, p, 1) stopper := stop.NewStopper() ctx := context.Background() defer stopper.Stop(ctx) s.Start(ctx, stopper) testCases := []struct { - state raftScheduleState + flag raftScheduleFlags expected string }{ {stateRaftReady, "ready=[1:1] request=[] tick=[]"}, @@ -270,7 +275,7 @@ func TestSchedulerBuffering(t *testing.T) { } for _, c := range testCases { - s.signal(s.enqueueN(c.state, 1, 1, 1, 1, 1)) + s.signal(s.enqueueN(c.flag, 1, 1, 1, 1, 1)) testutils.SucceedsSoon(t, func() error { if s := p.String(); c.expected != s { diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 2db3e2db006a..c348876447ce 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -1238,6 +1238,10 @@ var charts = []sectionDescription{ Title: "Log Commit", Metrics: []string{"raft.process.logcommit.latency"}, }, + { + Title: "Scheduler", + Metrics: []string{"raft.scheduler.latency"}, + }, }, }, {