Skip to content

Commit

Permalink
Merge pull request #17050 from couchand/feature/distsql-query-count
Browse files Browse the repository at this point in the history
distsql, ui: query count metrics
  • Loading branch information
couchand authored Jul 31, 2017
2 parents f6967b7 + 90e87c7 commit 2fed9e1
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 14 deletions.
11 changes: 5 additions & 6 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
)
rootSQLMemoryMonitor.Start(context.Background(), nil, mon.MakeStandaloneBudget(s.cfg.SQLMemoryPoolSize))

distSQLMetrics := sql.MakeMemMetrics("distsql", cfg.HistogramWindowInterval())
s.registry.AddMetric(distSQLMetrics.CurBytesCount)
s.registry.AddMetric(distSQLMetrics.MaxBytesHist)

// Set up the DistSQL temp engine.

// Check if all our configured stores are in-memory. if this is the case, we
Expand Down Expand Up @@ -313,6 +309,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
}
}

distSQLMetrics := distsqlrun.MakeDistSQLMetrics(cfg.HistogramWindowInterval())
s.registry.AddMetricStruct(distSQLMetrics)

// Set up the DistSQL server.
distSQLCfg := distsqlrun.ServerConfig{
AmbientContext: s.cfg.AmbientCtx,
Expand All @@ -326,8 +325,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
TempStorage: tempEngine,

ParentMemoryMonitor: &rootSQLMemoryMonitor,
Counter: distSQLMetrics.CurBytesCount,
Hist: distSQLMetrics.MaxBytesHist,

Metrics: &distSQLMetrics,
}
if distSQLTestingKnobs := s.cfg.TestingKnobs.DistSQL; distSQLTestingKnobs != nil {
distSQLCfg.TestingKnobs = *distSQLTestingKnobs.(*distsqlrun.TestingKnobs)
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ func (dsp *distSQLPlanner) Run(

log.VEvent(ctx, 1, "running DistSQL plan")

dsp.distSQLSrv.ServerConfig.Metrics.QueryStart()
defer dsp.distSQLSrv.ServerConfig.Metrics.QueryStop()

recv.resultToStreamColMap = plan.planToStreamColMap
thisNodeID := dsp.nodeDesc.NodeID

Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/distsqlrun/flow_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type flowScheduler struct {
log.AmbientContext
stopper *stop.Stopper
flowDoneCh chan *Flow
metrics *DistSQLMetrics

mu struct {
syncutil.Mutex
Expand All @@ -52,11 +53,14 @@ type flowWithCtx struct {
flow *Flow
}

func newFlowScheduler(ambient log.AmbientContext, stopper *stop.Stopper) *flowScheduler {
func newFlowScheduler(
ambient log.AmbientContext, stopper *stop.Stopper, metrics *DistSQLMetrics,
) *flowScheduler {
fs := &flowScheduler{
AmbientContext: ambient,
stopper: stopper,
flowDoneCh: make(chan *Flow, flowDoneChanSize),
metrics: metrics,
}
fs.mu.queue = list.New()
return fs
Expand All @@ -71,6 +75,7 @@ func (fs *flowScheduler) canRunFlow(_ *Flow) bool {
// runFlowNow starts the given flow; does not wait for the flow to complete.
func (fs *flowScheduler) runFlowNow(ctx context.Context, f *Flow) {
fs.mu.numRunning++
fs.metrics.FlowStart()
f.Start(ctx, func() { fs.flowDoneCh <- f })
// TODO(radu): we could replace the WaitGroup with a structure that keeps a
// refcount and automatically runs Cleanup() when the count reaches 0.
Expand Down Expand Up @@ -116,6 +121,7 @@ func (fs *flowScheduler) Start() {
case <-fs.flowDoneCh:
fs.mu.Lock()
fs.mu.numRunning--
fs.metrics.FlowStop()
if !stopped {
if frElem := fs.mu.queue.Front(); frElem != nil {
n := frElem.Value.(*flowWithCtx)
Expand Down
98 changes: 98 additions & 0 deletions pkg/sql/distsqlrun/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2017 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Andrew Dona-Couch ([email protected])

package distsqlrun

import (
"time"

"github.com/cockroachdb/cockroach/pkg/util/metric"
)

// DistSQLMetrics contains pointers to the metrics for
// monitoring DistSQL processing.
type DistSQLMetrics struct {
QueriesActive *metric.Gauge
QueriesTotal *metric.Counter
FlowsActive *metric.Gauge
FlowsTotal *metric.Counter
MaxBytesHist *metric.Histogram
CurBytesCount *metric.Counter
}

// MetricStruct implements the metrics.Struct interface.
func (DistSQLMetrics) MetricStruct() {}

var _ metric.Struct = DistSQLMetrics{}

var (
metaQueriesActive = metric.Metadata{
Name: "sql.distsql.queries.active",
Help: "Number of distributed SQL queries currently active"}
metaQueriesTotal = metric.Metadata{
Name: "sql.distsql.queries.total",
Help: "Number of distributed SQL queries executed"}
metaFlowsActive = metric.Metadata{
Name: "sql.distsql.flows.active",
Help: "Number of distributed SQL flows currently active"}
metaFlowsTotal = metric.Metadata{
Name: "sql.distsql.flows.total",
Help: "Number of distributed SQL flows executed"}
metaMemMaxBytes = metric.Metadata{
Name: "sql.mem.distsql.max",
Help: "Memory usage per sql statement for distsql"}
metaMemCurBytes = metric.Metadata{
Name: "sql.mem.distsql.current",
Help: "Current sql statement memory usage for distsql"}
)

// See pkg/sql/mem_metrics.go
// log10int64times1000 = log10(math.MaxInt64) * 1000, rounded up somewhat
const log10int64times1000 = 19 * 1000

// MakeDistSQLMetrics instantiates the metrics holder for DistSQL monitoring.
func MakeDistSQLMetrics(histogramWindow time.Duration) DistSQLMetrics {
return DistSQLMetrics{
QueriesActive: metric.NewGauge(metaQueriesActive),
QueriesTotal: metric.NewCounter(metaQueriesTotal),
FlowsActive: metric.NewGauge(metaFlowsActive),
FlowsTotal: metric.NewCounter(metaFlowsTotal),
MaxBytesHist: metric.NewHistogram(metaMemMaxBytes, histogramWindow, log10int64times1000, 3),
CurBytesCount: metric.NewCounter(metaMemCurBytes),
}
}

// QueryStart registers the start of a new DistSQL query.
func (m *DistSQLMetrics) QueryStart() {
m.QueriesActive.Inc(1)
m.QueriesTotal.Inc(1)
}

// QueryStop registers the end of a DistSQL query.
func (m *DistSQLMetrics) QueryStop() {
m.QueriesActive.Dec(1)
}

// FlowStart registers the start of a new DistSQL flow.
func (m *DistSQLMetrics) FlowStart() {
m.FlowsActive.Inc(1)
m.FlowsTotal.Inc(1)
}

// FlowStop registers the end of a DistSQL flow.
func (m *DistSQLMetrics) FlowStop() {
m.FlowsActive.Dec(1)
}
11 changes: 5 additions & 6 deletions pkg/sql/distsqlrun/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -95,14 +94,14 @@ type ServerConfig struct {
TestingKnobs TestingKnobs

ParentMemoryMonitor *mon.MemoryMonitor
Counter *metric.Counter
Hist *metric.Histogram

// TempStorage is used by some DistSQL processors to store rows when the
// working set is larger than can be stored in memory. It can be nil, if this
// cockroach node does not have an engine for temporary storage.
TempStorage engine.Engine

Metrics *DistSQLMetrics

// NodeID is the id of the node on which this Server is running.
NodeID *base.NodeIDContainer
ClusterID uuid.UUID
Expand All @@ -129,9 +128,9 @@ func NewServer(ctx context.Context, cfg ServerConfig) *ServerImpl {
ServerConfig: cfg,
regexpCache: parser.NewRegexpCache(512),
flowRegistry: makeFlowRegistry(),
flowScheduler: newFlowScheduler(cfg.AmbientContext, cfg.Stopper),
flowScheduler: newFlowScheduler(cfg.AmbientContext, cfg.Stopper, cfg.Metrics),
memMonitor: mon.MakeMonitor("distsql",
cfg.Counter, cfg.Hist, -1 /* increment: use default block size */, noteworthyMemoryUsageBytes),
cfg.Metrics.CurBytesCount, cfg.Metrics.MaxBytesHist, -1 /* increment: use default block size */, noteworthyMemoryUsageBytes),
tempStorage: cfg.TempStorage,
}
ds.memMonitor.Start(ctx, cfg.ParentMemoryMonitor, mon.BoundAccount{})
Expand Down Expand Up @@ -177,7 +176,7 @@ func (ds *ServerImpl) setupFlow(

// The monitor and account opened here are closed in Flow.Cleanup().
monitor := mon.MakeMonitor("flow",
ds.Counter, ds.Hist, -1 /* use default block size */, noteworthyMemoryUsageBytes)
ds.Metrics.CurBytesCount, ds.Metrics.MaxBytesHist, -1 /* use default block size */, noteworthyMemoryUsageBytes)
monitor.Start(ctx, &ds.memMonitor, mon.BoundAccount{})
acc := monitor.MakeBoundAccount()

Expand Down
2 changes: 1 addition & 1 deletion pkg/ui/embedded.go

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions pkg/ui/src/views/cluster/containers/nodeGraphs/dashboards/sql.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,34 @@ export default function (props: GraphDashboardProps) {
</Axis>
</LineGraph>,

<LineGraph
title="Active Distributed SQL Queries"
sources={nodeSources}
tooltip={`The total number of distributed SQL queries currently running ${tooltipSelection}.`}
>
<Axis>
<Metric name="cr.node.sql.distsql.queries.active" title="Active Queries" />
</Axis>
</LineGraph>,

<LineGraph
title="Active Flows for Distributed SQL Queries"
tooltip="The number of flows on each node contributing to currently running distributed SQL queries."
>
<Axis>
{
_.map(nodeIDs, (node) => (
<Metric
key={node}
name="cr.node.sql.distsql.flows.active"
title={nodeAddress(nodesSummary, node)}
sources={[node]}
/>
))
}
</Axis>
</LineGraph>,

<LineGraph
title="Service Latency: SQL, 99th percentile"
tooltip={(
Expand Down

0 comments on commit 2fed9e1

Please sign in to comment.