Skip to content

Commit

Permalink
colexec, colflow: add network latency stat to streams and refactor
Browse files Browse the repository at this point in the history
This commit adds the information about network latency from an outbox to inbox
on EXPLAIN ANALYZE diagrams.

This commit also adds the LatencyGetter helper to store node latencies
in a map. Rather than updating the latency map multiple times per query,
the latency map is updated if it hasn't been updated in over one second.

This commit refactors the VectorizedStatsCollector type into a base struct
and a specialized stats collector, NetworkVectorizedStatsCollector, for
collecting network latency on streams. Since the networkReader is only
used for the case where stats are collected for streams, this refactor
avoids the need to pass in nils for other instance where stats collectors
are used.

This commit also adds a separate implementation of the SpanStats interface
called VectorizedInboxStats. This change allows for us to choose which stats
to display on EXPLAIN ANALYZE diagrams for streams.

Release note (sql change): EXPLAIN ANALYZE diagrams now contain "network
latency" information on streams.
  • Loading branch information
cathymw committed Nov 3, 2020
1 parent 460c497 commit 4ea339a
Show file tree
Hide file tree
Showing 19 changed files with 734 additions and 267 deletions.
4 changes: 4 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) {
RPCContext: cfg.rpcContext,
Stopper: cfg.stopper,

LatencyGetter: &serverpb.LatencyGetter{
NodesStatusServer: &cfg.nodesStatusServer,
},

TempStorage: tempEngine,
TempStoragePath: cfg.TempStorageConfig.Path,
TempFS: tempFS,
Expand Down
1 change: 1 addition & 0 deletions pkg/server/serverpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"//pkg/util/errorutil",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"//vendor/github.com/gogo/protobuf/proto",
"//vendor/github.com/gogo/protobuf/sortkeys",
Expand Down
49 changes: 49 additions & 0 deletions pkg/server/serverpb/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ package serverpb

import (
context "context"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

// SQLStatusServer is a smaller version of the serverpb.StatusInterface which
Expand Down Expand Up @@ -61,3 +64,49 @@ func (s *OptionalNodesStatusServer) OptionalNodesStatusServer(
}
return v.(NodesStatusServer), nil
}

// LatencyGetter stores the map of latencies obtained from the NodesStatusServer.
// These latencies are displayed on the streams of EXPLAIN ANALYZE diagrams.
// This struct is put here to avoid import cycles.
type LatencyGetter struct {
latencyMap map[roachpb.NodeID]map[roachpb.NodeID]int64
lastUpdatedTime time.Time
NodesStatusServer *OptionalNodesStatusServer
}

const updateThreshold = 5 * time.Second

// GetLatency is a helper function that updates the latencies between nodes
// if the time since the last update exceeds the updateThreshold. This function
// returns the latency between the origin node and the target node.
func (lg *LatencyGetter) GetLatency(
ctx context.Context, originNodeID roachpb.NodeID, targetNodeID roachpb.NodeID,
) int64 {
if timeutil.Since(lg.lastUpdatedTime) < updateThreshold {
return lg.latencyMap[originNodeID][targetNodeID]
}
// Update latencies in latencyMap.
ss, err := lg.NodesStatusServer.OptionalNodesStatusServer(errorutil.FeatureNotAvailableToNonSystemTenantsIssue)
if err != nil {
// When latency is 0, it is not shown on EXPLAIN ANALYZE diagrams.
return 0
}
if lg.latencyMap == nil {
lg.latencyMap = make(map[roachpb.NodeID]map[roachpb.NodeID]int64)
}
response, _ := ss.Nodes(ctx, &NodesRequest{})
for _, sendingNode := range response.Nodes {
sendingNodeID := sendingNode.Desc.NodeID
if lg.latencyMap[sendingNodeID] == nil {
lg.latencyMap[sendingNodeID] = make(map[roachpb.NodeID]int64)
}
for _, receivingNode := range response.Nodes {
receivingNodeID := receivingNode.Desc.NodeID
if sendingNodeID != receivingNodeID {
lg.latencyMap[sendingNodeID][receivingNodeID] = sendingNode.Activity[receivingNodeID].Latency
}
}
}
lg.lastUpdatedTime = timeutil.Now()
return lg.latencyMap[originNodeID][targetNodeID]
}
44 changes: 44 additions & 0 deletions pkg/sql/colexec/execpb/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

var _ tracing.SpanStats = &VectorizedStats{}
var _ execinfrapb.DistSQLSpanStats = &VectorizedStats{}
var _ tracing.SpanStats = &VectorizedInboxStats{}
var _ execinfrapb.DistSQLSpanStats = &VectorizedInboxStats{}

const (
batchesOutputTagSuffix = "output.batches"
Expand All @@ -31,6 +33,7 @@ const (
maxVecDiskBytesTagSuffix = "disk.vectorized.max"
bytesReadTagSuffix = "bytes.read"
rowsReadTagSuffix = "rows.read"
networkLatencyTagSuffix = "network.latency"
)

// Stats is part of SpanStats interface.
Expand All @@ -57,6 +60,26 @@ func (vs *VectorizedStats) Stats() map[string]string {
return stats
}

// Stats is part of SpanStats interface.
func (vs *VectorizedInboxStats) Stats() map[string]string {
stats := map[string]string{
batchesOutputTagSuffix: fmt.Sprintf("%d", vs.BaseVectorizedStats.NumBatches),
// TODO(cathymw): Have inbox collect its own deserialization time with a
// timer and display deserialization time instead of ioTime.
ioTimeTagSuffix: fmt.Sprintf("%v", vs.BaseVectorizedStats.Time.Round(time.Microsecond)),
}
if vs.BaseVectorizedStats.BytesRead != 0 {
stats[bytesReadTagSuffix] = humanizeutil.IBytes(vs.BaseVectorizedStats.BytesRead)
}
if vs.BaseVectorizedStats.RowsRead != 0 {
stats[rowsReadTagSuffix] = fmt.Sprintf("%d", vs.BaseVectorizedStats.RowsRead)
}
if vs.NetworkLatency != 0 {
stats[networkLatencyTagSuffix] = fmt.Sprintf("%v", time.Duration(vs.NetworkLatency).Round(time.Microsecond))
}
return stats
}

const (
batchesOutputQueryPlanSuffix = "batches output"
tuplesOutputQueryPlanSuffix = "tuples output"
Expand All @@ -66,6 +89,7 @@ const (
maxVecDiskBytesQueryPlanSuffix = "max vectorized disk allocated"
bytesReadQueryPlanSuffix = "bytes read"
rowsReadQueryPlanSuffix = "rows read"
networkLatencyQueryPlanSuffix = "network latency"
)

// StatsForQueryPlan is part of DistSQLSpanStats interface.
Expand Down Expand Up @@ -98,3 +122,23 @@ func (vs *VectorizedStats) StatsForQueryPlan() []string {
}
return stats
}

// StatsForQueryPlan is part of DistSQLSpanStats interface.
func (vs *VectorizedInboxStats) StatsForQueryPlan() []string {
stats := []string{
fmt.Sprintf("%s: %d", batchesOutputQueryPlanSuffix, vs.BaseVectorizedStats.NumBatches),
fmt.Sprintf("%s: %v", ioTimeQueryPlanSuffix, vs.BaseVectorizedStats.Time.Round(time.Microsecond)),
}
if vs.BaseVectorizedStats.BytesRead != 0 {
stats = append(stats,
fmt.Sprintf("%s: %s", bytesReadQueryPlanSuffix, humanizeutil.IBytes(vs.BaseVectorizedStats.BytesRead)))
}
if vs.BaseVectorizedStats.RowsRead != 0 {
stats = append(stats, fmt.Sprintf("%s: %d", rowsReadQueryPlanSuffix, vs.BaseVectorizedStats.RowsRead))
}
if vs.NetworkLatency != 0 {
stats = append(stats,
fmt.Sprintf("%s: %v", networkLatencyQueryPlanSuffix, time.Duration(vs.NetworkLatency).Round(time.Microsecond)))
}
return stats
}
Loading

0 comments on commit 4ea339a

Please sign in to comment.