diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index 6424343190a3..cfbd8855e644 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -221,11 +221,12 @@ type Streamer struct { distSender *kvcoord.DistSender stopper *stop.Stopper - mode OperationMode - hints Hints - maxKeysPerRow int32 - budget *budget - keyLocking lock.Strength + mode OperationMode + hints Hints + maxKeysPerRow int32 + budget *budget + lockStrength lock.Strength + lockDurability lock.Durability streamerStatistics @@ -370,16 +371,18 @@ func NewStreamer( acc *mon.BoundAccount, kvPairsRead *int64, batchRequestsIssued *int64, - keyLocking lock.Strength, + lockStrength lock.Strength, + lockDurability lock.Durability, ) *Streamer { if txn.Type() != kv.LeafTxn { panic(errors.AssertionFailedf("RootTxn is given to the Streamer")) } s := &Streamer{ - distSender: distSender, - stopper: stopper, - budget: newBudget(acc, limitBytes), - keyLocking: keyLocking, + distSender: distSender, + stopper: stopper, + budget: newBudget(acc, limitBytes), + lockStrength: lockStrength, + lockDurability: lockDurability, } if kvPairsRead == nil { @@ -1744,7 +1747,8 @@ func buildResumeSingleRangeBatch( newGet := gets[0] gets = gets[1:] newGet.req.SetSpan(*get.ResumeSpan) - newGet.req.KeyLockingStrength = s.keyLocking + newGet.req.KeyLockingStrength = s.lockStrength + newGet.req.KeyLockingDurability = s.lockDurability newGet.union.Get = &newGet.req resumeReq.reqs[resumeReqIdx].Value = &newGet.union resumeReq.positions = append(resumeReq.positions, position) @@ -1772,7 +1776,8 @@ func buildResumeSingleRangeBatch( scans = scans[1:] newScan.req.SetSpan(*scan.ResumeSpan) newScan.req.ScanFormat = kvpb.BATCH_RESPONSE - newScan.req.KeyLockingStrength = s.keyLocking + newScan.req.KeyLockingStrength = s.lockStrength + newScan.req.KeyLockingDurability = s.lockDurability newScan.union.Scan = &newScan.req resumeReq.reqs[resumeReqIdx].Value = &newScan.union resumeReq.positions = append(resumeReq.positions, position) diff --git a/pkg/kv/kvclient/kvstreamer/streamer_accounting_test.go b/pkg/kv/kvclient/kvstreamer/streamer_accounting_test.go index c8caa524fd24..9b2bc2628679 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer_accounting_test.go +++ b/pkg/kv/kvclient/kvstreamer/streamer_accounting_test.go @@ -101,6 +101,7 @@ func TestStreamerMemoryAccounting(t *testing.T) { nil, /* kvPairsRead */ nil, /* batchRequestsIssued */ lock.None, + lock.Unreplicated, ) s.Init(OutOfOrder, Hints{UniqueRequests: true, SingleRowLookup: singleRowLookup}, 1 /* maxKeysPerRow */, nil /* diskBuffer */) return s diff --git a/pkg/kv/kvclient/kvstreamer/streamer_test.go b/pkg/kv/kvclient/kvstreamer/streamer_test.go index d479e51297cf..187b360e9079 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer_test.go +++ b/pkg/kv/kvclient/kvstreamer/streamer_test.go @@ -59,6 +59,7 @@ func getStreamer( nil, /* kvPairsRead */ nil, /* batchRequestsIssued */ lock.None, + lock.Unreplicated, ) } diff --git a/pkg/sql/catalog/descpb/locking.go b/pkg/sql/catalog/descpb/locking.go index 7545a0a52e92..47195a8707d4 100644 --- a/pkg/sql/catalog/descpb/locking.go +++ b/pkg/sql/catalog/descpb/locking.go @@ -80,3 +80,28 @@ func ToScanLockingWaitPolicy(wp tree.LockingWaitPolicy) ScanLockingWaitPolicy { panic(errors.AssertionFailedf("unknown locking wait policy %s", wp)) } } + +// PrettyString returns the locking durability as a user-readable string. +func (s ScanLockingDurability) PrettyString() string { + switch s { + case ScanLockingDurability_BEST_EFFORT: + return "best effort" + case ScanLockingDurability_GUARANTEED: + return "guaranteed" + default: + panic(errors.AssertionFailedf("unexpected durability %s", s)) + } +} + +// ToScanLockingDurability converts a tree.LockingDurability to its +// corresponding ScanLockingDurability. +func ToScanLockingDurability(s tree.LockingDurability) ScanLockingDurability { + switch s { + case tree.LockDurabilityBestEffort: + return ScanLockingDurability_BEST_EFFORT + case tree.LockDurabilityGuaranteed: + return ScanLockingDurability_GUARANTEED + default: + panic(errors.AssertionFailedf("unknown locking durability %d", s)) + } +} diff --git a/pkg/sql/catalog/descpb/locking.proto b/pkg/sql/catalog/descpb/locking.proto index 79ba3bd07536..03d1cf61444f 100644 --- a/pkg/sql/catalog/descpb/locking.proto +++ b/pkg/sql/catalog/descpb/locking.proto @@ -129,3 +129,9 @@ enum ScanLockingWaitPolicy { // ERROR represents NOWAIT - raise an error if a row cannot be locked. ERROR = 2; } + +// LockingDurability controls the durability of locks. +enum ScanLockingDurability { + BEST_EFFORT = 0; + GUARANTEED = 1; +} diff --git a/pkg/sql/colfetcher/colbatch_direct_scan.go b/pkg/sql/colfetcher/colbatch_direct_scan.go index 058f72404671..63066ad04cfa 100644 --- a/pkg/sql/colfetcher/colbatch_direct_scan.go +++ b/pkg/sql/colfetcher/colbatch_direct_scan.go @@ -215,6 +215,7 @@ func NewColBatchDirectScan( spec.Reverse, spec.LockingStrength, spec.LockingWaitPolicy, + spec.LockingDurability, flowCtx.EvalCtx.SessionData().LockTimeout, kvFetcherMemAcc, flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, diff --git a/pkg/sql/colfetcher/colbatch_scan.go b/pkg/sql/colfetcher/colbatch_scan.go index 199eeb7c4c0f..e6a726150e44 100644 --- a/pkg/sql/colfetcher/colbatch_scan.go +++ b/pkg/sql/colfetcher/colbatch_scan.go @@ -350,6 +350,7 @@ func NewColBatchScan( spec.Reverse, spec.LockingStrength, spec.LockingWaitPolicy, + spec.LockingDurability, flowCtx.EvalCtx.SessionData().LockTimeout, kvFetcherMemAcc, flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index ea66b7f16f2d..afd825f12bf0 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -563,6 +563,7 @@ func NewColIndexJoin( flowCtx.EvalCtx.Settings, spec.LockingWaitPolicy, spec.LockingStrength, + spec.LockingDurability, streamerBudgetLimit, streamerBudgetAcc, spec.MaintainOrdering, @@ -580,6 +581,7 @@ func NewColIndexJoin( false, /* reverse */ spec.LockingStrength, spec.LockingWaitPolicy, + spec.LockingDurability, flowCtx.EvalCtx.SessionData().LockTimeout, kvFetcherMemAcc, flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 6c9e5e1844c2..2f2925e4b08e 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -1665,6 +1665,7 @@ func initTableReaderSpecTemplate( TableDescriptorModificationTime: n.desc.GetModificationTime(), LockingStrength: n.lockingStrength, LockingWaitPolicy: n.lockingWaitPolicy, + LockingDurability: n.lockingDurability, } if err := rowenc.InitIndexFetchSpec(&s.FetchSpec, codec, n.desc, n.index, colIDs); err != nil { return nil, execinfrapb.PostProcessSpec{}, err @@ -2740,6 +2741,7 @@ func (dsp *DistSQLPlanner) createPlanForIndexJoin( Type: descpb.InnerJoin, LockingStrength: n.table.lockingStrength, LockingWaitPolicy: n.table.lockingWaitPolicy, + LockingDurability: n.table.lockingDurability, MaintainOrdering: len(n.reqOrdering) > 0, LimitHint: n.limitHint, } @@ -2817,6 +2819,7 @@ func (dsp *DistSQLPlanner) createPlanForLookupJoin( Type: n.joinType, LockingStrength: n.table.lockingStrength, LockingWaitPolicy: n.table.lockingWaitPolicy, + LockingDurability: n.table.lockingDurability, // TODO(sumeer): specifying ordering here using isFirstJoinInPairedJoiner // is late in the sense that the cost of this has not been taken into // account. Make this decision earlier in CustomFuncs.GenerateLookupJoins. @@ -2949,6 +2952,7 @@ func (dsp *DistSQLPlanner) createPlanForInvertedJoin( OutputGroupContinuationForLeftRow: n.isFirstJoinInPairedJoiner, LockingStrength: n.table.lockingStrength, LockingWaitPolicy: n.table.lockingWaitPolicy, + LockingDurability: n.table.lockingDurability, } fetchColIDs := make([]descpb.ColumnID, len(n.table.cols)) @@ -3045,6 +3049,7 @@ func (dsp *DistSQLPlanner) createPlanForZigzagJoin( fixedValues: valuesSpec, lockingStrength: side.scan.lockingStrength, lockingWaitPolicy: side.scan.lockingWaitPolicy, + lockingDurability: side.scan.lockingDurability, } } @@ -3064,6 +3069,7 @@ type zigzagPlanningSide struct { fixedValues *execinfrapb.ValuesCoreSpec lockingStrength descpb.ScanLockingStrength lockingWaitPolicy descpb.ScanLockingWaitPolicy + lockingDurability descpb.ScanLockingDurability } type zigzagPlanningInfo struct { diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index 41cc4fdf1fd2..aee191992469 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -256,6 +256,7 @@ func (e *distSQLSpecExecFactory) ConstructScan( } trSpec.LockingStrength = descpb.ToScanLockingStrength(params.Locking.Strength) trSpec.LockingWaitPolicy = descpb.ToScanLockingWaitPolicy(params.Locking.WaitPolicy) + trSpec.LockingDurability = descpb.ToScanLockingDurability(params.Locking.Durability) if trSpec.LockingStrength != descpb.ScanLockingStrength_FOR_NONE { // Scans that are performing row-level locking cannot currently be // distributed because their locks would not be propagated back to @@ -736,6 +737,7 @@ func (e *distSQLSpecExecFactory) constructZigzagJoinSide( fixedValues: valuesSpec, lockingStrength: descpb.ToScanLockingStrength(locking.Strength), lockingWaitPolicy: descpb.ToScanLockingWaitPolicy(locking.WaitPolicy), + lockingDurability: descpb.ToScanLockingDurability(locking.Durability), }, nil } diff --git a/pkg/sql/execinfrapb/processors_sql.proto b/pkg/sql/execinfrapb/processors_sql.proto index 86a7e8c952d8..9ba8d3e646a2 100644 --- a/pkg/sql/execinfrapb/processors_sql.proto +++ b/pkg/sql/execinfrapb/processors_sql.proto @@ -121,6 +121,9 @@ message TableReaderSpec { // to BLOCK when locking_strength is FOR_NONE. optional sqlbase.ScanLockingWaitPolicy locking_wait_policy = 11 [(gogoproto.nullable) = false]; + // Indicates the row-level locking durability to be used by the scan. + optional sqlbase.ScanLockingDurability locking_durability = 23 [(gogoproto.nullable) = false]; + // Indicates that misplanned ranges metadata should not be sent back to the // DistSQLReceiver. This will be set to true for the scan with a hard limit // (in which case we create a single processor that is placed at the @@ -327,6 +330,9 @@ message JoinReaderSpec { // to BLOCK when locking_strength is FOR_NONE. optional sqlbase.ScanLockingWaitPolicy locking_wait_policy = 10 [(gogoproto.nullable) = false]; + // Indicates the row-level locking durability to be used by the scan. + optional sqlbase.ScanLockingDurability locking_durability = 24 [(gogoproto.nullable) = false]; + // Indicates that the join reader should maintain the ordering of the input // stream. This is applicable to both lookup joins and index joins. For lookup // joins, maintaining order is expensive because it requires buffering. For @@ -469,6 +475,9 @@ message ZigzagJoinerSpec { // held by other active transactions when attempting to lock rows. Always set // to BLOCK when locking_strength is FOR_NONE. optional sqlbase.ScanLockingWaitPolicy locking_wait_policy = 5 [(gogoproto.nullable) = false]; + + // Indicates the row-level locking durability to be used by the scan. + optional sqlbase.ScanLockingDurability locking_durability = 8 [(gogoproto.nullable) = false]; } repeated Side sides = 7 [(gogoproto.nullable) = false]; @@ -717,6 +726,9 @@ message InvertedJoinerSpec { // held by other active transactions when attempting to lock rows. Always set // to BLOCK when locking_strength is FOR_NONE. optional sqlbase.ScanLockingWaitPolicy locking_wait_policy = 13 [(gogoproto.nullable) = false]; + + // Indicates the row-level locking durability to be used by the scan. + optional sqlbase.ScanLockingDurability locking_durability = 14 [(gogoproto.nullable) = false]; } // InvertedFiltererSpec is the specification of a processor that does filtering diff --git a/pkg/sql/insert_fast_path.go b/pkg/sql/insert_fast_path.go index 862837f3c3f7..df99e9c2af2f 100644 --- a/pkg/sql/insert_fast_path.go +++ b/pkg/sql/insert_fast_path.go @@ -191,6 +191,7 @@ func (r *insertFastPathRun) addFKChecks( } lockStrength := row.GetKeyLockingStrength(descpb.ToScanLockingStrength(c.Locking.Strength)) lockWaitPolicy := row.GetWaitPolicy(descpb.ToScanLockingWaitPolicy(c.Locking.WaitPolicy)) + lockDurability := row.GetKeyLockingDurability(descpb.ToScanLockingDurability(c.Locking.Durability)) if r.fkBatch.Header.WaitPolicy != lockWaitPolicy { return errors.AssertionFailedf( "FK check lock wait policy %s did not match %s", @@ -200,9 +201,9 @@ func (r *insertFastPathRun) addFKChecks( reqIdx := len(r.fkBatch.Requests) r.fkBatch.Requests = append(r.fkBatch.Requests, kvpb.RequestUnion{}) r.fkBatch.Requests[reqIdx].MustSetInner(&kvpb.ScanRequest{ - RequestHeader: kvpb.RequestHeaderFromSpan(span), - KeyLockingStrength: lockStrength, - // TODO(michae2): Once #100193 is finished, also include c.Locking.Durability. + RequestHeader: kvpb.RequestHeaderFromSpan(span), + KeyLockingStrength: lockStrength, + KeyLockingDurability: lockDurability, }) r.fkSpanInfo = append(r.fkSpanInfo, insertFastPathFKSpanInfo{ check: c, diff --git a/pkg/sql/logictest/testdata/logic_test/fk_read_committed b/pkg/sql/logictest/testdata/logic_test/fk_read_committed index a26d6b4367b0..93e5703a909c 100644 --- a/pkg/sql/logictest/testdata/logic_test/fk_read_committed +++ b/pkg/sql/logictest/testdata/logic_test/fk_read_committed @@ -3,9 +3,6 @@ statement ok SET CLUSTER SETTING sql.txn.read_committed_syntax.enabled = true -# Some foreign key checks are prohibited under weaker isolation levels until we -# improve locking. See #80683, #100156, #100193. - statement ok CREATE TABLE jars (j INT PRIMARY KEY) @@ -18,32 +15,20 @@ SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED statement ok INSERT INTO jars VALUES (1), (2) -# Foreign key checks of the parent require durable shared locking under weaker -# isolation levels, and are not yet supported. -query error pgcode 0A000 guaranteed-durable locking not yet implemented -INSERT INTO cookies VALUES (1, 1) - -statement ok -BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE - statement ok INSERT INTO cookies VALUES (1, 1) statement ok -COMMIT - -query error pgcode 0A000 guaranteed-durable locking not yet implemented UPDATE cookies SET j = 2 WHERE c = 1 -# Foreign key checks of the child do not require locking. query error violates foreign key constraint UPDATE jars SET j = j + 4 query error violates foreign key constraint -DELETE FROM jars WHERE j = 1 +DELETE FROM jars WHERE j = 2 statement ok DELETE FROM cookies WHERE c = 1 statement ok -DELETE FROM jars WHERE j = 1 +DELETE FROM jars WHERE j = 2 diff --git a/pkg/sql/logictest/testdata/logic_test/read_committed b/pkg/sql/logictest/testdata/logic_test/read_committed index bb34d002d6d2..25209292b33a 100644 --- a/pkg/sql/logictest/testdata/logic_test/read_committed +++ b/pkg/sql/logictest/testdata/logic_test/read_committed @@ -5,9 +5,6 @@ SET CLUSTER SETTING sql.txn.read_committed_syntax.enabled = true subtest select_for_update -# SELECT FOR UPDATE is prohibited under weaker isolation levels until we improve -# locking. See #57031, #75457, #100144, #100193. - statement ok CREATE TABLE supermarket ( person STRING PRIMARY KEY, @@ -22,7 +19,7 @@ statement ok INSERT INTO supermarket (person, aisle) VALUES ('abbie', 1), ('gideon', 2), ('matilda', 3), ('michael', 4) -# SELECT FOR UPDATE should still work under serializable isolation. +# Use SELECT FOR UPDATE under serializable isolation. statement ok BEGIN @@ -37,25 +34,29 @@ UPDATE supermarket SET aisle = 2 WHERE person = 'abbie' statement ok COMMIT -# It should fail under read committed isolation. +# Use SELECT FOR UPDATE under read committed isolation. statement ok BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED -query error pgcode 0A000 guaranteed-durable locking not yet implemented +query I SELECT aisle FROM supermarket WHERE person = 'matilda' FOR UPDATE +---- +3 statement ok ROLLBACK -# It should also fail under snapshot isolation. +# Use SELECT FOR UPDATE under snapshot isolation. statement ok SET CLUSTER SETTING sql.txn.snapshot_isolation_syntax.enabled = true statement ok BEGIN TRANSACTION ISOLATION LEVEL SNAPSHOT -query error pgcode 0A000 guaranteed-durable locking not yet implemented +query I SELECT aisle FROM supermarket WHERE person = 'matilda' FOR UPDATE +---- +3 statement ok ROLLBACK @@ -63,14 +64,14 @@ ROLLBACK statement ok RESET CLUSTER SETTING sql.txn.snapshot_isolation_syntax.enabled -# SELECT FOR UPDATE in a subquery should also fail under read committed. +# Use SELECT FOR UPDATE in a subquery under read committed isolation. statement ok BEGIN TRANSACTION statement ok SET TRANSACTION ISOLATION LEVEL READ COMMITTED; -query error pgcode 0A000 guaranteed-durable locking not yet implemented +statement ok UPDATE supermarket SET aisle = (SELECT aisle FROM supermarket WHERE person = 'matilda' FOR UPDATE) WHERE person = 'michael' @@ -78,21 +79,24 @@ UPDATE supermarket statement ok ROLLBACK -# It should also fail in a CTE. +# Use SELECT FOR UPDATE in a CTE under read committed isolation. statement ok BEGIN TRANSACTION statement ok SET TRANSACTION ISOLATION LEVEL READ COMMITTED; -query error pgcode 0A000 guaranteed-durable locking not yet implemented +query I WITH s AS (SELECT aisle FROM supermarket WHERE person = 'matilda' FOR UPDATE) SELECT aisle + 1 FROM s +---- +4 statement ok ROLLBACK +# Use SELECT FOR UPDATE in a UDF under read committed isolation. statement ok CREATE FUNCTION wrangle (name STRING) RETURNS INT LANGUAGE SQL AS $$ SELECT aisle FROM supermarket WHERE person = name FOR UPDATE @@ -101,8 +105,7 @@ $$ statement ok SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED -# But calling that function should fail. -query error pgcode 0A000 guaranteed-durable locking not yet implemented +statement ok INSERT INTO supermarket (person, aisle) VALUES ('grandma', wrangle('matilda')) statement ok @@ -114,42 +117,50 @@ DROP FUNCTION wrangle statement ok SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED -# Preparing a SELECT FOR UPDATE should succeed under read committed. +# Prepare and execute a SELECT FOR UPDATE under read committed isolation. statement ok PREPARE psa AS SELECT aisle FROM supermarket WHERE person = $1::STRING FOR UPDATE -# But execution should fail. -query error pgcode 0A000 guaranteed-durable locking not yet implemented +query I EXECUTE psa('matilda') +---- +3 statement ok DEALLOCATE psa -# SELECT FOR UPDATE using a lookup join should also fail. -query error pgcode 0A000 guaranteed-durable locking not yet implemented +# Use SELECT FOR UPDATE with a lookup join under read committed isolation. +query I WITH names AS MATERIALIZED (SELECT 'matilda' AS person) SELECT aisle FROM names NATURAL INNER LOOKUP JOIN supermarket FOR UPDATE +---- +3 -# SELECT FOR UPDATE using an index join should also fail. -query error pgcode 0A000 guaranteed-durable locking not yet implemented +# Use SELECT FOR UPDATE with an index join under read committed isolation. +query I rowsort SELECT aisle FROM supermarket@supermarket_starts_with_idx WHERE starts_with = 'm' FOR UPDATE +---- +3 +4 -# SELECT FOR UPDATE using a zigzag join should also fail. +# Use SELECT FOR UPDATE with a zigzag join under read committed isolation. statement ok SET enable_zigzag_join = true -query error pgcode 0A000 guaranteed-durable locking not yet implemented +query I SELECT aisle FROM supermarket@{FORCE_ZIGZAG} WHERE starts_with = 'm' AND ends_with = 'lda' FOR UPDATE +---- +3 statement ok RESET enable_zigzag_join diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index b1e2007e95f5..34062832d67a 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -2916,11 +2916,6 @@ func (b *Builder) buildLocking(locking opt.Locking) (opt.Locking, error) { 110873, "explicit unique checks are not yet supported under read committed isolation", ) } - if locking.Durability == tree.LockDurabilityGuaranteed { - return opt.Locking{}, unimplemented.NewWithIssuef( - 100193, "guaranteed-durable locking not yet implemented", - ) - } b.ContainsNonDefaultKeyLocking = true } return locking, nil diff --git a/pkg/sql/opt/exec/execbuilder/testdata/fk_read_committed b/pkg/sql/opt/exec/execbuilder/testdata/fk_read_committed index 7f2634b3bb2d..64a42ffa2ede 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/fk_read_committed +++ b/pkg/sql/opt/exec/execbuilder/testdata/fk_read_committed @@ -28,6 +28,24 @@ insert cookies ├── with-scan &1 └── filters (true) +query T +EXPLAIN (VERBOSE) INSERT INTO cookies VALUES (1, 1) +---- +distribution: local +vectorized: true +· +• insert fast path + columns: () + estimated row count: 0 (missing stats) + into: cookies(c, j) + auto commit + FK check: jars@jars_pkey + FK check locking strength: for share + FK check locking durability: guaranteed + size: 2 columns, 1 row + row 0, expr 0: 1 + row 0, expr 1: 1 + # Under serializable isolation, locking is not required, unless # enable_implicit_fk_locking_for_serializable is true. statement ok @@ -63,6 +81,43 @@ insert cookies ├── with-scan &1 └── filters (true) +statement ok +SET enable_durable_locking_for_serializable = true + +query T +EXPLAIN (OPT) INSERT INTO cookies VALUES (1, 1) +---- +insert cookies + ├── values + │ └── (1, 1) + └── f-k-checks + └── f-k-checks-item: cookies(j) -> jars(j) + └── anti-join (lookup jars) + ├── lookup columns are key + ├── locking: for-share,durability-guaranteed + ├── with-scan &1 + └── filters (true) + +query T +EXPLAIN (VERBOSE) INSERT INTO cookies VALUES (1, 1) +---- +distribution: local +vectorized: true +· +• insert fast path + columns: () + estimated row count: 0 (missing stats) + into: cookies(c, j) + FK check: jars@jars_pkey + FK check locking strength: for share + FK check locking durability: guaranteed + size: 2 columns, 1 row + row 0, expr 0: 1 + row 0, expr 1: 1 + +statement ok +RESET enable_durable_locking_for_serializable + statement ok RESET enable_implicit_fk_locking_for_serializable @@ -86,6 +141,60 @@ update cookies ├── with-scan &1 └── filters (true) +query T +EXPLAIN (VERBOSE) UPDATE cookies SET j = 2 WHERE c = 1 +---- +distribution: local +vectorized: true +· +• root +│ columns: () +│ +├── • update +│ │ columns: () +│ │ estimated row count: 0 (missing stats) +│ │ table: cookies +│ │ set: j +│ │ +│ └── • buffer +│ │ columns: (c, j, j_new) +│ │ label: buffer 1 +│ │ +│ └── • render +│ │ columns: (c, j, j_new) +│ │ render j_new: 2 +│ │ render c: c +│ │ render j: j +│ │ +│ └── • scan +│ columns: (c, j) +│ estimated row count: 1 (missing stats) +│ table: cookies@cookies_pkey +│ spans: /1/0 +│ locking strength: for update +│ +└── • constraint-check + │ + └── • error if rows + │ columns: () + │ + └── • lookup join (anti) + │ columns: (j_new) + │ estimated row count: 0 (missing stats) + │ table: jars@jars_pkey + │ equality: (j_new) = (j) + │ equality cols are key + │ locking strength: for share + │ locking durability: guaranteed + │ + └── • project + │ columns: (j_new) + │ + └── • scan buffer + columns: (c, j, j_new) + estimated row count: 1 (missing stats) + label: buffer 1 + # Foreign key checks of the child do not require locking. query T EXPLAIN (OPT) UPDATE jars SET j = j + 4 @@ -107,6 +216,83 @@ update jars └── filters └── j = cookies.j +query T +EXPLAIN (VERBOSE) UPDATE jars SET j = j + 4 +---- +distribution: local +vectorized: true +· +• root +│ columns: () +│ +├── • update +│ │ columns: () +│ │ estimated row count: 0 (missing stats) +│ │ table: jars +│ │ set: j +│ │ +│ └── • buffer +│ │ columns: (j, j_new) +│ │ label: buffer 1 +│ │ +│ └── • render +│ │ columns: (j, j_new) +│ │ render j_new: j + 4 +│ │ render j: j +│ │ +│ └── • scan +│ columns: (j) +│ estimated row count: 1,000 (missing stats) +│ table: jars@jars_pkey +│ spans: FULL SCAN +│ locking strength: for update +│ +└── • constraint-check + │ + └── • error if rows + │ columns: () + │ + └── • project + │ columns: (j) + │ + └── • hash join (inner) + │ columns: (j, j) + │ estimated row count: 99 (missing stats) + │ equality: (j) = (j) + │ left cols are key + │ right cols are key + │ + ├── • except all + │ │ columns: (j) + │ │ estimated row count: 1,000 (missing stats) + │ │ + │ ├── • project + │ │ │ columns: (j) + │ │ │ + │ │ └── • scan buffer + │ │ columns: (j, j_new) + │ │ estimated row count: 1,000 (missing stats) + │ │ label: buffer 1 + │ │ + │ └── • project + │ │ columns: (j_new) + │ │ + │ └── • scan buffer + │ columns: (j, j_new) + │ estimated row count: 1,000 (missing stats) + │ label: buffer 1 + │ + └── • distinct + │ columns: (j) + │ estimated row count: 100 (missing stats) + │ distinct on: j + │ + └── • scan + columns: (j) + estimated row count: 1,000 (missing stats) + table: cookies@cookies_pkey + spans: FULL SCAN + query T EXPLAIN (OPT) DELETE FROM jars WHERE j = 1 ---- @@ -120,3 +306,49 @@ delete jars ├── scan cookies └── filters └── j = cookies.j + +query T +EXPLAIN (VERBOSE) DELETE FROM jars WHERE j = 1 +---- +distribution: local +vectorized: true +· +• root +│ columns: () +│ +├── • delete +│ │ columns: () +│ │ estimated row count: 0 (missing stats) +│ │ from: jars +│ │ +│ └── • buffer +│ │ columns: (j) +│ │ label: buffer 1 +│ │ +│ └── • scan +│ columns: (j) +│ estimated row count: 1 (missing stats) +│ table: jars@jars_pkey +│ spans: /1/0 +│ +└── • constraint-check + │ + └── • error if rows + │ columns: () + │ + └── • hash join (right semi) + │ columns: (j) + │ estimated row count: 1 (missing stats) + │ equality: (j) = (j) + │ right cols are key + │ + ├── • scan + │ columns: (j) + │ estimated row count: 1,000 (missing stats) + │ table: cookies@cookies_pkey + │ spans: FULL SCAN + │ + └── • scan buffer + columns: (j) + estimated row count: 1 (missing stats) + label: buffer 1 diff --git a/pkg/sql/opt/exec/execbuilder/testdata/select_for_update_read_committed b/pkg/sql/opt/exec/execbuilder/testdata/select_for_update_read_committed index abf1d2a98410..6d5a174c2076 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/select_for_update_read_committed +++ b/pkg/sql/opt/exec/execbuilder/testdata/select_for_update_read_committed @@ -28,6 +28,23 @@ project ├── constraint: /1: [/'matilda' - /'matilda'] └── locking: for-update,durability-guaranteed +query T +EXPLAIN (VERBOSE) SELECT aisle FROM supermarket WHERE person = 'matilda' FOR UPDATE +---- +distribution: local +vectorized: true +· +• project +│ columns: (aisle) +│ +└── • scan + columns: (person, aisle) + estimated row count: 1 (missing stats) + table: supermarket@supermarket_pkey + spans: /"matilda"/0 + locking strength: for update + locking durability: guaranteed + query T EXPLAIN (OPT) UPDATE supermarket @@ -45,6 +62,55 @@ update supermarket ├── constraint: /13: [/'matilda' - /'matilda'] └── locking: for-update,durability-guaranteed +query T +EXPLAIN (VERBOSE) +UPDATE supermarket + SET aisle = (SELECT aisle FROM supermarket WHERE person = 'matilda' FOR UPDATE) + WHERE person = 'michael' +---- +distribution: local +vectorized: true +· +• root +│ columns: () +│ +├── • update +│ │ columns: () +│ │ estimated row count: 0 (missing stats) +│ │ table: supermarket +│ │ set: aisle +│ │ auto commit +│ │ +│ └── • render +│ │ columns: (person, aisle, ends_with, aisle_new) +│ │ render aisle_new: @S1 +│ │ render person: person +│ │ render aisle: aisle +│ │ render ends_with: ends_with +│ │ +│ └── • scan +│ columns: (person, aisle, ends_with) +│ estimated row count: 1 (missing stats) +│ table: supermarket@supermarket_pkey +│ spans: /"michael"/0 +│ locking strength: for update +│ +└── • subquery + │ id: @S1 + │ original sql: (SELECT aisle FROM supermarket WHERE person = 'matilda' FOR UPDATE) + │ exec mode: one row + │ + └── • project + │ columns: (aisle) + │ + └── • scan + columns: (person, aisle) + estimated row count: 1 (missing stats) + table: supermarket@supermarket_pkey + spans: /"matilda"/0 + locking strength: for update + locking durability: guaranteed + query T EXPLAIN (OPT) WITH s AS @@ -61,6 +127,47 @@ with &1 (s) └── projections └── aisle + 1 +query T +EXPLAIN (VERBOSE) +WITH s AS + (SELECT aisle FROM supermarket WHERE person = 'matilda' FOR UPDATE) +SELECT aisle + 1 FROM s +---- +distribution: local +vectorized: true +· +• root +│ columns: ("?column?") +│ +├── • render +│ │ columns: ("?column?") +│ │ render ?column?: aisle + 1 +│ │ +│ └── • scan buffer +│ columns: (aisle) +│ estimated row count: 1 (missing stats) +│ label: buffer 1 (s) +│ +└── • subquery + │ id: @S1 + │ original sql: SELECT aisle FROM supermarket WHERE person = 'matilda' FOR UPDATE + │ exec mode: all rows + │ + └── • buffer + │ columns: (aisle) + │ label: buffer 1 (s) + │ + └── • project + │ columns: (aisle) + │ + └── • scan + columns: (person, aisle) + estimated row count: 1 (missing stats) + table: supermarket@supermarket_pkey + spans: /"matilda"/0 + locking strength: for update + locking durability: guaranteed + query T EXPLAIN (OPT) WITH names AS MATERIALIZED @@ -82,6 +189,52 @@ with &1 (names) ├── with-scan &1 (names) └── filters (true) +query T +EXPLAIN (VERBOSE) +WITH names AS MATERIALIZED + (SELECT 'matilda' AS person) +SELECT aisle + FROM names + NATURAL INNER LOOKUP JOIN supermarket + FOR UPDATE +---- +distribution: local +vectorized: true +· +• root +│ columns: (aisle) +│ +├── • project +│ │ columns: (aisle) +│ │ +│ └── • lookup join (inner) +│ │ columns: (person, person, aisle) +│ │ estimated row count: 1 (missing stats) +│ │ table: supermarket@supermarket_pkey +│ │ equality: (person) = (person) +│ │ equality cols are key +│ │ locking strength: for update +│ │ locking durability: guaranteed +│ │ +│ └── • scan buffer +│ columns: (person) +│ estimated row count: 1 +│ label: buffer 1 (names) +│ +└── • subquery + │ id: @S1 + │ original sql: SELECT 'matilda' AS person + │ exec mode: all rows + │ + └── • buffer + │ columns: (person) + │ label: buffer 1 (names) + │ + └── • values + columns: (person) + size: 1 column, 1 row + row 0, expr 0: 'matilda' + query T EXPLAIN (OPT) SELECT aisle @@ -97,6 +250,35 @@ project ├── flags: force-index=supermarket_starts_with_idx └── locking: for-update,durability-guaranteed +query T +EXPLAIN (VERBOSE) +SELECT aisle + FROM supermarket@supermarket_starts_with_idx + WHERE starts_with = 'm' + FOR UPDATE +---- +distribution: local +vectorized: true +· +• project +│ columns: (aisle) +│ +└── • index join + │ columns: (aisle, starts_with) + │ estimated row count: 10 (missing stats) + │ table: supermarket@supermarket_pkey + │ key columns: person + │ locking strength: for update + │ locking durability: guaranteed + │ + └── • scan + columns: (person, starts_with) + estimated row count: 10 (missing stats) + table: supermarket@supermarket_starts_with_idx + spans: /"m"-/"m"/PrefixEnd + locking strength: for update + locking durability: guaranteed + statement ok SET enable_zigzag_join = true @@ -119,8 +301,48 @@ project │ └── ends_with = 'lda' └── filters (true) -statement ok -RESET enable_zigzag_join +query T +EXPLAIN (VERBOSE) +SELECT aisle + FROM supermarket@{FORCE_ZIGZAG} + WHERE starts_with = 'm' AND ends_with = 'lda' + FOR UPDATE +---- +distribution: local +vectorized: true +· +• project +│ columns: (aisle) +│ +└── • project + │ columns: (aisle, starts_with, ends_with) + │ + └── • lookup join (inner) + │ columns: (person, starts_with, ends_with, aisle) + │ estimated row count: 1 (missing stats) + │ table: supermarket@supermarket_pkey + │ equality: (person) = (person) + │ equality cols are key + │ locking strength: for update + │ locking durability: guaranteed + │ + └── • project + │ columns: (person, starts_with, ends_with) + │ + └── • zigzag join + columns: (person, starts_with, person, ends_with) + estimated row count: 1 (missing stats) + pred: (starts_with = 'm') AND (ends_with = 'lda') + left table: supermarket@supermarket_starts_with_idx + left columns: (person, starts_with) + left fixed values: 1 column + left locking strength: for update + left locking durability: guaranteed + right table: supermarket@supermarket_ends_with_idx + right columns: (person, ends_with) + right fixed values: 1 column + right locking strength: for update + right locking durability: guaranteed statement ok -SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE +RESET enable_zigzag_join diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index 6aa9d0d8ac01..ae6ce49ac53e 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -169,6 +169,7 @@ func (ef *execFactory) ConstructScan( scan.estimatedRowCount = uint64(params.EstimatedRowCount) scan.lockingStrength = descpb.ToScanLockingStrength(params.Locking.Strength) scan.lockingWaitPolicy = descpb.ToScanLockingWaitPolicy(params.Locking.WaitPolicy) + scan.lockingDurability = descpb.ToScanLockingDurability(params.Locking.Durability) scan.localityOptimized = params.LocalityOptimized if !ef.isExplain && !ef.planner.SessionData().Internal { idxUsageKey := roachpb.IndexUsageKey{ @@ -665,6 +666,7 @@ func (ef *execFactory) ConstructIndexJoin( tableScan.disableBatchLimit() tableScan.lockingStrength = descpb.ToScanLockingStrength(locking.Strength) tableScan.lockingWaitPolicy = descpb.ToScanLockingWaitPolicy(locking.WaitPolicy) + tableScan.lockingDurability = descpb.ToScanLockingDurability(locking.Durability) if !ef.isExplain && !ef.planner.SessionData().Internal { idxUsageKey := roachpb.IndexUsageKey{ @@ -725,6 +727,7 @@ func (ef *execFactory) ConstructLookupJoin( tableScan.index = idx tableScan.lockingStrength = descpb.ToScanLockingStrength(locking.Strength) tableScan.lockingWaitPolicy = descpb.ToScanLockingWaitPolicy(locking.WaitPolicy) + tableScan.lockingDurability = descpb.ToScanLockingDurability(locking.Durability) if !ef.isExplain && !ef.planner.SessionData().Internal { idxUsageKey := roachpb.IndexUsageKey{ @@ -865,6 +868,7 @@ func (ef *execFactory) ConstructInvertedJoin( tableScan.index = idx tableScan.lockingStrength = descpb.ToScanLockingStrength(locking.Strength) tableScan.lockingWaitPolicy = descpb.ToScanLockingWaitPolicy(locking.WaitPolicy) + tableScan.lockingDurability = descpb.ToScanLockingDurability(locking.Durability) if !ef.isExplain && !ef.planner.SessionData().Internal { idxUsageKey := roachpb.IndexUsageKey{ @@ -945,6 +949,7 @@ func (ef *execFactory) constructScanForZigzag( scan.index = idxDesc scan.lockingStrength = descpb.ToScanLockingStrength(locking.Strength) scan.lockingWaitPolicy = descpb.ToScanLockingWaitPolicy(locking.WaitPolicy) + scan.lockingDurability = descpb.ToScanLockingDurability(locking.Durability) return scan, eqColOrdinals, nil } diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index b2ed310d3ee9..eff9e8d8723d 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -299,6 +299,8 @@ type FetcherInitArgs struct { // LockWaitPolicy represents the policy to be used for handling conflicting // locks held by other active transactions. LockWaitPolicy descpb.ScanLockingWaitPolicy + // LockDurability represents the row-level locking durability to use. + LockDurability descpb.ScanLockingDurability // LockTimeout specifies the maximum amount of time that the fetcher will // wait while attempting to acquire a lock on a key or while blocking on an // existing lock in order to perform a non-locking read on a key. @@ -440,6 +442,7 @@ func (rf *Fetcher) Init(ctx context.Context, args FetcherInitArgs) error { reverse: args.Reverse, lockStrength: args.LockStrength, lockWaitPolicy: args.LockWaitPolicy, + lockDurability: args.LockDurability, lockTimeout: args.LockTimeout, acc: rf.kvFetcherMemAcc, forceProductionKVBatchSize: args.ForceProductionKVBatchSize, diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go index ce49e7fe3ee8..a1a991c1f550 100644 --- a/pkg/sql/row/kv_batch_fetcher.go +++ b/pkg/sql/row/kv_batch_fetcher.go @@ -180,6 +180,8 @@ type txnKVFetcher struct { // lockWaitPolicy represents the policy to be used for handling conflicting // locks held by other active transactions. lockWaitPolicy lock.WaitPolicy + // lockDurability represents the locking durability to use. + lockDurability lock.Durability // lockTimeout specifies the maximum amount of time that the fetcher will // wait while attempting to acquire a lock on a key or while blocking on an // existing lock in order to perform a non-locking read on a key. @@ -287,6 +289,7 @@ type newTxnKVFetcherArgs struct { reverse bool lockStrength descpb.ScanLockingStrength lockWaitPolicy descpb.ScanLockingWaitPolicy + lockDurability descpb.ScanLockingDurability lockTimeout time.Duration acc *mon.BoundAccount forceProductionKVBatchSize bool @@ -314,6 +317,7 @@ func newTxnKVFetcherInternal(args newTxnKVFetcherArgs) *txnKVFetcher { reverse: args.reverse, lockStrength: GetKeyLockingStrength(args.lockStrength), lockWaitPolicy: GetWaitPolicy(args.lockWaitPolicy), + lockDurability: GetKeyLockingDurability(args.lockDurability), lockTimeout: args.lockTimeout, acc: args.acc, forceProductionKVBatchSize: args.forceProductionKVBatchSize, @@ -544,7 +548,9 @@ func (f *txnKVFetcher) fetch(ctx context.Context) error { ba.Header.WholeRowsOfSize = int32(f.indexFetchSpec.MaxKeysPerRow) } ba.AdmissionHeader = f.requestAdmissionHeader - ba.Requests = spansToRequests(f.spans.Spans, f.scanFormat, f.reverse, f.lockStrength, f.reqsScratch) + ba.Requests = spansToRequests( + f.spans.Spans, f.scanFormat, f.reverse, f.lockStrength, f.lockDurability, f.reqsScratch, + ) if log.ExpensiveLogEnabled(ctx, 2) { log.VEventf(ctx, 2, "Scan %s", f.spans) @@ -889,7 +895,8 @@ func spansToRequests( spans roachpb.Spans, scanFormat kvpb.ScanFormat, reverse bool, - keyLocking lock.Strength, + lockStrength lock.Strength, + lockDurability lock.Durability, reqsScratch []kvpb.RequestUnion, ) []kvpb.RequestUnion { var reqs []kvpb.RequestUnion @@ -923,8 +930,8 @@ func spansToRequests( // A span without an EndKey indicates that the caller is requesting a // single key fetch, which can be served using a GetRequest. gets[curGet].req.Key = spans[i].Key - gets[curGet].req.KeyLockingStrength = keyLocking - // TODO(michae2): Once #100193 is finished, also include locking durability. + gets[curGet].req.KeyLockingStrength = lockStrength + gets[curGet].req.KeyLockingDurability = lockDurability gets[curGet].union.Get = &gets[curGet].req reqs[i].Value = &gets[curGet].union curGet++ @@ -933,8 +940,8 @@ func spansToRequests( curScan := i - curGet scans[curScan].req.SetSpan(spans[i]) scans[curScan].req.ScanFormat = scanFormat - scans[curScan].req.KeyLockingStrength = keyLocking - // TODO(michae2): Once #100193 is finished, also include locking durability. + scans[curScan].req.KeyLockingStrength = lockStrength + scans[curScan].req.KeyLockingDurability = lockDurability scans[curScan].union.ReverseScan = &scans[curScan].req reqs[i].Value = &scans[curScan].union } @@ -948,8 +955,8 @@ func spansToRequests( // A span without an EndKey indicates that the caller is requesting a // single key fetch, which can be served using a GetRequest. gets[curGet].req.Key = spans[i].Key - gets[curGet].req.KeyLockingStrength = keyLocking - // TODO(michae2): Once #100193 is finished, also include locking durability. + gets[curGet].req.KeyLockingStrength = lockStrength + gets[curGet].req.KeyLockingDurability = lockDurability gets[curGet].union.Get = &gets[curGet].req reqs[i].Value = &gets[curGet].union curGet++ @@ -958,8 +965,8 @@ func spansToRequests( curScan := i - curGet scans[curScan].req.SetSpan(spans[i]) scans[curScan].req.ScanFormat = scanFormat - scans[curScan].req.KeyLockingStrength = keyLocking - // TODO(michae2): Once #100193 is finished, also include locking durability. + scans[curScan].req.KeyLockingStrength = lockStrength + scans[curScan].req.KeyLockingDurability = lockDurability scans[curScan].union.Scan = &scans[curScan].req reqs[i].Value = &scans[curScan].union } diff --git a/pkg/sql/row/kv_batch_streamer.go b/pkg/sql/row/kv_batch_streamer.go index 7cf8965299db..f181ef217c70 100644 --- a/pkg/sql/row/kv_batch_streamer.go +++ b/pkg/sql/row/kv_batch_streamer.go @@ -27,8 +27,9 @@ import ( // txnKVStreamer handles retrieval of key/values. type txnKVStreamer struct { kvBatchFetcherHelper - streamer *kvstreamer.Streamer - keyLocking lock.Strength + streamer *kvstreamer.Streamer + lockStrength lock.Strength + lockDurability lock.Durability spans roachpb.Spans spanIDs []int @@ -53,14 +54,16 @@ var _ KVBatchFetcher = &txnKVStreamer{} func newTxnKVStreamer( streamer *kvstreamer.Streamer, lockStrength descpb.ScanLockingStrength, + lockDurability descpb.ScanLockingDurability, acc *mon.BoundAccount, kvPairsRead *int64, batchRequestsIssued *int64, ) KVBatchFetcher { f := &txnKVStreamer{ - streamer: streamer, - keyLocking: GetKeyLockingStrength(lockStrength), - acc: acc, + streamer: streamer, + lockStrength: GetKeyLockingStrength(lockStrength), + lockDurability: GetKeyLockingDurability(lockDurability), + acc: acc, } f.kvBatchFetcherHelper.init(f.nextBatch, kvPairsRead, batchRequestsIssued) return f @@ -99,7 +102,7 @@ func (f *txnKVStreamer) SetupNextFetch( reqsScratch[i] = kvpb.RequestUnion{} } // TODO(yuzefovich): consider supporting COL_BATCH_RESPONSE scan format. - reqs := spansToRequests(spans, kvpb.BATCH_RESPONSE, false /* reverse */, f.keyLocking, reqsScratch) + reqs := spansToRequests(spans, kvpb.BATCH_RESPONSE, false /* reverse */, f.lockStrength, f.lockDurability, reqsScratch) if err := f.streamer.Enqueue(ctx, reqs); err != nil { return err } diff --git a/pkg/sql/row/kv_fetcher.go b/pkg/sql/row/kv_fetcher.go index dc762e2c7828..c3bc38b9658c 100644 --- a/pkg/sql/row/kv_fetcher.go +++ b/pkg/sql/row/kv_fetcher.go @@ -56,6 +56,7 @@ func newTxnKVFetcher( reverse bool, lockStrength descpb.ScanLockingStrength, lockWaitPolicy descpb.ScanLockingWaitPolicy, + lockDurability descpb.ScanLockingDurability, lockTimeout time.Duration, acc *mon.BoundAccount, forceProductionKVBatchSize bool, @@ -93,6 +94,7 @@ func newTxnKVFetcher( reverse: reverse, lockStrength: lockStrength, lockWaitPolicy: lockWaitPolicy, + lockDurability: lockDurability, lockTimeout: lockTimeout, acc: acc, forceProductionKVBatchSize: forceProductionKVBatchSize, @@ -122,12 +124,13 @@ func NewDirectKVBatchFetcher( reverse bool, lockStrength descpb.ScanLockingStrength, lockWaitPolicy descpb.ScanLockingWaitPolicy, + lockDurability descpb.ScanLockingDurability, lockTimeout time.Duration, acc *mon.BoundAccount, forceProductionKVBatchSize bool, ) KVBatchFetcher { f := newTxnKVFetcher( - txn, bsHeader, reverse, lockStrength, lockWaitPolicy, + txn, bsHeader, reverse, lockStrength, lockWaitPolicy, lockDurability, lockTimeout, acc, forceProductionKVBatchSize, ) f.scanFormat = kvpb.COL_BATCH_RESPONSE @@ -147,12 +150,13 @@ func NewKVFetcher( reverse bool, lockStrength descpb.ScanLockingStrength, lockWaitPolicy descpb.ScanLockingWaitPolicy, + lockDurability descpb.ScanLockingDurability, lockTimeout time.Duration, acc *mon.BoundAccount, forceProductionKVBatchSize bool, ) *KVFetcher { return newKVFetcher(newTxnKVFetcher( - txn, bsHeader, reverse, lockStrength, lockWaitPolicy, + txn, bsHeader, reverse, lockStrength, lockWaitPolicy, lockDurability, lockTimeout, acc, forceProductionKVBatchSize, )) } @@ -168,6 +172,7 @@ func NewStreamingKVFetcher( st *cluster.Settings, lockWaitPolicy descpb.ScanLockingWaitPolicy, lockStrength descpb.ScanLockingStrength, + lockDurability descpb.ScanLockingDurability, streamerBudgetLimit int64, streamerBudgetAcc *mon.BoundAccount, maintainOrdering bool, @@ -189,6 +194,7 @@ func NewStreamingKVFetcher( &kvPairsRead, &batchRequestsIssued, GetKeyLockingStrength(lockStrength), + GetKeyLockingDurability(lockDurability), ) mode := kvstreamer.OutOfOrder if maintainOrdering { @@ -203,7 +209,7 @@ func NewStreamingKVFetcher( maxKeysPerRow, diskBuffer, ) - return newKVFetcher(newTxnKVStreamer(streamer, lockStrength, kvFetcherMemAcc, &kvPairsRead, &batchRequestsIssued)) + return newKVFetcher(newTxnKVStreamer(streamer, lockStrength, lockDurability, kvFetcherMemAcc, &kvPairsRead, &batchRequestsIssued)) } func newKVFetcher(batchFetcher KVBatchFetcher) *KVFetcher { diff --git a/pkg/sql/row/locking.go b/pkg/sql/row/locking.go index 88f79b052854..145971efe359 100644 --- a/pkg/sql/row/locking.go +++ b/pkg/sql/row/locking.go @@ -61,3 +61,18 @@ func GetWaitPolicy(lockWaitPolicy descpb.ScanLockingWaitPolicy) lock.WaitPolicy panic(errors.AssertionFailedf("unknown wait policy %s", lockWaitPolicy)) } } + +// GetKeyLockingDurability returns the configured lock durability to use for +// key-value scans. +func GetKeyLockingDurability(lockDurability descpb.ScanLockingDurability) lock.Durability { + switch lockDurability { + case descpb.ScanLockingDurability_BEST_EFFORT: + return lock.Unreplicated + + case descpb.ScanLockingDurability_GUARANTEED: + return lock.Replicated + + default: + panic(errors.AssertionFailedf("unknown lock durability %s", lockDurability)) + } +} diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index 79e7057e0a76..b8f14c7a67fc 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -296,6 +296,7 @@ func newInvertedJoiner( Txn: flowCtx.Txn, LockStrength: spec.LockingStrength, LockWaitPolicy: spec.LockingWaitPolicy, + LockDurability: spec.LockingDurability, LockTimeout: flowCtx.EvalCtx.SessionData().LockTimeout, Alloc: &ij.alloc, MemMonitor: flowCtx.Mon, diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index b71b18cb20b5..aea487233714 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -536,6 +536,7 @@ func newJoinReader( flowCtx.EvalCtx.Settings, spec.LockingWaitPolicy, spec.LockingStrength, + spec.LockingDurability, streamerBudgetLimit, &jr.streamerInfo.budgetAcc, jr.streamerInfo.maintainOrdering, @@ -562,6 +563,7 @@ func newJoinReader( Txn: jr.txn, LockStrength: spec.LockingStrength, LockWaitPolicy: spec.LockingWaitPolicy, + LockDurability: spec.LockingDurability, LockTimeout: flowCtx.EvalCtx.SessionData().LockTimeout, Alloc: &jr.alloc, MemMonitor: flowCtx.Mon, diff --git a/pkg/sql/rowexec/tablereader.go b/pkg/sql/rowexec/tablereader.go index b9d82c65fb99..4e05efb339a0 100644 --- a/pkg/sql/rowexec/tablereader.go +++ b/pkg/sql/rowexec/tablereader.go @@ -153,6 +153,7 @@ func newTableReader( Reverse: spec.Reverse, LockStrength: spec.LockingStrength, LockWaitPolicy: spec.LockingWaitPolicy, + LockDurability: spec.LockingDurability, LockTimeout: flowCtx.EvalCtx.SessionData().LockTimeout, Alloc: &tr.alloc, MemMonitor: flowCtx.Mon, diff --git a/pkg/sql/rowexec/zigzagjoiner.go b/pkg/sql/rowexec/zigzagjoiner.go index 9117a0b327e9..521e25d8fa95 100644 --- a/pkg/sql/rowexec/zigzagjoiner.go +++ b/pkg/sql/rowexec/zigzagjoiner.go @@ -470,6 +470,7 @@ func (z *zigzagJoiner) setupInfo( Txn: flowCtx.Txn, LockStrength: spec.LockingStrength, LockWaitPolicy: spec.LockingWaitPolicy, + LockDurability: spec.LockingDurability, LockTimeout: flowCtx.EvalCtx.SessionData().LockTimeout, Alloc: &info.alloc, MemMonitor: flowCtx.Mon, diff --git a/pkg/sql/scan.go b/pkg/sql/scan.go index 3a2b345adcfc..c8a981fa6432 100644 --- a/pkg/sql/scan.go +++ b/pkg/sql/scan.go @@ -88,10 +88,11 @@ type scanNode struct { // set to zero. estimatedRowCount uint64 - // lockingStrength and lockingWaitPolicy represent the row-level locking - // mode of the Scan. + // lockingStrength, lockingWaitPolicy, and lockingDurability represent the + // row-level locking mode of the Scan. lockingStrength descpb.ScanLockingStrength lockingWaitPolicy descpb.ScanLockingWaitPolicy + lockingDurability descpb.ScanLockingDurability // containsSystemColumns holds whether or not this scan is expected to // produce any system columns.