Skip to content

Commit

Permalink
sql: refactor Executor.execStmtInParallel
Browse files Browse the repository at this point in the history
... to not take in a StatementResult.

Before this patch, Executor.execStmtInParallel() was taking in a
StatementResult and writing mocked results to it. I've lifted the
mocking to the parent, since this matches an important concept -
parallel statements don't return results, so it was unnatural for
execStmtInParallel() to look like it's returning them.
  • Loading branch information
andreimatei committed Dec 12, 2017
1 parent a69aa0c commit 06d6568
Showing 1 changed file with 44 additions and 26 deletions.
70 changes: 44 additions & 26 deletions pkg/sql/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1812,7 +1812,20 @@ func (e *Executor) execStmtInOpenTxn(
// statements outside of a transaction are run synchronously with mocked
// results, which has the same effect as running asynchronously but
// immediately blocking.
err = e.execStmtInParallel(stmt, p, res)
err = func() error {
cols, err := e.execStmtInParallel(stmt, p)
if err != nil {
return err
}
// Produce mocked out results for the query - the "zero value" of the
// statement's result type:
// - tree.Rows -> an empty set of rows
// - tree.RowsAffected -> zero rows affected
if err := initStatementResult(res, stmt, cols); err != nil {
return err
}
return res.CloseResult()
}()
} else {
p.autoCommit = txnState.implicitTxn && !e.cfg.TestingKnobs.DisableAutoCommit
err = e.execStmt(stmt, p, automaticRetryCount, res)
Expand Down Expand Up @@ -2122,16 +2135,16 @@ func (e *Executor) shouldUseDistSQL(planner *planner, plan planNode) (bool, erro
}

// initStatementResult initializes res according to a query.
func initStatementResult(res StatementResult, stmt Statement, plan planNode) error {
//
// cols represents the columns of the result rows. Should be nil if
// stmt.AST.StatementType() != tree.Rows.
func initStatementResult(res StatementResult, stmt Statement, cols sqlbase.ResultColumns) error {
stmtAst := stmt.AST
res.BeginResult(stmtAst)
if stmtAst.StatementType() == tree.Rows {
columns := planColumns(plan)
res.SetColumns(columns)
for _, c := range columns {
if err := checkResultType(c.Typ); err != nil {
return err
}
res.SetColumns(cols)
for _, c := range cols {
if err := checkResultType(c.Typ); err != nil {
return err
}
}
return nil
Expand All @@ -2154,7 +2167,11 @@ func (e *Executor) execStmt(

defer plan.Close(ctx)

err = initStatementResult(res, stmt, plan)
var cols sqlbase.ResultColumns
if stmt.AST.StatementType() == tree.Rows {
cols = planColumns(plan)
}
err = initStatementResult(res, stmt, cols)
if err != nil {
return err
}
Expand Down Expand Up @@ -2188,18 +2205,20 @@ func (e *Executor) execStmt(
return res.CloseResult()
}

// execStmtInParallel executes the statement asynchronously and writes mocked
// out results to res. These mocked out results will be the "zero value"
// of the statement's result type:
// - tree.Rows -> an empty set of rows
// - tree.RowsAffected -> zero rows affected
// execStmtInParallel executes a query asynchronously: the query will wait for
// all other currently executing async queries which are not independent, and
// then it will run.
// Async queries don't produce results (apart from errors). Note that planning
// needs to be done synchronously because it's needed by the query dependency
// analysis.
// A list of columns is returned for purposes of initializing the statement
// results. This will be nil if the query's result is of type "RowsAffected".
//
// TODO(nvanbenschoten): We do not currently support parallelizing distributed SQL
// queries, so this method can only be used with classical SQL.
// TODO(andrei): the mocking of results business should be done by the caller.
func (e *Executor) execStmtInParallel(
stmt Statement, planner *planner, res StatementResult,
) (retErr error) {
stmt Statement, planner *planner,
) (sqlbase.ResultColumns, error) {
session := planner.session
ctx := session.Ctx()
params := runParams{
Expand All @@ -2210,12 +2229,11 @@ func (e *Executor) execStmtInParallel(

plan, err := planner.makePlan(ctx, stmt)
if err != nil {
return err
return nil, err
}

err = initStatementResult(res, stmt, plan)
if err != nil {
return err
var cols sqlbase.ResultColumns
if stmt.AST.StatementType() == tree.Rows {
cols = planColumns(plan)
}

// This ensures we don't unintentionally clean up the queryMeta object when we
Expand All @@ -2226,7 +2244,7 @@ func (e *Executor) execStmtInParallel(
// TODO(andrei): this should really be a result writer implementation that
// does nothing.
bufferedWriter := newBufferedWriter(session.makeBoundAccount())
err := initStatementResult(bufferedWriter, stmt, plan)
err := initStatementResult(bufferedWriter, stmt, cols)
if err != nil {
return err
}
Expand All @@ -2248,10 +2266,10 @@ func (e *Executor) execStmtInParallel(
session.removeActiveQuery(stmt.queryID)
return err
}); err != nil {
return err
return nil, err
}

return res.CloseResult()
return cols, nil
}

// updateStmtCounts updates metrics for the number of times the different types of SQL
Expand Down

0 comments on commit 06d6568

Please sign in to comment.