Skip to content

Commit

Permalink
sql: plumb locking durability into Get/Scan/ReverseScan requests
Browse files Browse the repository at this point in the history
Building on top of cockroachdb#110201, plumb locking durability from the optimizer
to creation of Get/Scan/ReverseScan requests in txnKVFetcher and
txnKVStreamer.

Fixes: cockroachdb#100194

Release note: None
  • Loading branch information
michae2 committed Sep 27, 2023
1 parent d0c46fe commit 71ea81a
Show file tree
Hide file tree
Showing 25 changed files with 181 additions and 83 deletions.
29 changes: 17 additions & 12 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,12 @@ type Streamer struct {
distSender *kvcoord.DistSender
stopper *stop.Stopper

mode OperationMode
hints Hints
maxKeysPerRow int32
budget *budget
keyLocking lock.Strength
mode OperationMode
hints Hints
maxKeysPerRow int32
budget *budget
lockStrength lock.Strength
lockDurability lock.Durability

streamerStatistics

Expand Down Expand Up @@ -370,16 +371,18 @@ func NewStreamer(
acc *mon.BoundAccount,
kvPairsRead *int64,
batchRequestsIssued *int64,
keyLocking lock.Strength,
lockStrength lock.Strength,
lockDurability lock.Durability,
) *Streamer {
if txn.Type() != kv.LeafTxn {
panic(errors.AssertionFailedf("RootTxn is given to the Streamer"))
}
s := &Streamer{
distSender: distSender,
stopper: stopper,
budget: newBudget(acc, limitBytes),
keyLocking: keyLocking,
distSender: distSender,
stopper: stopper,
budget: newBudget(acc, limitBytes),
lockStrength: lockStrength,
lockDurability: lockDurability,
}

if kvPairsRead == nil {
Expand Down Expand Up @@ -1744,7 +1747,8 @@ func buildResumeSingleRangeBatch(
newGet := gets[0]
gets = gets[1:]
newGet.req.SetSpan(*get.ResumeSpan)
newGet.req.KeyLockingStrength = s.keyLocking
newGet.req.KeyLockingStrength = s.lockStrength
newGet.req.KeyLockingDurability = s.lockDurability
newGet.union.Get = &newGet.req
resumeReq.reqs[resumeReqIdx].Value = &newGet.union
resumeReq.positions = append(resumeReq.positions, position)
Expand Down Expand Up @@ -1772,7 +1776,8 @@ func buildResumeSingleRangeBatch(
scans = scans[1:]
newScan.req.SetSpan(*scan.ResumeSpan)
newScan.req.ScanFormat = kvpb.BATCH_RESPONSE
newScan.req.KeyLockingStrength = s.keyLocking
newScan.req.KeyLockingStrength = s.lockStrength
newScan.req.KeyLockingDurability = s.lockDurability
newScan.union.Scan = &newScan.req
resumeReq.reqs[resumeReqIdx].Value = &newScan.union
resumeReq.positions = append(resumeReq.positions, position)
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvstreamer/streamer_accounting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func TestStreamerMemoryAccounting(t *testing.T) {
nil, /* kvPairsRead */
nil, /* batchRequestsIssued */
lock.None,
lock.Unreplicated,
)
s.Init(OutOfOrder, Hints{UniqueRequests: true, SingleRowLookup: singleRowLookup}, 1 /* maxKeysPerRow */, nil /* diskBuffer */)
return s
Expand Down
25 changes: 25 additions & 0 deletions pkg/sql/catalog/descpb/locking.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,28 @@ func ToScanLockingWaitPolicy(wp tree.LockingWaitPolicy) ScanLockingWaitPolicy {
panic(errors.AssertionFailedf("unknown locking wait policy %s", wp))
}
}

// PrettyString returns the locking durability as a user-readable string.
func (s ScanLockingDurability) PrettyString() string {
switch s {
case ScanLockingDurability_BEST_EFFORT:
return "best effort"
case ScanLockingDurability_GUARANTEED:
return "guaranteed"
default:
panic(errors.AssertionFailedf("unexpected durability %s", s))
}
}

// ToScanLockingDurability converts a tree.LockingDurability to its
// corresponding ScanLockingDurability.
func ToScanLockingDurability(s tree.LockingDurability) ScanLockingDurability {
switch s {
case tree.LockDurabilityBestEffort:
return ScanLockingDurability_BEST_EFFORT
case tree.LockDurabilityGuaranteed:
return ScanLockingDurability_GUARANTEED
default:
panic(errors.AssertionFailedf("unknown locking durability %d", s))
}
}
6 changes: 6 additions & 0 deletions pkg/sql/catalog/descpb/locking.proto
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,9 @@ enum ScanLockingWaitPolicy {
// ERROR represents NOWAIT - raise an error if a row cannot be locked.
ERROR = 2;
}

// LockingDurability controls the durability of locks.
enum ScanLockingDurability {
BEST_EFFORT = 0;
GUARANTEED = 1;
}
1 change: 1 addition & 0 deletions pkg/sql/colfetcher/colbatch_direct_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ func NewColBatchDirectScan(
spec.Reverse,
spec.LockingStrength,
spec.LockingWaitPolicy,
spec.LockingDurability,
flowCtx.EvalCtx.SessionData().LockTimeout,
kvFetcherMemAcc,
flowCtx.EvalCtx.TestingKnobs.ForceProductionValues,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ func NewColBatchScan(
spec.Reverse,
spec.LockingStrength,
spec.LockingWaitPolicy,
spec.LockingDurability,
flowCtx.EvalCtx.SessionData().LockTimeout,
kvFetcherMemAcc,
flowCtx.EvalCtx.TestingKnobs.ForceProductionValues,
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colfetcher/index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ func NewColIndexJoin(
flowCtx.EvalCtx.Settings,
spec.LockingWaitPolicy,
spec.LockingStrength,
spec.LockingDurability,
streamerBudgetLimit,
streamerBudgetAcc,
spec.MaintainOrdering,
Expand All @@ -580,6 +581,7 @@ func NewColIndexJoin(
false, /* reverse */
spec.LockingStrength,
spec.LockingWaitPolicy,
spec.LockingDurability,
flowCtx.EvalCtx.SessionData().LockTimeout,
kvFetcherMemAcc,
flowCtx.EvalCtx.TestingKnobs.ForceProductionValues,
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1665,6 +1665,7 @@ func initTableReaderSpecTemplate(
TableDescriptorModificationTime: n.desc.GetModificationTime(),
LockingStrength: n.lockingStrength,
LockingWaitPolicy: n.lockingWaitPolicy,
LockingDurability: n.lockingDurability,
}
if err := rowenc.InitIndexFetchSpec(&s.FetchSpec, codec, n.desc, n.index, colIDs); err != nil {
return nil, execinfrapb.PostProcessSpec{}, err
Expand Down Expand Up @@ -2740,6 +2741,7 @@ func (dsp *DistSQLPlanner) createPlanForIndexJoin(
Type: descpb.InnerJoin,
LockingStrength: n.table.lockingStrength,
LockingWaitPolicy: n.table.lockingWaitPolicy,
LockingDurability: n.table.lockingDurability,
MaintainOrdering: len(n.reqOrdering) > 0,
LimitHint: n.limitHint,
}
Expand Down Expand Up @@ -2817,6 +2819,7 @@ func (dsp *DistSQLPlanner) createPlanForLookupJoin(
Type: n.joinType,
LockingStrength: n.table.lockingStrength,
LockingWaitPolicy: n.table.lockingWaitPolicy,
LockingDurability: n.table.lockingDurability,
// TODO(sumeer): specifying ordering here using isFirstJoinInPairedJoiner
// is late in the sense that the cost of this has not been taken into
// account. Make this decision earlier in CustomFuncs.GenerateLookupJoins.
Expand Down Expand Up @@ -2949,6 +2952,7 @@ func (dsp *DistSQLPlanner) createPlanForInvertedJoin(
OutputGroupContinuationForLeftRow: n.isFirstJoinInPairedJoiner,
LockingStrength: n.table.lockingStrength,
LockingWaitPolicy: n.table.lockingWaitPolicy,
LockingDurability: n.table.lockingDurability,
}

fetchColIDs := make([]descpb.ColumnID, len(n.table.cols))
Expand Down Expand Up @@ -3045,6 +3049,7 @@ func (dsp *DistSQLPlanner) createPlanForZigzagJoin(
fixedValues: valuesSpec,
lockingStrength: side.scan.lockingStrength,
lockingWaitPolicy: side.scan.lockingWaitPolicy,
lockingDurability: side.scan.lockingDurability,
}
}

Expand All @@ -3064,6 +3069,7 @@ type zigzagPlanningSide struct {
fixedValues *execinfrapb.ValuesCoreSpec
lockingStrength descpb.ScanLockingStrength
lockingWaitPolicy descpb.ScanLockingWaitPolicy
lockingDurability descpb.ScanLockingDurability
}

type zigzagPlanningInfo struct {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ func (e *distSQLSpecExecFactory) ConstructScan(
}
trSpec.LockingStrength = descpb.ToScanLockingStrength(params.Locking.Strength)
trSpec.LockingWaitPolicy = descpb.ToScanLockingWaitPolicy(params.Locking.WaitPolicy)
trSpec.LockingDurability = descpb.ToScanLockingDurability(params.Locking.Durability)
if trSpec.LockingStrength != descpb.ScanLockingStrength_FOR_NONE {
// Scans that are performing row-level locking cannot currently be
// distributed because their locks would not be propagated back to
Expand Down Expand Up @@ -736,6 +737,7 @@ func (e *distSQLSpecExecFactory) constructZigzagJoinSide(
fixedValues: valuesSpec,
lockingStrength: descpb.ToScanLockingStrength(locking.Strength),
lockingWaitPolicy: descpb.ToScanLockingWaitPolicy(locking.WaitPolicy),
lockingDurability: descpb.ToScanLockingDurability(locking.Durability),
}, nil
}

Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/execinfrapb/processors_sql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ message TableReaderSpec {
// to BLOCK when locking_strength is FOR_NONE.
optional sqlbase.ScanLockingWaitPolicy locking_wait_policy = 11 [(gogoproto.nullable) = false];

// Indicates the row-level locking durability to be used by the scan.
optional sqlbase.ScanLockingDurability locking_durability = 23 [(gogoproto.nullable) = false];

// Indicates that misplanned ranges metadata should not be sent back to the
// DistSQLReceiver. This will be set to true for the scan with a hard limit
// (in which case we create a single processor that is placed at the
Expand Down Expand Up @@ -357,6 +360,9 @@ message JoinReaderSpec {
// to BLOCK when locking_strength is FOR_NONE.
optional sqlbase.ScanLockingWaitPolicy locking_wait_policy = 10 [(gogoproto.nullable) = false];

// Indicates the row-level locking durability to be used by the scan.
optional sqlbase.ScanLockingDurability locking_durability = 24 [(gogoproto.nullable) = false];

// Indicates that the join reader should maintain the ordering of the input
// stream. This is applicable to both lookup joins and index joins. For lookup
// joins, maintaining order is expensive because it requires buffering. For
Expand Down Expand Up @@ -499,6 +505,9 @@ message ZigzagJoinerSpec {
// held by other active transactions when attempting to lock rows. Always set
// to BLOCK when locking_strength is FOR_NONE.
optional sqlbase.ScanLockingWaitPolicy locking_wait_policy = 5 [(gogoproto.nullable) = false];

// Indicates the row-level locking durability to be used by the scan.
optional sqlbase.ScanLockingDurability locking_durability = 8 [(gogoproto.nullable) = false];
}

repeated Side sides = 7 [(gogoproto.nullable) = false];
Expand Down Expand Up @@ -747,6 +756,9 @@ message InvertedJoinerSpec {
// held by other active transactions when attempting to lock rows. Always set
// to BLOCK when locking_strength is FOR_NONE.
optional sqlbase.ScanLockingWaitPolicy locking_wait_policy = 13 [(gogoproto.nullable) = false];

// Indicates the row-level locking durability to be used by the scan.
optional sqlbase.ScanLockingDurability locking_durability = 14 [(gogoproto.nullable) = false];
}

// InvertedFiltererSpec is the specification of a processor that does filtering
Expand Down
7 changes: 4 additions & 3 deletions pkg/sql/insert_fast_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ func (r *insertFastPathRun) addFKChecks(
log.VEventf(ctx, 2, "FKScan %s", span)
}
lockStrength := row.GetKeyLockingStrength(descpb.ToScanLockingStrength(c.Locking.Strength))
lockDurability := row.GetKeyLockingDurability(descpb.ToScanLockingDurability(c.Locking.Durability))
lockWaitPolicy := row.GetWaitPolicy(descpb.ToScanLockingWaitPolicy(c.Locking.WaitPolicy))
if r.fkBatch.Header.WaitPolicy != lockWaitPolicy {
return errors.AssertionFailedf(
Expand All @@ -200,9 +201,9 @@ func (r *insertFastPathRun) addFKChecks(
reqIdx := len(r.fkBatch.Requests)
r.fkBatch.Requests = append(r.fkBatch.Requests, kvpb.RequestUnion{})
r.fkBatch.Requests[reqIdx].MustSetInner(&kvpb.ScanRequest{
RequestHeader: kvpb.RequestHeaderFromSpan(span),
KeyLockingStrength: lockStrength,
// TODO(michae2): Once #100193 is finished, also include c.Locking.Durability.
RequestHeader: kvpb.RequestHeaderFromSpan(span),
KeyLockingStrength: lockStrength,
KeyLockingDurability: lockDurability,
})
r.fkSpanInfo = append(r.fkSpanInfo, insertFastPathFKSpanInfo{
check: c,
Expand Down
23 changes: 4 additions & 19 deletions pkg/sql/logictest/testdata/logic_test/fk_read_committed
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
# LogicTest: !local-mixed-22.2-23.1

statement ok
SET CLUSTER SETTING sql.txn.read_committed_syntax.enabled = true

# Some foreign key checks are prohibited under weaker isolation levels until we
# improve locking. See #80683, #100156, #100193.
# statement ok
# SET CLUSTER SETTING sql.txn.read_committed_syntax.enabled = true

statement ok
CREATE TABLE jars (j INT PRIMARY KEY)
Expand All @@ -18,32 +15,20 @@ SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED
statement ok
INSERT INTO jars VALUES (1), (2)

# Foreign key checks of the parent require durable shared locking under weaker
# isolation levels, and are not yet supported.
query error pgcode 0A000 guaranteed-durable locking not yet implemented
INSERT INTO cookies VALUES (1, 1)

statement ok
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE

statement ok
INSERT INTO cookies VALUES (1, 1)

statement ok
COMMIT

query error pgcode 0A000 guaranteed-durable locking not yet implemented
UPDATE cookies SET j = 2 WHERE c = 1

# Foreign key checks of the child do not require locking.
query error violates foreign key constraint
UPDATE jars SET j = j + 4

query error violates foreign key constraint
DELETE FROM jars WHERE j = 1
DELETE FROM jars WHERE j = 2

statement ok
DELETE FROM cookies WHERE c = 1

statement ok
DELETE FROM jars WHERE j = 1
DELETE FROM jars WHERE j = 2
Loading

0 comments on commit 71ea81a

Please sign in to comment.