Skip to content

Commit

Permalink
Merge #40871
Browse files Browse the repository at this point in the history
40871: opt: move autocommit logic to the execbuilder r=RaduBerinde a=RaduBerinde

Auto-commit refers to the optimization of committing the txn as part
of the last batch of the mutation. This is only correct to do in
certain cases. Currently this logic is inside the execution layer and
is very simplistic (the root node must implement `enableAutoCommit`).

This change moves the auto-commit logic to the execbuilder and
improves the logic to allow RETURNING statements (as long as
projections have no side effects). Whether we auto-commit is then sent
down via the exec factory.

Fixes #34504.

Release note (performance improvement): Mutation statements with
RETURNING and not inside an explicit transaction are faster in some
cases.

Release justification: Low risk, high benefit changes to existing
functionality (we know of at least one customer workload that
benefits).


Co-authored-by: Radu Berinde <[email protected]>
  • Loading branch information
craig[bot] and RaduBerinde committed Sep 19, 2019
2 parents 8ef3bbb + 60a2ee6 commit f875f74
Show file tree
Hide file tree
Showing 18 changed files with 440 additions and 107 deletions.
5 changes: 0 additions & 5 deletions pkg/sql/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,11 +365,6 @@ func (n *createTableNode) startExec(params runParams) error {
return nil
}

// enableAutoCommit is part of the autoCommitNode interface.
func (n *createTableNode) enableAutoCommit() {
n.run.autoCommit = autoCommitEnabled
}

func (*createTableNode) Next(runParams) (bool, error) { return false, nil }
func (*createTableNode) Values() tree.Datums { return tree.Datums{} }

Expand Down
10 changes: 0 additions & 10 deletions pkg/sql/delayed.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ type delayedNode struct {
plan planNode
}

// delayedNode implements the autoCommitNode interface.
var _ autoCommitNode = &delayedNode{}

type nodeConstructor func(context.Context, *planner) (planNode, error)

func (d *delayedNode) Next(params runParams) (bool, error) { return d.plan.Next(params) }
Expand All @@ -42,13 +39,6 @@ func (d *delayedNode) Close(ctx context.Context) {
}
}

// enableAutoCommit is part of the autoCommitNode interface.
func (d *delayedNode) enableAutoCommit() {
if ac, ok := d.plan.(autoCommitNode); ok {
ac.enableAutoCommit()
}
}

