Skip to content

Commit

Permalink
Merge #34027
Browse files Browse the repository at this point in the history
34027: distsqlrun: add metrics for queue size and wait duration r=ajwerner a=ajwerner

Long queuing can lead to errors like described in #27746.
This change should give us more visibility into when queuing is occurring
and how problematic it is.

Release note: None

Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
craig[bot] and ajwerner committed Jan 16, 2019
2 parents c10ce74 + 418dbcd commit 7a3717d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
6 changes: 5 additions & 1 deletion pkg/sql/distsqlrun/flow_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (fs *flowScheduler) ScheduleFlow(ctx context.Context, f *Flow) error {
return fs.runFlowNow(ctx, f)
}
log.VEventf(ctx, 1, "flow scheduler enqueuing flow %s to be run later", f.id)
fs.metrics.FlowsQueued.Inc(1)
fs.mu.queue.PushBack(&flowWithCtx{
ctx: ctx,
flow: f,
Expand Down Expand Up @@ -156,9 +157,12 @@ func (fs *flowScheduler) Start() {
if frElem := fs.mu.queue.Front(); frElem != nil {
n := frElem.Value.(*flowWithCtx)
fs.mu.queue.Remove(frElem)
wait := timeutil.Since(n.enqueueTime)
log.VEventf(
n.ctx, 1, "flow scheduler dequeued flow %s, spent %s in queue", n.flow.id, timeutil.Since(n.enqueueTime),
n.ctx, 1, "flow scheduler dequeued flow %s, spent %s in queue", n.flow.id, wait,
)
fs.metrics.FlowsQueued.Dec(1)
fs.metrics.QueueWaitHist.RecordValue(int64(wait))
// Note: we use the flow's context instead of the worker
// context, to ensure that logging etc is relative to the
// specific flow.
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/distsqlrun/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type DistSQLMetrics struct {
QueriesTotal *metric.Counter
FlowsActive *metric.Gauge
FlowsTotal *metric.Counter
FlowsQueued *metric.Gauge
QueueWaitHist *metric.Histogram
MaxBytesHist *metric.Histogram
CurBytesCount *metric.Gauge
}
Expand Down Expand Up @@ -61,6 +63,18 @@ var (
Measurement: "Flows",
Unit: metric.Unit_COUNT,
}
metaFlowsQueued = metric.Metadata{
Name: "sql.distsql.flows.queued",
Help: "Number of distributed SQL flows currently queued",
Measurement: "Flows",
Unit: metric.Unit_COUNT,
}
metaQueueWaitHist = metric.Metadata{
Name: "sql.distsql.flows.queue_wait",
Help: "Duration of time flows spend waiting in the queue",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaMemMaxBytes = metric.Metadata{
Name: "sql.mem.distsql.max",
Help: "Memory usage per sql statement for distsql",
Expand All @@ -86,6 +100,8 @@ func MakeDistSQLMetrics(histogramWindow time.Duration) DistSQLMetrics {
QueriesTotal: metric.NewCounter(metaQueriesTotal),
FlowsActive: metric.NewGauge(metaFlowsActive),
FlowsTotal: metric.NewCounter(metaFlowsTotal),
FlowsQueued: metric.NewGauge(metaFlowsQueued),
QueueWaitHist: metric.NewLatency(metaQueueWaitHist, histogramWindow),
MaxBytesHist: metric.NewHistogram(metaMemMaxBytes, histogramWindow, log10int64times1000, 3),
CurBytesCount: metric.NewGauge(metaMemCurBytes),
}
Expand Down

0 comments on commit 7a3717d

Please sign in to comment.