Skip to content

Commit

Permalink
Merge #85524
Browse files Browse the repository at this point in the history
85524: sql: add join types and algorithms to telemetry logging r=rytaft a=rytaft

This commit adds several new fields to the `SampledQuery` structure used for
telemetry logging.

Closes #85425

Release note (sql change): The structured payloads used for telemetry
logs now include the following new fields:
`InnerJoinCount`: The number of inner joins in the query plan.
`LeftOuterJoinCount`: The number of left (or right) outer joins in the query
plan.
`FullOuterJoinCount`: The number of full outer joins in the query plan.
`SemiJoinCount`: The number of semi joins in the query plan.
`AntiJoinCount`: The number of anti joins in the query plan.
`IntersectAllJoinCount`: The number of intersect all joins in the query plan.
`ExceptAllJoinCount`: The number of except all joins in the query plan.
`HashJoinCount`: The number of hash joins in the query plan.
`CrossJoinCount`: The number of cross joins in the query plan.
`IndexJoinCount`: The number of index joins in the query plan.
`LookupJoinCount`: The number of lookup joins in the query plan.
`MergeJoinCount`: The number of merge joins in the query plan.
`InvertedJoinCount`: The number of inverted joins in the query plan.
`ApplyJoinCount`: The number of apply joins in the query plan.
`ZigZagJoinCount`: The number of zig zag joins in the query plan.

Co-authored-by: Rebecca Taft <[email protected]>
  • Loading branch information
craig[bot] and rytaft committed Aug 7, 2022
2 parents da518a9 + e96602b commit a7c91f0
Show file tree
Hide file tree
Showing 10 changed files with 550 additions and 5 deletions.
15 changes: 15 additions & 0 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2515,6 +2515,21 @@ contains common SQL event/execution details.
| `BytesRead` | The number of bytes read from disk. | no |
| `RowsRead` | The number of rows read from disk. | no |
| `RowsWritten` | The number of rows written. | no |
| `InnerJoinCount` | The number of inner joins in the query plan. | no |
| `LeftOuterJoinCount` | The number of left (or right) outer joins in the query plan. | no |
| `FullOuterJoinCount` | The number of full outer joins in the query plan. | no |
| `SemiJoinCount` | The number of semi joins in the query plan. | no |
| `AntiJoinCount` | The number of anti joins in the query plan. | no |
| `IntersectAllJoinCount` | The number of intersect all joins in the query plan. | no |
| `ExceptAllJoinCount` | The number of except all joins in the query plan. | no |
| `HashJoinCount` | The number of hash joins in the query plan. | no |
| `CrossJoinCount` | The number of cross joins in the query plan. | no |
| `IndexJoinCount` | The number of index joins in the query plan. | no |
| `LookupJoinCount` | The number of lookup joins in the query plan. | no |
| `MergeJoinCount` | The number of merge joins in the query plan. | no |
| `InvertedJoinCount` | The number of inverted joins in the query plan. | no |
| `ApplyJoinCount` | The number of apply joins in the query plan. | no |
| `ZigZagJoinCount` | The number of zig zag joins in the query plan. | no |