// startExec constructs the wrapped planNode now that execution is underway.
func (d *delayedNode) startExec(params runParams) error {
if d.plan != nil {
Expand Down
4 changes: 0 additions & 4 deletions pkg/sql/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ type deleteNode struct {
run deleteRun
}

// deleteNode implements the autoCommitNode interface.
var _ autoCommitNode = &deleteNode{}

// Delete removes rows from a table.
// Privileges: DELETE and SELECT on table. We currently always use a SELECT statement.
// Notes: postgres requires DELETE. Also requires SELECT for "USING" and "WHERE" with tables.
Expand Down Expand Up @@ -449,7 +446,6 @@ func canDeleteFastInterleaved(table *ImmutableTableDescriptor, fkTables row.FkTa
return true
}

// enableAutoCommit is part of the autoCommitNode interface.
func (d *deleteNode) enableAutoCommit() {
d.run.td.enableAutoCommit()
}
12 changes: 4 additions & 8 deletions pkg/sql/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import (
// deleteRangeNode implements DELETE on a primary index satisfying certain
// conditions that permit the direct use of the DeleteRange kv operation,
// instead of many point deletes.
//
// Note: deleteRangeNode can't autocommit, because it has to delete in batches,
// and it won't know whether or not there is more work to do until after a batch
// is returned. This property precludes using auto commit.
type deleteRangeNode struct {
// interleavedFastPath is true if we can take the fast path despite operating
// on an interleaved table.
Expand All @@ -41,7 +45,6 @@ type deleteRangeNode struct {
rowCount int
}

var _ autoCommitNode = &deleteRangeNode{}
var _ planNode = &deleteRangeNode{}
var _ planNodeFastPath = &deleteRangeNode{}
var _ batchedPlanNode = &deleteRangeNode{}
Expand Down Expand Up @@ -220,10 +223,3 @@ func (*deleteRangeNode) Values() tree.Datums {

// Close implements the planNode interface.
func (*deleteRangeNode) Close(ctx context.Context) {}

// enableAutoCommit implements the autoCommitNode interface.
func (d *deleteRangeNode) enableAutoCommit() {
// DeleteRange can't autocommit, because it has to delete in batches, and it
// won't know whether or not there is more work to do until after a batch is
// returned. This property precludes using auto commit.
}
5 changes: 1 addition & 4 deletions pkg/sql/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ type insertNode struct {
run insertRun
}

// insertNode implements the autoCommitNode interface.
var _ autoCommitNode = &insertNode{}

// Insert inserts rows into the database.
// Privileges: INSERT on table. Also requires UPDATE on "ON DUPLICATE KEY UPDATE".
// Notes: postgres requires INSERT. No "on duplicate key update" option.
Expand Down Expand Up @@ -614,7 +611,7 @@ func (n *insertNode) Close(ctx context.Context) {
insertNodePool.Put(n)
}

// enableAutoCommit is part of the autoCommitNode interface.
// See planner.autoCommit.
func (n *insertNode) enableAutoCommit() {
n.run.ti.enableAutoCommit()
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/opt/bench/stub_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ func (f *stubFactory) ConstructInsert(
insertCols exec.ColumnOrdinalSet,
returnCols exec.ColumnOrdinalSet,
checks exec.CheckOrdinalSet,
allowAutoCommit bool,
skipFKChecks bool,
) (exec.Node, error) {
return struct{}{}, nil
Expand All @@ -241,6 +242,7 @@ func (f *stubFactory) ConstructUpdate(
returnCols exec.ColumnOrdinalSet,
checks exec.CheckOrdinalSet,
passthrough sqlbase.ResultColumns,
allowAutoCommit bool,
skipFKChecks bool,
) (exec.Node, error) {
return struct{}{}, nil
Expand All @@ -255,6 +257,7 @@ func (f *stubFactory) ConstructUpsert(
updateCols exec.ColumnOrdinalSet,
returnCols exec.ColumnOrdinalSet,
checks exec.CheckOrdinalSet,
allowAutoCommit bool,
) (exec.Node, error) {
return struct{}{}, nil
}
Expand All @@ -264,6 +267,7 @@ func (f *stubFactory) ConstructDelete(
table cat.Table,
fetchCols exec.ColumnOrdinalSet,
returnCols exec.ColumnOrdinalSet,
allowAutoCommit bool,
skipFKChecks bool,
) (exec.Node, error) {
return struct{}{}, nil
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/opt/exec/execbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ type Builder struct {
// TODO(justin): set this up so that we can look them up by index lookups
// rather than scans.
withExprs []builtWithExpr

// autoCommit is passed through to factory methods for mutation operators. It
// allows execution to commit the transaction as part of the mutation itself.
// See canAutoCommit().
autoCommit bool
}

// New constructs an instance of the execution node builder using the
Expand Down Expand Up @@ -114,6 +119,9 @@ func (b *Builder) build(e opt.Expr) (exec.Node, error) {
if !ok {
return nil, errors.AssertionFailedf("building execution for non-relational operator %s", log.Safe(e.Op()))
}

b.autoCommit = b.canAutoCommit(rel)

plan, err := b.buildRelational(rel)
if err != nil {
return nil, err
Expand Down
56 changes: 56 additions & 0 deletions pkg/sql/opt/exec/execbuilder/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (b *Builder) buildInsert(ins *memo.InsertExpr) (execPlan, error) {
insertOrds,
returnOrds,
checkOrds,
b.autoCommit,
disableExecFKs,
)
if err != nil {
Expand Down Expand Up @@ -150,6 +151,7 @@ func (b *Builder) buildUpdate(upd *memo.UpdateExpr) (execPlan, error) {
returnColOrds,
checkOrds,
passthroughCols,
b.autoCommit,
disableExecFKs,
)
if err != nil {
Expand Down Expand Up @@ -222,6 +224,7 @@ func (b *Builder) buildUpsert(ups *memo.UpsertExpr) (execPlan, error) {
updateColOrds,
returnColOrds,
checkOrds,
b.autoCommit,
)
if err != nil {
return execPlan{}, err
Expand Down Expand Up @@ -266,6 +269,7 @@ func (b *Builder) buildDelete(del *memo.DeleteExpr) (execPlan, error) {
tab,
fetchColOrds,
returnColOrds,
b.autoCommit,
disableExecFKs,
)
if err != nil {
Expand Down Expand Up @@ -480,3 +484,55 @@ func (b *Builder) buildFKChecks(checks memo.FKChecksExpr) error {
}
return nil
}

// canAutoCommit determines if it is safe to auto commit the mutation contained
// in the expression.
//
// Mutations can commit the transaction as part of the same KV request,
// potentially taking advantage of the 1PC optimization. This is not ok to do in
// general; a sufficient set of conditions is:
// 1. There is a single mutation in the query.
// 2. The mutation has no planned FK checks (which run after the mutation).
// 3. The mutation is the root operator, or it is directly under a Project
// with no side-effecting expressions.
//
// An example of why we can't allow side-effecting expressions: if the
// projection encounters a division-by-zero error, the mutation shouldn't have
// been committed.
//
// Note that there are other necessary conditions related to execution
// (specifically, that the transaction is implicit); it is up to the exec
// factory to take that into account as well.
func (b *Builder) canAutoCommit(rel memo.RelExpr) bool {
if !rel.Relational().CanMutate {
// No mutations in the expression.
return false
}

switch rel.Op() {
case opt.InsertOp, opt.UpsertOp, opt.UpdateOp, opt.DeleteOp:
// Check that there aren't any more mutations in the input.
// TODO(radu): this can go away when all mutations are under top-level
// With ops.
if rel.Child(0).(memo.RelExpr).Relational().CanMutate {
return false
}
// Check that there aren't any FK checks planned.
fkChecks := rel.Child(1).(*memo.FKChecksExpr)
return len(*fkChecks) == 0

case opt.ProjectOp:
// Allow Project on top, as long as the expressions are not side-effecting.
//
// TODO(radu): for now, we only allow passthrough projections because not all
// builtins that can error out are marked as side-effecting.
proj := rel.(*memo.ProjectExpr)
if len(proj.Projections) != 0 {
return false
}
return b.canAutoCommit(proj.Input)

default:
return false
}
}
8 changes: 4 additions & 4 deletions pkg/sql/opt/exec/execbuilder/scalar.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,12 +520,12 @@ func (b *Builder) buildExistsSubquery(

// Build the execution plan for the subquery. Note that the subquery could
// have subqueries of its own which are added to b.subqueries.
root, err := b.build(exists.Input)
plan, err := b.buildRelational(exists.Input)
if err != nil {
return nil, err
}

return b.addSubquery(exec.SubqueryExists, types.Bool, root, exists.OriginalExpr), nil
return b.addSubquery(exec.SubqueryExists, types.Bool, plan.root, exists.OriginalExpr), nil
}

func (b *Builder) buildSubquery(
Expand All @@ -547,12 +547,12 @@ func (b *Builder) buildSubquery(

// Build the execution plan for the subquery. Note that the subquery could
// have subqueries of its own which are added to b.subqueries.
root, err := b.build(input)
plan, err := b.buildRelational(input)
if err != nil {
return nil, err
}

return b.addSubquery(exec.SubqueryOneRow, subquery.Typ, root, subquery.OriginalExpr), nil
return b.addSubquery(exec.SubqueryOneRow, subquery.Typ, plan.root, subquery.OriginalExpr), nil
}

// addSubquery adds an entry to b.subqueries and creates a tree.Subquery
Expand Down
Loading

0 comments on commit f875f74

Please sign in to comment.