Skip to content

Commit

Permalink
sql: add join types and algorithms to telemetry logging
Browse files Browse the repository at this point in the history
This commit adds several new fields to the SampledQuery structure used for
telemetry logging.

Closes #85425

Release note (sql change): The structured payloads used for telemetry
logs now include the following new fields:
`InnerJoinCount`: The number of inner joins in the query plan.
`LeftOuterJoinCount`: The number of left (or right) outer joins in the query
plan.
`FullOuterJoinCount`: The number of full outer joins in the query plan.
`SemiJoinCount`: The number of semi joins in the query plan.
`AntiJoinCount`: The number of anti joins in the query plan.
`IntersectAllJoinCount`: The number of intersect all joins in the query plan.
`ExceptAllJoinCount`: The number of except all joins in the query plan.
`HashJoinCount`: The number of hash joins in the query plan.
`CrossJoinCount`: The number of cross joins in the query plan.
`IndexJoinCount`: The number of index joins in the query plan.
`LookupJoinCount`: The number of lookup joins in the query plan.
`MergeJoinCount`: The number of merge joins in the query plan.
`InvertedJoinCount`: The number of inverted joins in the query plan.
`ApplyJoinCount`: The number of apply joins in the query plan.
`ZigZagJoinCount`: The number of zig zag joins in the query plan.
  • Loading branch information
rytaft committed Aug 5, 2022
1 parent b1de533 commit e96602b
Show file tree
Hide file tree
Showing 10 changed files with 550 additions and 5 deletions.
15 changes: 15 additions & 0 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2515,6 +2515,21 @@ contains common SQL event/execution details.
| `BytesRead` | The number of bytes read from disk. | no |
| `RowsRead` | The number of rows read from disk. | no |
| `RowsWritten` | The number of rows written. | no |
| `InnerJoinCount` | The number of inner joins in the query plan. | no |
| `LeftOuterJoinCount` | The number of left (or right) outer joins in the query plan. | no |
| `FullOuterJoinCount` | The number of full outer joins in the query plan. | no |
| `SemiJoinCount` | The number of semi joins in the query plan. | no |
| `AntiJoinCount` | The number of anti joins in the query plan. | no |
| `IntersectAllJoinCount` | The number of intersect all joins in the query plan. | no |
| `ExceptAllJoinCount` | The number of except all joins in the query plan. | no |
| `HashJoinCount` | The number of hash joins in the query plan. | no |
| `CrossJoinCount` | The number of cross joins in the query plan. | no |
| `IndexJoinCount` | The number of index joins in the query plan. | no |
| `LookupJoinCount` | The number of lookup joins in the query plan. | no |
| `MergeJoinCount` | The number of merge joins in the query plan. | no |
| `InvertedJoinCount` | The number of inverted joins in the query plan. | no |
| `ApplyJoinCount` | The number of apply joins in the query plan. | no |
| `ZigZagJoinCount` | The number of zig zag joins in the query plan. | no |


