diff --git a/pkg/sql/distsqlrun/flow_scheduler.go b/pkg/sql/distsqlrun/flow_scheduler.go index 09e002b84e88..067a2886eb04 100644 --- a/pkg/sql/distsqlrun/flow_scheduler.go +++ b/pkg/sql/distsqlrun/flow_scheduler.go @@ -123,6 +123,7 @@ func (fs *flowScheduler) ScheduleFlow(ctx context.Context, f *Flow) error { return fs.runFlowNow(ctx, f) } log.VEventf(ctx, 1, "flow scheduler enqueuing flow %s to be run later", f.id) + fs.metrics.FlowsQueued.Inc(1) fs.mu.queue.PushBack(&flowWithCtx{ ctx: ctx, flow: f, @@ -156,9 +157,12 @@ func (fs *flowScheduler) Start() { if frElem := fs.mu.queue.Front(); frElem != nil { n := frElem.Value.(*flowWithCtx) fs.mu.queue.Remove(frElem) + wait := timeutil.Since(n.enqueueTime) log.VEventf( - n.ctx, 1, "flow scheduler dequeued flow %s, spent %s in queue", n.flow.id, timeutil.Since(n.enqueueTime), + n.ctx, 1, "flow scheduler dequeued flow %s, spent %s in queue", n.flow.id, wait, ) + fs.metrics.FlowsQueued.Dec(1) + fs.metrics.QueueWaitHist.RecordValue(int64(wait)) // Note: we use the flow's context instead of the worker // context, to ensure that logging etc is relative to the // specific flow. diff --git a/pkg/sql/distsqlrun/metrics.go b/pkg/sql/distsqlrun/metrics.go index 6c0ac34f743a..9763339c9ad6 100644 --- a/pkg/sql/distsqlrun/metrics.go +++ b/pkg/sql/distsqlrun/metrics.go @@ -27,6 +27,8 @@ type DistSQLMetrics struct { QueriesTotal *metric.Counter FlowsActive *metric.Gauge FlowsTotal *metric.Counter + FlowsQueued *metric.Gauge + QueueWaitHist *metric.Histogram MaxBytesHist *metric.Histogram CurBytesCount *metric.Gauge } @@ -61,6 +63,18 @@ var ( Measurement: "Flows", Unit: metric.Unit_COUNT, } + metaFlowsQueued = metric.Metadata{ + Name: "sql.distsql.flows.queued", + Help: "Number of distributed SQL flows currently queued", + Measurement: "Flows", + Unit: metric.Unit_COUNT, + } + metaQueueWaitHist = metric.Metadata{ + Name: "sql.distsql.flows.queue_wait", + Help: "Duration of time flows spend waiting in the queue", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } metaMemMaxBytes = metric.Metadata{ Name: "sql.mem.distsql.max", Help: "Memory usage per sql statement for distsql", @@ -86,6 +100,8 @@ func MakeDistSQLMetrics(histogramWindow time.Duration) DistSQLMetrics { QueriesTotal: metric.NewCounter(metaQueriesTotal), FlowsActive: metric.NewGauge(metaFlowsActive), FlowsTotal: metric.NewCounter(metaFlowsTotal), + FlowsQueued: metric.NewGauge(metaFlowsQueued), + QueueWaitHist: metric.NewLatency(metaQueueWaitHist, histogramWindow), MaxBytesHist: metric.NewHistogram(metaMemMaxBytes, histogramWindow, log10int64times1000, 3), CurBytesCount: metric.NewGauge(metaMemCurBytes), }