Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colexec, colflow: add network latency stat to streams #55705

Merged
merged 1 commit into from
Nov 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) {
RPCContext: cfg.rpcContext,
Stopper: cfg.stopper,

LatencyGetter: &serverpb.LatencyGetter{
NodesStatusServer: &cfg.nodesStatusServer,
},

TempStorage: tempEngine,
TempStoragePath: cfg.TempStorageConfig.Path,
TempFS: tempFS,
Expand Down
1 change: 1 addition & 0 deletions pkg/server/serverpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"//pkg/util/errorutil",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"//vendor/github.com/gogo/protobuf/proto",
"//vendor/github.com/gogo/protobuf/sortkeys",
Expand Down
49 changes: 49 additions & 0 deletions pkg/server/serverpb/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ package serverpb

import (
context "context"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

// SQLStatusServer is a smaller version of the serverpb.StatusInterface which
Expand Down Expand Up @@ -61,3 +64,49 @@ func (s *OptionalNodesStatusServer) OptionalNodesStatusServer(
}
return v.(NodesStatusServer), nil
}

// LatencyGetter stores the map of latencies obtained from the NodesStatusServer.
// These latencies are displayed on the streams of EXPLAIN ANALYZE diagrams.
// This struct is put here to avoid import cycles.
type LatencyGetter struct {
latencyMap map[roachpb.NodeID]map[roachpb.NodeID]int64
lastUpdatedTime time.Time
NodesStatusServer *OptionalNodesStatusServer
}

const updateThreshold = 5 * time.Second

// GetLatency is a helper function that updates the latencies between nodes
// if the time since the last update exceeds the updateThreshold. This function
// returns the latency between the origin node and the target node.
func (lg *LatencyGetter) GetLatency(
ctx context.Context, originNodeID roachpb.NodeID, targetNodeID roachpb.NodeID,
) int64 {
if timeutil.Since(lg.lastUpdatedTime) < updateThreshold {
return lg.latencyMap[originNodeID][targetNodeID]
}
// Update latencies in latencyMap.
ss, err := lg.NodesStatusServer.OptionalNodesStatusServer(errorutil.FeatureNotAvailableToNonSystemTenantsIssue)
if err != nil {
// When latency is 0, it is not shown on EXPLAIN ANALYZE diagrams.
return 0
}
if lg.latencyMap == nil {
lg.latencyMap = make(map[roachpb.NodeID]map[roachpb.NodeID]int64)
}
response, _ := ss.Nodes(ctx, &NodesRequest{})
for _, sendingNode := range response.Nodes {
sendingNodeID := sendingNode.Desc.NodeID
if lg.latencyMap[sendingNodeID] == nil {
lg.latencyMap[sendingNodeID] = make(map[roachpb.NodeID]int64)
}
for _, receivingNode := range response.Nodes {
receivingNodeID := receivingNode.Desc.NodeID
if sendingNodeID != receivingNodeID {
lg.latencyMap[sendingNodeID][receivingNodeID] = sendingNode.Activity[receivingNodeID].Latency
}
}
}
lg.lastUpdatedTime = timeutil.Now()
return lg.latencyMap[originNodeID][targetNodeID]
}
44 changes: 44 additions & 0 deletions pkg/sql/colexec/execpb/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

var _ tracing.SpanStats = &VectorizedStats{}
var _ execinfrapb.DistSQLSpanStats = &VectorizedStats{}
var _ tracing.SpanStats = &VectorizedInboxStats{}
var _ execinfrapb.DistSQLSpanStats = &VectorizedInboxStats{}

const (
batchesOutputTagSuffix = "output.batches"
Expand All @@ -31,6 +33,7 @@ const (
maxVecDiskBytesTagSuffix = "disk.vectorized.max"
bytesReadTagSuffix = "bytes.read"
rowsReadTagSuffix = "rows.read"
networkLatencyTagSuffix = "network.latency"
)

// Stats is part of SpanStats interface.
Expand All @@ -57,6 +60,26 @@ func (vs *VectorizedStats) Stats() map[string]string {
return stats
}

// Stats is part of SpanStats interface.
func (vs *VectorizedInboxStats) Stats() map[string]string {
stats := map[string]string{
batchesOutputTagSuffix: fmt.Sprintf("%d", vs.BaseVectorizedStats.NumBatches),
// TODO(cathymw): Have inbox collect its own deserialization time with a
// timer and display deserialization time instead of ioTime.
ioTimeTagSuffix: fmt.Sprintf("%v", vs.BaseVectorizedStats.Time.Round(time.Microsecond)),
}
if vs.BaseVectorizedStats.BytesRead != 0 {
stats[bytesReadTagSuffix] = humanizeutil.IBytes(vs.BaseVectorizedStats.BytesRead)
}
if vs.BaseVectorizedStats.RowsRead != 0 {
stats[rowsReadTagSuffix] = fmt.Sprintf("%d", vs.BaseVectorizedStats.RowsRead)
}
if vs.NetworkLatency != 0 {
stats[networkLatencyTagSuffix] = fmt.Sprintf("%v", time.Duration(vs.NetworkLatency).Round(time.Microsecond))
}
return stats
}

const (
batchesOutputQueryPlanSuffix = "batches output"
tuplesOutputQueryPlanSuffix = "tuples output"
Expand All @@ -66,6 +89,7 @@ const (
maxVecDiskBytesQueryPlanSuffix = "max vectorized disk allocated"
bytesReadQueryPlanSuffix = "bytes read"
rowsReadQueryPlanSuffix = "rows read"
networkLatencyQueryPlanSuffix = "network latency"
)

// StatsForQueryPlan is part of DistSQLSpanStats interface.
Expand Down Expand Up @@ -98,3 +122,23 @@ func (vs *VectorizedStats) StatsForQueryPlan() []string {
}
return stats
}

// StatsForQueryPlan is part of DistSQLSpanStats interface.
func (vs *VectorizedInboxStats) StatsForQueryPlan() []string {
stats := []string{
fmt.Sprintf("%s: %d", batchesOutputQueryPlanSuffix, vs.BaseVectorizedStats.NumBatches),
fmt.Sprintf("%s: %v", ioTimeQueryPlanSuffix, vs.BaseVectorizedStats.Time.Round(time.Microsecond)),
}
if vs.BaseVectorizedStats.BytesRead != 0 {
stats = append(stats,
fmt.Sprintf("%s: %s", bytesReadQueryPlanSuffix, humanizeutil.IBytes(vs.BaseVectorizedStats.BytesRead)))
}
if vs.BaseVectorizedStats.RowsRead != 0 {
stats = append(stats, fmt.Sprintf("%s: %d", rowsReadQueryPlanSuffix, vs.BaseVectorizedStats.RowsRead))
}
if vs.NetworkLatency != 0 {
stats = append(stats,
fmt.Sprintf("%s: %v", networkLatencyQueryPlanSuffix, time.Duration(vs.NetworkLatency).Round(time.Microsecond)))
}
return stats
}
Loading