Skip to content

Commit

Permalink
pkg: Moved AS query evaluation to SchemaChanger from user txn.
Browse files Browse the repository at this point in the history
Previously, the SELECT statement to backfill the newly created
table was executed within a user txn. This has been made
async by moving it to the SchemaChanger as a first step in
making the command more robust.

The CREATE command is no longer a FastPathNode as it can no
longer report the number of rows inserted at the time the user
txn is committed.

Release note: None
  • Loading branch information
adityamaru27 committed Jun 25, 2019
1 parent 479ef9d commit 8059716
Show file tree
Hide file tree
Showing 7 changed files with 626 additions and 273 deletions.
117 changes: 88 additions & 29 deletions pkg/sql/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ func (p *planner) CreateTable(ctx context.Context, n *tree.CreateTable) (planNod
// createTableRun contains the run-time state of createTableNode
// during local execution.
type createTableRun struct {
autoCommit autoCommitOpt
rowsAffected int
autoCommit autoCommitOpt

// synthRowID indicates whether an input column needs to be synthesized to
// provide the default value for the hidden rowid column. The optimizer's
Expand Down Expand Up @@ -131,6 +130,14 @@ func (n *createTableNode) startExec(params runParams) error {
var affected map[sqlbase.ID]*sqlbase.MutableTableDescriptor
creationTime := params.p.txn.CommitTimestamp()
if n.n.As() {
// TODO(adityamaru): This planning step is only to populate db/schema
// details in the table names in-place, to later store in the table
// descriptor. Figure out a cleaner way to do this.
_, err = params.p.Select(params.ctx, n.n.AsSource, []*types.T{})
if err != nil {
return err
}

asCols = planColumns(n.sourcePlan)
if !n.run.synthRowID {
// rowID column is already present in the input as the last column, so
Expand All @@ -140,31 +147,40 @@ func (n *createTableNode) startExec(params runParams) error {
}
desc, err = makeTableDescIfAs(
n.n, n.dbDesc.ID, id, creationTime, asCols,
privs, &params.p.semaCtx)
privs, &params.p.semaCtx, params.p.EvalContext())
if err != nil {
return err
}

// If we have an implicit txn we want to run CTAS async, and consequently
// ensure it gets queued as a SchemaChange.
if params.p.ExtendedEvalContext().TxnImplicit {
desc.State = sqlbase.TableDescriptor_ADD
}
} else {
affected = make(map[sqlbase.ID]*sqlbase.MutableTableDescriptor)
desc, err = makeTableDesc(params, n.n, n.dbDesc.ID, id, creationTime, privs, affected)
}
if err != nil {
return err
}

if desc.Adding() {
// if this table and all its references are created in the same
// transaction it can be made PUBLIC.
refs, err := desc.FindAllReferences()
if err != nil {
return err
}
var foundExternalReference bool
for id := range refs {
if t := params.p.Tables().getUncommittedTableByID(id).MutableTableDescriptor; t == nil || !t.IsNewTable() {
foundExternalReference = true
break

if desc.Adding() {
// if this table and all its references are created in the same
// transaction it can be made PUBLIC.
refs, err := desc.FindAllReferences()
if err != nil {
return err
}
var foundExternalReference bool
for id := range refs {
if t := params.p.Tables().getUncommittedTableByID(id).MutableTableDescriptor; t == nil || !t.IsNewTable() {
foundExternalReference = true
break
}
}
if !foundExternalReference {
desc.State = sqlbase.TableDescriptor_PUBLIC
}
}
if !foundExternalReference {
desc.State = sqlbase.TableDescriptor_PUBLIC
}
}

Expand Down Expand Up @@ -209,7 +225,9 @@ func (n *createTableNode) startExec(params runParams) error {
return err
}

if n.n.As() {
// If we are in an explicit txn or the source has placeholders, we execute the
// CTAS query synchronously.
if n.n.As() && !params.p.ExtendedEvalContext().TxnImplicit {
// This is a very simplified version of the INSERT logic: no CHECK
// expressions, no FK checks, no arbitrary insertion order, no
// RETURNING, etc.
Expand Down Expand Up @@ -300,10 +318,15 @@ func (n *createTableNode) startExec(params runParams) error {
if err != nil {
return err
}
n.run.rowsAffected++
}
}

// The CREATE STATISTICS run for an async CTAS query is initiated by the
// SchemaChanger.
if n.n.As() && params.p.autoCommit {
return nil
}

// Initiate a run of CREATE STATISTICS. We use a large number
// for rowsAffected because we want to make sure that stats always get
// created/refreshed here.
Expand All @@ -326,13 +349,6 @@ func (n *createTableNode) Close(ctx context.Context) {
}
}

func (n *createTableNode) FastPathResults() (int, bool) {
if n.n.As() {
return n.run.rowsAffected, true
}
return 0, false
}

type indexMatch bool

const (
Expand Down Expand Up @@ -913,9 +929,49 @@ func InitTableDescriptor(
Version: 1,
ModificationTime: creationTime,
Privileges: privileges,
CreateAsOfTime: creationTime,
})
}

func getFinalSourceQuery(source *tree.Select, evalCtx *tree.EvalContext) string {
// Ensure that all the table names pretty-print as fully qualified, so we
// store that in the table descriptor.
//
// The traversal will update the TableNames in-place, so the changes are
// persisted in n.n.AsSource. We exploit the fact that planning step above
// has populated any missing db/schema details in the table names in-place.
// We use tree.FormatNode merely as a traversal method; its output buffer is
// discarded immediately after the traversal because it is not needed
// further.
f := tree.NewFmtCtx(tree.FmtParsable)
f.SetReformatTableNames(
func(_ *tree.FmtCtx, tn *tree.TableName) {
// Persist the database prefix expansion.
if tn.SchemaName != "" {
// All CTE or table aliases have no schema
// information. Those do not turn into explicit.
tn.ExplicitSchema = true
tn.ExplicitCatalog = true
}
},
)
f.FormatNode(source)
f.Close()

// Substitute placeholders with their values.
ctx := tree.NewFmtCtx(tree.FmtParsable)
ctx.SetPlaceholderFormat(func(ctx *tree.FmtCtx, placeholder *tree.Placeholder) {
d, err := placeholder.Eval(evalCtx)
if err != nil {
panic(fmt.Sprintf("failed to serialize placeholder: %s", err))
}
d.Format(ctx)
})
ctx.FormatNode(source)

return ctx.CloseAndGetString()
}

// makeTableDescIfAs is the MakeTableDesc method for when we have a table
// that is created with the CREATE AS format.
func makeTableDescIfAs(
Expand All @@ -925,8 +981,11 @@ func makeTableDescIfAs(
resultColumns []sqlbase.ResultColumn,
privileges *sqlbase.PrivilegeDescriptor,
semaCtx *tree.SemaContext,
evalContext *tree.EvalContext,
) (desc sqlbase.MutableTableDescriptor, err error) {
desc = InitTableDescriptor(id, parentID, p.Table.Table(), creationTime, privileges)
desc.CreateQuery = getFinalSourceQuery(p.AsSource, evalContext)

for i, colRes := range resultColumns {
columnTableDef := tree.ColumnTableDef{Name: tree.Name(colRes.Name), Type: colRes.Typ}
columnTableDef.Nullable.Nullability = tree.SilentNull
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,8 +471,8 @@ func TestCreateStatementType(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if cmdTag != "SELECT 10" {
t.Fatal("expected SELECT 10, got", cmdTag)
if cmdTag != "CREATE TABLE AS" {
t.Fatal("expected CREATE TABLE AS, got", cmdTag)
}
}

Expand Down
1 change: 0 additions & 1 deletion pkg/sql/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ var _ planNode = &zeroNode{}
var _ planNodeFastPath = &CreateUserNode{}
var _ planNodeFastPath = &DropUserNode{}
var _ planNodeFastPath = &alterUserSetPasswordNode{}
var _ planNodeFastPath = &createTableNode{}
var _ planNodeFastPath = &deleteRangeNode{}
var _ planNodeFastPath = &rowCountNode{}
var _ planNodeFastPath = &serializeNode{}
Expand Down
Loading

0 comments on commit 8059716

Please sign in to comment.