diff --git a/pkg/sql/colflow/explain_vec.go b/pkg/sql/colflow/explain_vec.go index 13d8f0c1eee1..a1711cf95ad3 100644 --- a/pkg/sql/colflow/explain_vec.go +++ b/pkg/sql/colflow/explain_vec.go @@ -101,9 +101,15 @@ type flowWithNode struct { // ExplainVec converts the flows (that are assumed to be vectorizable) into the // corresponding string representation. +// // It also supports printing of already constructed operator chains which takes // priority if non-nil (flows are ignored). All operators in opChains are // assumed to be planned on the gateway. +// +// As the second return parameter it returns a non-nil cleanup function which +// can be called only **after** closing the planNode tree containing the +// explainVecNode (if ExplainVec is used by another caller, then it can be +// called at any time). func ExplainVec( ctx context.Context, flowCtx *execinfra.FlowCtx, @@ -113,14 +119,25 @@ func ExplainVec( gatewayNodeID roachpb.NodeID, verbose bool, distributed bool, -) ([]string, error) { +) (_ []string, cleanup func(), _ error) { tp := treeprinter.NewWithStyle(treeprinter.CompactStyle) root := tp.Child("│") - var conversionErr error + var ( + cleanups []func() + err error + conversionErr error + ) + defer func() { + cleanup = func() { + for _, c := range cleanups { + c() + } + } + }() // It is possible that when iterating over execinfra.OpNodes we will hit a // panic (an input that doesn't implement OpNode interface), so we're // catching such errors. - if err := colexecerror.CatchVectorizedRuntimeError(func() { + if err = colexecerror.CatchVectorizedRuntimeError(func() { if opChains != nil { formatChains(root, gatewayNodeID, opChains, verbose) } else { @@ -132,8 +149,8 @@ func ExplainVec( // last. sort.Slice(sortedFlows, func(i, j int) bool { return sortedFlows[i].nodeID < sortedFlows[j].nodeID }) for _, flow := range sortedFlows { - opChains, cleanup, err := convertToVecTree(ctx, flowCtx, flow.flow, localProcessors, !distributed) - defer cleanup() + opChains, cleanup, err = convertToVecTree(ctx, flowCtx, flow.flow, localProcessors, !distributed) + cleanups = append(cleanups, cleanup) if err != nil { conversionErr = err return @@ -142,12 +159,12 @@ func ExplainVec( } } }); err != nil { - return nil, err + return nil, nil, err } if conversionErr != nil { - return nil, conversionErr + return nil, nil, conversionErr } - return tp.FormattedRows(), nil + return tp.FormattedRows(), nil, nil } func formatChains( diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index aba284251ffe..3df22737bab9 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -732,11 +732,12 @@ func (p *PlanningCtx) getDefaultSaveFlowsFunc( if planner.instrumentation.collectBundle && planner.curPlan.flags.IsSet(planFlagVectorized) { flowCtx := newFlowCtxForExplainPurposes(p, planner) getExplain := func(verbose bool) []string { - explain, err := colflow.ExplainVec( + explain, cleanup, err := colflow.ExplainVec( ctx, flowCtx, flows, p.infra.LocalProcessors, opChains, planner.extendedEvalCtx.DistSQLPlanner.gatewayNodeID, verbose, planner.curPlan.flags.IsDistributed(), ) + cleanup() if err != nil { // In some edge cases (like when subqueries are present or // when certain component doesn't implement execinfra.OpNode diff --git a/pkg/sql/explain_vec.go b/pkg/sql/explain_vec.go index 815fc7f407d7..ec53708e1d58 100644 --- a/pkg/sql/explain_vec.go +++ b/pkg/sql/explain_vec.go @@ -35,6 +35,8 @@ type explainVecNode struct { lines []string // The current row returned by the node. values tree.Datums + // cleanup will be called after closing the input tree. + cleanup func() } } @@ -74,7 +76,7 @@ func (n *explainVecNode) startExec(params runParams) error { return errors.New("vectorize is set to 'off'") } verbose := n.options.Flags[tree.ExplainFlagVerbose] - n.run.lines, err = colflow.ExplainVec( + n.run.lines, n.run.cleanup, err = colflow.ExplainVec( params.ctx, flowCtx, flows, physPlan.LocalProcessors, nil, /* opChains */ distSQLPlanner.gatewayNodeID, verbose, willDistribute, ) @@ -131,4 +133,7 @@ func (n *explainVecNode) Next(runParams) (bool, error) { func (n *explainVecNode) Values() tree.Datums { return n.run.values } func (n *explainVecNode) Close(ctx context.Context) { n.plan.close(ctx) + if n.run.cleanup != nil { + n.run.cleanup() + } } diff --git a/pkg/sql/logictest/testdata/logic_test/vectorize_local b/pkg/sql/logictest/testdata/logic_test/vectorize_local index 47846c77cfb2..0ed6ac2d024b 100644 --- a/pkg/sql/logictest/testdata/logic_test/vectorize_local +++ b/pkg/sql/logictest/testdata/logic_test/vectorize_local @@ -447,3 +447,20 @@ EXPLAIN (VEC) └ *sql.planNodeToRowSource └ *colfetcher.ColIndexJoin └ *colfetcher.ColBatchScan + +# Regression test for releasing operators before closing them with EXPLAIN (VEC) +# (#70438). +statement ok +CREATE TABLE t70438 (k INT PRIMARY KEY, v INT, UNIQUE INDEX foo (v)); +INSERT INTO t70438 VALUES (1, 2), (3, 4), (5, 6), (7, 8); + +query T +EXPLAIN (VEC) DELETE FROM t70438 WHERE k=3 OR v=6 +---- +│ +└ Node 1 + └ *sql.planNodeToRowSource + └ *colexec.unorderedDistinct + └ *colexec.SerialUnorderedSynchronizer + ├ *colfetcher.ColBatchScan + └ *colfetcher.ColBatchScan