From a5fc49c029977fec0a153912e91e728dfe8b1458 Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Mon, 28 Aug 2023 19:36:57 -0400 Subject: [PATCH] sql: use the correct locking strength for FOR SHARE clause Previously, FOR SHARE and FOR KEY SHARE would use non-locking KV scans. Now that the lock table supports shared locks, we can use lock.Shared as the locking strength for KV scans. This patch does that, and in doing so, wires up SHARED locks end to end. Informs #91545 Release note: None --- .../settings/settings-for-tenants.txt | 1 + docs/generated/settings/settings.html | 1 + .../tests/3node-tenant/generated_test.go | 7 ++ pkg/kv/BUILD.bazel | 1 + pkg/kv/db.go | 2 +- pkg/sql/backfill/backfill.go | 2 + pkg/sql/colexec/colbuilder/execplan.go | 2 +- pkg/sql/colfetcher/colbatch_direct_scan.go | 2 + pkg/sql/colfetcher/colbatch_scan.go | 2 + pkg/sql/colfetcher/index_join.go | 3 + pkg/sql/insert_fast_path.go | 7 +- .../testdata/logic_test/select_for_share | 75 +++++++++++++++++++ .../tests/fakedist-disk/generated_test.go | 7 ++ .../tests/fakedist-vec-off/generated_test.go | 7 ++ .../tests/fakedist/generated_test.go | 7 ++ .../generated_test.go | 7 ++ .../tests/local-vec-off/generated_test.go | 7 ++ .../logictest/tests/local/generated_test.go | 7 ++ pkg/sql/row/BUILD.bazel | 1 + pkg/sql/row/fetcher.go | 5 +- pkg/sql/row/kv_batch_fetcher.go | 18 +++-- pkg/sql/row/kv_batch_streamer.go | 5 +- pkg/sql/row/kv_fetcher.go | 22 ++++-- pkg/sql/row/locking.go | 26 ++++++- pkg/sql/rowexec/inverted_joiner.go | 1 + pkg/sql/rowexec/joinreader.go | 2 + pkg/sql/rowexec/tablereader.go | 1 + pkg/sql/rowexec/zigzagjoiner.go | 1 + 28 files changed, 207 insertions(+), 22 deletions(-) create mode 100644 pkg/sql/logictest/testdata/logic_test/select_for_share diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 0da64253c148..1f775366d285 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -248,6 +248,7 @@ sql.insights.anomaly_detection.memory_limit byte size 1.0 MiB the maximum amount sql.insights.execution_insights_capacity integer 1000 the size of the per-node store of execution insights tenant-rw sql.insights.high_retry_count.threshold integer 10 the number of retries a slow statement must have undergone for its high retry count to be highlighted as a potential problem tenant-rw sql.insights.latency_threshold duration 100ms amount of time after which an executing statement is considered slow. Use 0 to disable. tenant-rw +sql.locking.enable_shared_locks boolean false enable shared locking strength when using FOR SHARE or FOR KEY SHARE modifiers tenant-ro sql.log.slow_query.experimental_full_table_scans.enabled boolean false when set to true, statements that perform a full table/index scan will be logged to the slow query log even if they do not meet the latency threshold. Must have the slow query log enabled for this setting to have any effect. tenant-rw sql.log.slow_query.internal_queries.enabled boolean false when set to true, internal queries which exceed the slow query log threshold are logged to a separate log. Must have the slow query log enabled for this setting to have any effect. tenant-rw sql.log.slow_query.latency_threshold duration 0s when set to non-zero, log statements whose service latency exceeds the threshold to a secondary logger on each node tenant-rw diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index b8a21c736fa0..44e0c6e2629d 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -195,6 +195,7 @@
sql.insights.execution_insights_capacity
integer1000the size of the per-node store of execution insightsServerless/Dedicated/Self-Hosted
sql.insights.high_retry_count.threshold
integer10the number of retries a slow statement must have undergone for its high retry count to be highlighted as a potential problemServerless/Dedicated/Self-Hosted
sql.insights.latency_threshold
duration100msamount of time after which an executing statement is considered slow. Use 0 to disable.Serverless/Dedicated/Self-Hosted +
sql.locking.enable_shared_locks
booleanfalseenable shared locking strength when using FOR SHARE or FOR KEY SHARE modifiersServerless/Dedicated/Self-Hosted (read-only)
sql.log.slow_query.experimental_full_table_scans.enabled
booleanfalsewhen set to true, statements that perform a full table/index scan will be logged to the slow query log even if they do not meet the latency threshold. Must have the slow query log enabled for this setting to have any effect.Serverless/Dedicated/Self-Hosted
sql.log.slow_query.internal_queries.enabled
booleanfalsewhen set to true, internal queries which exceed the slow query log threshold are logged to a separate log. Must have the slow query log enabled for this setting to have any effect.Serverless/Dedicated/Self-Hosted
sql.log.slow_query.latency_threshold
duration0swhen set to non-zero, log statements whose service latency exceeds the threshold to a secondary logger on each nodeServerless/Dedicated/Self-Hosted diff --git a/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go b/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go index 027c6485e9d4..5e239a045977 100644 --- a/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go +++ b/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go @@ -1668,6 +1668,13 @@ func TestTenantLogic_select( runLogicTest(t, "select") } +func TestTenantLogic_select_for_share( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "select_for_share") +} + func TestTenantLogic_select_for_update( t *testing.T, ) { diff --git a/pkg/kv/BUILD.bazel b/pkg/kv/BUILD.bazel index d04671baf3b8..e6fb37eb5319 100644 --- a/pkg/kv/BUILD.bazel +++ b/pkg/kv/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "//pkg/kv/kvserver/concurrency/isolation", "//pkg/roachpb", "//pkg/settings", + "//pkg/settings/cluster", "//pkg/sql/sessiondatapb", "//pkg/storage/enginepb", "//pkg/testutils", diff --git a/pkg/kv/db.go b/pkg/kv/db.go index d9e2c74f2679..3041bc6c65a5 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -266,7 +266,7 @@ type DB struct { // SQL code that all uses kv.DB. // // TODO(sumeer,irfansharif): Find a home for these in the SQL layer. - // Especially SettingsValue. + // Especially SettingsValues. SQLKVResponseAdmissionQ *admission.WorkQueue AdmissionPacerFactory admission.PacerFactory SettingsValues *settings.Values diff --git a/pkg/sql/backfill/backfill.go b/pkg/sql/backfill/backfill.go index 256e7a18a23d..b84998ce79ab 100644 --- a/pkg/sql/backfill/backfill.go +++ b/pkg/sql/backfill/backfill.go @@ -172,6 +172,7 @@ func (cb *ColumnBackfiller) init( Spec: &spec, TraceKV: traceKV, ForceProductionKVBatchSize: cb.evalCtx.TestingKnobs.ForceProductionValues, + Settings: cb.evalCtx.Settings, }, ) } @@ -857,6 +858,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk( Spec: &spec, TraceKV: traceKV, ForceProductionKVBatchSize: ib.evalCtx.TestingKnobs.ForceProductionValues, + Settings: ib.evalCtx.Settings, }, ); err != nil { return nil, nil, 0, err diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index e8d78fbd7b7c..925a9d07f87c 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -858,7 +858,7 @@ func NewColOperator( // would be to simply include all key columns into the set // of needed for the fetch and to project them away in the // ColBatchDirectScan. - if row.GetKeyLockingStrength(core.TableReader.LockingStrength) != lock.None || + if row.GetKeyLockingStrength(ctx, core.TableReader.LockingStrength, flowCtx.EvalCtx.Settings) != lock.None || core.TableReader.LockingWaitPolicy == descpb.ScanLockingWaitPolicy_SKIP_LOCKED { return false } diff --git a/pkg/sql/colfetcher/colbatch_direct_scan.go b/pkg/sql/colfetcher/colbatch_direct_scan.go index 058f72404671..bf70904a5a7c 100644 --- a/pkg/sql/colfetcher/colbatch_direct_scan.go +++ b/pkg/sql/colfetcher/colbatch_direct_scan.go @@ -209,6 +209,7 @@ func NewColBatchDirectScan( // should be able to modify the BatchRequest, but alas. fetchSpec := spec.FetchSpec fetcher := row.NewDirectKVBatchFetcher( + ctx, flowCtx.Txn, bsHeader, &fetchSpec, @@ -218,6 +219,7 @@ func NewColBatchDirectScan( flowCtx.EvalCtx.SessionData().LockTimeout, kvFetcherMemAcc, flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, + flowCtx.EvalCtx.Settings, ) var hasDatumVec bool for _, t := range tableArgs.typs { diff --git a/pkg/sql/colfetcher/colbatch_scan.go b/pkg/sql/colfetcher/colbatch_scan.go index 199eeb7c4c0f..14938981d10a 100644 --- a/pkg/sql/colfetcher/colbatch_scan.go +++ b/pkg/sql/colfetcher/colbatch_scan.go @@ -345,6 +345,7 @@ func NewColBatchScan( return nil, nil, err } kvFetcher := row.NewKVFetcher( + ctx, flowCtx.Txn, bsHeader, spec.Reverse, @@ -353,6 +354,7 @@ func NewColBatchScan( flowCtx.EvalCtx.SessionData().LockTimeout, kvFetcherMemAcc, flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, + flowCtx.EvalCtx.Settings, ) fetcher := cFetcherPool.Get().(*cFetcher) fetcher.cFetcherArgs = cFetcherArgs{ diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index ea66b7f16f2d..e6e2af62e4df 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -557,6 +557,7 @@ func NewColIndexJoin( cFetcherMemoryLimit = int64(math.Ceil(float64(totalMemoryLimit) / 16.0)) streamerBudgetLimit := 14 * cFetcherMemoryLimit kvFetcher = row.NewStreamingKVFetcher( + ctx, flowCtx.Cfg.DistSender, flowCtx.Stopper(), txn, @@ -575,6 +576,7 @@ func NewColIndexJoin( ) } else { kvFetcher = row.NewKVFetcher( + ctx, txn, nil, /* bsHeader */ false, /* reverse */ @@ -583,6 +585,7 @@ func NewColIndexJoin( flowCtx.EvalCtx.SessionData().LockTimeout, kvFetcherMemAcc, flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, + flowCtx.EvalCtx.Settings, ) } diff --git a/pkg/sql/insert_fast_path.go b/pkg/sql/insert_fast_path.go index 862837f3c3f7..9871aa867a83 100644 --- a/pkg/sql/insert_fast_path.go +++ b/pkg/sql/insert_fast_path.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -153,7 +154,7 @@ func (r *insertFastPathRun) inputRow(rowIdx int) tree.Datums { // addFKChecks adds Requests to fkBatch and entries in fkSpanInfo / fkSpanMap as // needed for checking foreign keys for the given row. func (r *insertFastPathRun) addFKChecks( - ctx context.Context, rowIdx int, inputRow tree.Datums, + ctx context.Context, rowIdx int, inputRow tree.Datums, settings *cluster.Settings, ) error { for i := range r.fkChecks { c := &r.fkChecks[i] @@ -189,7 +190,7 @@ func (r *insertFastPathRun) addFKChecks( if r.traceKV { log.VEventf(ctx, 2, "FKScan %s", span) } - lockStrength := row.GetKeyLockingStrength(descpb.ToScanLockingStrength(c.Locking.Strength)) + lockStrength := row.GetKeyLockingStrength(ctx, descpb.ToScanLockingStrength(c.Locking.Strength), settings) lockWaitPolicy := row.GetWaitPolicy(descpb.ToScanLockingWaitPolicy(c.Locking.WaitPolicy)) if r.fkBatch.Header.WaitPolicy != lockWaitPolicy { return errors.AssertionFailedf( @@ -310,7 +311,7 @@ func (n *insertFastPathNode) BatchedNext(params runParams) (bool, error) { // Add FK existence checks. if len(n.run.fkChecks) > 0 { - if err := n.run.addFKChecks(params.ctx, rowIdx, inputRow); err != nil { + if err := n.run.addFKChecks(params.ctx, rowIdx, inputRow, params.ExecCfg().Settings); err != nil { return false, err } } diff --git a/pkg/sql/logictest/testdata/logic_test/select_for_share b/pkg/sql/logictest/testdata/logic_test/select_for_share new file mode 100644 index 000000000000..dd19172956d9 --- /dev/null +++ b/pkg/sql/logictest/testdata/logic_test/select_for_share @@ -0,0 +1,75 @@ +# LogicTest: !local-mixed-22.2-23.1 + +statement ok +SET CLUSTER SETTING sql.locking.enable_shared_locks = true + +statement ok +CREATE TABLE t(a INT PRIMARY KEY); +INSERT INTO t VALUES(1); +GRANT ALL ON t TO testuser; +CREATE USER testuser2 WITH VIEWACTIVITY; +GRANT ALL ON t TO testuser2; + +user testuser + +statement ok +BEGIN + +query I +SELECT * FROM t WHERE a = 1 FOR SHARE; +---- +1 + +# Start another transaction to show multiple transactions can acquire SHARED +# locks at the same time. + +user root + +statement ok +BEGIN + +query I +SELECT * FROM t WHERE a = 1 FOR SHARE; +---- +1 + +user testuser2 + +statement async writeReq count 1 +UPDATE t SET a = 2 WHERE a = 1 + +# TODO(arul): Until https://github.com/cockroachdb/cockroach/issues/107766 is +# addressed, we'll incorrectly report shared locks as having "Exclusive" lock +# strength; however, having this query in here is useful to make sure there are +# two locks on our key and setting the cluster setting above actually did +# something; otherwise, had we used non-locking reads, we'd have failed here. +query TTTTTTTBB colnames,retry,rowsort +SELECT database_name, schema_name, table_name, lock_key_pretty, lock_strength, durability, isolation_level, granted, contended FROM crdb_internal.cluster_locks +---- +database_name schema_name table_name lock_key_pretty lock_strength durability isolation_level granted contended +test public t /Table/106/1/1/0 Exclusive Unreplicated SERIALIZABLE true true +test public t /Table/106/1/1/0 Exclusive Unreplicated SERIALIZABLE false true + +# Commit the first transaction and rollback the second. + +user testuser + +statement ok +COMMIT + +user root + +statement ok +ROLLBACK + +user testuser2 + +# Now that both the transactions that issued shared lock reads have been +# finalized, the write should be able to proceed. + +awaitstatement writeReq + +query I +SELECT * FROM t; +---- +2 diff --git a/pkg/sql/logictest/tests/fakedist-disk/generated_test.go b/pkg/sql/logictest/tests/fakedist-disk/generated_test.go index 88e18d338e3c..a91fb52c46f8 100644 --- a/pkg/sql/logictest/tests/fakedist-disk/generated_test.go +++ b/pkg/sql/logictest/tests/fakedist-disk/generated_test.go @@ -1646,6 +1646,13 @@ func TestLogic_select( runLogicTest(t, "select") } +func TestLogic_select_for_share( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "select_for_share") +} + func TestLogic_select_for_update( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go b/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go index 926db53b5f10..46f39ea75dd7 100644 --- a/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go +++ b/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go @@ -1646,6 +1646,13 @@ func TestLogic_select( runLogicTest(t, "select") } +func TestLogic_select_for_share( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "select_for_share") +} + func TestLogic_select_for_update( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/fakedist/generated_test.go b/pkg/sql/logictest/tests/fakedist/generated_test.go index f49d57d715cf..d550d4637344 100644 --- a/pkg/sql/logictest/tests/fakedist/generated_test.go +++ b/pkg/sql/logictest/tests/fakedist/generated_test.go @@ -1660,6 +1660,13 @@ func TestLogic_select( runLogicTest(t, "select") } +func TestLogic_select_for_share( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "select_for_share") +} + func TestLogic_select_for_update( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go b/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go index 69939d73d853..367c3db3f00b 100644 --- a/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go +++ b/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go @@ -1632,6 +1632,13 @@ func TestLogic_select( runLogicTest(t, "select") } +func TestLogic_select_for_share( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "select_for_share") +} + func TestLogic_select_for_update( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/local-vec-off/generated_test.go b/pkg/sql/logictest/tests/local-vec-off/generated_test.go index 29fe95596431..f757a8da6084 100644 --- a/pkg/sql/logictest/tests/local-vec-off/generated_test.go +++ b/pkg/sql/logictest/tests/local-vec-off/generated_test.go @@ -1660,6 +1660,13 @@ func TestLogic_select( runLogicTest(t, "select") } +func TestLogic_select_for_share( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "select_for_share") +} + func TestLogic_select_for_update( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/local/generated_test.go b/pkg/sql/logictest/tests/local/generated_test.go index 523563e3db47..430d8cf08889 100644 --- a/pkg/sql/logictest/tests/local/generated_test.go +++ b/pkg/sql/logictest/tests/local/generated_test.go @@ -1807,6 +1807,13 @@ func TestLogic_select( runLogicTest(t, "select") } +func TestLogic_select_for_share( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "select_for_share") +} + func TestLogic_select_for_update( t *testing.T, ) { diff --git a/pkg/sql/row/BUILD.bazel b/pkg/sql/row/BUILD.bazel index dfa0dfdc31f8..00fb28404055 100644 --- a/pkg/sql/row/BUILD.bazel +++ b/pkg/sql/row/BUILD.bazel @@ -25,6 +25,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/row", visibility = ["//visibility:public"], deps = [ + "//pkg/clusterversion", "//pkg/col/coldata", "//pkg/jobs", "//pkg/jobs/jobspb", diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index b2ed310d3ee9..d6f974ff8e6d 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" @@ -316,6 +317,7 @@ type FetcherInitArgs struct { // row is being processed. In practice, this means that span IDs must be // passed in when SpansCanOverlap is true. SpansCanOverlap bool + Settings *cluster.Settings } // Init sets up a Fetcher for a given table and index. @@ -445,6 +447,7 @@ func (rf *Fetcher) Init(ctx context.Context, args FetcherInitArgs) error { forceProductionKVBatchSize: args.ForceProductionKVBatchSize, kvPairsRead: &kvPairsRead, batchRequestsIssued: &batchRequestsIssued, + settings: args.Settings, } if args.Txn != nil { fetcherArgs.sendFn = makeTxnKVFetcherDefaultSendFunc(args.Txn, &batchRequestsIssued) @@ -453,7 +456,7 @@ func (rf *Fetcher) Init(ctx context.Context, args FetcherInitArgs) error { fetcherArgs.admission.pacerFactory = args.Txn.DB().AdmissionPacerFactory fetcherArgs.admission.settingsValues = args.Txn.DB().SettingsValues } - rf.kvFetcher = newKVFetcher(newTxnKVFetcherInternal(fetcherArgs)) + rf.kvFetcher = newKVFetcher(newTxnKVFetcherInternal(ctx, fetcherArgs)) } return nil diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go index ce49e7fe3ee8..cd349b3ad3f0 100644 --- a/pkg/sql/row/kv_batch_fetcher.go +++ b/pkg/sql/row/kv_batch_fetcher.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/fetchpb" @@ -292,11 +293,16 @@ type newTxnKVFetcherArgs struct { forceProductionKVBatchSize bool kvPairsRead *int64 batchRequestsIssued *int64 + settings *cluster.Settings admission struct { // groups AC-related fields - requestHeader kvpb.AdmissionHeader - responseQ *admission.WorkQueue - pacerFactory admission.PacerFactory + requestHeader kvpb.AdmissionHeader + responseQ *admission.WorkQueue + pacerFactory admission.PacerFactory + // TODO(arul): we're determining whether we're in a SQL pod or not based on + // if this field is set or not. This is quite annoying -- it means we can't + // just get rid of this field and grab settings.Values off the settings + // field above (which is always set). settingsValues *settings.Values } } @@ -306,13 +312,13 @@ type newTxnKVFetcherArgs struct { // The passed-in memory account is owned by the fetcher throughout its lifetime // but is **not** closed - it is the caller's responsibility to close acc if it // is non-nil. -func newTxnKVFetcherInternal(args newTxnKVFetcherArgs) *txnKVFetcher { +func newTxnKVFetcherInternal(ctx context.Context, args newTxnKVFetcherArgs) *txnKVFetcher { f := &txnKVFetcher{ sendFn: args.sendFn, // Default to BATCH_RESPONSE. The caller will override if needed. scanFormat: kvpb.BATCH_RESPONSE, reverse: args.reverse, - lockStrength: GetKeyLockingStrength(args.lockStrength), + lockStrength: GetKeyLockingStrength(ctx, args.lockStrength, args.settings), lockWaitPolicy: GetWaitPolicy(args.lockWaitPolicy), lockTimeout: args.lockTimeout, acc: args.acc, @@ -350,6 +356,8 @@ func (f *txnKVFetcher) maybeInitAdmissionPacer( if sv == nil { // Only nil in tests and in SQL pods (we don't have admission pacing in // the latter anyway). + // TODO(arul): The assumption that we're in a SQL pod if sv == nil is + // incredibly brittle. return } admissionPri := admissionpb.WorkPriority(admissionHeader.Priority) diff --git a/pkg/sql/row/kv_batch_streamer.go b/pkg/sql/row/kv_batch_streamer.go index 7cf8965299db..f2696970f5dc 100644 --- a/pkg/sql/row/kv_batch_streamer.go +++ b/pkg/sql/row/kv_batch_streamer.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -51,15 +52,17 @@ var _ KVBatchFetcher = &txnKVStreamer{} // newTxnKVStreamer creates a new txnKVStreamer. func newTxnKVStreamer( + ctx context.Context, streamer *kvstreamer.Streamer, lockStrength descpb.ScanLockingStrength, acc *mon.BoundAccount, kvPairsRead *int64, batchRequestsIssued *int64, + st *cluster.Settings, ) KVBatchFetcher { f := &txnKVStreamer{ streamer: streamer, - keyLocking: GetKeyLockingStrength(lockStrength), + keyLocking: GetKeyLockingStrength(ctx, lockStrength, st), acc: acc, } f.kvBatchFetcherHelper.init(f.nextBatch, kvPairsRead, batchRequestsIssued) diff --git a/pkg/sql/row/kv_fetcher.go b/pkg/sql/row/kv_fetcher.go index dc762e2c7828..178e678c065e 100644 --- a/pkg/sql/row/kv_fetcher.go +++ b/pkg/sql/row/kv_fetcher.go @@ -51,6 +51,7 @@ var _ storage.NextKVer = &KVFetcher{} // the memory account can be shared by the caller with other components (as long // as there is no concurrency). func newTxnKVFetcher( + ctx context.Context, txn *kv.Txn, bsHeader *kvpb.BoundedStalenessHeader, reverse bool, @@ -59,6 +60,7 @@ func newTxnKVFetcher( lockTimeout time.Duration, acc *mon.BoundAccount, forceProductionKVBatchSize bool, + st *cluster.Settings, ) *txnKVFetcher { var sendFn sendFunc var batchRequestsIssued int64 @@ -98,13 +100,14 @@ func newTxnKVFetcher( forceProductionKVBatchSize: forceProductionKVBatchSize, kvPairsRead: new(int64), batchRequestsIssued: &batchRequestsIssued, + settings: st, } fetcherArgs.admission.requestHeader = txn.AdmissionHeader() fetcherArgs.admission.responseQ = txn.DB().SQLKVResponseAdmissionQ fetcherArgs.admission.pacerFactory = txn.DB().AdmissionPacerFactory fetcherArgs.admission.settingsValues = txn.DB().SettingsValues - return newTxnKVFetcherInternal(fetcherArgs) + return newTxnKVFetcherInternal(ctx, fetcherArgs) } // NewDirectKVBatchFetcher creates a new KVBatchFetcher that uses the @@ -116,6 +119,7 @@ func newTxnKVFetcher( // the memory account can be shared by the caller with other components (as long // as there is no concurrency). func NewDirectKVBatchFetcher( + ctx context.Context, txn *kv.Txn, bsHeader *kvpb.BoundedStalenessHeader, spec *fetchpb.IndexFetchSpec, @@ -125,10 +129,11 @@ func NewDirectKVBatchFetcher( lockTimeout time.Duration, acc *mon.BoundAccount, forceProductionKVBatchSize bool, + st *cluster.Settings, ) KVBatchFetcher { - f := newTxnKVFetcher( + f := newTxnKVFetcher(ctx, txn, bsHeader, reverse, lockStrength, lockWaitPolicy, - lockTimeout, acc, forceProductionKVBatchSize, + lockTimeout, acc, forceProductionKVBatchSize, st, ) f.scanFormat = kvpb.COL_BATCH_RESPONSE f.indexFetchSpec = spec @@ -142,6 +147,7 @@ func NewDirectKVBatchFetcher( // the memory account can be shared by the caller with other components (as long // as there is no concurrency). func NewKVFetcher( + ctx context.Context, txn *kv.Txn, bsHeader *kvpb.BoundedStalenessHeader, reverse bool, @@ -150,10 +156,11 @@ func NewKVFetcher( lockTimeout time.Duration, acc *mon.BoundAccount, forceProductionKVBatchSize bool, + st *cluster.Settings, ) *KVFetcher { - return newKVFetcher(newTxnKVFetcher( + return newKVFetcher(newTxnKVFetcher(ctx, txn, bsHeader, reverse, lockStrength, lockWaitPolicy, - lockTimeout, acc, forceProductionKVBatchSize, + lockTimeout, acc, forceProductionKVBatchSize, st, )) } @@ -162,6 +169,7 @@ func NewKVFetcher( // // If maintainOrdering is true, then diskBuffer must be non-nil. func NewStreamingKVFetcher( + ctx context.Context, distSender *kvcoord.DistSender, stopper *stop.Stopper, txn *kv.Txn, @@ -188,7 +196,7 @@ func NewStreamingKVFetcher( streamerBudgetAcc, &kvPairsRead, &batchRequestsIssued, - GetKeyLockingStrength(lockStrength), + GetKeyLockingStrength(ctx, lockStrength, st), ) mode := kvstreamer.OutOfOrder if maintainOrdering { @@ -203,7 +211,7 @@ func NewStreamingKVFetcher( maxKeysPerRow, diskBuffer, ) - return newKVFetcher(newTxnKVStreamer(streamer, lockStrength, kvFetcherMemAcc, &kvPairsRead, &batchRequestsIssued)) + return newKVFetcher(newTxnKVStreamer(ctx, streamer, lockStrength, kvFetcherMemAcc, &kvPairsRead, &batchRequestsIssued, st)) } func newKVFetcher(batchFetcher KVBatchFetcher) *KVFetcher { diff --git a/pkg/sql/row/locking.go b/pkg/sql/row/locking.go index 88f79b052854..bf7d7ef1d86d 100644 --- a/pkg/sql/row/locking.go +++ b/pkg/sql/row/locking.go @@ -11,14 +11,33 @@ package row import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/errors" ) +// EnableSharedLockingStrength dictates whether to use SHARED locks when SQL +// statements use the FOR {,KEY} SHARE modifiers. If set to true, locking reads +// with strength lock.Shared are performed; otherwise, non-locking reads are +// performed. +var EnableSharedLockingStrength = settings.RegisterBoolSetting( + settings.TenantWritable, + "sql.locking.enable_shared_locks", + "enable shared locking strength when using FOR SHARE or FOR KEY SHARE modifiers", + false, + settings.WithPublic, +) + // GetKeyLockingStrength returns the configured per-key locking strength to use // for key-value scans. -func GetKeyLockingStrength(lockStrength descpb.ScanLockingStrength) lock.Strength { +func GetKeyLockingStrength( + ctx context.Context, lockStrength descpb.ScanLockingStrength, settings *cluster.Settings, +) lock.Strength { switch lockStrength { case descpb.ScanLockingStrength_FOR_NONE: return lock.None @@ -27,8 +46,9 @@ func GetKeyLockingStrength(lockStrength descpb.ScanLockingStrength) lock.Strengt // Promote to FOR_SHARE. fallthrough case descpb.ScanLockingStrength_FOR_SHARE: - // We currently perform no per-key locking when FOR_SHARE is used - // because Shared locks have not yet been implemented. + if settings.Version.IsActive(ctx, clusterversion.V23_2) && EnableSharedLockingStrength.Get(&settings.SV) { + return lock.Shared + } return lock.None case descpb.ScanLockingStrength_FOR_NO_KEY_UPDATE: diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index 79e7057e0a76..28dba669682e 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -302,6 +302,7 @@ func newInvertedJoiner( Spec: &spec.FetchSpec, TraceKV: flowCtx.TraceKV, ForceProductionKVBatchSize: flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, + Settings: flowCtx.EvalCtx.Settings, }, ); err != nil { return nil, err diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index b71b18cb20b5..6a083c650293 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -530,6 +530,7 @@ func newJoinReader( } singleRowLookup := readerType == indexJoinReaderType || spec.LookupColumnsAreKey streamingKVFetcher = row.NewStreamingKVFetcher( + ctx, flowCtx.Cfg.DistSender, flowCtx.Stopper(), jr.txn, @@ -569,6 +570,7 @@ func newJoinReader( TraceKV: flowCtx.TraceKV, ForceProductionKVBatchSize: flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, SpansCanOverlap: jr.spansCanOverlap, + Settings: flowCtx.EvalCtx.Settings, }, ); err != nil { return nil, err diff --git a/pkg/sql/rowexec/tablereader.go b/pkg/sql/rowexec/tablereader.go index b9d82c65fb99..69b875cffaf7 100644 --- a/pkg/sql/rowexec/tablereader.go +++ b/pkg/sql/rowexec/tablereader.go @@ -159,6 +159,7 @@ func newTableReader( Spec: &spec.FetchSpec, TraceKV: flowCtx.TraceKV, ForceProductionKVBatchSize: flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, + Settings: flowCtx.EvalCtx.Settings, }, ); err != nil { return nil, err diff --git a/pkg/sql/rowexec/zigzagjoiner.go b/pkg/sql/rowexec/zigzagjoiner.go index 9117a0b327e9..b0654e298f6c 100644 --- a/pkg/sql/rowexec/zigzagjoiner.go +++ b/pkg/sql/rowexec/zigzagjoiner.go @@ -476,6 +476,7 @@ func (z *zigzagJoiner) setupInfo( Spec: &spec.FetchSpec, TraceKV: flowCtx.TraceKV, ForceProductionKVBatchSize: flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, + Settings: flowCtx.EvalCtx.Settings, }, ); err != nil { return err