Skip to content

Commit

Permalink
Merge pull request cockroachdb#68112 from cockroachdb/blathers/backpo…
Browse files Browse the repository at this point in the history
…rt-release-21.1-62467

release-21.1: sql: add times series metrics for disk spilling
  • Loading branch information
yuzefovich authored Jul 28, 2021
2 parents e351634 + 540a6ed commit 82f3ddb
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 2 deletions.
2 changes: 2 additions & 0 deletions pkg/sql/colcontainer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ go_library(
"//pkg/col/coldata",
"//pkg/col/colserde",
"//pkg/sql/colexecerror",
"//pkg/sql/execinfra",
"//pkg/sql/types",
"//pkg/storage/fs",
"//pkg/util/mon",
"//pkg/util/syncutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_golang_snappy//:snappy",
Expand Down
29 changes: 29 additions & 0 deletions pkg/sql/colcontainer/diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import (

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/colserde"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/golang/snappy"
Expand Down Expand Up @@ -191,6 +193,14 @@ type diskQueue struct {
scratchDecompressedReadBytes []byte

diskAcc *mon.BoundAccount

// diskSpillingMetricsHelper keeps track of various disk spilling metrics.
diskSpillingMetricsHelper struct {
mu struct {
syncutil.Mutex
querySpilled bool
}
}
}

var _ RewindableQueue = &diskQueue{}
Expand Down Expand Up @@ -296,6 +306,9 @@ func GetPatherFunc(f func(ctx context.Context) string) GetPather {
type DiskQueueCfg struct {
// FS is the filesystem interface to use.
FS fs.FS
// DistSQLMetrics contains metrics for monitoring DistSQL processing. This
// can be nil if these metrics are not needed.
DistSQLMetrics *execinfra.DistSQLMetrics
// GetPather returns where the temporary directory that will contain this
// DiskQueue's files has been created. The directory name will be a UUID.
// Note that the directory is created lazily on the first call to GetPath.
Expand Down Expand Up @@ -363,6 +376,15 @@ func NewRewindableDiskQueue(
return d, nil
}

func (d *diskQueue) querySpilled() {
d.diskSpillingMetricsHelper.mu.Lock()
defer d.diskSpillingMetricsHelper.mu.Unlock()
if d.cfg.DistSQLMetrics != nil && !d.diskSpillingMetricsHelper.mu.querySpilled {
d.diskSpillingMetricsHelper.mu.querySpilled = true
d.cfg.DistSQLMetrics.QueriesSpilled.Inc(1)
}
}

func newDiskQueue(
ctx context.Context, typs []*types.T, cfg DiskQueueCfg, diskAcc *mon.BoundAccount,
) (*diskQueue, error) {
Expand All @@ -385,6 +407,7 @@ func newDiskQueue(
if err := cfg.FS.MkdirAll(filepath.Join(cfg.GetPather.GetPath(ctx), d.dirName)); err != nil {
return nil, err
}
d.querySpilled()
// rotateFile will create a new file to write to.
return d, d.rotateFile(ctx)
}
Expand Down Expand Up @@ -520,6 +543,9 @@ func (d *diskQueue) writeFooterAndFlush(ctx context.Context) (err error) {
}
d.numBufferedBatches = 0
d.files[d.writeFileIdx].totalSize += written
if d.cfg.DistSQLMetrics != nil {
d.cfg.DistSQLMetrics.SpilledBytesWritten.Inc(int64(written))
}
if err := d.diskAcc.Grow(ctx, int64(written)); err != nil {
return err
}
Expand Down Expand Up @@ -646,6 +672,9 @@ func (d *diskQueue) maybeInitDeserializer(ctx context.Context) (bool, error) {
if err != nil && err != io.EOF {
return false, err
}
if d.cfg.DistSQLMetrics != nil {
d.cfg.DistSQLMetrics.SpilledBytesRead.Inc(int64(n))
}
if n != len(d.writer.scratch.compressedBuf) {
return false, errors.Errorf("expected to read %d bytes but read %d", len(d.writer.scratch.compressedBuf), n)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,9 @@ func (f *vectorizedFlow) Setup(
helper := newVectorizedFlowCreatorHelper(f.FlowBase)

diskQueueCfg := colcontainer.DiskQueueCfg{
FS: f.Cfg.TempFS,
GetPather: f,
FS: f.Cfg.TempFS,
DistSQLMetrics: f.Cfg.Metrics,
GetPather: f,
}
if err := diskQueueCfg.EnsureDefaults(); err != nil {
return ctx, err
Expand Down
24 changes: 24 additions & 0 deletions pkg/sql/execinfra/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type DistSQLMetrics struct {
VecOpenFDs *metric.Gauge
CurDiskBytesCount *metric.Gauge
MaxDiskBytesHist *metric.Histogram
QueriesSpilled *metric.Counter
SpilledBytesWritten *metric.Counter
SpilledBytesRead *metric.Counter
}

// MetricStruct implements the metrics.Struct interface.
Expand Down Expand Up @@ -111,6 +114,24 @@ var (
Measurement: "Disk",
Unit: metric.Unit_BYTES,
}
metaQueriesSpilled = metric.Metadata{
Name: "sql.distsql.queries.spilled",
Help: "Number of queries that have spilled to disk",
Measurement: "Queries",
Unit: metric.Unit_COUNT,
}
metaSpilledBytesWritten = metric.Metadata{
Name: "sql.disk.distsql.spilled.bytes.written",
Help: "Number of bytes written to temporary disk storage as a result of spilling",
Measurement: "Disk",
Unit: metric.Unit_BYTES,
}
metaSpilledBytesRead = metric.Metadata{
Name: "sql.disk.distsql.spilled.bytes.read",
Help: "Number of bytes read from temporary disk storage as a result of spilling",
Measurement: "Disk",
Unit: metric.Unit_BYTES,
}
)

// See pkg/sql/mem_metrics.go
Expand All @@ -132,6 +153,9 @@ func MakeDistSQLMetrics(histogramWindow time.Duration) DistSQLMetrics {
VecOpenFDs: metric.NewGauge(metaVecOpenFDs),
CurDiskBytesCount: metric.NewGauge(metaDiskCurBytes),
MaxDiskBytesHist: metric.NewHistogram(metaDiskMaxBytes, histogramWindow, log10int64times1000, 3),
QueriesSpilled: metric.NewCounter(metaQueriesSpilled),
SpilledBytesWritten: metric.NewCounter(metaSpilledBytesWritten),
SpilledBytesRead: metric.NewCounter(metaSpilledBytesRead),
}
}

Expand Down
12 changes: 12 additions & 0 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1710,6 +1710,18 @@ var charts = []sectionDescription{
Title: "Disk Usage per Statement",
Metrics: []string{"sql.disk.distsql.max"},
},
{
Title: "Number of Queries Spilled To Disk",
Metrics: []string{"sql.distsql.queries.spilled"},
},
{
Title: "Number of Bytes Written Due to Disk Spilling",
Metrics: []string{"sql.disk.distsql.spilled.bytes.written"},
},
{
Title: "Number of Bytes Read Due to Disk Spilling",
Metrics: []string{"sql.disk.distsql.spilled.bytes.read"},
},
},
},
{
Expand Down

0 comments on commit 82f3ddb

Please sign in to comment.