Skip to content

Commit

Permalink
Merge #56103 #56262
Browse files Browse the repository at this point in the history
56103: opt: generate zigzag joins on two partial indexes with the same predicate r=mgartner a=mgartner

This commit updates the GenerateZigzagJoins exploration rule so that it
can generate a zigzag join on two partial indexes with the same
predicate expression.

This was not possible previously because the remaining filters of the
outer `scanIndexIter.ForEach` loop were passed as filters to the inner
`scanIndexIter.ForEachStartingAfter` loop. These filters no longer
included the expression necessary for proving implication for the second
index, so a zigzag join was not planned.

If the same non-reduced, original query filters were passed to both the
outer and inner loops, then the remaining filters could include
unnecessary expressions when planning a zigzag join over two partial
indexes with different predicates. Expressions that should have been
removed from the remaining filters while proving implication in the
outer loop would remain in the remaining filters of the inner loop and
ultimately remain in the query plan.

Therefore, the proposed solution in this commit is to allow the user to
specify the non-reduced, original query filters as additional filters
that can prove implication. If the inner loop `scanIndexIter` cannot
prove implication with the remaining filters from the outer loop, it
attempts to prove implication with the original filters.

Release note (performance improvement): They query optimizer can now
plan zigzag joins on two partial indexes with the same predicate,
leading to more efficient query plans in some cases.


56262: sql: refactor code shared by diagnostics and EXPLAIN ANALYZE (DEBUG) r=RaduBerinde a=RaduBerinde

#### sql: pass recording to WithStatementTrace

Pass the recording instead of the span.

Release note: None

#### sql: refactor code shared by diagnostics and EXPLAIN ANALYZE (DEBUG)

There are two paths that result in a bundle being collected and inserted
into the diagnostics tables. Each path uses its own code to insert the
diagnostics bundle, which leads to unnecessary complication.

This change simplifies the stmtdiagnostics code to let the caller do
the insertion and streamlines the higher level code.

Release note: None

Co-authored-by: Marcus Gartner <[email protected]>
Co-authored-by: Radu Berinde <[email protected]>
  • Loading branch information
