Skip to content

Commit

Permalink
colexec, colflow: add network latency stat to streams
Browse files Browse the repository at this point in the history
This commit adds the information about network latency from an outbox to inbox
on EXPLAIN ANALYZE diagrams.

Release note (sql change): EXPLAIN ANALYZE diagrams now contain "network
latency" information on streams.
  • Loading branch information
cathymw committed Oct 20, 2020
1 parent 7a576e8 commit aed4a8e
Show file tree
Hide file tree
Showing 17 changed files with 617 additions and 259 deletions.
23 changes: 12 additions & 11 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,17 +342,18 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) {

// Set up the DistSQL server.
distSQLCfg := execinfra.ServerConfig{
AmbientContext: cfg.AmbientCtx,
Settings: cfg.Settings,
RuntimeStats: cfg.runtime,
ClusterID: &cfg.rpcContext.ClusterID,
ClusterName: cfg.ClusterName,
NodeID: cfg.nodeIDContainer,
Codec: codec,
DB: cfg.db,
Executor: cfg.circularInternalExecutor,
RPCContext: cfg.rpcContext,
Stopper: cfg.stopper,
AmbientContext: cfg.AmbientCtx,
Settings: cfg.Settings,
RuntimeStats: cfg.runtime,
ClusterID: &cfg.rpcContext.ClusterID,
ClusterName: cfg.ClusterName,
NodeID: cfg.nodeIDContainer,
Codec: codec,
DB: cfg.db,
Executor: cfg.circularInternalExecutor,
RPCContext: cfg.rpcContext,
Stopper: cfg.stopper,
NodesStatusServer: cfg.nodesStatusServer,

TempStorage: tempEngine,
TempStoragePath: cfg.TempStorageConfig.Path,
Expand Down
66 changes: 45 additions & 21 deletions pkg/sql/colexec/execpb/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,42 @@ var _ tracing.SpanStats = &VectorizedStats{}
var _ execinfrapb.DistSQLSpanStats = &VectorizedStats{}

const (
batchesOutputTagSuffix = "output.batches"
tuplesOutputTagSuffix = "output.tuples"
ioTimeTagSuffix = "time.io"
executionTimeTagSuffix = "time.execution"
maxVecMemoryBytesTagSuffix = "mem.vectorized.max"
maxVecDiskBytesTagSuffix = "disk.vectorized.max"
bytesReadTagSuffix = "bytes.read"
rowsReadTagSuffix = "rows.read"
batchesOutputTagSuffix = "output.batches"
tuplesOutputTagSuffix = "output.tuples"
ioTimeTagSuffix = "time.io"
executionTimeTagSuffix = "time.execution"
deserializationTimeTagSuffix = "time.deserialization"
maxVecMemoryBytesTagSuffix = "mem.vectorized.max"
maxVecDiskBytesTagSuffix = "disk.vectorized.max"
bytesReadTagSuffix = "bytes.read"
rowsReadTagSuffix = "rows.read"
networkLatencyTagSuffix = "network.latency"
)

// Stats is part of SpanStats interface.
func (vs *VectorizedStats) Stats() map[string]string {
var timeSuffix string
if vs.IO {
timeSuffix = ioTimeTagSuffix
if vs.OnStream {
timeSuffix = deserializationTimeTagSuffix
} else {
timeSuffix = ioTimeTagSuffix
}
} else {
timeSuffix = executionTimeTagSuffix
}
stats := map[string]string{
batchesOutputTagSuffix: fmt.Sprintf("%d", vs.NumBatches),
tuplesOutputTagSuffix: fmt.Sprintf("%d", vs.NumTuples),
timeSuffix: fmt.Sprintf("%v", vs.Time.Round(time.Microsecond)),
maxVecMemoryBytesTagSuffix: fmt.Sprintf("%d", vs.MaxAllocatedMem),
maxVecDiskBytesTagSuffix: fmt.Sprintf("%d", vs.MaxAllocatedDisk),
}
if vs.OnStream {
stats[networkLatencyTagSuffix] = fmt.Sprintf("%v", time.Duration(vs.NetworkLatency).Round(time.Microsecond))
stats[timeSuffix] = fmt.Sprintf("%v", (vs.Time - time.Duration(vs.NetworkLatency*vs.NumBatches)).Round(time.Microsecond))
} else {
stats[tuplesOutputTagSuffix] = fmt.Sprintf("%d", vs.NumTuples)
}
if vs.BytesRead != 0 {
stats[bytesReadTagSuffix] = humanizeutil.IBytes(vs.BytesRead)
}
Expand All @@ -58,28 +69,41 @@ func (vs *VectorizedStats) Stats() map[string]string {
}

const (
batchesOutputQueryPlanSuffix = "batches output"
tuplesOutputQueryPlanSuffix = "tuples output"
ioTimeQueryPlanSuffix = "IO time"
executionTimeQueryPlanSuffix = "execution time"
maxVecMemoryBytesQueryPlanSuffix = "max vectorized memory allocated"
maxVecDiskBytesQueryPlanSuffix = "max vectorized disk allocated"
bytesReadQueryPlanSuffix = "bytes read"
rowsReadQueryPlanSuffix = "rows read"
batchesOutputQueryPlanSuffix = "batches output"
tuplesOutputQueryPlanSuffix = "tuples output"
ioTimeQueryPlanSuffix = "IO time"
executionTimeQueryPlanSuffix = "execution time"
deserializationTimeQueryPlanSuffix = "deserialization time"
maxVecMemoryBytesQueryPlanSuffix = "max vectorized memory allocated"
maxVecDiskBytesQueryPlanSuffix = "max vectorized disk allocated"
bytesReadQueryPlanSuffix = "bytes read"
rowsReadQueryPlanSuffix = "rows read"
networkLatencyQueryPlanSuffix = "network latency"
)

// StatsForQueryPlan is part of DistSQLSpanStats interface.
func (vs *VectorizedStats) StatsForQueryPlan() []string {
var timeSuffix string
if vs.IO {
timeSuffix = ioTimeQueryPlanSuffix
if vs.OnStream {
timeSuffix = deserializationTimeQueryPlanSuffix
} else {
timeSuffix = ioTimeQueryPlanSuffix
}
} else {
timeSuffix = executionTimeQueryPlanSuffix
}
stats := []string{
fmt.Sprintf("%s: %d", batchesOutputQueryPlanSuffix, vs.NumBatches),
fmt.Sprintf("%s: %d", tuplesOutputQueryPlanSuffix, vs.NumTuples),
fmt.Sprintf("%s: %v", timeSuffix, vs.Time.Round(time.Microsecond)),
}
if vs.OnStream {
stats = append(stats,
fmt.Sprintf("%s: %v", timeSuffix, (vs.Time-time.Duration(vs.NetworkLatency*vs.NumBatches)).Round(time.Microsecond)),
fmt.Sprintf("%s: %v", networkLatencyQueryPlanSuffix, time.Duration(vs.NetworkLatency).Round(time.Microsecond)))
} else {
stats = append(stats,
fmt.Sprintf("%s: %d", tuplesOutputQueryPlanSuffix, vs.NumTuples),
fmt.Sprintf("%s: %v", timeSuffix, vs.Time.Round(time.Microsecond)))
}
if vs.MaxAllocatedMem != 0 {
stats = append(stats,
Expand Down
125 changes: 97 additions & 28 deletions pkg/sql/colexec/execpb/stats.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pkg/sql/colexec/execpb/stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,10 @@ message VectorizedStats {
// rows_read is the number of rows read from the disk. It is set only when
// io is true.
int64 rows_read = 9;
// on_stream indicates whether the stats are shown on a stream. This decides
// if network latency should be shown or not.
bool on_stream = 10;
// network_latency is the latency time in nanoseconds between outbox and inbox.
// It is set only when on_stream is true.
int64 network_latency = 11;
}
21 changes: 17 additions & 4 deletions pkg/sql/colexec/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,16 @@ import (
"github.com/cockroachdb/errors"
)

// The NetworkReader interface only exists to avoid an import cycle with
// with the colrpc file. This interface should only be implemented by the inbox.
type NetworkReader interface {
GetLatency() int64
}

// VectorizedStatsCollector collects VectorizedStats on Operators.
//
// If two Operators are connected (i.e. one is an input to another), the
// corresponding VectorizedStatsCollectors are also "connected" by sharing a
// StopWatch.
// TODO(cathymw): refactor this class into a base and specialized stats
// collectors.
type VectorizedStatsCollector struct {
colexecbase.Operator
NonExplainable
Expand All @@ -51,6 +56,8 @@ type VectorizedStatsCollector struct {

memMonitors []*mon.BytesMonitor
diskMonitors []*mon.BytesMonitor

networkReader NetworkReader
}

var _ colexecbase.Operator = &VectorizedStatsCollector{}
Expand All @@ -69,6 +76,7 @@ func NewVectorizedStatsCollector(
memMonitors []*mon.BytesMonitor,
diskMonitors []*mon.BytesMonitor,
inputStatsCollectors []*VectorizedStatsCollector,
networkReader NetworkReader,
) *VectorizedStatsCollector {
if inputWatch == nil {
colexecerror.InternalError(errors.AssertionFailedf("input watch for VectorizedStatsCollector is nil"))
Expand All @@ -89,13 +97,14 @@ func NewVectorizedStatsCollector(
}
return &VectorizedStatsCollector{
Operator: op,
VectorizedStats: execpb.VectorizedStats{ID: id, IO: ioTime},
VectorizedStats: execpb.VectorizedStats{ID: id, IO: ioTime, OnStream: networkReader != nil},
idTagKey: idTagKey,
ioReader: ioReader,
stopwatch: inputWatch,
memMonitors: memMonitors,
diskMonitors: diskMonitors,
childStatsCollectors: inputStatsCollectors,
networkReader: networkReader,
}
}

Expand Down Expand Up @@ -138,6 +147,9 @@ func (vsc *VectorizedStatsCollector) finalizeStats() {
// themselves).
vsc.RowsRead = vsc.ioReader.GetRowsRead()
}
if vsc.networkReader != nil {
vsc.NetworkLatency = vsc.networkReader.GetLatency()
}
}

// OutputStats outputs the vectorized stats collected by vsc into ctx.
Expand All @@ -163,6 +175,7 @@ func (vsc *VectorizedStatsCollector) OutputStats(
vsc.MaxAllocatedDisk = 0
vsc.NumBatches = 0
vsc.BytesRead = 0
vsc.NetworkLatency = 0
}
tracing.SetSpanStats(span, &vsc.VectorizedStats)
span.Finish()
Expand Down
Loading

0 comments on commit aed4a8e

Please sign in to comment.