Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
56943: kv: add raft.scheduler.latency histogram metric r=nvanbenschoten a=nvanbenschoten

This commit adds a new `raft.scheduler.latency` metric, which monitors
the latency between initially enqueuing a range in the Raft scheduler
and that range being picked up by a worker and processed. This metric
would be helpful to watch in cases where Raft slows down due to many
active ranges.

Release note (ui change): A new metric called `raft.scheduler.latency`
was added which monitors the latency for operations to be picked up
and processed by the Raft scheduler.

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Dec 1, 2020
2 parents 4e28ca3 + 4c53af1 commit 5740050
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 27 deletions.
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,12 @@ var (
Measurement: "Latency",
Unit: metric.Unit_NANOSECONDS,
}
metaRaftSchedulerLatency = metric.Metadata{
Name: "raft.scheduler.latency",
Help: "Nanoseconds spent waiting for a range to be processed by the Raft scheduler",
Measurement: "Latency",
Unit: metric.Unit_NANOSECONDS,
}

// Raft message metrics.
metaRaftRcvdProp = metric.Metadata{
Expand Down Expand Up @@ -1126,6 +1132,7 @@ type StoreMetrics struct {
RaftCommandCommitLatency *metric.Histogram
RaftHandleReadyLatency *metric.Histogram
RaftApplyCommittedLatency *metric.Histogram
RaftSchedulerLatency *metric.Histogram

// Raft message metrics.
//
Expand Down Expand Up @@ -1491,6 +1498,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
RaftCommandCommitLatency: metric.NewLatency(metaRaftCommandCommitLatency, histogramWindow),
RaftHandleReadyLatency: metric.NewLatency(metaRaftHandleReadyLatency, histogramWindow),
RaftApplyCommittedLatency: metric.NewLatency(metaRaftApplyCommittedLatency, histogramWindow),
RaftSchedulerLatency: metric.NewLatency(metaRaftSchedulerLatency, histogramWindow),

// Raft message metrics.
RaftRcvdMessages: [...]*metric.Counter{
Expand Down
89 changes: 66 additions & 23 deletions pkg/kv/kvserver/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import (
"sync"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

const rangeIDChunkSize = 1000
Expand Down Expand Up @@ -143,17 +145,23 @@ type raftProcessor interface {
processTick(context.Context, roachpb.RangeID) bool
}

type raftScheduleState int
type raftScheduleFlags int

const (
stateQueued raftScheduleState = 1 << iota
stateQueued raftScheduleFlags = 1 << iota
stateRaftReady
stateRaftRequest
stateRaftTick
)

type raftScheduleState struct {
flags raftScheduleFlags
begin int64 // nanoseconds
}

type raftScheduler struct {
processor raftProcessor
latency *metric.Histogram
numWorkers int

mu struct {
Expand All @@ -172,6 +180,7 @@ func newRaftScheduler(
) *raftScheduler {
s := &raftScheduler{
processor: processor,
latency: metrics.RaftSchedulerLatency,
numWorkers: numWorkers,
}
s.mu.cond = sync.NewCond(&s.mu.Mutex)
Expand Down Expand Up @@ -243,70 +252,98 @@ func (s *raftScheduler) worker(ctx context.Context) {
// the range ID marked as "queued" so that a concurrent Enqueue* will not
// queue the range ID again.
state := s.mu.state[id]
s.mu.state[id] = stateQueued
s.mu.state[id] = raftScheduleState{flags: stateQueued}
s.mu.Unlock()

// Record the scheduling latency for the range.
lat := nowNanos() - state.begin
s.latency.RecordValue(lat)

// Process requests first. This avoids a scenario where a tick and a
// "quiesce" message are processed in the same iteration and intervening
// raft ready processing unquiesces the replica because the tick triggers
// an election.
if state&stateRaftRequest != 0 {
if state.flags&stateRaftRequest != 0 {
// processRequestQueue returns true if the range should perform ready
// processing. Do not reorder this below the call to processReady.
if s.processor.processRequestQueue(ctx, id) {
state |= stateRaftReady
state.flags |= stateRaftReady
}
}
if state&stateRaftTick != 0 {
if state.flags&stateRaftTick != 0 {
// processRaftTick returns true if the range should perform ready
// processing. Do not reorder this below the call to processReady.
if s.processor.processTick(ctx, id) {
state |= stateRaftReady
state.flags |= stateRaftReady
}
}
if state&stateRaftReady != 0 {
if state.flags&stateRaftReady != 0 {
s.processor.processReady(ctx, id)
}

s.mu.Lock()
state = s.mu.state[id]
if state == stateQueued {
if state.flags == stateQueued {
// No further processing required by the range ID, clear it from the
// state map.
delete(s.mu.state, id)
} else {
// There was a concurrent call to one of the Enqueue* methods. Queue the
// range ID for further processing.
// There was a concurrent call to one of the Enqueue* methods. Queue
// the range ID for further processing.
//
// Even though the Enqueue* method did not signal after detecting
// that the range was being processed, there also is no need for us
// to signal the condition variable. This is because this worker
// goroutine will loop back around and continue working without ever
// going back to sleep.
//
// We can prove this out through a short derivation.
// - For optimal concurrency, we want:
// awake_workers = min(max_workers, num_ranges)
// - The condition variable / mutex structure ensures that:
// awake_workers = cur_awake_workers + num_signals
// - So we need the following number of signals for optimal concurrency:
// num_signals = min(max_workers, num_ranges) - cur_awake_workers
// - If we re-enqueue a range that's currently being processed, the
// num_ranges does not change once the current iteration completes
// and the worker does not go back to sleep between the current
// iteration and the next iteration, so no change to num_signals
// is needed.
s.mu.queue.Push(id)
s.mu.cond.Signal()
}
}
}

func (s *raftScheduler) enqueue1Locked(addState raftScheduleState, id roachpb.RangeID) int {
func (s *raftScheduler) enqueue1Locked(
addFlags raftScheduleFlags, id roachpb.RangeID, now int64,
) int {
prevState := s.mu.state[id]
if prevState&addState == addState {
if prevState.flags&addFlags == addFlags {
return 0
}
var queued int
newState := prevState | addState
if newState&stateQueued == 0 {
newState |= stateQueued
newState := prevState
newState.flags = newState.flags | addFlags
if newState.flags&stateQueued == 0 {
newState.flags |= stateQueued
queued++
s.mu.queue.Push(id)
}
if newState.begin == 0 {
newState.begin = now
}
s.mu.state[id] = newState
return queued
}

func (s *raftScheduler) enqueue1(addState raftScheduleState, id roachpb.RangeID) int {
func (s *raftScheduler) enqueue1(addFlags raftScheduleFlags, id roachpb.RangeID) int {
now := nowNanos()
s.mu.Lock()
count := s.enqueue1Locked(addState, id)
s.mu.Unlock()
return count
defer s.mu.Unlock()
return s.enqueue1Locked(addFlags, id, now)
}

func (s *raftScheduler) enqueueN(addState raftScheduleState, ids ...roachpb.RangeID) int {
func (s *raftScheduler) enqueueN(addFlags raftScheduleFlags, ids ...roachpb.RangeID) int {
// Enqueue the ids in chunks to avoid hold raftScheduler.mu for too long.
const enqueueChunkSize = 128

Expand All @@ -315,12 +352,14 @@ func (s *raftScheduler) enqueueN(addState raftScheduleState, ids ...roachpb.Rang
return 0
}

now := nowNanos()
s.mu.Lock()
var count int
for i, id := range ids {
count += s.enqueue1Locked(addState, id)
count += s.enqueue1Locked(addFlags, id, now)
if (i+1)%enqueueChunkSize == 0 {
s.mu.Unlock()
now = nowNanos()
s.mu.Lock()
}
}
Expand Down Expand Up @@ -353,3 +392,7 @@ func (s *raftScheduler) EnqueueRaftRequests(ids ...roachpb.RangeID) {
func (s *raftScheduler) EnqueueRaftTicks(ids ...roachpb.RangeID) {
s.signal(s.enqueueN(stateRaftTick, ids...))
}

func nowNanos() int64 {
return timeutil.Now().UnixNano()
}
13 changes: 9 additions & 4 deletions pkg/kv/kvserver/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -229,8 +230,9 @@ func TestSchedulerLoop(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

m := newStoreMetrics(metric.TestSampleInterval)
p := newTestProcessor()
s := newRaftScheduler(nil, p, 1)
s := newRaftScheduler(m, p, 1)
stopper := stop.NewStopper()
ctx := context.Background()
defer stopper.Stop(ctx)
Expand All @@ -244,6 +246,8 @@ func TestSchedulerLoop(t *testing.T) {
}
return nil
})

require.Equal(t, int64(3), m.RaftSchedulerLatency.TotalCount())
}

// Verify that when we enqueue the same range multiple times for the same
Expand All @@ -252,15 +256,16 @@ func TestSchedulerBuffering(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

m := newStoreMetrics(metric.TestSampleInterval)
p := newTestProcessor()
s := newRaftScheduler(nil, p, 1)
s := newRaftScheduler(m, p, 1)
stopper := stop.NewStopper()
ctx := context.Background()
defer stopper.Stop(ctx)
s.Start(ctx, stopper)

testCases := []struct {
state raftScheduleState
flag raftScheduleFlags
expected string
}{
{stateRaftReady, "ready=[1:1] request=[] tick=[]"},
Expand All @@ -270,7 +275,7 @@ func TestSchedulerBuffering(t *testing.T) {
}

for _, c := range testCases {
s.signal(s.enqueueN(c.state, 1, 1, 1, 1, 1))
s.signal(s.enqueueN(c.flag, 1, 1, 1, 1, 1))

testutils.SucceedsSoon(t, func() error {
if s := p.String(); c.expected != s {
Expand Down
4 changes: 4 additions & 0 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,10 @@ var charts = []sectionDescription{
Title: "Log Commit",
Metrics: []string{"raft.process.logcommit.latency"},
},
{
Title: "Scheduler",
Metrics: []string{"raft.scheduler.latency"},
},
},
},
{
Expand Down

0 comments on commit 5740050

Please sign in to comment.