From eb2380ddd8fd637cd11a4a039f18b29778f55684 Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Fri, 28 Apr 2023 15:21:59 -0400 Subject: [PATCH 1/4] lockspanset: cleanup some loops Followup from https://github.com/cockroachdb/cockroach/pull/102396. This patch converts some loops to be more idiomatic for go that that patch mistakenly didn't. Epic: none Release note: None --- pkg/kv/kvserver/concurrency/lock/locking.go | 7 +++++++ pkg/kv/kvserver/lockspanset/BUILD.bazel | 1 + pkg/kv/kvserver/lockspanset/lockspanset.go | 14 ++++++-------- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/pkg/kv/kvserver/concurrency/lock/locking.go b/pkg/kv/kvserver/concurrency/lock/locking.go index 0747620ccfa8..6d477a82d06e 100644 --- a/pkg/kv/kvserver/concurrency/lock/locking.go +++ b/pkg/kv/kvserver/concurrency/lock/locking.go @@ -53,6 +53,9 @@ var ExclusiveLocksBlockNonLockingReads = settings.RegisterBoolSetting( // MaxStrength is the maximum value in the Strength enum. const MaxStrength = Intent +// NumLockStrength is the total number of lock strengths in the Strength enum. +const NumLockStrength = MaxStrength + 1 + // MaxDurability is the maximum value in the Durability enum. const MaxDurability = Unreplicated @@ -62,6 +65,10 @@ func init() { panic(fmt.Sprintf("Strength (%s) with value larger than MaxDurability", st)) } } + if int(NumLockStrength) != len(Strength_name) { + panic(fmt.Sprintf("mismatched numer of lock strengths: NumLockStrength %d, lock strengths %d", + int(NumLockStrength), len(Strength_name))) + } for v := range Durability_name { if d := Durability(v); d > MaxDurability { panic(fmt.Sprintf("Durability (%s) with value larger than MaxDurability", d)) diff --git a/pkg/kv/kvserver/lockspanset/BUILD.bazel b/pkg/kv/kvserver/lockspanset/BUILD.bazel index 65aa3c825c65..b9d70f4e2f48 100644 --- a/pkg/kv/kvserver/lockspanset/BUILD.bazel +++ b/pkg/kv/kvserver/lockspanset/BUILD.bazel @@ -18,6 +18,7 @@ go_test( args = ["-test.timeout=295s"], embed = [":lockspanset"], deps = [ + "//pkg/keys", "//pkg/kv/kvserver/concurrency/lock", "//pkg/roachpb", "//pkg/util/leaktest", diff --git a/pkg/kv/kvserver/lockspanset/lockspanset.go b/pkg/kv/kvserver/lockspanset/lockspanset.go index 4fe1b2e9fa2d..5f09b4970d1a 100644 --- a/pkg/kv/kvserver/lockspanset/lockspanset.go +++ b/pkg/kv/kvserver/lockspanset/lockspanset.go @@ -19,10 +19,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" ) -const NumLockStrength = lock.MaxStrength + 1 - type LockSpanSet struct { - spans [NumLockStrength][]roachpb.Span + spans [lock.NumLockStrength][]roachpb.Span } var lockSpanSetPool = sync.Pool{ @@ -78,10 +76,10 @@ func (l *LockSpanSet) Empty() bool { // String prints a string representation of the LockSpanSet. func (l *LockSpanSet) String() string { var buf strings.Builder - for st := lock.Strength(0); st < NumLockStrength; st++ { - for _, span := range l.GetSpans(st) { + for st, spans := range l.spans { + for _, span := range spans { fmt.Fprintf(&buf, "%s: %s\n", - st, span) + lock.Strength(st), span) } } return buf.String() @@ -90,8 +88,8 @@ func (l *LockSpanSet) String() string { // Len returns the total number of spans tracked across all strengths. func (l *LockSpanSet) Len() int { var count int - for st := lock.Strength(0); st < NumLockStrength; st++ { - count += len(l.GetSpans(st)) + for _, spans := range l.spans { + count += len(spans) } return count } From 5418acd2abef7dfd102afcbe06f20f0bcfc21f5e Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Fri, 28 Apr 2023 18:35:59 -0400 Subject: [PATCH 2/4] kv: use LockSpanSets to declare lock spans Prior to this patch, requests would use the same mechanism (`spanset.SpanSets`) to declare both latch and lock spans. This patch changes the latter to use `lockspanset.LockSpanSets` instead. This allows us to declare lock spans for a request with its intended lock strengths in the future. This will in-turn enable us to support multiple locking strengths in the concurrency package. This patch does not change any functionality. To that end, requests that would have previously declared access for `spanset.SpanReadOnly` use `lock.None`; requests that would have previously declared `spanset.SpanReadWrite` use `lock.Intent`. The `concurrency` package validates lock spans with no other lock strength are supplied to it. It does so both in the lock table waiter and when scanning the lock table. We will relax these assertions as we build towards shared locks. One thing to note is that I decided not to change the lock table tests in this patch, to reduce test churn. We do so by translating lock strength back to SpanAccess when printing out the guard state. This will get fixed in a followup, along with the datadriven input. Informs #102008 Release note: None --- pkg/kv/kvserver/BUILD.bazel | 2 + pkg/kv/kvserver/batcheval/BUILD.bazel | 2 + pkg/kv/kvserver/batcheval/cmd_add_sstable.go | 4 +- pkg/kv/kvserver/batcheval/cmd_barrier.go | 4 +- pkg/kv/kvserver/batcheval/cmd_clear_range.go | 4 +- .../batcheval/cmd_clear_range_test.go | 4 +- .../batcheval/cmd_compute_checksum.go | 4 +- .../kvserver/batcheval/cmd_conditional_put.go | 4 +- pkg/kv/kvserver/batcheval/cmd_delete_range.go | 4 +- .../batcheval/cmd_delete_range_test.go | 4 +- .../kvserver/batcheval/cmd_end_transaction.go | 4 +- pkg/kv/kvserver/batcheval/cmd_export.go | 4 +- pkg/kv/kvserver/batcheval/cmd_gc.go | 4 +- .../kvserver/batcheval/cmd_heartbeat_txn.go | 4 +- pkg/kv/kvserver/batcheval/cmd_lease_info.go | 4 +- .../kvserver/batcheval/cmd_lease_request.go | 4 +- .../kvserver/batcheval/cmd_lease_transfer.go | 4 +- pkg/kv/kvserver/batcheval/cmd_migrate.go | 4 +- pkg/kv/kvserver/batcheval/cmd_probe.go | 8 +- pkg/kv/kvserver/batcheval/cmd_push_txn.go | 4 +- pkg/kv/kvserver/batcheval/cmd_put.go | 4 +- pkg/kv/kvserver/batcheval/cmd_query_intent.go | 4 +- pkg/kv/kvserver/batcheval/cmd_query_locks.go | 4 +- pkg/kv/kvserver/batcheval/cmd_query_txn.go | 4 +- pkg/kv/kvserver/batcheval/cmd_range_stats.go | 4 +- .../kvserver/batcheval/cmd_recompute_stats.go | 4 +- pkg/kv/kvserver/batcheval/cmd_recover_txn.go | 4 +- .../kvserver/batcheval/cmd_resolve_intent.go | 4 +- .../batcheval/cmd_resolve_intent_range.go | 4 +- .../batcheval/cmd_resolve_intent_test.go | 4 +- pkg/kv/kvserver/batcheval/cmd_revert_range.go | 4 +- pkg/kv/kvserver/batcheval/cmd_subsume.go | 4 +- pkg/kv/kvserver/batcheval/cmd_truncate_log.go | 4 +- pkg/kv/kvserver/batcheval/command.go | 4 +- pkg/kv/kvserver/batcheval/declare.go | 13 +- pkg/kv/kvserver/batcheval/declare_test.go | 4 +- pkg/kv/kvserver/concurrency/BUILD.bazel | 3 +- .../concurrency/concurrency_control.go | 24 +-- .../concurrency/concurrency_manager.go | 17 +- .../concurrency/concurrency_manager_test.go | 19 +- pkg/kv/kvserver/concurrency/lock_table.go | 198 ++++++++++-------- .../kvserver/concurrency/lock_table_test.go | 106 +++++++--- .../kvserver/concurrency/lock_table_waiter.go | 9 +- .../concurrency/lock_table_waiter_test.go | 50 ++--- pkg/kv/kvserver/lockspanset/BUILD.bazel | 2 +- pkg/kv/kvserver/lockspanset/lockspanset.go | 24 +++ pkg/kv/kvserver/replica_read.go | 3 +- pkg/kv/kvserver/replica_send.go | 38 ++-- pkg/kv/kvserver/replica_test.go | 3 +- 49 files changed, 417 insertions(+), 232 deletions(-) diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 8004670b6a96..e6b6c449c91b 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -141,6 +141,7 @@ go_library( "//pkg/kv/kvserver/liveness", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/kv/kvserver/load", + "//pkg/kv/kvserver/lockspanset", "//pkg/kv/kvserver/logstore", "//pkg/kv/kvserver/multiqueue", "//pkg/kv/kvserver/raftentry", @@ -376,6 +377,7 @@ go_test( "//pkg/kv/kvserver/liveness", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/kv/kvserver/load", + "//pkg/kv/kvserver/lockspanset", "//pkg/kv/kvserver/logstore", "//pkg/kv/kvserver/protectedts", "//pkg/kv/kvserver/protectedts/ptpb", diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index 6c985c5e6fa3..b5da0fdb8cb9 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -67,6 +67,7 @@ go_library( "//pkg/kv/kvserver/gc", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", + "//pkg/kv/kvserver/lockspanset", "//pkg/kv/kvserver/rditer", "//pkg/kv/kvserver/readsummary", "//pkg/kv/kvserver/readsummary/rspb", @@ -143,6 +144,7 @@ go_test( "//pkg/kv/kvserver/gc", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", + "//pkg/kv/kvserver/lockspanset", "//pkg/kv/kvserver/readsummary", "//pkg/kv/kvserver/readsummary/rspb", "//pkg/kv/kvserver/spanset", diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go index 20e3e9c0e54c..0da4f97b5faf 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" @@ -45,7 +46,8 @@ func declareKeysAddSSTable( rs ImmutableRangeState, header *kvpb.Header, req kvpb.Request, - latchSpans, lockSpans *spanset.SpanSet, + latchSpans *spanset.SpanSet, + lockSpans *lockspanset.LockSpanSet, maxOffset time.Duration, ) { args := req.(*kvpb.AddSSTableRequest) diff --git a/pkg/kv/kvserver/batcheval/cmd_barrier.go b/pkg/kv/kvserver/batcheval/cmd_barrier.go index 4b4b5e7b1e2c..40bacd038fac 100644 --- a/pkg/kv/kvserver/batcheval/cmd_barrier.go +++ b/pkg/kv/kvserver/batcheval/cmd_barrier.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/storage" ) @@ -28,7 +29,8 @@ func declareKeysBarrier( _ ImmutableRangeState, _ *kvpb.Header, req kvpb.Request, - latchSpans, _ *spanset.SpanSet, + latchSpans *spanset.SpanSet, + _ *lockspanset.LockSpanSet, _ time.Duration, ) { // Barrier is special-cased in the concurrency manager to *not* actually diff --git a/pkg/kv/kvserver/batcheval/cmd_clear_range.go b/pkg/kv/kvserver/batcheval/cmd_clear_range.go index 1c570ffb46c5..dfdbdcb0a76f 100644 --- a/pkg/kv/kvserver/batcheval/cmd_clear_range.go +++ b/pkg/kv/kvserver/batcheval/cmd_clear_range.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -42,7 +43,8 @@ func declareKeysClearRange( rs ImmutableRangeState, header *kvpb.Header, req kvpb.Request, - latchSpans, lockSpans *spanset.SpanSet, + latchSpans *spanset.SpanSet, + lockSpans *lockspanset.LockSpanSet, maxOffset time.Duration, ) { DefaultDeclareIsolatedKeys(rs, header, req, latchSpans, lockSpans, maxOffset) diff --git a/pkg/kv/kvserver/batcheval/cmd_clear_range_test.go b/pkg/kv/kvserver/batcheval/cmd_clear_range_test.go index 43a310a8ddf2..91260dea23b4 100644 --- a/pkg/kv/kvserver/batcheval/cmd_clear_range_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_clear_range_test.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -186,7 +187,8 @@ func TestCmdClearRange(t *testing.T) { // particular, to test the additional seeks necessary to peek for // adjacent range keys that we may truncate (for stats purposes) which // should not cross the range bounds. - var latchSpans, lockSpans spanset.SpanSet + var latchSpans spanset.SpanSet + var lockSpans lockspanset.LockSpanSet declareKeysClearRange(&desc, &cArgs.Header, cArgs.Args, &latchSpans, &lockSpans, 0) batch := &wrappedBatch{Batch: spanset.NewBatchAt(eng.NewBatch(), &latchSpans, cArgs.Header.Timestamp)} defer batch.Close() diff --git a/pkg/kv/kvserver/batcheval/cmd_compute_checksum.go b/pkg/kv/kvserver/batcheval/cmd_compute_checksum.go index 131db73f7607..d0dba475d072 100644 --- a/pkg/kv/kvserver/batcheval/cmd_compute_checksum.go +++ b/pkg/kv/kvserver/batcheval/cmd_compute_checksum.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -32,7 +33,8 @@ func declareKeysComputeChecksum( rs ImmutableRangeState, _ *kvpb.Header, _ kvpb.Request, - latchSpans, _ *spanset.SpanSet, + latchSpans *spanset.SpanSet, + _ *lockspanset.LockSpanSet, _ time.Duration, ) { // The correctness of range merges depends on the lease applied index of a diff --git a/pkg/kv/kvserver/batcheval/cmd_conditional_put.go b/pkg/kv/kvserver/batcheval/cmd_conditional_put.go index 75bd4e1ec1bd..7cf71f6d3dd7 100644 --- a/pkg/kv/kvserver/batcheval/cmd_conditional_put.go +++ b/pkg/kv/kvserver/batcheval/cmd_conditional_put.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -29,7 +30,8 @@ func declareKeysConditionalPut( rs ImmutableRangeState, header *kvpb.Header, req kvpb.Request, - latchSpans, lockSpans *spanset.SpanSet, + latchSpans *spanset.SpanSet, + lockSpans *lockspanset.LockSpanSet, maxOffset time.Duration, ) { args := req.(*kvpb.ConditionalPutRequest) diff --git a/pkg/kv/kvserver/batcheval/cmd_delete_range.go b/pkg/kv/kvserver/batcheval/cmd_delete_range.go index c3b9df8b50fe..c34dbaf5cd60 100644 --- a/pkg/kv/kvserver/batcheval/cmd_delete_range.go +++ b/pkg/kv/kvserver/batcheval/cmd_delete_range.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -34,7 +35,8 @@ func declareKeysDeleteRange( rs ImmutableRangeState, header *kvpb.Header, req kvpb.Request, - latchSpans, lockSpans *spanset.SpanSet, + latchSpans *spanset.SpanSet, + lockSpans *lockspanset.LockSpanSet, maxOffset time.Duration, ) { args := req.(*kvpb.DeleteRangeRequest) diff --git a/pkg/kv/kvserver/batcheval/cmd_delete_range_test.go b/pkg/kv/kvserver/batcheval/cmd_delete_range_test.go index 35132497c5de..43060106f022 100644 --- a/pkg/kv/kvserver/batcheval/cmd_delete_range_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_delete_range_test.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -252,7 +253,8 @@ func TestDeleteRangeTombstone(t *testing.T) { // the additional seeks necessary to check for adjacent range keys that we // may merge with (for stats purposes) which should not cross the range // bounds. - var latchSpans, lockSpans spanset.SpanSet + var latchSpans spanset.SpanSet + var lockSpans lockspanset.LockSpanSet declareKeysDeleteRange(evalCtx.Desc, &h, req, &latchSpans, &lockSpans, 0) batch := spanset.NewBatchAt(engine.NewBatch(), &latchSpans, h.Timestamp) defer batch.Close() diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go index d97a526127e4..ac48a1c20670 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/gc" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" @@ -60,7 +61,8 @@ func declareKeysEndTxn( rs ImmutableRangeState, header *kvpb.Header, req kvpb.Request, - latchSpans, _ *spanset.SpanSet, + latchSpans *spanset.SpanSet, + _ *lockspanset.LockSpanSet, _ time.Duration, ) { et := req.(*kvpb.EndTxnRequest) diff --git a/pkg/kv/kvserver/batcheval/cmd_export.go b/pkg/kv/kvserver/batcheval/cmd_export.go index a435e4e3d941..bb004fbc49ab 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export.go +++ b/pkg/kv/kvserver/batcheval/cmd_export.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" @@ -72,7 +73,8 @@ func declareKeysExport( rs ImmutableRangeState, header *kvpb.Header, req kvpb.Request, - latchSpans, lockSpans *spanset.SpanSet, + latchSpans *spanset.SpanSet, + lockSpans *lockspanset.LockSpanSet, maxOffset time.Duration, ) { DefaultDeclareIsolatedKeys(rs, header, req, latchSpans, lockSpans, maxOffset) diff --git a/pkg/kv/kvserver/batcheval/cmd_gc.go b/pkg/kv/kvserver/batcheval/cmd_gc.go index 2635ca53baaa..ed6ede15c162 100644 --- a/pkg/kv/kvserver/batcheval/cmd_gc.go +++ b/pkg/kv/kvserver/batcheval/cmd_gc.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -34,7 +35,8 @@ func declareKeysGC( rs ImmutableRangeState, header *kvpb.Header, req kvpb.Request, - latchSpans, _ *spanset.SpanSet, + latchSpans *spanset.SpanSet, + _ *lockspanset.LockSpanSet, _ time.Duration, ) { gcr := req.(*kvpb.GCRequest) diff --git a/pkg/kv/kvserver/batcheval/cmd_heartbeat_txn.go b/pkg/kv/kvserver/batcheval/cmd_heartbeat_txn.go index c0f9dc8d9680..a0cb38db7f01 100644 --- a/pkg/kv/kvserver/batcheval/cmd_heartbeat_txn.go +++ b/pkg/kv/kvserver/batcheval/cmd_heartbeat_txn.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -32,7 +33,8 @@ func declareKeysHeartbeatTransaction( rs ImmutableRangeState, header *kvpb.Header, req kvpb.Request, - latchSpans, _ *spanset.SpanSet, + latchSpans *spanset.SpanSet, + _ *lockspanset.LockSpanSet, _ time.Duration, ) { declareKeysWriteTransaction(rs, header, req, latchSpans) diff --git a/pkg/kv/kvserver/batcheval/cmd_lease_info.go b/pkg/kv/kvserver/batcheval/cmd_lease_info.go index 5829dad900e1..3cca944d3339 100644 --- a/pkg/kv/kvserver/batcheval/cmd_lease_info.go +++ b/pkg/kv/kvserver/batcheval/cmd_lease_info.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -30,7 +31,8 @@ func declareKeysLeaseInfo( rs ImmutableRangeState, _ *kvpb.Header, _ kvpb.Request, - latchSpans, _ *spanset.SpanSet, + latchSpans *spanset.SpanSet, + _ *lockspanset.LockSpanSet, _ time.Duration, ) { latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeLeaseKey(rs.GetRangeID())}) diff --git a/pkg/kv/kvserver/batcheval/cmd_lease_request.go b/pkg/kv/kvserver/batcheval/cmd_lease_request.go index f800ce98e2bb..9979397ddcd3 100644 --- a/pkg/kv/kvserver/batcheval/cmd_lease_request.go +++ b/pkg/kv/kvserver/batcheval/cmd_lease_request.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -31,7 +32,8 @@ func declareKeysRequestLease( rs ImmutableRangeState, _ *kvpb.Header, _ kvpb.Request, - latchSpans, _ *spanset.SpanSet, + latchSpans *spanset.SpanSet, + _ *lockspanset.LockSpanSet, _ time.Duration, ) { // NOTE: RequestLease is run on replicas that do not hold the lease, so diff --git a/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go b/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go index 8f4cccff2621..7d053d41c9c6 100644 --- a/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go +++ b/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -31,7 +32,8 @@ func declareKeysTransferLease( _ ImmutableRangeState, _ *kvpb.Header, _ kvpb.Request, - latchSpans, _ *spanset.SpanSet, + latchSpans *spanset.SpanSet, + _ *lockspanset.LockSpanSet, _ time.Duration, ) { // TransferLease must not run concurrently with any other request so it uses diff --git a/pkg/kv/kvserver/batcheval/cmd_migrate.go b/pkg/kv/kvserver/batcheval/cmd_migrate.go index 979ef49be728..a0fc7d03ab59 100644 --- a/pkg/kv/kvserver/batcheval/cmd_migrate.go +++ b/pkg/kv/kvserver/batcheval/cmd_migrate.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -32,7 +33,8 @@ func declareKeysMigrate( rs ImmutableRangeState, _ *kvpb.Header, _ kvpb.Request, - latchSpans, _ *spanset.SpanSet, + latchSpans *spanset.SpanSet, + _ *lockspanset.LockSpanSet, _ time.Duration, ) { // TODO(irfansharif): This will eventually grow to capture the super set of diff --git a/pkg/kv/kvserver/batcheval/cmd_probe.go b/pkg/kv/kvserver/batcheval/cmd_probe.go index 739b59a05269..f5fca4b8559e 100644 --- a/pkg/kv/kvserver/batcheval/cmd_probe.go +++ b/pkg/kv/kvserver/batcheval/cmd_probe.go @@ -17,12 +17,18 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/storage" ) func declareKeysProbe( - _ ImmutableRangeState, _ *kvpb.Header, _ kvpb.Request, _, _ *spanset.SpanSet, _ time.Duration, + _ ImmutableRangeState, + _ *kvpb.Header, + _ kvpb.Request, + _ *spanset.SpanSet, + _ *lockspanset.LockSpanSet, + _ time.Duration, ) { // Declare no keys. This means that we're not even serializing with splits // (i.e. a probe could be directed at a key that will become the right-hand diff --git a/pkg/kv/kvserver/batcheval/cmd_push_txn.go b/pkg/kv/kvserver/batcheval/cmd_push_txn.go index 56d76aaf05d3..25bac62c7f80 100644 --- a/pkg/kv/kvserver/batcheval/cmd_push_txn.go +++ b/pkg/kv/kvserver/batcheval/cmd_push_txn.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -37,7 +38,8 @@ func declareKeysPushTransaction( rs ImmutableRangeState, _ *kvpb.Header, req kvpb.Request, - latchSpans, _ *spanset.SpanSet, + latchSpans *spanset.SpanSet, + _ *lockspanset.LockSpanSet, _ time.Duration, ) { pr := req.(*kvpb.PushTxnRequest) diff --git a/pkg/kv/kvserver/batcheval/cmd_put.go b/pkg/kv/kvserver/batcheval/cmd_put.go index df0074119039..8388ec5baeeb 100644 --- a/pkg/kv/kvserver/batcheval/cmd_put.go +++ b/pkg/kv/kvserver/batcheval/cmd_put.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -29,7 +30,8 @@ func declareKeysPut( rs ImmutableRangeState, header *kvpb.Header, req kvpb.Request, - latchSpans, lockSpans *spanset.SpanSet, + latchSpans *spanset.SpanSet, + lockSpans *lockspanset.LockSpanSet, maxOffset time.Duration, ) { args := req.(*kvpb.PutRequest) diff --git a/pkg/kv/kvserver/batcheval/cmd_query_intent.go b/pkg/kv/kvserver/batcheval/cmd_query_intent.go index 2da4994ac937..f998bcd9ca46 100644 --- a/pkg/kv/kvserver/batcheval/cmd_query_intent.go +++ b/pkg/kv/kvserver/batcheval/cmd_query_intent.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -30,7 +31,8 @@ func declareKeysQueryIntent( _ ImmutableRangeState, _ *kvpb.Header, req kvpb.Request, - latchSpans, _ *spanset.SpanSet, + latchSpans *spanset.SpanSet, + _ *lockspanset.LockSpanSet, _ time.Duration, ) { // QueryIntent requests read the specified keys at the maximum timestamp in diff --git a/pkg/kv/kvserver/batcheval/cmd_query_locks.go b/pkg/kv/kvserver/batcheval/cmd_query_locks.go index a586eda1969b..45ef8b6d2472 100644 --- a/pkg/kv/kvserver/batcheval/cmd_query_locks.go +++ b/pkg/kv/kvserver/batcheval/cmd_query_locks.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -31,7 +32,8 @@ func declareKeysQueryLocks( rs ImmutableRangeState, _ *kvpb.Header, _ kvpb.Request, - latchSpans, _ *spanset.SpanSet, + latchSpans *spanset.SpanSet, + _ *lockspanset.LockSpanSet, _ time.Duration, ) { // Latch on the range descriptor during evaluation of query locks. diff --git a/pkg/kv/kvserver/batcheval/cmd_query_txn.go b/pkg/kv/kvserver/batcheval/cmd_query_txn.go index 11db94c923d2..778e57b19a19 100644 --- a/pkg/kv/kvserver/batcheval/cmd_query_txn.go +++ b/pkg/kv/kvserver/batcheval/cmd_query_txn.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -32,7 +33,8 @@ func declareKeysQueryTransaction( _ ImmutableRangeState, _ *kvpb.Header, req kvpb.Request, - latchSpans, _ *spanset.SpanSet, + latchSpans *spanset.SpanSet, + _ *lockspanset.LockSpanSet, _ time.Duration, ) { qr := req.(*kvpb.QueryTxnRequest) diff --git a/pkg/kv/kvserver/batcheval/cmd_range_stats.go b/pkg/kv/kvserver/batcheval/cmd_range_stats.go index bd28e6ac5081..5d91904dd230 100644 --- a/pkg/kv/kvserver/batcheval/cmd_range_stats.go +++ b/pkg/kv/kvserver/batcheval/cmd_range_stats.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -30,7 +31,8 @@ func declareKeysRangeStats( rs ImmutableRangeState, header *kvpb.Header, req kvpb.Request, - latchSpans, lockSpans *spanset.SpanSet, + latchSpans *spanset.SpanSet, + lockSpans *lockspanset.LockSpanSet, maxOffset time.Duration, ) { DefaultDeclareKeys(rs, header, req, latchSpans, lockSpans, maxOffset) diff --git a/pkg/kv/kvserver/batcheval/cmd_recompute_stats.go b/pkg/kv/kvserver/batcheval/cmd_recompute_stats.go index c9f16242665b..6688118d88ce 100644 --- a/pkg/kv/kvserver/batcheval/cmd_recompute_stats.go +++ b/pkg/kv/kvserver/batcheval/cmd_recompute_stats.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -34,7 +35,8 @@ func declareKeysRecomputeStats( rs ImmutableRangeState, _ *kvpb.Header, _ kvpb.Request, - latchSpans, _ *spanset.SpanSet, + latchSpans *spanset.SpanSet, + _ *lockspanset.LockSpanSet, _ time.Duration, ) { // We don't declare any user key in the range. This is OK since all we're doing is computing a diff --git a/pkg/kv/kvserver/batcheval/cmd_recover_txn.go b/pkg/kv/kvserver/batcheval/cmd_recover_txn.go index e6900c4cbfe2..57021077f471 100644 --- a/pkg/kv/kvserver/batcheval/cmd_recover_txn.go +++ b/pkg/kv/kvserver/batcheval/cmd_recover_txn.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -32,7 +33,8 @@ func declareKeysRecoverTransaction( rs ImmutableRangeState, _ *kvpb.Header, req kvpb.Request, - latchSpans, _ *spanset.SpanSet, + latchSpans *spanset.SpanSet, + _ *lockspanset.LockSpanSet, _ time.Duration, ) { rr := req.(*kvpb.RecoverTxnRequest) diff --git a/pkg/kv/kvserver/batcheval/cmd_resolve_intent.go b/pkg/kv/kvserver/batcheval/cmd_resolve_intent.go index 29353a86703e..aff655cfd18b 100644 --- a/pkg/kv/kvserver/batcheval/cmd_resolve_intent.go +++ b/pkg/kv/kvserver/batcheval/cmd_resolve_intent.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -56,7 +57,8 @@ func declareKeysResolveIntent( rs ImmutableRangeState, _ *kvpb.Header, req kvpb.Request, - latchSpans, _ *spanset.SpanSet, + latchSpans *spanset.SpanSet, + _ *lockspanset.LockSpanSet, _ time.Duration, ) { declareKeysResolveIntentCombined(rs, req, latchSpans) diff --git a/pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go b/pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go index 64cc8c73e442..715ccd3d8073 100644 --- a/pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go +++ b/pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -29,7 +30,8 @@ func declareKeysResolveIntentRange( rs ImmutableRangeState, _ *kvpb.Header, req kvpb.Request, - latchSpans, _ *spanset.SpanSet, + latchSpans *spanset.SpanSet, + _ *lockspanset.LockSpanSet, _ time.Duration, ) { declareKeysResolveIntentCombined(rs, req, latchSpans) diff --git a/pkg/kv/kvserver/batcheval/cmd_resolve_intent_test.go b/pkg/kv/kvserver/batcheval/cmd_resolve_intent_test.go index 9f5e3a0b235d..9b127b775882 100644 --- a/pkg/kv/kvserver/batcheval/cmd_resolve_intent_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_resolve_intent_test.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -99,7 +100,8 @@ func TestDeclareKeysResolveIntent(t *testing.T) { as := abortspan.New(desc.RangeID) - var latchSpans, lockSpans spanset.SpanSet + var latchSpans spanset.SpanSet + var lockSpans lockspanset.LockSpanSet var h kvpb.Header h.RangeID = desc.RangeID diff --git a/pkg/kv/kvserver/batcheval/cmd_revert_range.go b/pkg/kv/kvserver/batcheval/cmd_revert_range.go index 9f1d09b26717..d9169f211ac5 100644 --- a/pkg/kv/kvserver/batcheval/cmd_revert_range.go +++ b/pkg/kv/kvserver/batcheval/cmd_revert_range.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -39,7 +40,8 @@ func declareKeysRevertRange( rs ImmutableRangeState, header *kvpb.Header, req kvpb.Request, - latchSpans, lockSpans *spanset.SpanSet, + latchSpans *spanset.SpanSet, + lockSpans *lockspanset.LockSpanSet, maxOffset time.Duration, ) { args := req.(*kvpb.RevertRangeRequest) diff --git a/pkg/kv/kvserver/batcheval/cmd_subsume.go b/pkg/kv/kvserver/batcheval/cmd_subsume.go index fbcc45106811..6926f66a469c 100644 --- a/pkg/kv/kvserver/batcheval/cmd_subsume.go +++ b/pkg/kv/kvserver/batcheval/cmd_subsume.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/storage" @@ -33,7 +34,8 @@ func declareKeysSubsume( _ ImmutableRangeState, _ *kvpb.Header, _ kvpb.Request, - latchSpans, _ *spanset.SpanSet, + latchSpans *spanset.SpanSet, + _ *lockspanset.LockSpanSet, _ time.Duration, ) { // Subsume must not run concurrently with any other command. It declares a diff --git a/pkg/kv/kvserver/batcheval/cmd_truncate_log.go b/pkg/kv/kvserver/batcheval/cmd_truncate_log.go index 430ef914d9a6..3c982aa6c5f2 100644 --- a/pkg/kv/kvserver/batcheval/cmd_truncate_log.go +++ b/pkg/kv/kvserver/batcheval/cmd_truncate_log.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -33,7 +34,8 @@ func declareKeysTruncateLog( rs ImmutableRangeState, _ *kvpb.Header, _ kvpb.Request, - latchSpans, _ *spanset.SpanSet, + latchSpans *spanset.SpanSet, + _ *lockspanset.LockSpanSet, _ time.Duration, ) { prefix := keys.RaftLogPrefix(rs.GetRangeID()) diff --git a/pkg/kv/kvserver/batcheval/command.go b/pkg/kv/kvserver/batcheval/command.go index 205e83d167ea..0a05cb93a579 100644 --- a/pkg/kv/kvserver/batcheval/command.go +++ b/pkg/kv/kvserver/batcheval/command.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -29,7 +30,8 @@ type DeclareKeysFunc func( rs ImmutableRangeState, header *kvpb.Header, request kvpb.Request, - latchSpans, lockSpans *spanset.SpanSet, + latchSpans *spanset.SpanSet, + lockSpans *lockspanset.LockSpanSet, maxOffset time.Duration, ) diff --git a/pkg/kv/kvserver/batcheval/declare.go b/pkg/kv/kvserver/batcheval/declare.go index cb28a5c5e26e..ad1f3039273f 100644 --- a/pkg/kv/kvserver/batcheval/declare.go +++ b/pkg/kv/kvserver/batcheval/declare.go @@ -17,7 +17,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -30,7 +32,8 @@ func DefaultDeclareKeys( _ ImmutableRangeState, header *kvpb.Header, req kvpb.Request, - latchSpans, _ *spanset.SpanSet, + latchSpans *spanset.SpanSet, + _ *lockspanset.LockSpanSet, _ time.Duration, ) { access := spanset.SpanReadWrite @@ -49,13 +52,17 @@ func DefaultDeclareIsolatedKeys( _ ImmutableRangeState, header *kvpb.Header, req kvpb.Request, - latchSpans, lockSpans *spanset.SpanSet, + latchSpans *spanset.SpanSet, + lockSpans *lockspanset.LockSpanSet, maxOffset time.Duration, ) { access := spanset.SpanReadWrite + // TODO(arul): pass in the correct lock strength here based on the request. + str := lock.Intent timestamp := header.Timestamp if kvpb.IsReadOnly(req) && !kvpb.IsLocking(req) { access = spanset.SpanReadOnly + str = lock.None // For non-locking reads, acquire read latches all the way up to the // request's worst-case (i.e. global) uncertainty limit, because reads may @@ -81,7 +88,7 @@ func DefaultDeclareIsolatedKeys( timestamp.Forward(in.GlobalLimit) } latchSpans.AddMVCC(access, req.Header().Span(), timestamp) - lockSpans.AddNonMVCC(access, req.Header().Span()) + lockSpans.Add(str, req.Header().Span()) } // DeclareKeysForBatch adds all keys that the batch with the provided header diff --git a/pkg/kv/kvserver/batcheval/declare_test.go b/pkg/kv/kvserver/batcheval/declare_test.go index 0f2b5a1c24b1..d126286ba93f 100644 --- a/pkg/kv/kvserver/batcheval/declare_test.go +++ b/pkg/kv/kvserver/batcheval/declare_test.go @@ -14,6 +14,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -43,7 +44,8 @@ func TestRequestsSerializeWithAllKeys(t *testing.T) { continue } t.Run(method.String(), func(t *testing.T) { - var otherLatchSpans, otherLockSpans spanset.SpanSet + var otherLatchSpans spanset.SpanSet + var otherLockSpans lockspanset.LockSpanSet startKey := []byte(`a`) endKey := []byte(`b`) diff --git a/pkg/kv/kvserver/concurrency/BUILD.bazel b/pkg/kv/kvserver/concurrency/BUILD.bazel index 78b7da19e877..a472d4e790b9 100644 --- a/pkg/kv/kvserver/concurrency/BUILD.bazel +++ b/pkg/kv/kvserver/concurrency/BUILD.bazel @@ -16,12 +16,12 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency", visibility = ["//visibility:public"], deps = [ - "//pkg/keys", "//pkg/kv", "//pkg/kv/kvpb", "//pkg/kv/kvserver/concurrency/lock", "//pkg/kv/kvserver/concurrency/poison", "//pkg/kv/kvserver/intentresolver", + "//pkg/kv/kvserver/lockspanset", "//pkg/kv/kvserver/spanlatch", "//pkg/kv/kvserver/spanset", "//pkg/kv/kvserver/txnwait", @@ -66,6 +66,7 @@ go_test( "//pkg/kv/kvserver/concurrency/lock", "//pkg/kv/kvserver/concurrency/poison", "//pkg/kv/kvserver/intentresolver", + "//pkg/kv/kvserver/lockspanset", "//pkg/kv/kvserver/spanlatch", "//pkg/kv/kvserver/spanset", "//pkg/kv/kvserver/txnwait", diff --git a/pkg/kv/kvserver/concurrency/concurrency_control.go b/pkg/kv/kvserver/concurrency/concurrency_control.go index bbbcdae0c26a..dfce491acc0e 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_control.go +++ b/pkg/kv/kvserver/concurrency/concurrency_control.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -418,22 +419,15 @@ type Request struct { // not also passed an exiting Guard. LatchSpans *spanset.SpanSet - // The maximal set of spans within which the request expects to have - // isolation from conflicting transactions. Conflicting locks within - // these spans will be queued on and conditionally pushed. + // The maximal set of spans within which the request expects to have isolation + // from conflicting transactions. The level of isolation for a span is + // dictated by its corresponding lock Strength. Conflicting locks within these + // spans will be queued on and conditionally pushed. // - // Note that unlike LatchSpans, the timestamps that these spans are - // declared at are NOT consulted. All read spans are considered to take - // place at the transaction's read timestamp (Txn.ReadTimestamp) and all - // write spans are considered to take place the transaction's write - // timestamp (Txn.WriteTimestamp). If the request is non-transactional - // (Txn == nil), all reads and writes are considered to take place at - // Timestamp. - // - // Note: ownership of the SpanSet is assumed by the Request once it is + // Note: ownership of the LockSpanSet is assumed by the Request once it is // passed to SequenceReq. Only supplied to SequenceReq if the method is // not also passed an exiting Guard. - LockSpans *spanset.SpanSet + LockSpans *lockspanset.LockSpanSet } // Guard is returned from Manager.SequenceReq. The guard is passed back in to @@ -758,14 +752,14 @@ type lockTableGuard interface { // the discovered locks have been added. ResolveBeforeScanning() []roachpb.LockUpdate - // CheckOptimisticNoConflicts uses the SpanSet representing the spans that + // CheckOptimisticNoConflicts uses the LockSpanSet representing the spans that // were actually read, to check for conflicting locks, after an optimistic // evaluation. It returns true if there were no conflicts. See // lockTable.ScanOptimistic for context. Note that the evaluation has // already seen any intents (replicated single-key locks) that conflicted, // so this checking is practically only going to find unreplicated locks // that conflict. - CheckOptimisticNoConflicts(*spanset.SpanSet) (ok bool) + CheckOptimisticNoConflicts(*lockspanset.LockSpanSet) (ok bool) // IsKeyLockedByConflictingTxn returns whether the specified key is locked or // reserved (see lockTable "reservations") by a conflicting transaction in the diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager.go b/pkg/kv/kvserver/concurrency/concurrency_manager.go index ec90fb971340..6d2c2b1bc2b6 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanlatch" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" @@ -670,7 +671,7 @@ func (g *Guard) LatchSpans() *spanset.SpanSet { // SpanSets to the caller, ensuring that the SpanSets are not destroyed with the // Guard. The method is only safe if called immediately before passing the Guard // to FinishReq. -func (g *Guard) TakeSpanSets() (*spanset.SpanSet, *spanset.SpanSet) { +func (g *Guard) TakeSpanSets() (*spanset.SpanSet, *lockspanset.LockSpanSet) { la, lo := g.Req.LatchSpans, g.Req.LockSpans g.Req.LatchSpans, g.Req.LockSpans = nil, nil return la, lo @@ -708,18 +709,18 @@ func (g *Guard) IsolatedAtLaterTimestamps() bool { // unprotected timestamp. We only look at global latch spans because local // latch spans always use unbounded (NonMVCC) timestamps. return len(g.Req.LatchSpans.GetSpans(spanset.SpanReadOnly, spanset.SpanGlobal)) == 0 && - // Similarly, if the request declared any global or local read lock spans - // then it can not trivially bump its timestamp without dropping its - // lockTableGuard and re-scanning the lockTable. Doing so could allow the - // request to conflict with locks that it previously did not conflict with. - len(g.Req.LockSpans.GetSpans(spanset.SpanReadOnly, spanset.SpanGlobal)) == 0 && - len(g.Req.LockSpans.GetSpans(spanset.SpanReadOnly, spanset.SpanLocal)) == 0 + // Similarly, if the request intends to perform any non-locking reads, it + // cannot trivially bump its timestamp and expect to be isolated at the + // higher timestamp. Bumping its timestamp could cause the request to + // conflict with locks that it previously did not conflict with. It must + // drop its lockTableGuard and re-scan the lockTable. + len(g.Req.LockSpans.GetSpans(lock.None)) == 0 } // CheckOptimisticNoConflicts checks that the {latch,lock}SpansRead do not // have a conflicting latch, lock. func (g *Guard) CheckOptimisticNoConflicts( - latchSpansRead *spanset.SpanSet, lockSpansRead *spanset.SpanSet, + latchSpansRead *spanset.SpanSet, lockSpansRead *lockspanset.LockSpanSet, ) (ok bool) { if g.EvalKind != OptimisticEval { panic(errors.AssertionFailedf("unexpected EvalKind: %d", g.EvalKind)) diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go index d088cf69dab9..d1635306b0df 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -999,8 +1000,8 @@ func (c *cluster) resetNamespace() { // Its logic mirrors that in Replica.collectSpans. func (c *cluster) collectSpans( t *testing.T, txn *roachpb.Transaction, ts hlc.Timestamp, reqs []kvpb.Request, -) (latchSpans, lockSpans *spanset.SpanSet) { - latchSpans, lockSpans = &spanset.SpanSet{}, &spanset.SpanSet{} +) (latchSpans *spanset.SpanSet, lockSpans *lockspanset.LockSpanSet) { + latchSpans, lockSpans = &spanset.SpanSet{}, &lockspanset.LockSpanSet{} h := kvpb.Header{Txn: txn, Timestamp: ts} for _, req := range reqs { if cmd, ok := batcheval.LookupCommand(req.Method()); ok { @@ -1011,12 +1012,14 @@ func (c *cluster) collectSpans( } // Commands may create a large number of duplicate spans. De-duplicate - // them to reduce the number of spans we pass to the spanlatch manager. - for _, s := range [...]*spanset.SpanSet{latchSpans, lockSpans} { - s.SortAndDedup() - if err := s.Validate(); err != nil { - t.Fatal(err) - } + // them to reduce the number of spans we pass to the {spanlatch,Lock}Manager. + latchSpans.SortAndDedup() + lockSpans.SortAndDeDup() + if err := latchSpans.Validate(); err != nil { + t.Fatal(err) + } + if err := lockSpans.Validate(); err != nil { + t.Fatal(err) } return latchSpans, lockSpans } diff --git a/pkg/kv/kvserver/concurrency/lock_table.go b/pkg/kv/kvserver/concurrency/lock_table.go index 79027d59f067..81033d625109 100644 --- a/pkg/kv/kvserver/concurrency/lock_table.go +++ b/pkg/kv/kvserver/concurrency/lock_table.go @@ -18,9 +18,9 @@ import ( "sync/atomic" "time" - "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -95,9 +95,10 @@ type waitingState struct { queuedWriters int // how many writers are waiting? queuedReaders int // how many readers are waiting? - // Represents the action that the request was trying to perform when - // it hit the conflict. E.g. was it trying to read or write? - guardAccess spanset.SpanAccess + // Represents the lock strength of the action that the request was trying to + // perform when it hit the conflict. E.g. was it trying to perform a (possibly + // locking) read or write an Intent? + guardStrength lock.Strength } // String implements the fmt.Stringer interface. @@ -361,7 +362,7 @@ type lockTableGuardImpl struct { // Information about this request. txn *enginepb.TxnMeta ts hlc.Timestamp - spans *spanset.SpanSet + spans *lockspanset.LockSpanSet waitPolicy lock.WaitPolicy maxWaitQueueLength int @@ -412,9 +413,8 @@ type lockTableGuardImpl struct { // The key for the lockState. key roachpb.Key // The key for the lockState is contained in the Span specified by - // spans[sa][ss][index]. - ss spanset.SpanScope - sa spanset.SpanAccess // Iterates from stronger to weaker strength + // spans[str][index]. + str lock.Strength // Iterates from strongest to weakest lock strength index int mu struct { @@ -534,17 +534,18 @@ func (g *lockTableGuardImpl) updateStateLocked(newState waitingState) { g.mu.state = newState } -func (g *lockTableGuardImpl) CheckOptimisticNoConflicts(spanSet *spanset.SpanSet) (ok bool) { +func (g *lockTableGuardImpl) CheckOptimisticNoConflicts( + lockSpanSet *lockspanset.LockSpanSet, +) (ok bool) { if g.waitPolicy == lock.WaitPolicy_SkipLocked { // If the request is using a SkipLocked wait policy, lock conflicts are // handled during evaluation. return true } - // Temporarily replace the SpanSet in the guard. + // Temporarily replace the LockSpanSet in the guard. originalSpanSet := g.spans - g.spans = spanSet - g.sa = spanset.NumSpanAccess - 1 - g.ss = spanset.SpanScope(0) + g.spans = lockSpanSet + g.str = lock.MaxStrength g.index = -1 defer func() { g.spans = originalSpanSet @@ -556,7 +557,7 @@ func (g *lockTableGuardImpl) CheckOptimisticNoConflicts(spanSet *spanset.SpanSet ltRange := &lockState{key: startKey, endKey: span.EndKey} for iter.FirstOverlap(ltRange); iter.Valid(); iter.NextOverlap(ltRange) { l := iter.Cur() - if !l.isNonConflictingLock(g, g.sa) { + if !l.isNonConflictingLock(g, g.str) { return false } } @@ -646,8 +647,8 @@ func (g *lockTableGuardImpl) isSameTxnAsReservation(ws waitingState) bool { // accumulate intents to resolve. // Acquires g.mu. func (g *lockTableGuardImpl) findNextLockAfter(notify bool) { - spans := g.spans.GetSpans(g.sa, g.ss) - var span *spanset.Span + spans := g.spans.GetSpans(g.str) + var span *roachpb.Span resumingInSameSpan := false if g.index == -1 || len(spans[g.index].EndKey) == 0 { span = stepToNextSpan(g) @@ -687,7 +688,7 @@ func (g *lockTableGuardImpl) findNextLockAfter(notify bool) { // Else, past the lock where it stopped waiting. We may not // encounter that lock since it may have been garbage collected. } - wait, transitionedToFree := l.tryActiveWait(g, g.sa, notify, g.lt.clock) + wait, transitionedToFree := l.tryActiveWait(g, g.str, notify, g.lt.clock) if transitionedToFree { locksToGC = append(locksToGC, l) } @@ -1276,7 +1277,6 @@ func (l *lockState) informActiveWaiters() { for e := l.waitingReaders.Front(); e != nil; e = e.Next() { state := waitForState - state.guardAccess = spanset.SpanReadOnly // Since there are waiting readers we could not have transitioned out of // or into a state with a reservation, since readers do not wait for // reservations. @@ -1285,6 +1285,7 @@ func (l *lockState) informActiveWaiters() { l.distinguishedWaiter = g findDistinguished = false } + state.guardStrength = g.str g.mu.Lock() g.updateStateLocked(state) if l.distinguishedWaiter == g { @@ -1303,7 +1304,7 @@ func (l *lockState) informActiveWaiters() { if g.isSameTxnAsReservation(state) { state.kind = waitSelf } else { - state.guardAccess = spanset.SpanReadWrite + state.guardStrength = qg.guard.str if findDistinguished { l.distinguishedWaiter = g findDistinguished = false @@ -1499,13 +1500,14 @@ func (l *lockState) clearLockHolder() { } } -// Decides whether the request g with access sa should actively wait at this -// lock and if yes, adjusts the data-structures appropriately. The notify -// parameter is true iff the request's new state channel should be notified -- -// it is set to false when the call to tryActiveWait is happening due to an -// event for a different request or transaction (like a lock release) since in -// that case the channel is notified first and the call to tryActiveWait() -// happens later in lockTableGuard.CurState(). +// tryActiveWait decides whether the request g, with locking strength str, +// should actively wait at this lock or not. It adjusts the data-structures +// appropriately if the request needs to wait. The notify parameter is true iff +// the request's new state channel should be notified -- it is set to false when +// the call to tryActiveWait is happening due to an event for a different +// request or transaction (like a lock release) since in that case the channel +// is notified first and the call to tryActiveWait() happens later in +// lockTableGuard.CurState(). // // It uses the finalizedTxnCache to decide that the caller does not need to // wait on a lock of a transaction that is already finalized. @@ -1559,11 +1561,17 @@ func (l *lockState) clearLockHolder() { // The return value is true iff it is actively waiting. // Acquires l.mu, g.mu. func (l *lockState) tryActiveWait( - g *lockTableGuardImpl, sa spanset.SpanAccess, notify bool, clock *hlc.Clock, + g *lockTableGuardImpl, str lock.Strength, notify bool, clock *hlc.Clock, ) (wait bool, transitionedToFree bool) { l.mu.Lock() defer l.mu.Unlock() + switch str { + case lock.None, lock.Intent: + default: + panic(errors.AssertionFailedf("unexpected lock strength %s", str)) + } + // It is possible that this lock is empty and has not yet been deleted. if l.isEmptyLock() { return false, false @@ -1596,32 +1604,52 @@ func (l *lockState) tryActiveWait( } } - if sa == spanset.SpanReadOnly { + if str == lock.None { if lockHolderTxn == nil { - // Reads only care about locker, not a reservation. + // Non locking reads only care about locks, not reservations. return false, false } // Locked by some other txn. + // TODO(arul): this will need to change once we start supporting different + // lock strengths. if g.ts.Less(lockHolderTS) { return false, false } g.mu.Lock() - _, alsoHasStrongerAccess := g.mu.locks[l] + _, alsoLocksWithHigherStrength := g.mu.locks[l] g.mu.Unlock() // If the request already has this lock in its locks map, it must also be - // writing to this key and must be either a reservation holder or inactive - // waiter at this lock. The former has already been handled above. For the - // latter, it must have had its reservation broken. Since this is a weaker - // access we defer to the stronger access and don't wait here. + // acquiring this lock at a higher strength. It must either be a reservation + // holder or an inactive waiter at this lock. The former has already been + // handled above. For the latter to be possible, the request must have had + // its reservation broken. Since this is a weaker lock strength, we defer to + // the stronger lock strength and continuing with our scan. + // + // NB: If we were not defer to the stronger lock strength and start waiting + // here, we would end up doing so in the wrong wait queue (queuedReaders vs. + // queuedWriters). + // + // TODO(arul): the queued{Readers,Writers} names are going to change, as + // described in the Shared locks RFC. Reword this comment when that happens. + // + // Non-transactional requests cannot make reservations or acquire locks. + // They can only perform reads or writes, which means they can only have + // lock spans with strength {None,Intent}. However, because they cannot make + // reservations, we can not detect a key is being accessed with both None + // and Intent locking strengths, like we can for transactional requests. In + // some rare cases, the lock is now held at a timestamp that is not + // compatible with this request and it will wait here -- there's no + // correctness issue in doing so. // - // For non-transactional requests that have the key specified as both - // SpanReadOnly and SpanReadWrite, the request never acquires a - // reservation, so using the locks map to detect this duplication of the - // key is not possible. In the rare case, the lock is now held at a - // timestamp that is not compatible with this request and it will wait - // here -- there is no correctness issue with doing that. - if alsoHasStrongerAccess { + // TODO(arul): It seems like the above paragraph is implying a writing + // non-transactional request will end up waiting in the same queue as + // non-locking readers, and that's fine. We should revisit if we want to + // store non-transactional writers with other locking requests, as described + // in the shared locks RFC -- non-transactional requests race with readers + // and reservation holders anyway, so I'm not entirely sure what we get by + // storing them in the same queue as locking requests. + if alsoLocksWithHigherStrength { return false, false } } @@ -1631,7 +1659,7 @@ func (l *lockState) tryActiveWait( key: l.key, queuedWriters: l.queuedWriters.Len(), queuedReaders: l.waitingReaders.Len(), - guardAccess: sa, + guardStrength: str, } if lockHolderTxn != nil { waitForState.txn = lockHolderTxn @@ -1655,7 +1683,7 @@ func (l *lockState) tryActiveWait( // Incompatible with whoever is holding lock or reservation. - if l.reservation != nil && sa == spanset.SpanReadWrite && l.tryBreakReservation(g.seqNum) { + if l.reservation != nil && str == lock.Intent && l.tryBreakReservation(g.seqNum) { l.reservation = g g.mu.Lock() g.mu.locks[l] = struct{}{} @@ -1671,7 +1699,7 @@ func (l *lockState) tryActiveWait( wait = true g.mu.Lock() defer g.mu.Unlock() - if sa == spanset.SpanReadWrite { + if str == lock.Intent { var qg *queuedGuard if _, inQueue := g.mu.locks[l]; inQueue { // Already in queue and must be in the right position, so mark as active @@ -1775,7 +1803,7 @@ func (l *lockState) tryActiveWait( return true, false } -func (l *lockState) isNonConflictingLock(g *lockTableGuardImpl, sa spanset.SpanAccess) bool { +func (l *lockState) isNonConflictingLock(g *lockTableGuardImpl, str lock.Strength) bool { l.mu.Lock() defer l.mu.Unlock() @@ -1807,7 +1835,7 @@ func (l *lockState) isNonConflictingLock(g *lockTableGuardImpl, sa spanset.SpanA // path. A conflict with a finalized txn will be noticed when retrying // pessimistically. - if sa == spanset.SpanReadOnly && g.ts.Less(lockHolderTS) { + if str == lock.None && g.ts.Less(lockHolderTS) { return true } // Conflicts. @@ -1951,13 +1979,13 @@ func (l *lockState) acquireLock( } // A replicated lock held by txn with timestamp ts was discovered by guard g -// where g is trying to access this key with access sa. +// where g is trying to access this key with strength accessStrength. // Acquires l.mu. func (l *lockState) discoveredLock( txn *enginepb.TxnMeta, ts hlc.Timestamp, g *lockTableGuardImpl, - sa spanset.SpanAccess, + accessStrength lock.Strength, notRemovable bool, clock *hlc.Clock, ) error { @@ -1998,8 +2026,8 @@ func (l *lockState) discoveredLock( l.reservation = nil } - switch sa { - case spanset.SpanReadOnly: + switch accessStrength { + case lock.None: // Don't enter the lock's queuedReaders list, because all queued readers // are expected to be active. Instead, wait until the next scan. @@ -2011,7 +2039,7 @@ func (l *lockState) discoveredLock( return errors.AssertionFailedf("discovered non-conflicting lock") } - case spanset.SpanReadWrite: + case lock.Intent: // Immediately enter the lock's queuedWriters list. // NB: this inactive waiter can be non-transactional. g.mu.Lock() @@ -2042,6 +2070,8 @@ func (l *lockState) discoveredLock( l.queuedWriters.InsertBefore(qg, e) } } + default: + panic(errors.AssertionFailedf("unhandled lock strength %s", accessStrength)) } // If there are waiting requests from the same txn, they no longer need to wait. @@ -2078,11 +2108,10 @@ func (l *lockState) tryClearLock(force bool) bool { // Note that none of the current waiters can be requests from lockHolderTxn, // so they will never be told to waitElsewhere on themselves. waitState = waitingState{ - kind: waitElsewhere, - txn: lockHolderTxn, - key: l.key, - held: true, - guardAccess: spanset.SpanReadOnly, + kind: waitElsewhere, + txn: lockHolderTxn, + key: l.key, + held: true, } } else { // !replicatedHeld || force. Both are handled as doneWaiting since the @@ -2103,6 +2132,8 @@ func (l *lockState) tryClearLock(force bool) bool { // Clear waitingReaders. for e := l.waitingReaders.Front(); e != nil; { g := e.Value.(*lockTableGuardImpl) + waitState.guardStrength = g.str + curr := e e = e.Next() l.waitingReaders.Remove(curr) @@ -2115,9 +2146,10 @@ func (l *lockState) tryClearLock(force bool) bool { } // Clear queuedWriters. - waitState.guardAccess = spanset.SpanReadWrite for e := l.queuedWriters.Front(); e != nil; { qg := e.Value.(*queuedGuard) + waitState.guardStrength = qg.guard.str + curr := e e = e.Next() l.queuedWriters.Remove(curr) @@ -2492,8 +2524,7 @@ func (t *lockTableImpl) ScanAndEnqueue(req Request, guard lockTableGuard) lockTa } else { g = guard.(*lockTableGuardImpl) g.key = nil - g.sa = spanset.NumSpanAccess - 1 - g.ss = spanset.SpanScope(0) + g.str = lock.MaxStrength g.index = -1 g.mu.Lock() g.mu.startWait = false @@ -2530,7 +2561,7 @@ func (t *lockTableImpl) newGuardForReq(req Request) *lockTableGuardImpl { g.spans = req.LockSpans g.waitPolicy = req.WaitPolicy g.maxWaitQueueLength = req.MaxLockWaitQueueLength - g.sa = spanset.NumSpanAccess - 1 + g.str = lock.MaxStrength g.index = -1 return g } @@ -2622,7 +2653,7 @@ func (t *lockTableImpl) AddDiscoveredLock( } g := guard.(*lockTableGuardImpl) key := intent.Key - sa, _, err := findAccessInSpans(key, g.spans) + str, err := findHighestLockStrengthInSpans(key, g.spans) if err != nil { return false, err } @@ -2659,7 +2690,7 @@ func (t *lockTableImpl) AddDiscoveredLock( g.notRemovableLock = l notRemovableLock = true } - err = l.discoveredLock(&intent.Txn, intent.Txn.WriteTimestamp, g, sa, notRemovableLock, g.lt.clock) + err = l.discoveredLock(&intent.Txn, intent.Txn.WriteTimestamp, g, str, notRemovableLock, g.lt.clock) // Can't release tree.mu until call l.discoveredLock() since someone may // find an empty lock and remove it from the tree. t.locks.mu.Unlock() @@ -2784,27 +2815,24 @@ func (t *lockTableImpl) tryClearLocks(force bool, numToClear int) { } } -// Given the key must be in spans, returns the strongest access -// specified in the spans, along with the scope of the key. -func findAccessInSpans( - key roachpb.Key, spans *spanset.SpanSet, -) (spanset.SpanAccess, spanset.SpanScope, error) { - ss := spanset.SpanGlobal - if keys.IsLocal(key) { - ss = spanset.SpanLocal - } - for sa := spanset.NumSpanAccess - 1; sa >= 0; sa-- { - s := spans.GetSpans(sa, ss) +// findHighestLockStrengthInSpans returns the highest lock strength specified +// for the given key in the supplied spans. It is expected for the key to be +// present in the spans; an assertion failed error is returned otherwise. +func findHighestLockStrengthInSpans( + key roachpb.Key, spans *lockspanset.LockSpanSet, +) (lock.Strength, error) { + for str := lock.MaxStrength; str >= 0; str-- { + s := spans.GetSpans(str) // First span that starts after key i := sort.Search(len(s), func(i int) bool { return key.Compare(s[i].Key) < 0 }) if i > 0 && ((len(s[i-1].EndKey) > 0 && key.Compare(s[i-1].EndKey) < 0) || key.Equal(s[i-1].Key)) { - return sa, ss, nil + return str, nil } } - return 0, 0, errors.AssertionFailedf("could not find access in spans") + return 0, errors.AssertionFailedf("could not find access in spans") } // Tries to GC locks that were previously known to have become empty. @@ -2878,20 +2906,18 @@ func (t *lockTableImpl) updateLockInternal(up *roachpb.LockUpdate) (heldByTxn bo // Iteration helper for findNextLockAfter. Returns the next span to search // over, or nil if the iteration is done. // REQUIRES: g.mu is locked. -func stepToNextSpan(g *lockTableGuardImpl) *spanset.Span { +func stepToNextSpan(g *lockTableGuardImpl) *roachpb.Span { g.index++ - for ; g.ss < spanset.NumSpanScope; g.ss++ { - for ; g.sa >= 0; g.sa-- { - spans := g.spans.GetSpans(g.sa, g.ss) - if g.index < len(spans) { - span := &spans[g.index] - g.key = span.Key - return span - } - g.index = 0 + for ; g.str >= 0; g.str-- { + spans := g.spans.GetSpans(g.str) + if g.index < len(spans) { + span := &spans[g.index] + g.key = span.Key + return span } - g.sa = spanset.NumSpanAccess - 1 + g.index = 0 } + g.str = lock.MaxStrength return nil } diff --git a/pkg/kv/kvserver/concurrency/lock_table_test.go b/pkg/kv/kvserver/concurrency/lock_table_test.go index d7ad7826cc34..a76a7a9b05f4 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_test.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanlatch" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -305,13 +306,13 @@ func TestLockTableBasic(t *testing.T) { if d.HasArg("max-lock-wait-queue-length") { d.ScanArgs(t, "max-lock-wait-queue-length", &maxLockWaitQueueLength) } - spans := scanSpans(t, d, ts) + latchSpans, lockSpans := scanSpans(t, d, ts) req := Request{ Timestamp: ts, WaitPolicy: waitPolicy, MaxLockWaitQueueLength: maxLockWaitQueueLength, - LatchSpans: spans, - LockSpans: spans, + LatchSpans: latchSpans, + LockSpans: lockSpans, } if txnMeta != nil { // Update the transaction's timestamp, if necessary. The transaction @@ -485,8 +486,8 @@ func TestLockTableBasic(t *testing.T) { if g == nil { d.Fatalf(t, "unknown guard: %s", reqName) } - spans := scanSpans(t, d, req.Timestamp) - return fmt.Sprintf("no-conflicts: %t", g.CheckOptimisticNoConflicts(spans)) + _, lockSpans := scanSpans(t, d, req.Timestamp) + return fmt.Sprintf("no-conflicts: %t", g.CheckOptimisticNoConflicts(lockSpans)) case "is-key-locked-by-conflicting-txn": var reqName string @@ -577,8 +578,21 @@ func TestLockTableBasic(t *testing.T) { if txnS == "" { txnS = fmt.Sprintf("unknown txn with ID: %v", state.txn.ID) } + // TODO(arul): We're translating the lock strength back to guardAccess + // for now to reduce test churn. A followup patch should teach these + // datadriven tests to use lock.Strength correctly -- both in its input + // and output. + var sa spanset.SpanAccess + switch state.guardStrength { + case lock.None: + sa = spanset.SpanReadOnly + case lock.Intent: + sa = spanset.SpanReadWrite + default: + t.Fatalf("unexpected guard strength %s", state.guardStrength) + } return fmt.Sprintf("%sstate=%s txn=%s key=%s held=%t guard-access=%s", - str, typeStr, txnS, state.key, state.held, state.guardAccess) + str, typeStr, txnS, state.key, state.held, sa) case "resolve-before-scanning": var reqName string @@ -689,8 +703,11 @@ func getSpan(t *testing.T, d *datadriven.TestData, str string) roachpb.Span { return span } -func scanSpans(t *testing.T, d *datadriven.TestData, ts hlc.Timestamp) *spanset.SpanSet { - spans := &spanset.SpanSet{} +func scanSpans( + t *testing.T, d *datadriven.TestData, ts hlc.Timestamp, +) (*spanset.SpanSet, *lockspanset.LockSpanSet) { + latchSpans := &spanset.SpanSet{} + lockSpans := &lockspanset.LockSpanSet{} var spansStr string d.ScanArgs(t, "spans", &spansStr) parts := strings.Split(spansStr, "+") @@ -701,17 +718,22 @@ func scanSpans(t *testing.T, d *datadriven.TestData, ts hlc.Timestamp) *spanset. c := p[0] p = p[2:] var sa spanset.SpanAccess + var str lock.Strength + // TODO(arul): Switch the datadriven input to use lock strengths instead. switch c { case 'r': sa = spanset.SpanReadOnly + str = lock.None case 'w': sa = spanset.SpanReadWrite + str = lock.Intent default: d.Fatalf(t, "incorrect span access: %c", c) } - spans.AddMVCC(sa, getSpan(t, d, p), ts) + latchSpans.AddMVCC(sa, getSpan(t, d, p), ts) + lockSpans.Add(str, getSpan(t, d, p)) } - return spans + return latchSpans, lockSpans } func ScanLockStrength(t *testing.T, d *datadriven.TestData) lock.Strength { @@ -758,16 +780,18 @@ func TestLockTableMaxLocks(t *testing.T) { // 10 requests, each with 10 discovered locks. Only 1 will be considered // notRemovable per request. for i := 0; i < 10; i++ { - spans := &spanset.SpanSet{} + latchSpans := &spanset.SpanSet{} + lockSpans := &lockspanset.LockSpanSet{} for j := 0; j < 20; j++ { k := roachpb.Key(fmt.Sprintf("%08d", i*20+j)) keys = append(keys, k) - spans.AddMVCC(spanset.SpanReadWrite, roachpb.Span{Key: k}, hlc.Timestamp{WallTime: 1}) + latchSpans.AddMVCC(spanset.SpanReadWrite, roachpb.Span{Key: k}, hlc.Timestamp{WallTime: 1}) + lockSpans.Add(lock.Intent, roachpb.Span{Key: k}) } req := Request{ Timestamp: hlc.Timestamp{WallTime: 1}, - LatchSpans: spans, - LockSpans: spans, + LatchSpans: latchSpans, + LockSpans: lockSpans, } reqs = append(reqs, req) ltg := lt.ScanAndEnqueue(req, nil) @@ -883,16 +907,18 @@ func TestLockTableMaxLocksWithMultipleNotRemovableRefs(t *testing.T) { var guards []lockTableGuard // 10 requests. Every pair of requests have the same span. for i := 0; i < 10; i++ { - spans := &spanset.SpanSet{} + latchSpans := &spanset.SpanSet{} + lockSpans := &lockspanset.LockSpanSet{} key := roachpb.Key(fmt.Sprintf("%08d", i/2)) if i%2 == 0 { keys = append(keys, key) } - spans.AddMVCC(spanset.SpanReadWrite, roachpb.Span{Key: key}, hlc.Timestamp{WallTime: 1}) + latchSpans.AddMVCC(spanset.SpanReadWrite, roachpb.Span{Key: key}, hlc.Timestamp{WallTime: 1}) + lockSpans.Add(lock.Intent, roachpb.Span{Key: key}) req := Request{ Timestamp: hlc.Timestamp{WallTime: 1}, - LatchSpans: spans, - LockSpans: spans, + LatchSpans: latchSpans, + LockSpans: lockSpans, } ltg := lt.ScanAndEnqueue(req, nil) require.Nil(t, ltg.ResolveBeforeScanning()) @@ -1346,11 +1372,17 @@ func TestLockTableConcurrentSingleRequests(t *testing.T) { for i := 0; i < numRequests; i++ { ts := timestamps[rng.Intn(len(timestamps))] keysPerm := rng.Perm(len(keys)) - spans := &spanset.SpanSet{} + latchSpans := &spanset.SpanSet{} + lockSpans := &lockspanset.LockSpanSet{} for i := 0; i < numKeys; i++ { span := roachpb.Span{Key: keys[keysPerm[i]]} acc := spanset.SpanAccess(rng.Intn(int(spanset.NumSpanAccess))) - spans.AddMVCC(acc, span, ts) + str := lock.None + if acc == spanset.SpanReadWrite { + str = lock.Intent + } + latchSpans.AddMVCC(acc, span, ts) + lockSpans.Add(str, span) } var txn *roachpb.Transaction if rng.Intn(2) == 0 { @@ -1365,8 +1397,8 @@ func TestLockTableConcurrentSingleRequests(t *testing.T) { request := &Request{ Txn: txn, Timestamp: ts, - LatchSpans: spans, - LockSpans: spans, + LatchSpans: latchSpans, + LockSpans: lockSpans, } items = append(items, workloadItem{request: request}) if txn != nil { @@ -1439,13 +1471,14 @@ func TestLockTableConcurrentRequests(t *testing.T) { ts = timestamps[rng.Intn(len(timestamps))] } keysPerm := rng.Perm(len(keys)) - spans := &spanset.SpanSet{} + latchSpans := &spanset.SpanSet{} + lockSpans := &lockspanset.LockSpanSet{} onlyReads := txnMeta == nil && rng.Intn(2) != 0 numKeys := rng.Intn(len(keys)-1) + 1 request := &Request{ Timestamp: ts, - LatchSpans: spans, - LockSpans: spans, + LatchSpans: latchSpans, + LockSpans: lockSpans, } if txnMeta != nil { request.Txn = &roachpb.Transaction{ @@ -1457,21 +1490,26 @@ func TestLockTableConcurrentRequests(t *testing.T) { for i := 0; i < numKeys; i++ { span := roachpb.Span{Key: keys[keysPerm[i]]} acc := spanset.SpanReadOnly + str := lock.None dupRead := false if !onlyReads { acc = spanset.SpanAccess(rng.Intn(int(spanset.NumSpanAccess))) if acc == spanset.SpanReadWrite && txnMeta != nil && rng.Intn(2) == 0 { // Acquire lock. wi.locksToAcquire = append(wi.locksToAcquire, span.Key) + str = lock.Intent } if acc == spanset.SpanReadWrite && rng.Intn(2) == 0 { // Also include the key as read. dupRead = true + str = lock.Intent } } - spans.AddMVCC(acc, span, ts) + latchSpans.AddMVCC(acc, span, ts) + lockSpans.Add(str, span) if dupRead { - spans.AddMVCC(spanset.SpanReadOnly, span, ts) + latchSpans.AddMVCC(spanset.SpanReadOnly, span, ts) + lockSpans.Add(lock.None, span) } } items = append(items, wi) @@ -1579,20 +1617,24 @@ func doBenchWork(item *benchWorkItem, env benchEnv, doneCh chan<- error) { // keys will be locked. func createRequests(index int, numOutstanding int, numKeys int, numReadKeys int) []benchWorkItem { ts := hlc.Timestamp{WallTime: 10} - spans := &spanset.SpanSet{} + latchSpans := &spanset.SpanSet{} + lockSpans := &lockspanset.LockSpanSet{} wi := benchWorkItem{ Request: Request{ Timestamp: ts, - LatchSpans: spans, - LockSpans: spans, + LatchSpans: latchSpans, + LockSpans: lockSpans, }, } for i := 0; i < numKeys; i++ { key := roachpb.Key(fmt.Sprintf("k%d.%d", index, i)) + span := roachpb.Span{Key: key} if i <= numReadKeys { - spans.AddMVCC(spanset.SpanReadOnly, roachpb.Span{Key: key}, ts) + latchSpans.AddMVCC(spanset.SpanReadOnly, span, ts) + lockSpans.Add(lock.None, span) } else { - spans.AddMVCC(spanset.SpanReadWrite, roachpb.Span{Key: key}, ts) + latchSpans.AddMVCC(spanset.SpanReadWrite, span, ts) + lockSpans.Add(lock.Intent, span) wi.locksToAcquire = append(wi.locksToAcquire, key) } } diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter.go b/pkg/kv/kvserver/concurrency/lock_table_waiter.go index 7d1de8cc2f45..1ff59bb79c87 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter.go @@ -18,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" @@ -476,8 +475,8 @@ func (w *lockTableWaiterImpl) pushLockTxn( // under the lock. For write-write conflicts, try to abort the lock // holder entirely so the write request can revoke and replace the lock // with its own lock. - switch ws.guardAccess { - case spanset.SpanReadOnly: + switch ws.guardStrength { + case lock.None: pushType = kvpb.PUSH_TIMESTAMP beforePushObs = roachpb.ObservedTimestamp{ NodeID: w.nodeDesc.NodeID, @@ -499,9 +498,11 @@ func (w *lockTableWaiterImpl) pushLockTxn( // round-trip and would lose the local timestamp if rewritten later. log.VEventf(ctx, 2, "pushing timestamp of txn %s above %s", ws.txn.ID.Short(), h.Timestamp) - case spanset.SpanReadWrite: + case lock.Intent: pushType = kvpb.PUSH_ABORT log.VEventf(ctx, 2, "pushing txn %s to abort", ws.txn.ID.Short()) + default: + log.Fatalf(ctx, "unhandled lock strength %s", ws.guardStrength) } case lock.WaitPolicy_Error: diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go index dc937f41a9c2..a58231f5c036 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go @@ -21,7 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -84,7 +84,7 @@ func (g *mockLockTableGuard) CurState() waitingState { func (g *mockLockTableGuard) ResolveBeforeScanning() []roachpb.LockUpdate { return g.toResolve } -func (g *mockLockTableGuard) CheckOptimisticNoConflicts(*spanset.SpanSet) (ok bool) { +func (g *mockLockTableGuard) CheckOptimisticNoConflicts(*lockspanset.LockSpanSet) (ok bool) { return true } func (g *mockLockTableGuard) IsKeyLockedByConflictingTxn( @@ -318,14 +318,14 @@ func testWaitPush(t *testing.T, k waitKind, makeReq func() Request, expPushTS hl req := makeReq() g.state = waitingState{ - kind: k, - txn: &pusheeTxn.TxnMeta, - key: keyA, - held: lockHeld, - guardAccess: spanset.SpanReadOnly, + kind: k, + txn: &pusheeTxn.TxnMeta, + key: keyA, + held: lockHeld, + guardStrength: lock.None, } if waitAsWrite { - g.state.guardAccess = spanset.SpanReadWrite + g.state.guardStrength = lock.Intent } g.notify() @@ -507,11 +507,11 @@ func testErrorWaitPush( req := makeReq() g.state = waitingState{ - kind: k, - txn: &pusheeTxn.TxnMeta, - key: keyA, - held: lockHeld, - guardAccess: spanset.SpanReadOnly, + kind: k, + txn: &pusheeTxn.TxnMeta, + key: keyA, + held: lockHeld, + guardStrength: lock.None, } g.notify() @@ -665,11 +665,11 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) { req := makeReq() g.state = waitingState{ - kind: k, - txn: &pusheeTxn.TxnMeta, - key: keyA, - held: lockHeld, - guardAccess: spanset.SpanReadWrite, + kind: k, + txn: &pusheeTxn.TxnMeta, + key: keyA, + held: lockHeld, + guardStrength: lock.Intent, } g.notify() @@ -797,11 +797,11 @@ func TestLockTableWaiterIntentResolverError(t *testing.T) { pusheeTxn := makeTxnProto("pushee") lockHeld := sync g.state = waitingState{ - kind: waitForDistinguished, - txn: &pusheeTxn.TxnMeta, - key: keyA, - held: lockHeld, - guardAccess: spanset.SpanReadWrite, + kind: waitForDistinguished, + txn: &pusheeTxn.TxnMeta, + key: keyA, + held: lockHeld, + guardStrength: lock.Intent, } // Errors are propagated when observed while pushing transactions. @@ -852,8 +852,8 @@ func TestLockTableWaiterDeferredIntentResolverError(t *testing.T) { pusheeTxn.Status = roachpb.ABORTED g.state = waitingState{ - kind: doneWaiting, - guardAccess: spanset.SpanReadWrite, + kind: doneWaiting, + guardStrength: lock.Intent, } g.toResolve = []roachpb.LockUpdate{ roachpb.MakeLockUpdate(&pusheeTxn, roachpb.Span{Key: keyA}), diff --git a/pkg/kv/kvserver/lockspanset/BUILD.bazel b/pkg/kv/kvserver/lockspanset/BUILD.bazel index b9d70f4e2f48..ac440d2ddd81 100644 --- a/pkg/kv/kvserver/lockspanset/BUILD.bazel +++ b/pkg/kv/kvserver/lockspanset/BUILD.bazel @@ -9,6 +9,7 @@ go_library( deps = [ "//pkg/kv/kvserver/concurrency/lock", "//pkg/roachpb", + "@com_github_cockroachdb_errors//:errors", ], ) @@ -18,7 +19,6 @@ go_test( args = ["-test.timeout=295s"], embed = [":lockspanset"], deps = [ - "//pkg/keys", "//pkg/kv/kvserver/concurrency/lock", "//pkg/roachpb", "//pkg/util/leaktest", diff --git a/pkg/kv/kvserver/lockspanset/lockspanset.go b/pkg/kv/kvserver/lockspanset/lockspanset.go index 5f09b4970d1a..dcc75d65334a 100644 --- a/pkg/kv/kvserver/lockspanset/lockspanset.go +++ b/pkg/kv/kvserver/lockspanset/lockspanset.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/errors" ) type LockSpanSet struct { @@ -102,3 +103,26 @@ func (l *LockSpanSet) Copy() *LockSpanSet { } return n } + +// Reserve space for N additional spans. +func (l *LockSpanSet) Reserve(str lock.Strength, n int) { + existing := l.spans[str] + if n <= cap(existing)-len(existing) { + return + } + l.spans[str] = make([]roachpb.Span, len(existing), n+len(existing)) + copy(l.spans[str], existing) +} + +// Validate returns an error if any spans that have been added to the set are +// invalid. +func (l *LockSpanSet) Validate() error { + for _, spans := range l.spans { + for _, span := range spans { + if !span.Valid() { + return errors.Errorf("invalid span %s %s", span.Key, span.EndKey) + } + } + } + return nil +} diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 04b86024d0b4..999b49da1772 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -522,7 +523,7 @@ func (r *Replica) handleReadOnlyLocalEvalResult( // and uses that to compute the latch and lock spans. func (r *Replica) collectSpansRead( ba *kvpb.BatchRequest, br *kvpb.BatchResponse, -) (latchSpans, lockSpans *spanset.SpanSet, _ error) { +) (latchSpans *spanset.SpanSet, lockSpans *lockspanset.LockSpanSet, _ error) { baCopy := *ba baCopy.Requests = make([]kvpb.RequestUnion, 0, len(ba.Requests)) for i := 0; i < len(ba.Requests); i++ { diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index e1d950ba3223..764199860c75 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" @@ -410,7 +411,8 @@ func (r *Replica) executeBatchWithConcurrencyRetries( ctx context.Context, ba *kvpb.BatchRequest, fn batchExecutionFn, ) (br *kvpb.BatchResponse, writeBytes *kvadmission.StoreWriteBytes, pErr *kvpb.Error) { // Try to execute command; exit retry loop on success. - var latchSpans, lockSpans *spanset.SpanSet + var latchSpans *spanset.SpanSet + var lockSpans *lockspanset.LockSpanSet var requestEvalKind concurrency.RequestEvalKind var g *concurrency.Guard defer func() { @@ -1086,8 +1088,13 @@ func (r *Replica) checkBatchRequest(ba *kvpb.BatchRequest, isReadOnly bool) erro func (r *Replica) collectSpans( ba *kvpb.BatchRequest, -) (latchSpans, lockSpans *spanset.SpanSet, requestEvalKind concurrency.RequestEvalKind, _ error) { - latchSpans, lockSpans = spanset.New(), spanset.New() +) ( + latchSpans *spanset.SpanSet, + lockSpans *lockspanset.LockSpanSet, + requestEvalKind concurrency.RequestEvalKind, + _ error, +) { + latchSpans, lockSpans = spanset.New(), lockspanset.New() r.mu.RLock() desc := r.descRLocked() liveCount := r.mu.state.Stats.LiveCount @@ -1109,10 +1116,11 @@ func (r *Replica) collectSpans( latchGuess += len(et.(*kvpb.EndTxnRequest).LockSpans) - 1 } latchSpans.Reserve(spanset.SpanReadWrite, spanset.SpanGlobal, latchGuess) - lockSpans.Reserve(spanset.SpanReadWrite, spanset.SpanGlobal, len(ba.Requests)) + // TODO(arul): Use the correct locking strength here. + lockSpans.Reserve(lock.Intent, len(ba.Requests)) } else { latchSpans.Reserve(spanset.SpanReadOnly, spanset.SpanGlobal, len(ba.Requests)) - lockSpans.Reserve(spanset.SpanReadOnly, spanset.SpanGlobal, len(ba.Requests)) + lockSpans.Reserve(lock.None, len(ba.Requests)) } // Note that we are letting locking readers be considered for optimistic @@ -1176,15 +1184,17 @@ func (r *Replica) collectSpans( } // Commands may create a large number of duplicate spans. De-duplicate - // them to reduce the number of spans we pass to the spanlatch manager. - for _, s := range [...]*spanset.SpanSet{latchSpans, lockSpans} { - s.SortAndDedup() - - // If any command gave us spans that are invalid, bail out early - // (before passing them to the spanlatch manager, which may panic). - if err := s.Validate(); err != nil { - return nil, nil, concurrency.PessimisticEval, err - } + // them to reduce the number of spans we pass to the {spanlatch,Lock}Manager. + latchSpans.SortAndDedup() + lockSpans.SortAndDeDup() + + // If any command gave us spans that are invalid, bail out early + // (before passing them to the {spanlatch,Lock}Manager, which may panic). + if err := latchSpans.Validate(); err != nil { + return nil, nil, concurrency.PessimisticEval, err + } + if err := lockSpans.Validate(); err != nil { + return nil, nil, concurrency.PessimisticEval, err } optEvalForLimit := false diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index e62f22e7bc06..e35c927b1955 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" @@ -101,7 +102,7 @@ func allSpansGuard() *concurrency.Guard { return &concurrency.Guard{ Req: concurrency.Request{ LatchSpans: allSpans(), - LockSpans: spanset.New(), + LockSpans: lockspanset.New(), }, } } From fea3420d2992bd53be98e69386616651ddcb80a2 Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Mon, 1 May 2023 19:43:33 -0400 Subject: [PATCH 3/4] concurrency: change how waitingState is updated Previously, whenever we were updating `waitingState` for a request, we would explicitly have to assign `guardStrength` at each of the callsites. The `guardStrength` on the `waitingState` represents the strength with which the request was trying to scan the lock table when it detected a conflict. This information is already present on the `lockTableGuardImpl`, so expecting callers to correctly copy it over is both redundant and possibly error-prone. To note is that `guardStrength` does not make sense when a request is done waiting, as there is no conflict to speak of. To that end, we split out updates to the `waitingState` into 2 kinds -- one for when a request is done waiting and another for when it is still waiting. We copy over the correct guardStrength in case of the latter. Release note: None --- pkg/kv/kvserver/concurrency/lock_table.go | 75 +++++++++++++---------- 1 file changed, 44 insertions(+), 31 deletions(-) diff --git a/pkg/kv/kvserver/concurrency/lock_table.go b/pkg/kv/kvserver/concurrency/lock_table.go index 81033d625109..c2c259abf6bb 100644 --- a/pkg/kv/kvserver/concurrency/lock_table.go +++ b/pkg/kv/kvserver/concurrency/lock_table.go @@ -530,7 +530,23 @@ func (g *lockTableGuardImpl) CurState() waitingState { return g.mu.state } -func (g *lockTableGuardImpl) updateStateLocked(newState waitingState) { +// updateStateToDoneWaitingLocked updates the request's waiting state to +// indicate that it is done waiting. +// REQUIRES: g.mu to be locked. +func (g *lockTableGuardImpl) updateStateToDoneWaitingLocked() { + g.mu.state = waitingState{kind: doneWaiting} +} + +// updateWaitingStateLocked updates the request's waiting state to indicate +// to the one supplied. The supplied waiting state must imply the request is +// still waiting. Typically, this function is called for the first time when +// the request discovers a conflict while scanning the lock table. +// REQUIRES: g.mu to be locked. +func (g *lockTableGuardImpl) updateWaitingStateLocked(newState waitingState) { + if newState.kind == doneWaiting { + panic(errors.AssertionFailedf("unexpected waiting state kind: %d", newState.kind)) + } + newState.guardStrength = g.str // copy over the strength which caused the conflict g.mu.state = newState } @@ -717,7 +733,7 @@ func (g *lockTableGuardImpl) findNextLockAfter(notify bool) { } g.mu.Lock() defer g.mu.Unlock() - g.updateStateLocked(waitingState{kind: doneWaiting}) + g.updateStateToDoneWaitingLocked() // We are doneWaiting but may have some locks to resolve. There are // two cases: // - notify=false: the caller was already waiting and will look at this list @@ -1276,7 +1292,6 @@ func (l *lockState) informActiveWaiters() { } for e := l.waitingReaders.Front(); e != nil; e = e.Next() { - state := waitForState // Since there are waiting readers we could not have transitioned out of // or into a state with a reservation, since readers do not wait for // reservations. @@ -1285,9 +1300,8 @@ func (l *lockState) informActiveWaiters() { l.distinguishedWaiter = g findDistinguished = false } - state.guardStrength = g.str g.mu.Lock() - g.updateStateLocked(state) + g.updateWaitingStateLocked(waitForState) if l.distinguishedWaiter == g { g.mu.state.kind = waitForDistinguished } @@ -1304,7 +1318,6 @@ func (l *lockState) informActiveWaiters() { if g.isSameTxnAsReservation(state) { state.kind = waitSelf } else { - state.guardStrength = qg.guard.str if findDistinguished { l.distinguishedWaiter = g findDistinguished = false @@ -1314,7 +1327,7 @@ func (l *lockState) informActiveWaiters() { } } g.mu.Lock() - g.updateStateLocked(state) + g.updateWaitingStateLocked(state) g.notify() g.mu.Unlock() } @@ -1659,7 +1672,6 @@ func (l *lockState) tryActiveWait( key: l.key, queuedWriters: l.queuedWriters.Len(), queuedReaders: l.waitingReaders.Len(), - guardStrength: str, } if lockHolderTxn != nil { waitForState.txn = lockHolderTxn @@ -1735,7 +1747,7 @@ func (l *lockState) tryActiveWait( g.mu.startWait = true state := waitForState state.kind = waitQueueMaxLengthExceeded - g.updateStateLocked(state) + g.updateWaitingStateLocked(state) if notify { g.notify() } @@ -1788,14 +1800,14 @@ func (l *lockState) tryActiveWait( if g.isSameTxnAsReservation(waitForState) { state := waitForState state.kind = waitSelf - g.updateStateLocked(state) + g.updateWaitingStateLocked(state) } else { state := waitForState if l.distinguishedWaiter == nil { l.distinguishedWaiter = g state.kind = waitForDistinguished } - g.updateStateLocked(state) + g.updateWaitingStateLocked(state) } if notify { g.notify() @@ -2099,24 +2111,27 @@ func (l *lockState) tryClearLock(force bool) bool { return false } - // Clear lock holder. While doing so, determine which waitingState to - // transition waiters to. - var waitState waitingState + // Clear lock holder. While doing so, construct the closure used to transition + // waiters. + lockHolderTxn, _ := l.getLockHolder() // only needed if this is a replicated lock replicatedHeld := l.holder.locked && l.holder.holder[lock.Replicated].txn != nil - if replicatedHeld && !force { - lockHolderTxn, _ := l.getLockHolder() - // Note that none of the current waiters can be requests from lockHolderTxn, - // so they will never be told to waitElsewhere on themselves. - waitState = waitingState{ - kind: waitElsewhere, - txn: lockHolderTxn, - key: l.key, - held: true, + transitionWaiter := func(g *lockTableGuardImpl) { + if replicatedHeld && !force { + // Note that none of the current waiters can be requests from + // lockHolderTxn, so they will never be told to waitElsewhere on + // themselves. + waitState := waitingState{ + kind: waitElsewhere, + txn: lockHolderTxn, + key: l.key, + held: true, + } + g.updateWaitingStateLocked(waitState) + } else { + // !replicatedHeld || force. Both are handled as doneWaiting since the + // system is no longer tracking the lock that was possibly held. + g.updateStateToDoneWaitingLocked() } - } else { - // !replicatedHeld || force. Both are handled as doneWaiting since the - // system is no longer tracking the lock that was possibly held. - waitState = waitingState{kind: doneWaiting} } l.clearLockHolder() @@ -2132,14 +2147,13 @@ func (l *lockState) tryClearLock(force bool) bool { // Clear waitingReaders. for e := l.waitingReaders.Front(); e != nil; { g := e.Value.(*lockTableGuardImpl) - waitState.guardStrength = g.str curr := e e = e.Next() l.waitingReaders.Remove(curr) g.mu.Lock() - g.updateStateLocked(waitState) + transitionWaiter(g) g.notify() delete(g.mu.locks, l) g.mu.Unlock() @@ -2148,7 +2162,6 @@ func (l *lockState) tryClearLock(force bool) bool { // Clear queuedWriters. for e := l.queuedWriters.Front(); e != nil; { qg := e.Value.(*queuedGuard) - waitState.guardStrength = qg.guard.str curr := e e = e.Next() @@ -2157,7 +2170,7 @@ func (l *lockState) tryClearLock(force bool) bool { g := qg.guard g.mu.Lock() if qg.active { - g.updateStateLocked(waitState) + transitionWaiter(g) g.notify() } delete(g.mu.locks, l) From 6be6f46f47cc5d312ec834c6f9911ffe0b767bbd Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Mon, 1 May 2023 19:58:28 -0400 Subject: [PATCH 4/4] concurrency: improve ShouldWait logic Previously, when a request performed its initial scan and discovered intents that needed to be resolved before it could evaluate, `findLockAfter` would infer its call stack using the `notify` parameter. It would then use this to set `startWait`, which would then be consulted at the callsite using `ShouldWait`. This felt brittle, so in this patch, we improve this structure by teaching `ShouldWait` about any locks that need to be resolved before evaluation instead. Release note: None --- pkg/kv/kvserver/concurrency/lock_table.go | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/pkg/kv/kvserver/concurrency/lock_table.go b/pkg/kv/kvserver/concurrency/lock_table.go index c2c259abf6bb..3aac7c72da67 100644 --- a/pkg/kv/kvserver/concurrency/lock_table.go +++ b/pkg/kv/kvserver/concurrency/lock_table.go @@ -502,7 +502,12 @@ func releaseLockTableGuardImpl(g *lockTableGuardImpl) { func (g *lockTableGuardImpl) ShouldWait() bool { g.mu.Lock() defer g.mu.Unlock() - return g.mu.startWait + // The request needs to drop latches and wait if: + // 1. The lock table indicated as such (e.g. the request ran into a + // conflicting lock). + // 2. OR the request successfully performed its scan but discovered replicated + // locks that need to be resolved before it can evaluate. + return g.mu.startWait || len(g.toResolve) > 0 } func (g *lockTableGuardImpl) ResolveBeforeScanning() []roachpb.LockUpdate { @@ -734,20 +739,7 @@ func (g *lockTableGuardImpl) findNextLockAfter(notify bool) { g.mu.Lock() defer g.mu.Unlock() g.updateStateToDoneWaitingLocked() - // We are doneWaiting but may have some locks to resolve. There are - // two cases: - // - notify=false: the caller was already waiting and will look at this list - // of locks. - // - notify=true: this is a scan initiated by the caller, and it is holding - // latches. We need to tell it to "wait", so that it does this resolution - // first. startWait is currently false. This is the case handled below. if notify { - if len(g.toResolve) > 0 { - // Force caller to release latches and resolve intents. The first - // state it will see after releasing latches is doneWaiting, which - // will cause it to resolve intents. - g.mu.startWait = true - } g.notify() } }