Skip to content

Commit

Permalink
Merge #27863
Browse files Browse the repository at this point in the history
27863: distsqlrun: merge the execution engines r=jordanlewis a=jordanlewis

Fixes  #28184.

This commit merges the DistSQL and local SQL execution engines. The
strategy is thus:

The method to check whether DistSQL supports a planNode tree is
unchanged, but the results are interpreted differently. A result that
says "unsupported" is interpreted as "keep this plan on the gateway
node" rather than "fall back to the local execution engine".

If a planNode is unsupported by DistSQL (doesn't have a corresponding
DistSQL Processor implementation), instead of giving up, the DistSQL
physical planner emits a special wrapper processor (the
`planNodeToRowSource` processor) that simply converts the output of the
planNode into a form amenable for further consumption by DistSQL
processors.

The wrapped planNode isn't serialized to a processor spec - instead,
since we guarantee that a plan with a wrapped planNode will be executed
on the gateway node, we serialize the index of the local planNode into a
slice of localPlanNodes that's passed to the execution engine in a side
channel, and not serialized to a proto.

Release note (sql change): all queries now run through the DistSQL
execution engine.

Co-authored-by: Jordan Lewis <[email protected]>
  • Loading branch information
craig[bot] and jordanlewis committed Aug 9, 2018
2 parents 5ea456c + 1665a1b commit f248bf0
Show file tree
Hide file tree
Showing 55 changed files with 1,339 additions and 768 deletions.
9 changes: 7 additions & 2 deletions pkg/acceptance/testdata/c/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ int main(int argc, char *argv[]) {
return 1;
}


/* Always call first on any conn that is to be used with libpqtypes */
PQinitTypes(conn);

Expand All @@ -161,9 +162,11 @@ int main(int argc, char *argv[]) {
return 1;
}