#### Common fields
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -409,6 +410,21 @@ func (p *planner) maybeLogStatementInternal(
BytesRead: queryStats.bytesRead,
RowsRead: queryStats.rowsRead,
RowsWritten: queryStats.rowsWritten,
InnerJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.InnerJoin]),
LeftOuterJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.LeftOuterJoin]),
FullOuterJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.FullOuterJoin]),
SemiJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.LeftSemiJoin]),
AntiJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.LeftAntiJoin]),
IntersectAllJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.IntersectAllJoin]),
ExceptAllJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.ExceptAllJoin]),
HashJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.HashJoin]),
CrossJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.CrossJoin]),
IndexJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.IndexJoin]),
LookupJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.LookupJoin]),
MergeJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.MergeJoin]),
InvertedJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.InvertedJoin]),
ApplyJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ApplyJoin]),
ZigZagJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ZigZagJoin]),
}
p.logOperationalEventsOnlyExternally(ctx, eventLogEntry{event: &sampledQuery})
} else {
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec"
Expand Down Expand Up @@ -150,6 +151,14 @@ type instrumentationHelper struct {
// nanosSinceStatsCollected is the maximum number of nanoseconds that have
// passed since stats were collected on any table scanned by this query.
nanosSinceStatsCollected time.Duration

// joinTypeCounts records the number of times each type of logical join was
// used in the query.
joinTypeCounts map[descpb.JoinType]int

// joinAlgorithmCounts records the number of times each type of join algorithm
// was used in the query.
joinAlgorithmCounts map[exec.JoinAlgorithm]int
}

// outputMode indicates how the statement output needs to be populated (for
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/opt/exec/execbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package execbuilder
import (
"time"

"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/opt"
"github.com/cockroachdb/cockroach/pkg/sql/opt/cat"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec"
Expand Down Expand Up @@ -143,6 +144,14 @@ type Builder struct {
// passed since stats were collected on any table scanned by this query.
NanosSinceStatsCollected time.Duration

// JoinTypeCounts records the number of times each type of logical join was
// used in the query.
JoinTypeCounts map[descpb.JoinType]int

// JoinAlgorithmCounts records the number of times each type of join algorithm
// was used in the query.
JoinAlgorithmCounts map[exec.JoinAlgorithm]int

// wrapFunctionOverride overrides default implementation to return resolvable
// function reference for function with specified function name.
// The default can be overridden by calling SetSearchPath method to provide
Expand Down
70 changes: 65 additions & 5 deletions pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,8 @@ func (b *Builder) buildApplyJoin(join memo.RelExpr) (execPlan, error) {

ep := execPlan{outputCols: outputCols}

b.recordJoinType(joinType)
b.recordJoinAlgorithm(exec.ApplyJoin)
ep.root, err = b.factory.ConstructApplyJoin(
joinType,
leftPlan.root,
Expand Down Expand Up @@ -1128,11 +1130,12 @@ func (b *Builder) buildHashJoin(join memo.RelExpr) (execPlan, error) {
rightExpr.Relational().OutputCols,
*filters,
)
isCrossJoin := len(leftEq) == 0
if !b.disableTelemetry {
if len(leftEq) > 0 {
telemetry.Inc(sqltelemetry.JoinAlgoHashUseCounter)
} else {
if isCrossJoin {
telemetry.Inc(sqltelemetry.JoinAlgoCrossUseCounter)
} else {
telemetry.Inc(sqltelemetry.JoinAlgoHashUseCounter)
}
telemetry.Inc(opt.JoinTypeToUseCounter(join.Op()))
}
Expand Down Expand Up @@ -1160,6 +1163,12 @@ func (b *Builder) buildHashJoin(join memo.RelExpr) (execPlan, error) {
leftEqColsAreKey := leftExpr.Relational().FuncDeps.ColsAreStrictKey(leftEq.ToSet())
rightEqColsAreKey := rightExpr.Relational().FuncDeps.ColsAreStrictKey(rightEq.ToSet())

b.recordJoinType(joinType)
if isCrossJoin {
b.recordJoinAlgorithm(exec.CrossJoin)
} else {
b.recordJoinAlgorithm(exec.HashJoin)
}
ep.root, err = b.factory.ConstructHashJoin(
joinType,
left.root, right.root,
Expand Down Expand Up @@ -1214,6 +1223,8 @@ func (b *Builder) buildMergeJoin(join *memo.MergeJoinExpr) (execPlan, error) {
reqOrd := ep.reqOrdering(join)
leftEqColsAreKey := leftExpr.Relational().FuncDeps.ColsAreStrictKey(leftEq.ColSet())
rightEqColsAreKey := rightExpr.Relational().FuncDeps.ColsAreStrictKey(rightEq.ColSet())
b.recordJoinType(joinType)
b.recordJoinAlgorithm(exec.MergeJoin)
ep.root, err = b.factory.ConstructMergeJoin(
joinType,
left.root, right.root,
Expand Down Expand Up @@ -1542,6 +1553,13 @@ func (b *Builder) buildSetOp(set memo.RelExpr) (execPlan, error) {
panic(errors.AssertionFailedf("invalid operator %s", redact.Safe(set.Op())))
}

switch typ {
case tree.IntersectOp:
b.recordJoinType(descpb.IntersectAllJoin)
case tree.ExceptOp:
b.recordJoinType(descpb.ExceptAllJoin)
}

hardLimit := uint64(0)
if set.Op() == opt.LocalityOptimizedSearchOp {
if !b.disableTelemetry {
Expand All @@ -1568,11 +1586,17 @@ func (b *Builder) buildSetOp(set memo.RelExpr) (execPlan, error) {
if typ == tree.UnionOp && all {
ep.root, err = b.factory.ConstructUnionAll(left.root, right.root, reqOrdering, hardLimit)
} else if len(streamingOrdering) > 0 {
if typ != tree.UnionOp {
b.recordJoinAlgorithm(exec.MergeJoin)
}
ep.root, err = b.factory.ConstructStreamingSetOp(typ, all, left.root, right.root, streamingOrdering, reqOrdering)
} else {
if len(reqOrdering) > 0 {
return execPlan{}, errors.AssertionFailedf("hash set op is not supported with a required ordering")
}
if typ != tree.UnionOp {
b.recordJoinAlgorithm(exec.HashJoin)
}
ep.root, err = b.factory.ConstructHashSetOp(typ, all, left.root, right.root)
}
if err != nil {
Expand Down Expand Up @@ -1732,6 +1756,7 @@ func (b *Builder) buildIndexJoin(join *memo.IndexJoinExpr) (execPlan, error) {
}

res := execPlan{outputCols: output}
b.recordJoinAlgorithm(exec.IndexJoin)
res.root, err = b.factory.ConstructIndexJoin(
input.root, tab, keyCols, needed, res.reqOrdering(join), locking, join.RequiredPhysical().LimitHintInt64(),
)
Expand Down Expand Up @@ -1827,8 +1852,11 @@ func (b *Builder) buildLookupJoin(join *memo.LookupJoinExpr) (execPlan, error) {
locking = forUpdateLocking
}

joinType := joinOpToJoinType(join.JoinType)
b.recordJoinType(joinType)
b.recordJoinAlgorithm(exec.LookupJoin)
res.root, err = b.factory.ConstructLookupJoin(
joinOpToJoinType(join.JoinType),
joinType,
input.root,
tab,
idx,
Expand Down Expand Up @@ -1942,8 +1970,11 @@ func (b *Builder) buildInvertedJoin(join *memo.InvertedJoinExpr) (execPlan, erro
locking = forUpdateLocking
}

joinType := joinOpToJoinType(join.JoinType)
b.recordJoinType(joinType)
b.recordJoinAlgorithm(exec.InvertedJoin)
res.root, err = b.factory.ConstructInvertedJoin(
joinOpToJoinType(join.JoinType),
joinType,
invertedExpr,
input.root,
tab,
Expand Down Expand Up @@ -2043,6 +2074,7 @@ func (b *Builder) buildZigzagJoin(join *memo.ZigzagJoinExpr) (execPlan, error) {
return execPlan{}, err
}

b.recordJoinAlgorithm(exec.ZigZagJoin)
res.root, err = b.factory.ConstructZigzagJoin(
leftTable,
leftIndex,
Expand Down Expand Up @@ -2662,6 +2694,34 @@ func (b *Builder) statementTag(expr memo.RelExpr) string {
}
}

// recordJoinType increments the counter for the given join type for telemetry
// reporting.
func (b *Builder) recordJoinType(joinType descpb.JoinType) {
if b.JoinTypeCounts == nil {
const numJoinTypes = 7
b.JoinTypeCounts = make(map[descpb.JoinType]int, numJoinTypes)
}
// Don't bother distinguishing between left and right.
switch joinType {
case descpb.RightOuterJoin:
joinType = descpb.LeftOuterJoin
case descpb.RightSemiJoin:
joinType = descpb.LeftSemiJoin
case descpb.RightAntiJoin:
joinType = descpb.LeftAntiJoin
}
b.JoinTypeCounts[joinType]++
}

// recordJoinAlgorithm increments the counter for the given join algorithm for
// telemetry reporting.
func (b *Builder) recordJoinAlgorithm(joinAlgorithm exec.JoinAlgorithm) {
if b.JoinAlgorithmCounts == nil {
b.JoinAlgorithmCounts = make(map[exec.JoinAlgorithm]int, exec.NumJoinAlgorithms)
}
b.JoinAlgorithmCounts[joinAlgorithm]++
}

// boundedStalenessAllowList contains the operators that may be used with
// bounded staleness queries.
var boundedStalenessAllowList = map[opt.Operator]struct{}{
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/opt/exec/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,3 +364,19 @@ const (
// Streaming means that the grouping columns are fully ordered.
Streaming
)

// JoinAlgorithm is the type of join algorithm used.
type JoinAlgorithm int8

// The following are all the supported join algorithms.
const (
HashJoin JoinAlgorithm = iota
CrossJoin
IndexJoin
LookupJoin
MergeJoin
InvertedJoin
ApplyJoin
ZigZagJoin
NumJoinAlgorithms
)
4 changes: 4 additions & 0 deletions pkg/sql/plan_opt.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,8 @@ func (opc *optPlanningCtx) runExecBuilder(
planTop.instrumentation.maxFullScanRows = bld.MaxFullScanRows
planTop.instrumentation.totalScanRows = bld.TotalScanRows
planTop.instrumentation.nanosSinceStatsCollected = bld.NanosSinceStatsCollected
planTop.instrumentation.joinTypeCounts = bld.JoinTypeCounts
planTop.instrumentation.joinAlgorithmCounts = bld.JoinAlgorithmCounts
} else {
// Create an explain factory and record the explain.Plan.
explainFactory := explain.NewFactory(f)
Expand All @@ -652,6 +654,8 @@ func (opc *optPlanningCtx) runExecBuilder(
planTop.instrumentation.maxFullScanRows = bld.MaxFullScanRows
planTop.instrumentation.totalScanRows = bld.TotalScanRows
planTop.instrumentation.nanosSinceStatsCollected = bld.NanosSinceStatsCollected
planTop.instrumentation.joinTypeCounts = bld.JoinTypeCounts
planTop.instrumentation.joinAlgorithmCounts = bld.JoinAlgorithmCounts

planTop.instrumentation.RecordExplainPlan(explainPlan)
}
Expand Down
Loading

0 comments on commit e96602b

Please sign in to comment.