Skip to content

Commit

Permalink
sql/opt/exec: add implicit SELECT FOR UPDATE support for UPDATE state…
Browse files Browse the repository at this point in the history
…ments

This commit adds support for implicitly applying FOR UPDATE row-level
locking modes to the initial row scan of UPDATE statements.

Conceptually, if we picture an UPDATE statement as the composition of a
SELECT statement and an INSERT statement (with loosened semantics around
existing rows) then this change performs the following transformation:

```
UPDATE t = SELECT FROM t + INSERT INTO t
=>
UPDATE t = SELECT FROM t FOR UPDATE + INSERT INTO t
```

The transformation is conditional on the UPDATE expression tree matching a
pattern. Specifically, the FOR UPDATE locking mode is only used during the
initial row scan when all row filters have been pushed into the ScanExpr. If
the statement includes any filters that cannot be pushed into the scan then
no row-level locking mode is applied. The rationale here is that FOR UPDATE
locking is not necessary for correctness due to serializable isolation, so it
is strictly a performance optimization for contended writes. Therefore, it is
not worth risking the transformation being a pessimization, so it is only
applied when doing so does not risk creating artificial contention.

The change introduces a new `enable_implicit_select_for_update` session variable
that controls whether this transformation is applied to mutation statements. It
also introduces a `sql.defaults.implicit_select_for_update.enabled` cluster setting
to serve as the default value for the session variable.

