Skip to content

Commit

Permalink
colexec, colflow: refactor VectorizedStatsCollector, VectorizedStats,…
Browse files Browse the repository at this point in the history
… and add LatencyGetter

This commit refactors the VectorizedStatsCollector type into a base struct
and a specialized stats collector, NetworkVectorizedStatsCollector, for
collecting network latency on streams. Since the networkReader is only
used for the case where stats are collected for streams, this refactor
avoids the need to pass in nils for other instance where stats collectors
are used.

This commit also adds a separate implementation of the SpanStats interface
called VectorizedInboxStats. This change allows for us to choose which stats
to display on EXPLAIN ANALYZE diagrams for streams.

This commit also adds the LatencyGetter helper to store node latencies
in a map. Rather than updating the latency map multiple times per query,
the latency map is updated if it hasn't been updated in over one second.

Release note: None
  • Loading branch information
cathymw committed Oct 27, 2020
1 parent cedf007 commit 264d418
Show file tree
Hide file tree
Showing 13 changed files with 642 additions and 348 deletions.
14 changes: 8 additions & 6 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ package server
import (
"context"
"fmt"
"math"
"net"
"os"
"path/filepath"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/blobs"
"github.com/cockroachdb/cockroach/pkg/blobs/blobspb"
Expand Down Expand Up @@ -69,6 +64,10 @@ import (
"github.com/cockroachdb/errors"
"github.com/marusama/semaphore"
"google.golang.org/grpc"
"math"
"net"
"os"
"path/filepath"
)

type sqlServer struct {
Expand Down Expand Up @@ -353,7 +352,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) {
Executor: cfg.circularInternalExecutor,
RPCContext: cfg.rpcContext,
Stopper: cfg.stopper,
NodesStatusServer: cfg.nodesStatusServer,

LatencyGetter: &serverpb.LatencyGetter{
NodesStatusServer: &cfg.nodesStatusServer,
},

TempStorage: tempEngine,
TempStoragePath: cfg.TempStorageConfig.Path,
Expand Down
50 changes: 49 additions & 1 deletion pkg/server/serverpb/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ package serverpb

import (
context "context"

"fmt"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"time"
)

// SQLStatusServer is a smaller version of the serverpb.StatusInterface which
Expand Down Expand Up @@ -61,3 +63,49 @@ func (s *OptionalNodesStatusServer) OptionalNodesStatusServer(
}
return v.(NodesStatusServer), nil
}

// LatencyGetter stores the map of latencies obtained from the NodesStatusServer.
// These latencies are displayed on the streams of EXPLAIN ANALYZE diagrams.
// This struct is put here to avoid import cycles.
type LatencyGetter struct {
latencyMap map[roachpb.NodeID]map[roachpb.NodeID]int64
lastUpdatedTime time.Time
NodesStatusServer *OptionalNodesStatusServer
}

// GetLatency is a helper function that updates the latencies between nodes
// if they have not been updated in more than one second. This function
// returns the latency between the node with the Outbox and the node with
// the Inbox.
func (lg *LatencyGetter) GetLatency(
outboxNodeID roachpb.NodeID,
inboxNodeID roachpb.NodeID,
ctx context.Context,
) int64 {
if time.Since(lg.lastUpdatedTime) > time.Second {
// update latencies in latencyMap
ss, err := lg.NodesStatusServer.OptionalNodesStatusServer(errorutil.FeatureNotAvailableToNonSystemTenantsIssue)
if err == nil {
if lg.latencyMap == nil {
lg.latencyMap = make(map[roachpb.NodeID]map[roachpb.NodeID]int64)
}
response, _ := ss.Nodes(ctx, &NodesRequest{})
for _, sendingNode := range response.Nodes {
sendingNodeID := sendingNode.Desc.NodeID
if lg.latencyMap[sendingNodeID] == nil {
lg.latencyMap[sendingNodeID] = make(map[roachpb.NodeID]int64)
}
for _, receivingNode := range response.Nodes {
receivingNodeID := receivingNode.Desc.NodeID
if sendingNodeID != receivingNodeID {
lg.latencyMap[sendingNodeID][receivingNodeID] = sendingNode.Activity[receivingNodeID].Latency
}
}
}
}
fmt.Println(lg.latencyMap)
lg.lastUpdatedTime = time.Now()
}
return lg.latencyMap[outboxNodeID][inboxNodeID]
}

74 changes: 47 additions & 27 deletions pkg/sql/colexec/execpb/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import (

var _ tracing.SpanStats = &VectorizedStats{}
var _ execinfrapb.DistSQLSpanStats = &VectorizedStats{}
var _ tracing.SpanStats = &VectorizedInboxStats{}
var _ execinfrapb.DistSQLSpanStats = &VectorizedInboxStats{}

const (
batchesOutputTagSuffix = "output.batches"
tuplesOutputTagSuffix = "output.tuples"
ioTimeTagSuffix = "time.io"
executionTimeTagSuffix = "time.execution"
deserializationTimeTagSuffix = "time.deserialization"
maxVecMemoryBytesTagSuffix = "mem.vectorized.max"
maxVecDiskBytesTagSuffix = "disk.vectorized.max"
bytesReadTagSuffix = "bytes.read"
Expand All @@ -39,26 +40,17 @@ const (
func (vs *VectorizedStats) Stats() map[string]string {
var timeSuffix string
if vs.IO {
if vs.OnStream {
timeSuffix = deserializationTimeTagSuffix
} else {
timeSuffix = ioTimeTagSuffix
}
timeSuffix = ioTimeTagSuffix
} else {
timeSuffix = executionTimeTagSuffix
}
stats := map[string]string{
batchesOutputTagSuffix: fmt.Sprintf("%d", vs.NumBatches),
tuplesOutputTagSuffix: fmt.Sprintf("%d", vs.NumTuples),
timeSuffix: fmt.Sprintf("%v", vs.Time.Round(time.Microsecond)),
maxVecMemoryBytesTagSuffix: fmt.Sprintf("%d", vs.MaxAllocatedMem),
maxVecDiskBytesTagSuffix: fmt.Sprintf("%d", vs.MaxAllocatedDisk),
}
if vs.OnStream {
stats[networkLatencyTagSuffix] = fmt.Sprintf("%v", time.Duration(vs.NetworkLatency).Round(time.Microsecond))
stats[timeSuffix] = fmt.Sprintf("%v", (vs.Time - time.Duration(vs.NetworkLatency*vs.NumBatches)).Round(time.Microsecond))
} else {
stats[tuplesOutputTagSuffix] = fmt.Sprintf("%d", vs.NumTuples)
}
if vs.BytesRead != 0 {
stats[bytesReadTagSuffix] = humanizeutil.IBytes(vs.BytesRead)
}
Expand All @@ -68,12 +60,31 @@ func (vs *VectorizedStats) Stats() map[string]string {
return stats
}

// Stats is part of SpanStats interface.
func (vs *VectorizedInboxStats) Stats() map[string]string {
stats := map[string]string{
batchesOutputTagSuffix: fmt.Sprintf("%d", vs.BaseVectorizedStats.NumBatches),
// TODO(cathymw): have inbox collect its own deserialization time with a
// timer and display deserialization time instead of ioTime.
ioTimeTagSuffix: fmt.Sprintf("%v", vs.BaseVectorizedStats.Time.Round(time.Microsecond)),
}
if vs.BaseVectorizedStats.BytesRead != 0 {
stats[bytesReadTagSuffix] = humanizeutil.IBytes(vs.BaseVectorizedStats.BytesRead)
}
if vs.BaseVectorizedStats.RowsRead != 0 {
stats[rowsReadTagSuffix] = fmt.Sprintf("%d", vs.BaseVectorizedStats.RowsRead)
}
if vs.NetworkLatency != 0 {
stats[networkLatencyTagSuffix] = fmt.Sprintf("%v", time.Duration(vs.NetworkLatency).Round(time.Microsecond))
}
return stats
}

const (
batchesOutputQueryPlanSuffix = "batches output"
tuplesOutputQueryPlanSuffix = "tuples output"
ioTimeQueryPlanSuffix = "IO time"
executionTimeQueryPlanSuffix = "execution time"
deserializationTimeQueryPlanSuffix = "deserialization time"
maxVecMemoryBytesQueryPlanSuffix = "max vectorized memory allocated"
maxVecDiskBytesQueryPlanSuffix = "max vectorized disk allocated"
bytesReadQueryPlanSuffix = "bytes read"
Expand All @@ -85,25 +96,14 @@ const (
func (vs *VectorizedStats) StatsForQueryPlan() []string {
var timeSuffix string
if vs.IO {
if vs.OnStream {
timeSuffix = deserializationTimeQueryPlanSuffix
} else {
timeSuffix = ioTimeQueryPlanSuffix
}
timeSuffix = ioTimeQueryPlanSuffix
} else {
timeSuffix = executionTimeQueryPlanSuffix
}
stats := []string{
fmt.Sprintf("%s: %d", batchesOutputQueryPlanSuffix, vs.NumBatches),
}
if vs.OnStream {
stats = append(stats,
fmt.Sprintf("%s: %v", timeSuffix, (vs.Time-time.Duration(vs.NetworkLatency*vs.NumBatches)).Round(time.Microsecond)),
fmt.Sprintf("%s: %v", networkLatencyQueryPlanSuffix, time.Duration(vs.NetworkLatency).Round(time.Microsecond)))
} else {
stats = append(stats,
fmt.Sprintf("%s: %d", tuplesOutputQueryPlanSuffix, vs.NumTuples),
fmt.Sprintf("%s: %v", timeSuffix, vs.Time.Round(time.Microsecond)))
fmt.Sprintf("%s: %d", tuplesOutputQueryPlanSuffix, vs.NumTuples),
fmt.Sprintf("%s: %v", timeSuffix, vs.Time.Round(time.Microsecond)),
}
if vs.MaxAllocatedMem != 0 {
stats = append(stats,
Expand All @@ -122,3 +122,23 @@ func (vs *VectorizedStats) StatsForQueryPlan() []string {
}
return stats
}

// StatsForQueryPlan is part of DistSQLSpanStats interface.
func (vs *VectorizedInboxStats) StatsForQueryPlan() []string {
stats := []string{
fmt.Sprintf("%s: %d", batchesOutputQueryPlanSuffix, vs.BaseVectorizedStats.NumBatches),
fmt.Sprintf("%s: %v", ioTimeQueryPlanSuffix, vs.BaseVectorizedStats.Time.Round(time.Microsecond)),
}
if vs.BaseVectorizedStats.BytesRead != 0 {
stats = append(stats,
fmt.Sprintf("%s: %s", bytesReadQueryPlanSuffix, humanizeutil.IBytes(vs.BaseVectorizedStats.BytesRead)))
}
if vs.BaseVectorizedStats.RowsRead != 0 {
stats = append(stats, fmt.Sprintf("%s: %d", rowsReadQueryPlanSuffix, vs.BaseVectorizedStats.RowsRead))
}
if vs.NetworkLatency != 0 {
stats = append(stats,
fmt.Sprintf("%s: %v", networkLatencyQueryPlanSuffix, time.Duration(vs.NetworkLatency).Round(time.Microsecond)))
}
return stats
}
Loading

0 comments on commit 264d418

Please sign in to comment.