// '1401-01-19 BC'
// '1401-01-19'
PGdate date;
date.isbc = 1;
// TODO(jordan): put this back when #28099 is fixed.
// date.isbc = 1;
date.isbc = 0;
date.year = 1401;
date.mon = 0;
date.mday = 19;
Expand Down Expand Up @@ -250,6 +253,7 @@ int main(int argc, char *argv[]) {
tstz.time.usec = 0;
tstz.time.withtz = 1;
tstz.time.gmtoff = 0;

if (!PQputf(param, "%timestamptz", &tstz)) {
fprintf(stderr, "ERROR PQputf(timestamptz): %s\n", PQgeterror());
return 1;
Expand Down Expand Up @@ -419,6 +423,7 @@ int main(int argc, char *argv[]) {
fprintf(stderr, "ERROR resultFormat=%d PQgetf(timestamptz): %s\n", resultFormat, PQgeterror());
return 1;
}

if (!timestampEqual(recvtstz, tstz)) {
fprintf(stderr, "resultFormat=%d expected:\n", resultFormat);
timestampPrint(tstz);
Expand Down
4 changes: 0 additions & 4 deletions pkg/ccl/importccl/exportcsv.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ func exportPlanHook(
return nil, nil, nil, err
}

if !p.DistSQLPlanner().CheckPossible(sel) {
return nil, nil, nil, errors.Errorf("unsupported EXPORT query -- as an alternative try `cockroach sql --format=csv`")
}

fn := func(ctx context.Context, plans []sql.PlanNode, resultsCh chan<- tree.Datums) error {
ctx, span := tracing.ChildSpan(ctx, exportStmt.StatementTag())
defer tracing.FinishSpan(span)
Expand Down
16 changes: 0 additions & 16 deletions pkg/ccl/importccl/exportcsv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,22 +156,6 @@ func TestMultiNodeExportStmt(t *testing.T) {
}
}

func TestExportNonTable(t *testing.T) {
defer leaktest.AfterTest(t)()
tc := testcluster.StartTestCluster(t, 3,
base.TestClusterArgs{ServerArgs: base.TestServerArgs{UseDatabase: "test"}},
)
defer tc.Stopper().Stop(context.Background())
db := sqlutils.MakeSQLRunner(tc.Conns[0])
db.Exec(t, "CREATE DATABASE test")

if _, err := db.DB.Exec(
`EXPORT INTO CSV 'nodelocal:///series' WITH chunk_rows = '10' FROM SELECT generate_series(1, 100)`,
); !testutils.IsError(err, "unsupported EXPORT query") {
t.Fatal(err)
}
}

func TestExportJoin(t *testing.T) {
defer leaktest.AfterTest(t)()
dir, cleanupDir := testutils.TempDir(t)
Expand Down
1 change: 1 addition & 0 deletions pkg/cli/interactive_tests/test_sql_mem_monitor.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ expect {
"out of memory" {}
"cannot allocate memory" {}
"std::bad_alloc" {}
"Resource temporarily unavailable" {}
timeout { handle_timeout "memory allocation error" }
}
eexpect ":/# "
Expand Down
56 changes: 28 additions & 28 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,16 @@ func (ex *connExecutor) dispatchToExecutionEngine(
res.SetError(err)
return nil
}
defer planner.curPlan.close(ctx)
// We only need to close the plan if we don't hand it off to DistSQL. Once
// we hand it off, DistSQL will take care of closing it.
// TODO(jordan): once we add partial plan wrapping, this will need to change.
// We'll need to close all of the nodes that weren't taken over by DistSQL.
needClose := true
defer func() {
if needClose {
planner.curPlan.close(ctx)
}
}()

var cols sqlbase.ResultColumns
if stmt.AST.StatementType() == tree.Rows {
Expand All @@ -754,29 +763,15 @@ func (ex *connExecutor) dispatchToExecutionEngine(
}

ex.sessionTracing.TracePlanCheckStart(ctx)
useDistSQL := false
distributePlan := false
// If we use the optimizer and we are in "local" mode, don't try to
// distribute.
if ex.sessionData.OptimizerMode != sessiondata.OptimizerLocal {
ok, err := planner.prepareForDistSQLSupportCheck(
ctx, ex.sessionData.DistSQLMode == sessiondata.DistSQLAlways,
)
if err != nil {
ex.sessionTracing.TracePlanCheckEnd(ctx, err, false)
res.SetError(err)
return nil
}
if ok {
useDistSQL, err = shouldUseDistSQL(
ctx, ex.sessionData.DistSQLMode, ex.server.cfg.DistSQLPlanner, planner.curPlan.plan)
if err != nil {
ex.sessionTracing.TracePlanCheckEnd(ctx, err, false)
res.SetError(err)
return nil
}
}
planner.prepareForDistSQLSupportCheck()
distributePlan = shouldDistributePlan(
ctx, ex.sessionData.DistSQLMode, ex.server.cfg.DistSQLPlanner, planner.curPlan.plan)
}
ex.sessionTracing.TracePlanCheckEnd(ctx, nil, useDistSQL)
ex.sessionTracing.TracePlanCheckEnd(ctx, nil, distributePlan)

if ex.server.cfg.TestingKnobs.BeforeExecute != nil {
ex.server.cfg.TestingKnobs.BeforeExecute(ctx, stmt.String(), false /* isParallel */)
Expand All @@ -791,12 +786,13 @@ func (ex *connExecutor) dispatchToExecutionEngine(
panic(fmt.Sprintf("query %d not in registry", stmt.queryID))
}
queryMeta.phase = executing
queryMeta.isDistributed = useDistSQL
queryMeta.isDistributed = distributePlan
ex.mu.Unlock()

if useDistSQL {
if ex.sessionData.DistSQLMode != sessiondata.DistSQLOff {
needClose = false
ex.sessionTracing.TraceExecStart(ctx, "distributed")
err = ex.execWithDistSQLEngine(ctx, planner, stmt.AST.StatementType(), res)
err = ex.execWithDistSQLEngine(ctx, planner, stmt.AST.StatementType(), res, distributePlan)
} else {
ex.sessionTracing.TraceExecStart(ctx, "local")
err = ex.execWithLocalEngine(ctx, planner, stmt.AST.StatementType(), res)
Expand All @@ -807,7 +803,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
return err
}
ex.recordStatementSummary(
planner, stmt, useDistSQL, optimizerPlanned,
planner, stmt, distributePlan, optimizerPlanned,
ex.extraTxnState.autoRetryCounter, res.RowsAffected(), res.Err(),
&ex.server.EngineMetrics,
)
Expand Down Expand Up @@ -913,7 +909,11 @@ func (ex *connExecutor) execWithLocalEngine(
// If an error is returned, the connection needs to stop processing queries.
// Query execution errors are written to res; they are not returned.
func (ex *connExecutor) execWithDistSQLEngine(
ctx context.Context, planner *planner, stmtType tree.StatementType, res RestrictedCommandResult,
ctx context.Context,
planner *planner,
stmtType tree.StatementType,
res RestrictedCommandResult,
distribute bool,
) error {
recv := makeDistSQLReceiver(
ctx, res, stmtType,
Expand All @@ -924,9 +924,9 @@ func (ex *connExecutor) execWithDistSQLEngine(
},
&ex.sessionTracing,
)
ex.server.cfg.DistSQLPlanner.PlanAndRun(
ctx, planner.txn, planner, planner.curPlan.plan, recv, planner.ExtendedEvalContext(),
)
// We pass in whether or not we wanted to distribute this plan, which tells
// the planner whether or not to plan remote table readers.
ex.server.cfg.DistSQLPlanner.PlanAndRun(ctx, planner, recv, distribute)
return recv.commErr
}

Expand Down
17 changes: 15 additions & 2 deletions pkg/sql/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (p *planner) distinct(

func (n *distinctNode) startExec(params runParams) error {
flowCtx := &distsqlrun.FlowCtx{
EvalCtx: *params.EvalContext(),
EvalCtx: params.EvalContext(),
}

cols := make([]int, len(planColumns(n.plan)))
Expand All @@ -200,14 +200,27 @@ func (n *distinctNode) startExec(params runParams) error {

spec := createDistinctSpec(n, cols)

input, err := makePlanNodeToRowSource(n.plan, params)
input, err := makePlanNodeToRowSource(n.plan, params, false)
if err != nil {
return err
}
if len(spec.DistinctColumns) == 0 {
return errors.New("cannot initialize a distinctNode with 0 columns")
}

// Normally, startExec isn't recursive, since it's invoked for all nodes using
// the planTree walker. And as normal, the walker will startExec the source
// of this distinct.
// But, we also need to startExec our planNodeToRowSource to properly
// initialize it. That won't get touched via the planNode walker, so we have
// to do it recursively here.
if err := input.startExec(params); err != nil {
return err
}
if err := input.InitWithOutput(&distsqlrun.PostProcessSpec{}, nil); err != nil {
return err
}

post := &distsqlrun.PostProcessSpec{} // post is not used as we only use the processor for the core distinct logic.
var output distsqlrun.RowReceiver // output is never used as distinct is only run as a RowSource.

Expand Down
Loading

0 comments on commit f248bf0

Please sign in to comment.