From b3eaf62a71b184d2b30d84c7e2f2617f48381748 Mon Sep 17 00:00:00 2001 From: Andrew Kimball Date: Fri, 16 Nov 2018 08:14:16 -0800 Subject: [PATCH] opt: Add execbuilder support for Insert operator Add handling for the Insert operator in execbuilder and in the exec factory. This is mostly code copied over from sql/insert.go. Release note: None --- .../logictest/testdata/logic_test/computed | 2 +- .../logictest/testdata/logic_test/optimizer | 4 +- .../testdata/logic_test/statement_statistics | 2 +- pkg/sql/logictest/testdata/logic_test/txn | 10 +- .../exec/execbuilder/relational_builder.go | 45 +++- pkg/sql/opt/exec/execbuilder/testdata/insert | 155 +++++++------ pkg/sql/opt/exec/execbuilder/testdata/orderby | 19 +- pkg/sql/opt/exec/execbuilder/testdata/spool | 206 ++++++++++-------- pkg/sql/opt/exec/factory.go | 6 + pkg/sql/opt_exec_factory.go | 171 ++++++++++++--- pkg/sql/plan.go | 3 +- 11 files changed, 414 insertions(+), 209 deletions(-) diff --git a/pkg/sql/logictest/testdata/logic_test/computed b/pkg/sql/logictest/testdata/logic_test/computed index 36951cb78a7f..f2903d39fbc6 100644 --- a/pkg/sql/logictest/testdata/logic_test/computed +++ b/pkg/sql/logictest/testdata/logic_test/computed @@ -722,7 +722,7 @@ CREATE TABLE error_check (k INT PRIMARY KEY, s STRING, i INT AS (s::INT) STORED) statement ok INSERT INTO error_check VALUES(1, '1') -statement error computed column i: +statement error could not parse "foo" as type int: strconv.ParseInt INSERT INTO error_check VALUES(2, 'foo') statement error computed column i: diff --git a/pkg/sql/logictest/testdata/logic_test/optimizer b/pkg/sql/logictest/testdata/logic_test/optimizer index 248cdcefe20e..0e655b148dee 100644 --- a/pkg/sql/logictest/testdata/logic_test/optimizer +++ b/pkg/sql/logictest/testdata/logic_test/optimizer @@ -97,8 +97,8 @@ CREATE SEQUENCE seq statement ok SET OPTIMIZER = ALWAYS -query error pq: unsupported statement: \*tree\.Insert -INSERT INTO test (k, v) VALUES (5, 50) +query error pq: unsupported statement: \*tree\.Delete +DELETE FROM test WHERE k=5 # Don't fall back to heuristic planner in ALWAYS mode. query error pq: aggregates with FILTER are not supported yet diff --git a/pkg/sql/logictest/testdata/logic_test/statement_statistics b/pkg/sql/logictest/testdata/logic_test/statement_statistics index 0db49e846eec..8376eb9bd694 100644 --- a/pkg/sql/logictest/testdata/logic_test/statement_statistics +++ b/pkg/sql/logictest/testdata/logic_test/statement_statistics @@ -112,7 +112,7 @@ SELECT key,flags WHERE application_name = 'valuetest' ORDER BY key, flags ---- key flags -INSERT INTO test VALUES (_, _, __more1__), (__more1__) - +INSERT INTO test VALUES (_, _, __more1__), (__more1__) · SELECT (_, _, __more3__) FROM test WHERE _ · SELECT key FROM test.crdb_internal.node_statement_statistics · SELECT sin(_) · diff --git a/pkg/sql/logictest/testdata/logic_test/txn b/pkg/sql/logictest/testdata/logic_test/txn index ad415714dd1d..900ac5015466 100644 --- a/pkg/sql/logictest/testdata/logic_test/txn +++ b/pkg/sql/logictest/testdata/logic_test/txn @@ -931,19 +931,19 @@ statement ok COMMIT statement error cannot execute CREATE TABLE in a read-only transaction -CREATE TABLE a (a int) +CREATE TABLE tab (a int) statement error cannot execute INSERT in a read-only transaction -INSERT INTO a VALUES(1) +INSERT INTO kv VALUES('foo') statement error cannot execute UPDATE in a read-only transaction -UPDATE a SET a = 1 +UPDATE kv SET v = 'foo' statement error cannot execute INSERT in a read-only transaction -UPSERT INTO a VALUES(2) +UPSERT INTO kv VALUES('foo') statement error cannot execute DELETE in a read-only transaction -DELETE FROM a +DELETE FROM kv statement error cannot execute nextval\(\) in a read-only transaction SELECT nextval('a') diff --git a/pkg/sql/opt/exec/execbuilder/relational_builder.go b/pkg/sql/opt/exec/execbuilder/relational_builder.go index a1f7662f6313..c8e0c1c367c3 100644 --- a/pkg/sql/opt/exec/execbuilder/relational_builder.go +++ b/pkg/sql/opt/exec/execbuilder/relational_builder.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" "github.com/cockroachdb/cockroach/pkg/sql/opt/ordering" "github.com/cockroachdb/cockroach/pkg/sql/opt/props/physical" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/encoding" @@ -118,6 +119,14 @@ func (ep *execPlan) sqlOrdering(ordering opt.Ordering) sqlbase.ColumnOrdering { func (b *Builder) buildRelational(e memo.RelExpr) (execPlan, error) { var ep execPlan var err error + + // Raise error if a mutation op is part of a read-only transaction. + if opt.IsMutationOp(e) && b.evalCtx.TxnReadOnly { + return execPlan{}, pgerror.NewErrorf(pgerror.CodeReadOnlySQLTransactionError, + "cannot execute %s in a read-only transaction", e.Op().SyntaxTag()) + } + + // Handle read-only operators which never write data or modify schema. switch t := e.(type) { case *memo.ValuesExpr: ep, err = b.buildValues(t) @@ -167,6 +176,9 @@ func (b *Builder) buildRelational(e memo.RelExpr) (execPlan, error) { case *memo.ProjectSetExpr: ep, err = b.buildProjectSet(t) + case *memo.InsertExpr: + ep, err = b.buildInsert(t) + default: if opt.IsSetOp(e) { ep, err = b.buildSetOp(e) @@ -179,11 +191,12 @@ func (b *Builder) buildRelational(e memo.RelExpr) (execPlan, error) { if opt.IsJoinApplyOp(e) { return execPlan{}, b.decorrelationError() } - return execPlan{}, errors.Errorf("unsupported relational op %s", e.Op()) } if err != nil { return execPlan{}, err } + + // Wrap the expression in a render expression if presentation requires it. if p := e.RequiredPhysical(); !p.Presentation.Any() { ep, err = b.applyPresentation(ep, p) } @@ -937,6 +950,36 @@ func (b *Builder) buildProjectSet(projectSet *memo.ProjectSetExpr) (execPlan, er return ep, nil } +func (b *Builder) buildInsert(ins *memo.InsertExpr) (execPlan, error) { + // Build the input query and ensure that the input columns that correspond to + // the table columns are projected. + input, err := b.buildRelational(ins.Input) + if err != nil { + return execPlan{}, err + } + input, err = b.ensureColumns(input, ins.InputCols, nil, ins.ProvidedPhysical().Ordering) + if err != nil { + return execPlan{}, err + } + + // Construct the Insert node. + tab := b.mem.Metadata().Table(ins.Table) + node, err := b.factory.ConstructInsert(input.root, tab, ins.NeedResults) + if err != nil { + return execPlan{}, err + } + + // If INSERT returns rows, they contain all non-mutation columns from the + // table, in the same order they're defined in the table. + ep := execPlan{root: node} + if ins.NeedResults { + for i, n := 0, tab.ColumnCount(); i < n; i++ { + ep.outputCols.Set(int(ins.InputCols[i]), i) + } + } + return ep, nil +} + // needProjection figures out what projection is needed on top of the input plan // to produce the given list of columns. If the input plan already produces // the columns (in the same order), returns needProj=false. diff --git a/pkg/sql/opt/exec/execbuilder/testdata/insert b/pkg/sql/opt/exec/execbuilder/testdata/insert index 780fd815b52b..fd51122aeab7 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/insert +++ b/pkg/sql/opt/exec/execbuilder/testdata/insert @@ -298,20 +298,22 @@ SELECT tree, field, description FROM [ EXPLAIN (VERBOSE) INSERT INTO insert_t TABLE select_t ORDER BY v DESC ] ---- -count · · - └── insert · · - │ into insert_t(x, v, rowid) - │ default 0 NULL - │ default 1 NULL - │ default 2 unique_rowid() - └── sort · · - │ order -v - └── render · · - │ render 0 test.public.select_t.x - │ render 1 test.public.select_t.v - └── scan · · -· table select_t@primary -· spans ALL +count · · + └── insert · · + │ into insert_t(x, v, rowid) + └── render · · + │ render 0 x + │ render 1 v + │ render 2 column7 + └── sort · · + │ order -v + └── render · · + │ render 0 unique_rowid() + │ render 1 x + │ render 2 v + └── scan · · +· table select_t@primary +· spans ALL # Check that INSERT supports LIMIT (MySQL extension) query TTT @@ -319,72 +321,93 @@ SELECT tree, field, description FROM [ EXPLAIN (VERBOSE) INSERT INTO insert_t SELECT * FROM select_t LIMIT 1 ] ---- -count · · - └── insert · · - │ into insert_t(x, v, rowid) - │ default 0 NULL - │ default 1 NULL - │ default 2 unique_rowid() - └── limit · · - │ count 1 - └── render · · - │ render 0 test.public.select_t.x - │ render 1 test.public.select_t.v - └── scan · · -· table select_t@primary -· spans ALL -· limit 1 +count · · + └── insert · · + │ into insert_t(x, v, rowid) + └── render · · + │ render 0 x + │ render 1 v + │ render 2 unique_rowid() + └── scan · · +· table select_t@primary +· spans ALL +· limit 1 # Check the grouping of LIMIT and ORDER BY query TTT EXPLAIN (PLAN) INSERT INTO insert_t VALUES (1,1), (2,2) LIMIT 1 ---- -count · · - └── insert · · - │ into insert_t(x, v, rowid) - └── limit · · - │ count 1 - └── values · · -· size 2 columns, 2 rows +count · · + └── insert · · + │ into insert_t(x, v, rowid) + └── render · · + └── limit · · + │ count 1 + └── values · · +· size 2 columns, 2 rows query TTT EXPLAIN (PLAN) INSERT INTO insert_t VALUES (1,1), (2,2) ORDER BY 2 LIMIT 1 ---- -count · · - └── insert · · - │ into insert_t(x, v, rowid) - └── limit · · - │ count 1 - └── sort · · - │ order +column2 - │ strategy top 1 - └── values · · -· size 2 columns, 2 rows +count · · + └── insert · · + │ into insert_t(x, v, rowid) + └── render · · + └── limit · · + │ count 1 + └── sort · · + │ order +column2 + └── values · · +· size 2 columns, 2 rows query TTT EXPLAIN (PLAN) INSERT INTO insert_t (VALUES (1,1), (2,2) ORDER BY 2) LIMIT 1 ---- -count · · - └── insert · · - │ into insert_t(x, v, rowid) - └── limit · · - │ count 1 - └── sort · · - │ order +column2 - │ strategy top 1 - └── values · · -· size 2 columns, 2 rows +count · · + └── insert · · + │ into insert_t(x, v, rowid) + └── render · · + └── limit · · + │ count 1 + └── sort · · + │ order +column2 + └── values · · +· size 2 columns, 2 rows query TTT EXPLAIN (PLAN) INSERT INTO insert_t (VALUES (1,1), (2,2) ORDER BY 2 LIMIT 1) ---- -count · · - └── insert · · - │ into insert_t(x, v, rowid) - └── limit · · - │ count 1 - └── sort · · - │ order +column2 - │ strategy top 1 - └── values · · -· size 2 columns, 2 rows +count · · + └── insert · · + │ into insert_t(x, v, rowid) + └── render · · + └── limit · · + │ count 1 + └── sort · · + │ order +column2 + └── values · · +· size 2 columns, 2 rows + +# ORDER BY expression that's not inserted into table. +query TTTTT +EXPLAIN (VERBOSE) INSERT INTO insert_t (SELECT length(k), 2 FROM kv ORDER BY k || v) RETURNING x+v +---- +render · · ("?column?") · + │ render 0 x + v · · + └── run · · (x, v, rowid[hidden]) · + └── insert · · (x, v, rowid[hidden]) · + │ into insert_t(x, v, rowid) · · + └── render · · (length, "?column?", column9) · + │ render 0 length · · + │ render 1 "?column?" · · + │ render 2 column9 · · + └── sort · · (column9, length, "?column?", column8) +column8 + │ order +column8 · · + └── render · · (column9, length, "?column?", column8) · + │ render 0 unique_rowid() · · + │ render 1 length(k) · · + │ render 2 2 · · + │ render 3 k || v · · + └── scan · · (k, v) · +· table kv@primary · · +· spans ALL · · diff --git a/pkg/sql/opt/exec/execbuilder/testdata/orderby b/pkg/sql/opt/exec/execbuilder/testdata/orderby index bbe47e064fbf..31060eafe9b2 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/orderby +++ b/pkg/sql/opt/exec/execbuilder/testdata/orderby @@ -482,15 +482,16 @@ ordinality · · (x, "ordinality") · query TTTTT EXPLAIN (VERBOSE) INSERT INTO t(a, b) SELECT * FROM (SELECT 1 AS x, 2 AS y) ORDER BY x RETURNING b ---- -render · · (b) · - │ render 0 test.public.t.b · · - └── run · · (a, b, c) · - └── insert · · (a, b, c) · - │ into t(a, b) · · - └── render · · (x, y) x=CONST; y=CONST - │ render 0 1 · · - │ render 1 2 · · - └── emptyrow · · () · +render · · (b) · + │ render 0 b · · + └── run · · (a, b, c) · + └── insert · · (a, b, c) · + │ into t(a, b, c) · · + └── values · · (x, y, column6) · +· size 3 columns, 1 row · · +· row 0, expr 0 1 · · +· row 0, expr 1 2 · · +· row 0, expr 2 NULL · · query TTTTT EXPLAIN (VERBOSE) DELETE FROM t WHERE a = 3 RETURNING b diff --git a/pkg/sql/opt/exec/execbuilder/testdata/spool b/pkg/sql/opt/exec/execbuilder/testdata/spool index df0eee150b5f..aef0619d97e0 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/spool +++ b/pkg/sql/opt/exec/execbuilder/testdata/spool @@ -1,22 +1,26 @@ # LogicTest: local-opt statement ok -CREATE TABLE t(x INT PRIMARY KEY); INSERT INTO t VALUES(1); +CREATE TABLE t(x INT PRIMARY KEY) + +statement ok +CREATE TABLE t2(x INT PRIMARY KEY) # Check that if a mutation uses further processing, a spool is added. query TTT -EXPLAIN WITH a AS (INSERT INTO t VALUES (2) RETURNING x) +EXPLAIN WITH a AS (INSERT INTO t SELECT * FROM t2 RETURNING x) SELECT * FROM a LIMIT 1 ---- -limit · · - │ count 1 - └── spool · · - │ limit 1 - └── run · · - └── insert · · - │ into t(x) - └── values · · -· size 1 column, 1 row +limit · · + │ count 1 + └── spool · · + │ limit 1 + └── run · · + └── insert · · + │ into t(x) + └── scan · · +· table t2@primary +· spans ALL query TTT EXPLAIN WITH a AS (DELETE FROM t RETURNING x) @@ -67,17 +71,18 @@ limit · · # Ditto all mutations, with the statement source syntax. query TTT -EXPLAIN SELECT * FROM [INSERT INTO t VALUES (2) RETURNING x] LIMIT 1 +EXPLAIN SELECT * FROM [INSERT INTO t SELECT * FROM t2 RETURNING x] LIMIT 1 ---- -limit · · - │ count 1 - └── spool · · - │ limit 1 - └── run · · - └── insert · · - │ into t(x) - └── values · · -· size 1 column, 1 row +limit · · + │ count 1 + └── spool · · + │ limit 1 + └── run · · + └── insert · · + │ into t(x) + └── scan · · +· table t2@primary +· spans ALL query TTT EXPLAIN SELECT * FROM [DELETE FROM t RETURNING x] LIMIT 1 @@ -124,105 +129,116 @@ limit · · # Check that a spool is also inserted for other processings than LIMIT. query TTT -EXPLAIN SELECT * FROM [INSERT INTO t VALUES (2) RETURNING x] ORDER BY x +EXPLAIN SELECT count(*) FROM [INSERT INTO t SELECT * FROM t2 RETURNING x] ---- -sort · · - │ order +x - └── spool · · - └── run · · - └── insert · · - │ into t(x) - └── values · · -· size 1 column, 1 row +group · · + │ aggregate 0 count_rows() + │ scalar · + └── spool · · + └── render · · + └── run · · + └── insert · · + │ into t(x) + └── scan · · +· table t2@primary +· spans ALL query TTT -EXPLAIN SELECT * FROM [INSERT INTO t VALUES (2) RETURNING x], t +EXPLAIN SELECT * FROM [INSERT INTO t SELECT * FROM t2 RETURNING x], t ---- -join · · - │ type cross - ├── spool · · - │ └── run · · - │ └── insert · · - │ │ into t(x) - │ └── values · · - │ size 1 column, 1 row - └── scan · · -· table t@primary -· spans ALL +join · · + │ type cross + ├── spool · · + │ └── run · · + │ └── insert · · + │ │ into t(x) + │ └── scan · · + │ table t2@primary + │ spans ALL + └── scan · · +· table t@primary +· spans ALL # Check that if a spool is already added at some level, then it is not added # again at levels below. +# TODO(andyk): This optimization is not part of CBO yet. query TTT -EXPLAIN WITH a AS (INSERT INTO t VALUES(2) RETURNING x), +EXPLAIN WITH a AS (INSERT INTO t SELECT * FROM t2 RETURNING x), b AS (INSERT INTO t SELECT x+1 FROM a RETURNING x) SELECT * FROM b LIMIT 1 ---- -limit · · - │ count 1 - └── spool · · - │ limit 1 - └── run · · - └── insert · · - │ into t(x) - └── render · · - └── run · · - └── insert · · - │ into t(x) - └── values · · -· size 1 column, 1 row +limit · · + │ count 1 + └── spool · · + │ limit 1 + └── run · · + └── insert · · + │ into t(x) + └── spool · · + └── render · · + └── run · · + └── insert · · + │ into t(x) + └── scan · · +· table t2@primary +· spans ALL # Check that no spool is inserted if a top-level render is elided. query TTT -EXPLAIN SELECT * FROM [INSERT INTO t VALUES (2) RETURNING x] +EXPLAIN SELECT * FROM [INSERT INTO t SELECT * FROM t2 RETURNING x] ---- -run · · - └── insert · · - │ into t(x) - └── values · · -· size 1 column, 1 row +run · · + └── insert · · + │ into t(x) + └── scan · · +· table t2@primary +· spans ALL # Check that no spool is used for a top-level INSERT, but # sub-INSERTs still get a spool. query TTT -EXPLAIN INSERT INTO t SELECT x+1 FROM [INSERT INTO t VALUES(2) RETURNING x] +EXPLAIN INSERT INTO t SELECT x+1 FROM [INSERT INTO t SELECT * FROM t2 RETURNING x] ---- -count · · - └── insert · · - │ into t(x) - └── spool · · - └── render · · - └── run · · - └── insert · · - │ into t(x) - └── values · · -· size 1 column, 1 row +count · · + └── insert · · + │ into t(x) + └── spool · · + └── render · · + └── run · · + └── insert · · + │ into t(x) + └── scan · · +· table t2@primary +· spans ALL # Check that simple computations using RETURNING get their spool pulled up. query TTT -EXPLAIN SELECT * FROM [INSERT INTO t VALUES (1) RETURNING x+10] WHERE @1 < 3 LIMIT 10 +EXPLAIN SELECT * FROM [INSERT INTO t SELECT * FROM t2 RETURNING x+10] WHERE @1 < 3 LIMIT 10 ---- -limit · · - │ count 10 - └── spool · · - │ limit 10 - └── filter · · - │ filter "?column?" < 3 - └── render · · - └── run · · - └── insert · · - │ into t(x) - └── values · · -· size 1 column, 1 row +render · · + └── limit · · + │ count 10 + └── spool · · + │ limit 10 + └── filter · · + │ filter x < -7 + └── run · · + └── insert · · + │ into t(x) + └── scan · · +· table t2@primary +· spans ALL # Check that a pulled up spool gets elided at the top level. query TTT -EXPLAIN SELECT * FROM [INSERT INTO t VALUES (1) RETURNING x+10] WHERE @1 < 3 +EXPLAIN SELECT * FROM [INSERT INTO t SELECT * FROM t2 RETURNING x+10] WHERE @1 < 3 ---- -filter · · - │ filter "?column?" < 3 - └── render · · - └── run · · - └── insert · · - │ into t(x) - └── values · · -· size 1 column, 1 row +render · · + └── filter · · + │ filter x < -7 + └── run · · + └── insert · · + │ into t(x) + └── scan · · +· table t2@primary +· spans ALL diff --git a/pkg/sql/opt/exec/factory.go b/pkg/sql/opt/exec/factory.go index e17c134efaf9..84fed3115758 100644 --- a/pkg/sql/opt/exec/factory.go +++ b/pkg/sql/opt/exec/factory.go @@ -226,6 +226,12 @@ type Factory interface { // ConstructShowTrace returns a node that implements a SHOW TRACE // FOR SESSION statement. ConstructShowTrace(typ tree.ShowTraceType, compact bool) (Node, error) + + // ConstructInsert creates a node that implements an INSERT statement. Each + // tabCols parameter maps input columns into corresponding table columns, by + // ordinal position. The rowsNeeded parameter is true if a RETURNING clause + // needs the inserted row(s) as output. + ConstructInsert(input Node, table opt.Table, rowsNeeded bool) (Node, error) } // OutputOrdering indicates the required output ordering on a Node that is being diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index 639336136e22..6bc45e9a45b2 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/opt/constraint" "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sem/types" @@ -160,6 +161,13 @@ func (ef *execFactory) ConstructFilter( f.ivarHelper = tree.MakeIndexedVarHelper(f, len(src.info.SourceColumns)) f.filter = f.ivarHelper.Rebind(filter, true /* alsoReset */, false /* normalizeToNonNil */) f.props.ordering = sqlbase.ColumnOrdering(reqOrdering) + + // If there's a spool, pull it up. + if spool, ok := f.source.plan.(*spoolNode); ok { + f.source.plan = spool.source + spool.source = f + return spool, nil + } return f, nil } @@ -190,26 +198,18 @@ func (ef *execFactory) ConstructSimpleProject( // We will need the names of the input columns. inputCols = planColumns(n.(planNode)) } - src := asDataSource(n) - r := &renderNode{ - source: src, - sourceInfo: sqlbase.MultiSourceInfo{src.info}, - render: make([]tree.TypedExpr, len(cols)), - columns: make([]sqlbase.ResultColumn, len(cols)), - } - r.ivarHelper = tree.MakeIndexedVarHelper(r, len(src.info.SourceColumns)) + + var rb renderBuilder + rb.init(n, reqOrdering, len(cols)) for i, col := range cols { - v := r.ivarHelper.IndexedVar(int(col)) - r.render[i] = v + v := rb.r.ivarHelper.IndexedVar(int(col)) if colNames == nil { - r.columns[i].Name = inputCols[col].Name + rb.addExpr(v, inputCols[col].Name) } else { - r.columns[i].Name = colNames[i] + rb.addExpr(v, colNames[i]) } - r.columns[i].Typ = v.ResolvedType() } - r.props.ordering = sqlbase.ColumnOrdering(reqOrdering) - return r, nil + return rb.res, nil } func hasDuplicates(cols []exec.ColumnOrdinal) bool { @@ -227,21 +227,13 @@ func hasDuplicates(cols []exec.ColumnOrdinal) bool { func (ef *execFactory) ConstructRender( n exec.Node, exprs tree.TypedExprs, colNames []string, reqOrdering exec.OutputOrdering, ) (exec.Node, error) { - src := asDataSource(n) - r := &renderNode{ - source: src, - sourceInfo: sqlbase.MultiSourceInfo{src.info}, - render: make([]tree.TypedExpr, len(exprs)), - columns: make([]sqlbase.ResultColumn, len(exprs)), - } - r.ivarHelper = tree.MakeIndexedVarHelper(r, len(src.info.SourceColumns)) + var rb renderBuilder + rb.init(n, reqOrdering, len(exprs)) for i, expr := range exprs { - expr = r.ivarHelper.Rebind(expr, false /* alsoReset */, true /* normalizeToNonNil */) - r.render[i] = expr - r.columns[i] = sqlbase.ResultColumn{Name: colNames[i], Typ: expr.ResolvedType()} + expr = rb.r.ivarHelper.Rebind(expr, false /* alsoReset */, true /* normalizeToNonNil */) + rb.addExpr(expr, colNames[i]) } - r.props.ordering = sqlbase.ColumnOrdering(reqOrdering) - return r, nil + return rb.res, nil } // RenameColumns is part of the exec.Factory interface. @@ -742,6 +734,12 @@ func (ef *execFactory) ConstructLimit( l.countExpr = limit return l, nil } + // If the input plan is a spoolNode, then propagate any constant limit to it. + if spool, ok := plan.(*spoolNode); ok { + if val, ok := limit.(*tree.DInt); ok { + spool.hardLimit = int64(*val) + } + } return &limitNode{ plan: plan, countExpr: limit, @@ -784,6 +782,16 @@ func (ef *execFactory) ConstructProjectSet( func (ef *execFactory) ConstructPlan( root exec.Node, subqueries []exec.Subquery, ) (exec.Plan, error) { + // Enable auto-commit if the planner setting allows it. + if ef.planner.autoCommit { + if ac, ok := root.(autoCommitNode); ok { + ac.enableAutoCommit() + } + } + // No need to spool at the root. + if spool, ok := root.(*spoolNode); ok { + root = spool.source + } res := &planTop{ plan: root.(planNode), auditEvents: ef.planner.curPlan.auditEvents, @@ -870,3 +878,110 @@ func (ef *execFactory) ConstructShowTrace(typ tree.ShowTraceType, compact bool) } return node, nil } + +func (ef *execFactory) ConstructInsert( + input exec.Node, table opt.Table, rowsNeeded bool, +) (exec.Node, error) { + // Derive insert table and column descriptors. + tabDesc := table.(*optTable).desc + colCount := len(tabDesc.Columns) + colDescs := make([]sqlbase.ColumnDescriptor, colCount+table.MutationColumnCount()) + copy(colDescs, tabDesc.Columns) + + // Append any mutation columns. + for i := colCount; i < len(colDescs); i++ { + colDescs[i] = *table.MutationColumn(i - colCount).(*sqlbase.ColumnDescriptor) + } + + // Determine the foreign key tables involved in the update. + fkTables, err := row.TablesNeededForFKs( + ef.planner.extendedEvalCtx.Context, + *tabDesc, + row.CheckInserts, + ef.planner.LookupTableByID, + ef.planner.CheckPrivilege, + ef.planner.analyzeExpr, + ) + if err != nil { + return nil, err + } + + // Create the table insert, which does the bulk of the work. + ri, err := row.MakeInserter(ef.planner.txn, tabDesc, fkTables, colDescs, + row.CheckFKs, &ef.planner.alloc) + if err != nil { + return nil, err + } + + // Determine the relational type of the generated insert node. + // If rows are not needed, no columns are returned. + var returnCols sqlbase.ResultColumns + if rowsNeeded { + // Insert always returns all non-mutation columns, in the same order they + // are defined in the table. Note that the columns and order can be + // different than tabCols. + returnCols = sqlbase.ResultColumnsFromColDescs(tabDesc.Columns) + } + + // Regular path for INSERT. + ins := insertNodePool.Get().(*insertNode) + *ins = insertNode{ + source: input.(planNode), + columns: returnCols, + run: insertRun{ + ti: tableInserter{ri: ri}, + checkHelper: fkTables[tabDesc.ID].CheckHelper, + rowsNeeded: rowsNeeded, + iVarContainerForComputedCols: sqlbase.RowIndexedVarContainer{ + Cols: tabDesc.Columns, + Mapping: ri.InsertColIDtoRowIndex, + }, + insertCols: ri.InsertCols, + }, + } + + // serialize the data-modifying plan to ensure that no data is + // observed that hasn't been validated first. See the comments + // on BatchedNext() in plan_batch.go. + if rowsNeeded { + return &spoolNode{source: &serializeNode{source: ins}}, nil + } + + // We could use serializeNode here, but using rowCountNode is an + // optimization that saves on calls to Next() by the caller. + return &rowCountNode{source: ins}, nil +} + +// renderBuilder encapsulates the code to build a renderNode. +type renderBuilder struct { + r *renderNode + res planNode +} + +// init initializes the renderNode with render expressions. +func (rb *renderBuilder) init(n exec.Node, reqOrdering exec.OutputOrdering, cap int) { + src := asDataSource(n) + rb.r = &renderNode{ + source: src, + sourceInfo: sqlbase.MultiSourceInfo{src.info}, + render: make([]tree.TypedExpr, 0, cap), + columns: make([]sqlbase.ResultColumn, 0, cap), + } + rb.r.ivarHelper = tree.MakeIndexedVarHelper(rb.r, len(src.info.SourceColumns)) + rb.r.props.ordering = sqlbase.ColumnOrdering(reqOrdering) + + // If there's a spool, pull it up. + if spool, ok := rb.r.source.plan.(*spoolNode); ok { + rb.r.source.plan = spool.source + spool.source = rb.r + rb.res = spool + } else { + rb.res = rb.r + } +} + +// addExpr adds a new render expression with the given name. +func (rb *renderBuilder) addExpr(expr tree.TypedExpr, colName string) { + rb.r.render = append(rb.r.render, expr) + rb.r.columns = append(rb.r.columns, sqlbase.ResultColumn{Name: colName, Typ: expr.ResolvedType()}) +} diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index 79a753f79fb1..23868e462c53 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -367,7 +367,8 @@ func (p *planner) makeOptimizerPlan(ctx context.Context, stmt Statement) error { // Start with fast check to see if top-level statement is supported. switch stmt.AST.(type) { case *tree.ParenSelect, *tree.Select, *tree.SelectClause, - *tree.UnionClause, *tree.ValuesClause, *tree.Explain: + *tree.UnionClause, *tree.ValuesClause, *tree.Explain, + *tree.Insert: default: return pgerror.Unimplemented("statement", fmt.Sprintf("unsupported statement: %T", stmt.AST))