Skip to content

Commit

Permalink
Merge #38061
Browse files Browse the repository at this point in the history
38061: pkg: Split CTAS execution between user txn and SchemaChanger. r=adityamaru27 a=adityamaru27

As a first step in improving the performance and scalability of the `CTAS` command,
(issue #25828) we needed to split the responsibility of creating a new
`TableDescriptor`, and executing the `AS` query to backfill the new table.

While the user txn continues to create and store the new desc, the
execution of the `AS` query has been made async by moving the logic to the
SchemaChanger.

Co-authored-by: Aditya Maru <[email protected]>
  • Loading branch information
craig[bot] and adityamaru27 committed Jun 25, 2019
2 parents bd016ad + 8059716 commit c9ad321
Show file tree
Hide file tree
Showing 17 changed files with 737 additions and 333 deletions.
4 changes: 2 additions & 2 deletions pkg/acceptance/testdata/psql/test-psql.sh
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,5 @@ echo 'Testing copy error'
output="$(psql -d testdb -c 'copy missing from stdin' 2>&1 || true)"
echo $output | grep 'relation "missing" does not exist'

# Test that CREATE TABLE AS returns tag SELECT, not CREATE (#20227).
psql -d testdb -c "CREATE TABLE ctas AS SELECT 1" | grep "SELECT"
# Test that CREATE TABLE AS returns tag CREATE TABLE AS, not CREATE (#20227).
psql -d testdb -c "CREATE TABLE ctas AS SELECT 1" | grep "CREATE TABLE AS"
2 changes: 1 addition & 1 deletion pkg/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ func Example_sql() {
// sql -e create table t.g1 (x int)
// CREATE TABLE
// sql -e create table t.g2 as select * from generate_series(1,10)
// SELECT 10
// CREATE TABLE AS
// sql -d nonexistent -e select count(*) from "".information_schema.tables limit 0
// count
// sql -d nonexistent -e create database nonexistent; create table foo(x int); select * from foo
Expand Down
117 changes: 88 additions & 29 deletions pkg/sql/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ func (p *planner) CreateTable(ctx context.Context, n *tree.CreateTable) (planNod
// createTableRun contains the run-time state of createTableNode
// during local execution.
type createTableRun struct {
autoCommit autoCommitOpt
rowsAffected int
autoCommit autoCommitOpt

// synthRowID indicates whether an input column needs to be synthesized to
// provide the default value for the hidden rowid column. The optimizer's
Expand Down Expand Up @@ -131,6 +130,14 @@ func (n *createTableNode) startExec(params runParams) error {
var affected map[sqlbase.ID]*sqlbase.MutableTableDescriptor
creationTime := params.p.txn.CommitTimestamp()
if n.n.As() {
// TODO(adityamaru): This planning step is only to populate db/schema
// details in the table names in-place, to later store in the table
// descriptor. Figure out a cleaner way to do this.
_, err = params.p.Select(params.ctx, n.n.AsSource, []*types.T{})
if err != nil {
return err
}

asCols = planColumns(n.sourcePlan)
if !n.run.synthRowID {
// rowID column is already present in the input as the last column, so
Expand All @@ -140,31 +147,40 @@ func (n *createTableNode) startExec(params runParams) error {
}
desc, err = makeTableDescIfAs(
n.n, n.dbDesc.ID, id, creationTime, asCols,
privs, &params.p.semaCtx)
privs, &params.p.semaCtx, params.p.EvalContext())
if err != nil {
return err
}

// If we have an implicit txn we want to run CTAS async, and consequently
// ensure it gets queued as a SchemaChange.
if params.p.ExtendedEvalContext().TxnImplicit {
desc.State = sqlbase.TableDescriptor_ADD
}
} else {
affected = make(map[sqlbase.ID]*sqlbase.MutableTableDescriptor)
desc, err = makeTableDesc(params, n.n, n.dbDesc.ID, id, creationTime, privs, affected)
}
if err != nil {
return err
}

if desc.Adding() {
// if this table and all its references are created in the same
// transaction it can be made PUBLIC.
refs, err := desc.FindAllReferences()
if err != nil {
return err
}
var foundExternalReference bool
for id := range refs {
if t := params.p.Tables().getUncommittedTableByID(id).MutableTableDescriptor; t == nil || !t.IsNewTable() {
foundExternalReference = true
break

if desc.Adding() {
// if this table and all its references are created in the same
// transaction it can be made PUBLIC.
refs, err := desc.FindAllReferences()
if err != nil {
return err
}
var foundExternalReference bool
for id := range refs {
if t := params.p.Tables().getUncommittedTableByID(id).MutableTableDescriptor; t == nil || !t.IsNewTable() {
foundExternalReference = true
break
}
}
if !foundExternalReference {
desc.State = sqlbase.TableDescriptor_PUBLIC
}
}
if !foundExternalReference {
desc.State = sqlbase.TableDescriptor_PUBLIC
}
}

Expand Down Expand Up @@ -209,7 +225,9 @@ func (n *createTableNode) startExec(params runParams) error {
return err
}

if n.n.As() {
// If we are in an explicit txn or the source has placeholders, we execute the
// CTAS query synchronously.
if n.n.As() && !params.p.ExtendedEvalContext().TxnImplicit {
// This is a very simplified version of the INSERT logic: no CHECK
// expressions, no FK checks, no arbitrary insertion order, no
// RETURNING, etc.
Expand Down Expand Up @@ -300,10 +318,15 @@ func (n *createTableNode) startExec(params runParams) error {
if err != nil {
return err
}
n.run.rowsAffected++
}
}

// The CREATE STATISTICS run for an async CTAS query is initiated by the
// SchemaChanger.
if n.n.As() && params.p.autoCommit {
return nil
}

// Initiate a run of CREATE STATISTICS. We use a large number
// for rowsAffected because we want to make sure that stats always get
// created/refreshed here.
Expand All @@ -326,13 +349,6 @@ func (n *createTableNode) Close(ctx context.Context) {
}
}

func (n *createTableNode) FastPathResults() (int, bool) {
if n.n.As() {
return n.run.rowsAffected, true
}
return 0, false
}

type indexMatch bool

const (
Expand Down Expand Up @@ -913,9 +929,49 @@ func InitTableDescriptor(
Version: 1,
ModificationTime: creationTime,
Privileges: privileges,
CreateAsOfTime: creationTime,
})
}

func getFinalSourceQuery(source *tree.Select, evalCtx *tree.EvalContext) string {
// Ensure that all the table names pretty-print as fully qualified, so we
// store that in the table descriptor.
//
// The traversal will update the TableNames in-place, so the changes are
// persisted in n.n.AsSource. We exploit the fact that planning step above
// has populated any missing db/schema details in the table names in-place.
// We use tree.FormatNode merely as a traversal method; its output buffer is
// discarded immediately after the traversal because it is not needed
// further.
f := tree.NewFmtCtx(tree.FmtParsable)
f.SetReformatTableNames(
func(_ *tree.FmtCtx, tn *tree.TableName) {
// Persist the database prefix expansion.
if tn.SchemaName != "" {
// All CTE or table aliases have no schema
// information. Those do not turn into explicit.
tn.ExplicitSchema = true
tn.ExplicitCatalog = true
}
},
)
f.FormatNode(source)
f.Close()

// Substitute placeholders with their values.
ctx := tree.NewFmtCtx(tree.FmtParsable)
ctx.SetPlaceholderFormat(func(ctx *tree.FmtCtx, placeholder *tree.Placeholder) {
d, err := placeholder.Eval(evalCtx)
if err != nil {
panic(fmt.Sprintf("failed to serialize placeholder: %s", err))
}
d.Format(ctx)
})
ctx.FormatNode(source)

return ctx.CloseAndGetString()
}

// makeTableDescIfAs is the MakeTableDesc method for when we have a table
// that is created with the CREATE AS format.
func makeTableDescIfAs(
Expand All @@ -925,8 +981,11 @@ func makeTableDescIfAs(
resultColumns []sqlbase.ResultColumn,
privileges *sqlbase.PrivilegeDescriptor,
semaCtx *tree.SemaContext,
evalContext *tree.EvalContext,
) (desc sqlbase.MutableTableDescriptor, err error) {
desc = InitTableDescriptor(id, parentID, p.Table.Table(), creationTime, privileges)
desc.CreateQuery = getFinalSourceQuery(p.AsSource, evalContext)

for i, colRes := range resultColumns {
columnTableDef := tree.ColumnTableDef{Name: tree.Name(colRes.Name), Type: colRes.Typ}
columnTableDef.Nullable.Nullability = tree.SilentNull
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,8 +471,8 @@ func TestCreateStatementType(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if cmdTag != "SELECT 10" {
t.Fatal("expected SELECT 10, got", cmdTag)
if cmdTag != "CREATE TABLE AS" {
t.Fatal("expected CREATE TABLE AS, got", cmdTag)
}
}

Expand Down
34 changes: 26 additions & 8 deletions pkg/sql/logictest/testdata/logic_test/create_as
Original file line number Diff line number Diff line change
@@ -1,22 +1,34 @@
# LogicTest: local local-opt

statement count 3
statement ok
CREATE TABLE stock (item, quantity) AS VALUES ('cups', 10), ('plates', 15), ('forks', 30)

statement count 1
statement count 3
SELECT * FROM stock

statement ok
CREATE TABLE runningOut AS SELECT * FROM stock WHERE quantity < 12

statement count 1
SELECT * FROM runningOut

query TI
SELECT * FROM runningOut
----
cups 10

statement count 3
statement ok
CREATE TABLE itemColors (color) AS VALUES ('blue'), ('red'), ('green')

statement count 9
statement count 3
SELECT * FROM itemColors

statement ok
CREATE TABLE itemTypes AS (SELECT item, color FROM stock, itemColors)

statement count 9
SELECT * FROM itemTypes

query TT rowsort
SELECT * FROM itemTypes
----
Expand All @@ -39,9 +51,12 @@ CREATE TABLE t2 (col1, col2, col3) AS SELECT * FROM stock
statement error pgcode 42601 CREATE TABLE specifies 1 column name, but data source has 2 columns
CREATE TABLE t2 (col1) AS SELECT * FROM stock

statement count 5
statement ok
CREATE TABLE unionstock AS SELECT * FROM stock UNION VALUES ('spoons', 25), ('knives', 50)

statement count 5
SELECT * FROM unionstock

query TI
SELECT * FROM unionstock ORDER BY quantity
----
Expand All @@ -51,7 +66,7 @@ spoons 25
forks 30
knives 50

statement count 0
statement ok
CREATE TABLE IF NOT EXISTS unionstock AS VALUES ('foo', 'bar')

query TI
Expand All @@ -62,10 +77,13 @@ cups 10
statement ok
CREATE DATABASE smtng

statement count 3
statement ok
CREATE TABLE smtng.something AS SELECT * FROM stock

statement count 0
statement count 3
SELECT * FROM smtng.something;

statement ok
CREATE TABLE IF NOT EXISTS smtng.something AS SELECT * FROM stock

query TI
Expand Down
Loading

0 comments on commit c9ad321

Please sign in to comment.