diff --git a/pkg/sql/executor.go b/pkg/sql/executor.go index 3f06dc089efd..59ad8b7c0fb3 100644 --- a/pkg/sql/executor.go +++ b/pkg/sql/executor.go @@ -1813,7 +1813,20 @@ func (e *Executor) execStmtInOpenTxn( // statements outside of a transaction are run synchronously with mocked // results, which has the same effect as running asynchronously but // immediately blocking. - err = e.execStmtInParallel(stmt, p, res) + err = func() error { + cols, err := e.execStmtInParallel(stmt, p) + if err != nil { + return err + } + // Produce mocked out results for the query - the "zero value" of the + // statement's result type: + // - tree.Rows -> an empty set of rows + // - tree.RowsAffected -> zero rows affected + if err := initStatementResult(res, stmt, cols); err != nil { + return err + } + return res.CloseResult() + }() } else { p.autoCommit = txnState.implicitTxn && !e.cfg.TestingKnobs.DisableAutoCommit err = e.execStmt(stmt, p, automaticRetryCount, res) @@ -2123,16 +2136,16 @@ func (e *Executor) shouldUseDistSQL(planner *planner, plan planNode) (bool, erro } // initStatementResult initializes res according to a query. -func initStatementResult(res StatementResult, stmt Statement, plan planNode) error { +// +// cols represents the columns of the result rows. Should be nil if +// stmt.AST.StatementType() != tree.Rows. +func initStatementResult(res StatementResult, stmt Statement, cols sqlbase.ResultColumns) error { stmtAst := stmt.AST res.BeginResult(stmtAst) - if stmtAst.StatementType() == tree.Rows { - columns := planColumns(plan) - res.SetColumns(columns) - for _, c := range columns { - if err := checkResultType(c.Typ); err != nil { - return err - } + res.SetColumns(cols) + for _, c := range cols { + if err := checkResultType(c.Typ); err != nil { + return err } } return nil @@ -2155,7 +2168,11 @@ func (e *Executor) execStmt( defer plan.Close(ctx) - err = initStatementResult(res, stmt, plan) + var cols sqlbase.ResultColumns + if stmt.AST.StatementType() == tree.Rows { + cols = planColumns(plan) + } + err = initStatementResult(res, stmt, cols) if err != nil { return err } @@ -2189,18 +2206,20 @@ func (e *Executor) execStmt( return res.CloseResult() } -// execStmtInParallel executes the statement asynchronously and writes mocked -// out results to res. These mocked out results will be the "zero value" -// of the statement's result type: -// - tree.Rows -> an empty set of rows -// - tree.RowsAffected -> zero rows affected +// execStmtInParallel executes a query asynchronously: the query will wait for +// all other currently executing async queries which are not independent, and +// then it will run. +// Async queries don't produce results (apart from errors). Note that planning +// needs to be done synchronously because it's needed by the query dependency +// analysis. +// A list of columns is returned for purposes of initializing the statement +// results. This will be nil if the query's result is of type "RowsAffected". // // TODO(nvanbenschoten): We do not currently support parallelizing distributed SQL // queries, so this method can only be used with classical SQL. -// TODO(andrei): the mocking of results business should be done by the caller. func (e *Executor) execStmtInParallel( - stmt Statement, planner *planner, res StatementResult, -) (retErr error) { + stmt Statement, planner *planner, +) (sqlbase.ResultColumns, error) { session := planner.session ctx := session.Ctx() params := runParams{ @@ -2211,12 +2230,11 @@ func (e *Executor) execStmtInParallel( plan, err := planner.makePlan(ctx, stmt) if err != nil { - return err + return nil, err } - - err = initStatementResult(res, stmt, plan) - if err != nil { - return err + var cols sqlbase.ResultColumns + if stmt.AST.StatementType() == tree.Rows { + cols = planColumns(plan) } // This ensures we don't unintentionally clean up the queryMeta object when we @@ -2227,7 +2245,7 @@ func (e *Executor) execStmtInParallel( // TODO(andrei): this should really be a result writer implementation that // does nothing. bufferedWriter := newBufferedWriter(session.makeBoundAccount()) - err := initStatementResult(bufferedWriter, stmt, plan) + err := initStatementResult(bufferedWriter, stmt, cols) if err != nil { return err } @@ -2249,10 +2267,10 @@ func (e *Executor) execStmtInParallel( session.removeActiveQuery(stmt.queryID) return err }); err != nil { - return err + return nil, err } - return res.CloseResult() + return cols, nil } // updateStmtCounts updates metrics for the number of times the different types of SQL