3 people committed Nov 5, 2020
3 parents f9779fa + 69a2cab + 789d3f8 commit 3e9ad0f
Show file tree
Hide file tree
Showing 14 changed files with 289 additions and 222 deletions.
4 changes: 1 addition & 3 deletions pkg/bench/ddl_analysis/ddl_analysis_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@ func RunRoundTripBenchmark(b *testing.B, tests []RoundTripBenchTestCase) {
b.Run(tc.name, func(b *testing.B) {
var stmtToKvBatchRequests sync.Map

beforePlan := func(sp *tracing.Span, stmt string) {
beforePlan := func(trace tracing.Recording, stmt string) {
if _, ok := stmtToKvBatchRequests.Load(stmt); ok {
sp.Finish()
trace := sp.GetRecording()
count := countKvBatchRequestsInRecording(trace)
stmtToKvBatchRequests.Store(stmt, count)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,9 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
},
},
SQLExecutor: &sql.ExecutorTestingKnobs{
WithStatementTrace: func(sp *tracing.Span, stmt string) {
WithStatementTrace: func(trace tracing.Recording, stmt string) {
if stmt == historicalQuery {
recCh <- sp.GetRecording()
recCh <- trace
}
},
},
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
Expand Down Expand Up @@ -1119,7 +1120,7 @@ type connExecutor struct {

// stmtDiagnosticsRecorder is used to track which queries need to have
// information collected.
stmtDiagnosticsRecorder StmtDiagnosticsRecorder
stmtDiagnosticsRecorder *stmtdiagnostics.Registry
}

// ctxHolder contains a connection's context and, while session tracing is
Expand Down
32 changes: 16 additions & 16 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
Expand Down Expand Up @@ -341,7 +342,8 @@ func (ex *connExecutor) execStmtInOpenState(
p.noticeSender = res

var shouldCollectDiagnostics bool
var finishCollectionDiagnostics StmtDiagnosticsTraceFinishFunc
var diagRequestID stmtdiagnostics.RequestID
var finishCollectionDiagnostics func()

if explainBundle, ok := ast.(*tree.ExplainAnalyzeDebug); ok {
telemetry.Inc(sqltelemetry.ExplainAnalyzeDebugUseCounter)
Expand All @@ -364,10 +366,8 @@ func (ex *connExecutor) execStmtInOpenState(
// bundle.
p.discardRows = true
} else {
shouldCollectDiagnostics, finishCollectionDiagnostics = ex.stmtDiagnosticsRecorder.ShouldCollectDiagnostics(ctx, ast)
if shouldCollectDiagnostics {
telemetry.Inc(sqltelemetry.StatementDiagnosticsCollectedCounter)
}
shouldCollectDiagnostics, diagRequestID, finishCollectionDiagnostics =
ex.stmtDiagnosticsRecorder.ShouldCollectDiagnostics(ctx, stmt.AnonymizedStr)
}

if shouldCollectDiagnostics {
Expand All @@ -378,30 +378,30 @@ func (ex *connExecutor) execStmtInOpenState(
ctx, sp = tracing.StartSnowballTrace(ctx, tr, "traced statement")
// TODO(radu): consider removing this if/when #46164 is addressed.
p.extendedEvalCtx.Context = ctx
anonymizedStr := stmt.AnonymizedStr
fingerprint := stmt.AnonymizedStr
defer func() {
// Record the statement information that we've collected.
// Note that in case of implicit transactions, the trace contains the auto-commit too.
sp.Finish()
trace := sp.GetRecording()
ie := p.extendedEvalCtx.InternalExecutor.(*InternalExecutor)
placeholders := p.extendedEvalCtx.Placeholders
bundle := buildStatementBundle(
origCtx, ex.server.cfg.DB, ie, &p.curPlan, trace, placeholders,
)
bundle.insert(origCtx, fingerprint, ast, ex.server.cfg.StmtDiagnosticsRecorder, diagRequestID)
if finishCollectionDiagnostics != nil {
bundle, collectionErr := buildStatementBundle(
origCtx, ex.server.cfg.DB, ie, &p.curPlan, trace, placeholders,
)
finishCollectionDiagnostics(origCtx, bundle.trace, bundle.zip, collectionErr)
finishCollectionDiagnostics()
telemetry.Inc(sqltelemetry.StatementDiagnosticsCollectedCounter)
} else {
// Handle EXPLAIN ANALYZE (DEBUG).
// If there was a communication error, no point in setting any results.
// If there was a communication error already, no point in setting any results.
if retErr == nil {
retErr = setExplainBundleResult(
origCtx, res, ast, trace, &p.curPlan, placeholders, ie, ex.server.cfg,
)
retErr = setExplainBundleResult(origCtx, res, bundle, ex.server.cfg)
}
}

stmtStats, _ := ex.appStats.getStatsForStmt(anonymizedStr, ex.implicitTxn(), retErr, false)
stmtStats, _ := ex.appStats.getStatsForStmt(fingerprint, ex.implicitTxn(), retErr, false)
if stmtStats == nil {
return
}
Expand Down Expand Up @@ -443,7 +443,7 @@ func (ex *connExecutor) execStmtInOpenState(
p.extendedEvalCtx.Context = ctx

defer func() {
ex.server.cfg.TestingKnobs.WithStatementTrace(sp, sql)
ex.server.cfg.TestingKnobs.WithStatementTrace(sp.GetRecording(), sql)
}()
}

Expand Down
36 changes: 1 addition & 35 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,40 +743,6 @@ func (cfg *ExecutorConfig) Organization() string {
return ClusterOrganization.Get(&cfg.Settings.SV)
}

// StmtDiagnosticsRecorder is the interface into *stmtdiagnostics.Registry to
// record statement diagnostics.
type StmtDiagnosticsRecorder interface {

// ShouldCollectDiagnostics checks whether any data should be collected for the
// given query, which is the case if the registry has a request for this
// statement's fingerprint; in this case ShouldCollectDiagnostics will not
// return true again on this note for the same diagnostics request.
//
// If data is to be collected, the returned finish() function must always be
// called once the data was collected. If collection fails, it can be called
// with a collectionErr.
ShouldCollectDiagnostics(ctx context.Context, ast tree.Statement) (
shouldCollect bool,
finish StmtDiagnosticsTraceFinishFunc,
)

// InsertStatementDiagnostics inserts a trace into system.statement_diagnostics.
//
// traceJSON is either DNull (when collectionErr should not be nil) or a *DJSON.
InsertStatementDiagnostics(ctx context.Context,
stmtFingerprint string,
stmt string,
traceJSON tree.Datum,
bundleZip []byte,
) (id int64, err error)
}

// StmtDiagnosticsTraceFinishFunc is the type of function returned from
// ShouldCollectDiagnostics to report the outcome of a trace.
type StmtDiagnosticsTraceFinishFunc = func(
ctx context.Context, traceJSON tree.Datum, bundle []byte, collectionErr error,
)

var _ base.ModuleTestingKnobs = &ExecutorTestingKnobs{}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down Expand Up @@ -844,7 +810,7 @@ type ExecutorTestingKnobs struct {

// WithStatementTrace is called after the statement is executed in
// execStmtInOpenState.
WithStatementTrace func(span *tracing.Span, stmt string)
WithStatementTrace func(trace tracing.Recording, stmt string)

// RunAfterSCJobsCacheLookup is called after the SchemaChangeJobCache is checked for
// a given table id.
Expand Down
104 changes: 62 additions & 42 deletions pkg/sql/explain_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,65 +25,46 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/opt/memo"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/jsonpb"
)

// setExplainBundleResult creates the diagnostics and returns the bundle
// information for an EXPLAIN ANALYZE (DEBUG) statement.
// setExplainBundleResult sets the result of an EXPLAIN ANALYZE (DEBUG)
// statement.
//
// Note: bundle.insert() must have been called.
//
// Returns an error if information rows couldn't be added to the result.
func setExplainBundleResult(
ctx context.Context,
res RestrictedCommandResult,
ast tree.Statement,
trace tracing.Recording,
plan *planTop,
placeholders *tree.PlaceholderInfo,
ie *InternalExecutor,
bundle diagnosticsBundle,
execCfg *ExecutorConfig,
) error {
res.ResetStmtType(&tree.ExplainAnalyzeDebug{})
res.SetColumns(ctx, colinfo.ExplainAnalyzeDebugColumns)

var text []string
func() {
bundle, err := buildStatementBundle(ctx, execCfg.DB, ie, plan, trace, placeholders)
if err != nil {
// TODO(radu): we cannot simply set an error on the result here without
// changing the executor logic (e.g. an implicit transaction could have
// committed already). Just show the error in the result.
text = []string{fmt.Sprintf("Error generating bundle: %v", err)}
return
}

fingerprint := tree.AsStringWithFlags(ast, tree.FmtHideConstants)
stmtStr := tree.AsString(ast)

diagID, err := execCfg.StmtDiagnosticsRecorder.InsertStatementDiagnostics(
ctx,
fingerprint,
stmtStr,
bundle.trace,
bundle.zip,
)
if err != nil {
text = []string{fmt.Sprintf("Error recording bundle: %v", err)}
return
}

if bundle.collectionErr != nil {
// TODO(radu): we cannot simply set an error on the result here without
// changing the executor logic (e.g. an implicit transaction could have
// committed already). Just show the error in the result.
text = []string{fmt.Sprintf("Error generating bundle: %v", bundle.collectionErr)}
} else {
text = []string{
"Statement diagnostics bundle generated. Download from the Admin UI (Advanced",
"Debug -> Statement Diagnostics History), via the direct link below, or using",
"the command line.",
fmt.Sprintf("Admin UI: %s", execCfg.AdminURL()),
fmt.Sprintf("Direct link: %s/_admin/v1/stmtbundle/%d", execCfg.AdminURL(), diagID),
fmt.Sprintf("Direct link: %s/_admin/v1/stmtbundle/%d", execCfg.AdminURL(), bundle.diagID),
"Command line: cockroach statement-diag list / download",
}
}()
}

if err := res.Err(); err != nil {
// Add the bundle information as a detail to the query error.
Expand Down Expand Up @@ -146,12 +127,21 @@ func normalizeSpan(s tracingpb.RecordedSpan, trace tracing.Recording) tracingpb.

// diagnosticsBundle contains diagnostics information collected for a statement.
type diagnosticsBundle struct {
zip []byte
trace tree.Datum
// Zip file binary data.
zip []byte

// Tracing data, as DJson (or DNull if it is not available).
traceJSON tree.Datum

// Stores any error in the collection, building, or insertion of the bundle.
collectionErr error

// diagID is the diagnostics instance ID, populated by insert().
diagID stmtdiagnostics.CollectedInstanceID
}

// buildStatementBundle collects metadata related the planning and execution of
// the statement. It generates a bundle for storage in
// buildStatementBundle collects metadata related to the planning and execution
// of the statement. It generates a bundle for storage in
// system.statement_diagnostics.
func buildStatementBundle(
ctx context.Context,
Expand All @@ -160,9 +150,9 @@ func buildStatementBundle(
plan *planTop,
trace tracing.Recording,
placeholders *tree.PlaceholderInfo,
) (diagnosticsBundle, error) {
) diagnosticsBundle {
if plan == nil {
return diagnosticsBundle{}, errors.AssertionFailedf("execution terminated early")
return diagnosticsBundle{collectionErr: errors.AssertionFailedf("execution terminated early")}
}
b := makeStmtBundleBuilder(db, ie, plan, trace, placeholders)

Expand All @@ -177,9 +167,39 @@ func buildStatementBundle(

buf, err := b.finalize()
if err != nil {
return diagnosticsBundle{}, err
return diagnosticsBundle{collectionErr: err}
}
return diagnosticsBundle{traceJSON: traceJSON, zip: buf.Bytes()}
}

// insert the bundle in statements diagnostics. Sets bundle.diagID and (in error
// cases) bundle.collectionErr.
//
// diagRequestID should be the ID returned by ShouldCollectDiagnostics, or zero
// if diagnostics were triggered by EXPLAIN ANALYZE (DEBUG).
func (bundle *diagnosticsBundle) insert(
ctx context.Context,
fingerprint string,
ast tree.Statement,
stmtDiagRecorder *stmtdiagnostics.Registry,
diagRequestID stmtdiagnostics.RequestID,
) {
var err error
bundle.diagID, err = stmtDiagRecorder.InsertStatementDiagnostics(
ctx,
diagRequestID,
fingerprint,
tree.AsString(ast),
bundle.traceJSON,
bundle.zip,
bundle.collectionErr,
)
if err != nil {
log.Warningf(ctx, "failed to report statement diagnostics: %s", err)
if bundle.collectionErr != nil {
bundle.collectionErr = err
}
}
return diagnosticsBundle{trace: traceJSON, zip: buf.Bytes()}, nil
}

// stmtBundleBuilder is a helper for building a statement bundle.
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/opt/xform/join_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (c *CustomFuncs) GenerateLookupJoins(

var pkCols opt.ColList
var iter scanIndexIter
iter.init(c.e.mem, &c.im, scanPrivate, on, rejectInvertedIndexes)
iter.Init(c.e.mem, &c.im, scanPrivate, on, rejectInvertedIndexes)
iter.ForEach(func(index cat.Index, onFilters memo.FiltersExpr, indexCols opt.ColSet, isCovering bool) {
// Find the longest prefix of index key columns that are constrained by
// an equality with another column or a constant.
Expand Down Expand Up @@ -434,7 +434,7 @@ func (c *CustomFuncs) GenerateInvertedJoins(
var pkCols opt.ColList

var iter scanIndexIter
iter.init(c.e.mem, &c.im, scanPrivate, on, rejectNonInvertedIndexes)
iter.Init(c.e.mem, &c.im, scanPrivate, on, rejectNonInvertedIndexes)
iter.ForEach(func(index cat.Index, on memo.FiltersExpr, indexCols opt.ColSet, isCovering bool) {
// Check whether the filter can constrain the index.
invertedExpr := invertedidx.TryJoinGeoIndex(
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/opt/xform/limit_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (c *CustomFuncs) GenerateLimitedScans(
// Iterate over all non-inverted, non-partial indexes, looking for those
// that can be limited.
var iter scanIndexIter
iter.init(c.e.mem, &c.im, scanPrivate, nil /* originalFilters */, rejectInvertedIndexes|rejectPartialIndexes)
iter.Init(c.e.mem, &c.im, scanPrivate, nil /* filters */, rejectInvertedIndexes|rejectPartialIndexes)
iter.ForEach(func(index cat.Index, filters memo.FiltersExpr, indexCols opt.ColSet, isCovering bool) {
newScanPrivate := *scanPrivate
newScanPrivate.Index = index.Ordinal()
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/opt/xform/scan_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
func (c *CustomFuncs) GenerateIndexScans(grp memo.RelExpr, scanPrivate *memo.ScanPrivate) {
// Iterate over all non-inverted and non-partial secondary indexes.
var iter scanIndexIter
iter.init(c.e.mem, &c.im, scanPrivate, nil /* originalFilters */, rejectPrimaryIndex|rejectInvertedIndexes)
iter.Init(c.e.mem, &c.im, scanPrivate, nil /* filters */, rejectPrimaryIndex|rejectInvertedIndexes)
iter.ForEach(func(index cat.Index, filters memo.FiltersExpr, indexCols opt.ColSet, isCovering bool) {
// If the secondary index includes the set of needed columns, then construct
// a new Scan operator using that index.
Expand Down
Loading

0 comments on commit 3e9ad0f

Please sign in to comment.