The locking mode is still ignored by the key-value layer, but that will change
in the next few days. Once that happens, we'll be able to gather performance
numbers (past what's already been posted in cockroachdb#43775) about the performance impact
this change has on uncontended and contended workloads.

Release note (sql change): UPDATE statements now acquire locks using the FOR
UPDATE locking mode during their initial row scan, which improves performance
for contended workloads. This behavior is configurable using the
`enable_implicit_select_for_update` session variable and the
`sql.defaults.implicit_select_for_update.enabled` cluster setting.
  • Loading branch information
nvanbenschoten committed Feb 26, 2020
1 parent 9b85a90 commit 435fa43
Show file tree
Hide file tree
Showing 15 changed files with 684 additions and 325 deletions.
10 changes: 10 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ var optDrivenFKClusterMode = settings.RegisterBoolSetting(
true,
)

var implicitSelectForUpdateClusterMode = settings.RegisterBoolSetting(
"sql.defaults.implicit_select_for_update.enabled",
"default value for enable_implicit_select_for_update session setting; enables FOR UPDATE locking during the row-fetch phase of mutation statements",
true,
)

var insertFastPathClusterMode = settings.RegisterBoolSetting(
"sql.defaults.insert_fast_path.enabled",
"default value for enable_insert_fast_path session setting; enables a specialized insert path",
Expand Down Expand Up @@ -1882,6 +1888,10 @@ func (m *sessionDataMutator) SetOptimizerFKs(val bool) {
m.data.OptimizerFKs = val
}

func (m *sessionDataMutator) SetImplicitSelectForUpdate(val bool) {
m.data.ImplicitSelectForUpdate = val
}

func (m *sessionDataMutator) SetInsertFastPath(val bool) {
m.data.InsertFastPath = val
}
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/logictest/testdata/logic_test/alter_table
Original file line number Diff line number Diff line change
Expand Up @@ -1058,4 +1058,3 @@ ALTER TABLE t43092 ALTER COLUMN x DROP NOT NULL

statement ok
DROP TABLE t43092

3 changes: 3 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/pg_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -1569,6 +1569,7 @@ default_tablespace · NULL NULL
default_transaction_isolation serializable NULL NULL NULL string
default_transaction_read_only off NULL NULL NULL string
distsql off NULL NULL NULL string
enable_implicit_select_for_update on NULL NULL NULL string
enable_insert_fast_path on NULL NULL NULL string
enable_zigzag_join on NULL NULL NULL string
experimental_enable_hash_sharded_indexes off NULL NULL NULL string
Expand Down Expand Up @@ -1628,6 +1629,7 @@ default_tablespace · NULL user NU
default_transaction_isolation serializable NULL user NULL default default
default_transaction_read_only off NULL user NULL off off
distsql off NULL user NULL off off
enable_implicit_select_for_update on NULL user NULL on on
enable_insert_fast_path on NULL user NULL on on
enable_zigzag_join on NULL user NULL on on
experimental_enable_hash_sharded_indexes off NULL user NULL off off
Expand Down Expand Up @@ -1683,6 +1685,7 @@ default_tablespace NULL NULL NULL NULL
default_transaction_isolation NULL NULL NULL NULL NULL
default_transaction_read_only NULL NULL NULL NULL NULL
distsql NULL NULL NULL NULL NULL
enable_implicit_select_for_update NULL NULL NULL NULL NULL
enable_insert_fast_path NULL NULL NULL NULL NULL
enable_zigzag_join NULL NULL NULL NULL NULL
experimental_enable_hash_sharded_indexes NULL NULL NULL NULL NULL
Expand Down
101 changes: 51 additions & 50 deletions pkg/sql/logictest/testdata/logic_test/show_source
Original file line number Diff line number Diff line change
Expand Up @@ -25,56 +25,57 @@ SELECT *
FROM [SHOW ALL]
WHERE variable != 'optimizer' AND variable != 'crdb_version' AND variable != 'session_id'
----
variable value
application_name ·
bytea_output hex
client_encoding UTF8
client_min_messages notice
database test
datestyle ISO, MDY
default_int_size 8
default_tablespace ·
default_transaction_isolation serializable
default_transaction_read_only off
distsql off
enable_insert_fast_path on
enable_zigzag_join on
experimental_enable_hash_sharded_indexes off
experimental_enable_primary_key_changes off
experimental_enable_temp_tables off
experimental_optimizer_foreign_keys on
experimental_serial_normalization rowid
extra_float_digits 0
force_savepoint_restart off
idle_in_transaction_session_timeout 0
integer_datetimes on
intervalstyle postgres
locality region=test,dc=dc1
lock_timeout 0
max_identifier_length 128
max_index_keys 32
node_id 1
reorder_joins_limit 4
require_explicit_primary_keys off
results_buffer_size 16384
row_security off
search_path public
server_encoding UTF8
server_version 9.5.0
server_version_num 90500
session_user root
sql_safe_updates off
standard_conforming_strings on
statement_timeout 0
synchronize_seqscans on
timezone UTC
tracing off
transaction_isolation serializable
transaction_priority normal
transaction_read_only off
transaction_status NoTxn
vectorize auto
vectorize_row_count_threshold 0
variable value
application_name ·
bytea_output hex
client_encoding UTF8
client_min_messages notice
database test
datestyle ISO, MDY
default_int_size 8
default_tablespace ·
default_transaction_isolation serializable
default_transaction_read_only off
distsql off
enable_implicit_select_for_update on
enable_insert_fast_path on
enable_zigzag_join on
experimental_enable_hash_sharded_indexes off
experimental_enable_primary_key_changes off
experimental_enable_temp_tables off
experimental_optimizer_foreign_keys on
experimental_serial_normalization rowid
extra_float_digits 0
force_savepoint_restart off
idle_in_transaction_session_timeout 0
integer_datetimes on
intervalstyle postgres
locality region=test,dc=dc1
lock_timeout 0
max_identifier_length 128
max_index_keys 32
node_id 1
reorder_joins_limit 4
require_explicit_primary_keys off
results_buffer_size 16384
row_security off
search_path public
server_encoding UTF8
server_version 9.5.0
server_version_num 90500
session_user root
sql_safe_updates off
standard_conforming_strings on
statement_timeout 0
synchronize_seqscans on
timezone UTC
tracing off
transaction_isolation serializable
transaction_priority normal
transaction_read_only off
transaction_status NoTxn
vectorize auto
vectorize_row_count_threshold 0

query T colnames
SELECT * FROM [SHOW CLUSTER SETTING sql.defaults.distsql]
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/opt/exec/execbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ type Builder struct {
allowAutoCommit bool

allowInsertFastPath bool

// forceForUpdateLocking is conditionally passed through to factory methods
// for scan operators that serve as the input for mutation operators. When
// set to true, it ensures that a FOR UPDATE row-level locking mode is used
// by scans. See forUpdateLocking.
forceForUpdateLocking bool
}

// New constructs an instance of the execution node builder using the
Expand Down
133 changes: 126 additions & 7 deletions pkg/sql/opt/exec/execbuilder/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,14 @@ import (
)

func (b *Builder) buildMutationInput(
inputExpr memo.RelExpr, colList opt.ColList, p *memo.MutationPrivate,
mutExpr, inputExpr memo.RelExpr, colList opt.ColList, p *memo.MutationPrivate,
) (execPlan, error) {
if b.shouldApplyImplicitLockingToMutationInput(mutExpr) {
// Re-entrance is not possible because mutations are never nested.
b.forceForUpdateLocking = true
defer func() { b.forceForUpdateLocking = false }()
}

input, err := b.buildRelational(inputExpr)
if err != nil {
return execPlan{}, err
Expand Down Expand Up @@ -60,7 +66,7 @@ func (b *Builder) buildInsert(ins *memo.InsertExpr) (execPlan, error) {
colList := make(opt.ColList, 0, len(ins.InsertCols)+len(ins.CheckCols))
colList = appendColsWhenPresent(colList, ins.InsertCols)
colList = appendColsWhenPresent(colList, ins.CheckCols)
input, err := b.buildMutationInput(ins.Input, colList, &ins.MutationPrivate)
input, err := b.buildMutationInput(ins, ins.Input, colList, &ins.MutationPrivate)
if err != nil {
return execPlan{}, err
}
Expand Down Expand Up @@ -260,17 +266,16 @@ func (b *Builder) buildUpdate(upd *memo.UpdateExpr) (execPlan, error) {
colList := make(opt.ColList, 0, cnt)
colList = appendColsWhenPresent(colList, upd.FetchCols)
colList = appendColsWhenPresent(colList, upd.UpdateCols)

// The RETURNING clause of the Update can refer to the columns
// in any of the FROM tables. As a result, the Update may need
// to passthrough those columns so the projection above can use
// them.
if upd.NeedResults() {
colList = appendColsWhenPresent(colList, upd.PassthroughCols)
}

colList = appendColsWhenPresent(colList, upd.CheckCols)
input, err := b.buildMutationInput(upd.Input, colList, &upd.MutationPrivate)

input, err := b.buildMutationInput(upd, upd.Input, colList, &upd.MutationPrivate)
if err != nil {
return execPlan{}, err
}
Expand Down Expand Up @@ -348,7 +353,8 @@ func (b *Builder) buildUpsert(ups *memo.UpsertExpr) (execPlan, error) {
colList = append(colList, ups.CanaryCol)
}
colList = appendColsWhenPresent(colList, ups.CheckCols)
input, err := b.buildMutationInput(ups.Input, colList, &ups.MutationPrivate)

input, err := b.buildMutationInput(ups, ups.Input, colList, &ups.MutationPrivate)
if err != nil {
return execPlan{}, err
}
Expand Down Expand Up @@ -409,7 +415,8 @@ func (b *Builder) buildDelete(del *memo.DeleteExpr) (execPlan, error) {
// Upgrade execution engine to not require this.
colList := make(opt.ColList, 0, len(del.FetchCols))
colList = appendColsWhenPresent(colList, del.FetchCols)
input, err := b.buildMutationInput(del.Input, colList, &del.MutationPrivate)

input, err := b.buildMutationInput(del, del.Input, colList, &del.MutationPrivate)
if err != nil {
return execPlan{}, err
}
Expand Down Expand Up @@ -724,3 +731,115 @@ func (b *Builder) canAutoCommit(rel memo.RelExpr) bool {
return false
}
}

// forUpdateLocking is the row-level locking mode used by mutations during their
// initial row scan, when such locking is deemed desirable. The locking mode is
// equivalent that used by a SELECT ... FOR UPDATE statement.
var forUpdateLocking = &tree.LockingItem{Strength: tree.ForUpdate}

// shouldApplyImplicitLockingToMutationInput determines whether or not the
// builder should apply a FOR UPDATE row-level locking mode to the initial row
// scan of a mutation expression.
func (b *Builder) shouldApplyImplicitLockingToMutationInput(mutExpr memo.RelExpr) bool {
switch t := mutExpr.(type) {
case *memo.InsertExpr:
// Unlike with the other three mutation expressions, it never makes
// sense to apply implicit row-level locking to the input of an INSERT
// expression because any contention results in unique constraint
// violations.
return false

case *memo.UpdateExpr:
return b.shouldApplyImplicitLockingToUpdateInput(t)

case *memo.UpsertExpr:
return b.shouldApplyImplicitLockingToUpsertInput(t)

case *memo.DeleteExpr:
return b.shouldApplyImplicitLockingToDeleteInput(t)

default:
panic(errors.AssertionFailedf("unexpected mutation expression %T", t))
}
}

// shouldApplyImplicitLockingToUpdateInput determines whether or not the builder
// should apply a FOR UPDATE row-level locking mode to the initial row scan of
// an UPDATE statement.
//
// Conceptually, if we picture an UPDATE statement as the composition of a
// SELECT statement and an INSERT statement (with loosened semantics around
// existing rows) then this method determines whether the builder should perform
// the following transformation:
//
// UPDATE t = SELECT FROM t + INSERT INTO t
// =>
// UPDATE t = SELECT FROM t FOR UPDATE + INSERT INTO t
//
// The transformation is conditional on the UPDATE expression tree matching a
// pattern. Specifically, the FOR UPDATE locking mode is only used during the
// initial row scan when all row filters have been pushed into the ScanExpr. If
// the statement includes any filters that cannot be pushed into the scan then
// no row-level locking mode is applied. The rationale here is that FOR UPDATE
// locking is not necessary for correctness due to serializable isolation, so it
// is strictly a performance optimization for contended writes. Therefore, it is
// not worth risking the transformation being a pessimization, so it is only
// applied when doing so does not risk creating artificial contention.
func (b *Builder) shouldApplyImplicitLockingToUpdateInput(upd *memo.UpdateExpr) bool {
if !b.evalCtx.SessionData.ImplicitSelectForUpdate {
return false
}

// Try to match the pattern:
//
// (Update
// $input:(Scan $scanPrivate:*)
// $checks:*
// $mutationPrivate:*
// )
//
// Or
//
// (Update
// $input:(Project
// (Scan $scanPrivate:*)
// $projections:*
// $passthrough:*
// )
// $checks:*
// $mutationPrivate:*
// )
//
_, ok := upd.Input.(*memo.ScanExpr)
if !ok {
proj, ok := upd.Input.(*memo.ProjectExpr)
if !ok {
return false
}
_, ok = proj.Input.(*memo.ScanExpr)
if !ok {
return false
}
}
return true
}

// tryApplyImplicitLockingToUpsertInput determines whether or not the builder
// should apply a FOR UPDATE row-level locking mode to the initial row scan of
// an UPSERT statement.
//
// TODO(nvanbenschoten): implement this method to match on appropriate Upsert
// expression trees and apply a row-level locking mode.
func (b *Builder) shouldApplyImplicitLockingToUpsertInput(ups *memo.UpsertExpr) bool {
return false
}

// tryApplyImplicitLockingToDeleteInput determines whether or not the builder
// should apply a FOR UPDATE row-level locking mode to the initial row scan of
// an DELETE statement.
//
// TODO(nvanbenschoten): implement this method to match on appropriate Delete
// expression trees and apply a row-level locking mode.
func (b *Builder) shouldApplyImplicitLockingToDeleteInput(del *memo.DeleteExpr) bool {
return false
}
7 changes: 6 additions & 1 deletion pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,11 @@ func (b *Builder) buildScan(scan *memo.ScanExpr) (execPlan, error) {
hardLimit = 0
}

locking := scan.Locking
if b.forceForUpdateLocking {
locking = forUpdateLocking
}

root, err := b.factory.ConstructScan(
tab,
tab.Index(scan.Index),
Expand All @@ -501,7 +506,7 @@ func (b *Builder) buildScan(scan *memo.ScanExpr) (execPlan, error) {
b.indexConstraintMaxResults(scan),
res.reqOrdering(scan),
rowCount,
scan.Locking,
locking,
)
if err != nil {
return execPlan{}, err
Expand Down
Loading

0 comments on commit 435fa43

Please sign in to comment.