#### Common fields
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -410,6 +411,21 @@ func (p *planner) maybeLogStatementInternal(
BytesRead: queryStats.bytesRead,
RowsRead: queryStats.rowsRead,
RowsWritten: queryStats.rowsWritten,
InnerJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.InnerJoin]),
LeftOuterJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.LeftOuterJoin]),
FullOuterJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.FullOuterJoin]),
SemiJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.LeftSemiJoin]),
AntiJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.LeftAntiJoin]),
IntersectAllJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.IntersectAllJoin]),
ExceptAllJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.ExceptAllJoin]),
HashJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.HashJoin]),
CrossJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.CrossJoin]),
IndexJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.IndexJoin]),
LookupJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.LookupJoin]),
MergeJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.MergeJoin]),
InvertedJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.InvertedJoin]),
ApplyJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ApplyJoin]),
ZigZagJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ZigZagJoin]),
}
p.logOperationalEventsOnlyExternally(ctx, eventLogEntry{event: &sampledQuery})
} else {
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec"
Expand Down Expand Up @@ -150,6 +151,14 @@ type instrumentationHelper struct {
// nanosSinceStatsCollected is the maximum number of nanoseconds that have
// passed since stats were collected on any table scanned by this query.
nanosSinceStatsCollected time.Duration

// joinTypeCounts records the number of times each type of logical join was
// used in the query.
joinTypeCounts map[descpb.JoinType]int

// joinAlgorithmCounts records the number of times each type of join algorithm
// was used in the query.
joinAlgorithmCounts map[exec.JoinAlgorithm]int
}

// outputMode indicates how the statement output needs to be populated (for
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/opt/exec/execbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package execbuilder
import (
"time"

"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/opt"
"github.com/cockroachdb/cockroach/pkg/sql/opt/cat"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec"
Expand Down Expand Up @@ -143,6 +144,14 @@ type Builder struct {
// passed since stats were collected on any table scanned by this query.
NanosSinceStatsCollected time.Duration

// JoinTypeCounts records the number of times each type of logical join was
// used in the query.
JoinTypeCounts map[descpb.JoinType]int

// JoinAlgorithmCounts records the number of times each type of join algorithm
// was used in the query.
JoinAlgorithmCounts map[exec.JoinAlgorithm]int

// wrapFunctionOverride overrides default implementation to return resolvable
// function reference for function with specified function name.
// The default can be overridden by calling SetBuiltinFuncWrapper method to provide
Expand Down
70 changes: 65 additions & 5 deletions pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,8 @@ func (b *Builder) buildApplyJoin(join memo.RelExpr) (execPlan, error) {

ep := execPlan{outputCols: outputCols}

b.recordJoinType(joinType)
b.recordJoinAlgorithm(exec.ApplyJoin)
ep.root, err = b.factory.ConstructApplyJoin(
joinType,
leftPlan.root,
Expand Down Expand Up @@ -1129,11 +1131,12 @@ func (b *Builder) buildHashJoin(join memo.RelExpr) (execPlan, error) {
rightExpr.Relational().OutputCols,
*filters,
)
isCrossJoin := len(leftEq) == 0
if !b.disableTelemetry {
if len(leftEq) > 0 {
telemetry.Inc(sqltelemetry.JoinAlgoHashUseCounter)
} else {
if isCrossJoin {
telemetry.Inc(sqltelemetry.JoinAlgoCrossUseCounter)
} else {
telemetry.Inc(sqltelemetry.JoinAlgoHashUseCounter)
}
telemetry.Inc(opt.JoinTypeToUseCounter(join.Op()))
}
Expand Down Expand Up @@ -1161,6 +1164,12 @@ func (b *Builder) buildHashJoin(join memo.RelExpr) (execPlan, error) {
leftEqColsAreKey := leftExpr.Relational().FuncDeps.ColsAreStrictKey(leftEq.ToSet())
rightEqColsAreKey := rightExpr.Relational().FuncDeps.ColsAreStrictKey(rightEq.ToSet())

b.recordJoinType(joinType)
if isCrossJoin {
b.recordJoinAlgorithm(exec.CrossJoin)
} else {
b.recordJoinAlgorithm(exec.HashJoin)
}
ep.root, err = b.factory.ConstructHashJoin(
joinType,
left.root, right.root,
Expand Down Expand Up @@ -1215,6 +1224,8 @@ func (b *Builder) buildMergeJoin(join *memo.MergeJoinExpr) (execPlan, error) {
reqOrd := ep.reqOrdering(join)
leftEqColsAreKey := leftExpr.Relational().FuncDeps.ColsAreStrictKey(leftEq.ColSet())
rightEqColsAreKey := rightExpr.Relational().FuncDeps.ColsAreStrictKey(rightEq.ColSet())
b.recordJoinType(joinType)
b.recordJoinAlgorithm(exec.MergeJoin)
ep.root, err = b.factory.ConstructMergeJoin(
joinType,
left.root, right.root,
Expand Down Expand Up @@ -1543,6 +1554,13 @@ func (b *Builder) buildSetOp(set memo.RelExpr) (execPlan, error) {
panic(errors.AssertionFailedf("invalid operator %s", redact.Safe(set.Op())))
}

switch typ {
case tree.IntersectOp:
b.recordJoinType(descpb.IntersectAllJoin)
case tree.ExceptOp:
b.recordJoinType(descpb.ExceptAllJoin)
}

hardLimit := uint64(0)
if set.Op() == opt.LocalityOptimizedSearchOp {
if !b.disableTelemetry {
Expand All @@ -1569,11 +1587,17 @@ func (b *Builder) buildSetOp(set memo.RelExpr) (execPlan, error) {
if typ == tree.UnionOp && all {
ep.root, err = b.factory.ConstructUnionAll(left.root, right.root, reqOrdering, hardLimit)
} else if len(streamingOrdering) > 0 {
if typ != tree.UnionOp {
b.recordJoinAlgorithm(exec.MergeJoin)
}
ep.root, err = b.factory.ConstructStreamingSetOp(typ, all, left.root, right.root, streamingOrdering, reqOrdering)
} else {
if len(reqOrdering) > 0 {
return execPlan{}, errors.AssertionFailedf("hash set op is not supported with a required ordering")
}
if typ != tree.UnionOp {
b.recordJoinAlgorithm(exec.HashJoin)
}
ep.root, err = b.factory.ConstructHashSetOp(typ, all, left.root, right.root)
}
if err != nil {
Expand Down Expand Up @@ -1733,6 +1757,7 @@ func (b *Builder) buildIndexJoin(join *memo.IndexJoinExpr) (execPlan, error) {
}

res := execPlan{outputCols: output}
b.recordJoinAlgorithm(exec.IndexJoin)
res.root, err = b.factory.ConstructIndexJoin(
input.root, tab, keyCols, needed, res.reqOrdering(join), locking, join.RequiredPhysical().LimitHintInt64(),
)
Expand Down Expand Up @@ -1828,8 +1853,11 @@ func (b *Builder) buildLookupJoin(join *memo.LookupJoinExpr) (execPlan, error) {
locking = forUpdateLocking
}

joinType := joinOpToJoinType(join.JoinType)
b.recordJoinType(joinType)
b.recordJoinAlgorithm(exec.LookupJoin)
res.root, err = b.factory.ConstructLookupJoin(
joinOpToJoinType(join.JoinType),
joinType,
input.root,
tab,
idx,
Expand Down Expand Up @@ -1943,8 +1971,11 @@ func (b *Builder) buildInvertedJoin(join *memo.InvertedJoinExpr) (execPlan, erro
locking = forUpdateLocking
}

joinType := joinOpToJoinType(join.JoinType)
b.recordJoinType(joinType)
b.recordJoinAlgorithm(exec.InvertedJoin)
res.root, err = b.factory.ConstructInvertedJoin(
joinOpToJoinType(join.JoinType),
joinType,
invertedExpr,
input.root,
tab,
Expand Down Expand Up @@ -2044,6 +2075,7 @@ func (b *Builder) buildZigzagJoin(join *memo.ZigzagJoinExpr) (execPlan, error) {
return execPlan{}, err
}

b.recordJoinAlgorithm(exec.ZigZagJoin)
res.root, err = b.factory.ConstructZigzagJoin(
leftTable,
leftIndex,
Expand Down Expand Up @@ -2663,6 +2695,34 @@ func (b *Builder) statementTag(expr memo.RelExpr) string {
}
}

// recordJoinType increments the counter for the given join type for telemetry
// reporting.
func (b *Builder) recordJoinType(joinType descpb.JoinType) {
if b.JoinTypeCounts == nil {
const numJoinTypes = 7
b.JoinTypeCounts = make(map[descpb.JoinType]int, numJoinTypes)
}
// Don't bother distinguishing between left and right.
switch joinType {
case descpb.RightOuterJoin:
joinType = descpb.LeftOuterJoin
case descpb.RightSemiJoin:
joinType = descpb.LeftSemiJoin
case descpb.RightAntiJoin:
joinType = descpb.LeftAntiJoin
}
b.JoinTypeCounts[joinType]++
}

// recordJoinAlgorithm increments the counter for the given join algorithm for
// telemetry reporting.
func (b *Builder) recordJoinAlgorithm(joinAlgorithm exec.JoinAlgorithm) {
if b.JoinAlgorithmCounts == nil {
b.JoinAlgorithmCounts = make(map[exec.JoinAlgorithm]int, exec.NumJoinAlgorithms)
}
b.JoinAlgorithmCounts[joinAlgorithm]++
}

// boundedStalenessAllowList contains the operators that may be used with
// bounded staleness queries.
var boundedStalenessAllowList = map[opt.Operator]struct{}{
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/opt/exec/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,3 +364,19 @@ const (
// Streaming means that the grouping columns are fully ordered.
Streaming
)

// JoinAlgorithm is the type of join algorithm used.
type JoinAlgorithm int8

// The following are all the supported join algorithms.
const (
HashJoin JoinAlgorithm = iota
CrossJoin
IndexJoin
LookupJoin
MergeJoin
InvertedJoin
ApplyJoin
ZigZagJoin
NumJoinAlgorithms
)
4 changes: 4 additions & 0 deletions pkg/sql/plan_opt.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,8 @@ func (opc *optPlanningCtx) runExecBuilder(
planTop.instrumentation.maxFullScanRows = bld.MaxFullScanRows
planTop.instrumentation.totalScanRows = bld.TotalScanRows
planTop.instrumentation.nanosSinceStatsCollected = bld.NanosSinceStatsCollected
planTop.instrumentation.joinTypeCounts = bld.JoinTypeCounts
planTop.instrumentation.joinAlgorithmCounts = bld.JoinAlgorithmCounts
} else {
// Create an explain factory and record the explain.Plan.
explainFactory := explain.NewFactory(f)
Expand All @@ -652,6 +654,8 @@ func (opc *optPlanningCtx) runExecBuilder(
planTop.instrumentation.maxFullScanRows = bld.MaxFullScanRows
planTop.instrumentation.totalScanRows = bld.TotalScanRows
planTop.instrumentation.nanosSinceStatsCollected = bld.NanosSinceStatsCollected
planTop.instrumentation.joinTypeCounts = bld.JoinTypeCounts
planTop.instrumentation.joinAlgorithmCounts = bld.JoinAlgorithmCounts

planTop.instrumentation.RecordExplainPlan(explainPlan)
}
Expand Down
Loading

0 comments on commit a7c91f0

Please sign in to comment.