diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go index 79b6f72a4aef..1a88542f26e2 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go @@ -76,8 +76,8 @@ import ( // check-opt-no-conflicts req= // is-key-locked-by-conflicting-txn req= key= strength= // -// on-lock-acquired req= key= [seq=] [dur=r|u] [strength=] -// on-lock-updated req= txn= key= status=[committed|aborted|pending] [ts=[,]] +// on-lock-acquired req= key= [seq=] [dur=r|u] [str=] +// on-lock-updated req= txn= key= status=[committed|aborted|pending] [ts=[,]] [ignored-seqs=[-][,[-]] // on-txn-updated txn= status=[committed|aborted|pending] [ts=[,]] // // on-lease-updated leaseholder= lease-seq= @@ -377,6 +377,7 @@ func TestConcurrencyManagerBasic(t *testing.T) { } var key string d.ScanArgs(t, "key", &key) + // TODO(nvanbenschoten): replace with scanLockStrength. strength := concurrency.ScanLockStrength(t, d) ok, txn, err := g.IsKeyLockedByConflictingTxn(roachpb.Key(key), strength) if err != nil { @@ -416,7 +417,7 @@ func TestConcurrencyManagerBasic(t *testing.T) { } var str lock.Strength if d.HasArg("str") { - str = concurrency.ScanLockStrength(t, d) + str = scanLockStrength(t, d) } else { // If no lock strength is provided in the test, infer it from the // durability. @@ -478,6 +479,7 @@ func TestConcurrencyManagerBasic(t *testing.T) { if d.HasArg("ts") { ts = scanTimestamp(t, d) } + ignoredSeqNums := scanIgnoredSeqNumbers(t, d) // Confirm that the request has a corresponding ResolveIntent. found := false @@ -502,6 +504,7 @@ func TestConcurrencyManagerBasic(t *testing.T) { log.Eventf(ctx, "%s txn %s @ %s", verb, txn.Short(), key) span := roachpb.Span{Key: roachpb.Key(key)} up := roachpb.MakeLockUpdate(txnUpdate, span) + up.IgnoredSeqNums = ignoredSeqNums m.OnLockUpdated(ctx, &up) }) return c.waitAndCollect(t, mon) diff --git a/pkg/kv/kvserver/concurrency/datadriven_util_test.go b/pkg/kv/kvserver/concurrency/datadriven_util_test.go index b31e38b45e45..0ac44389272b 100644 --- a/pkg/kv/kvserver/concurrency/datadriven_util_test.go +++ b/pkg/kv/kvserver/concurrency/datadriven_util_test.go @@ -101,6 +101,12 @@ func getLockDurability(t *testing.T, d *datadriven.TestData, durS string) lock.D } } +func scanLockStrength(t *testing.T, d *datadriven.TestData) lock.Strength { + var strS string + d.ScanArgs(t, "str", &strS) + return concurrency.GetStrength(t, d, strS) +} + func scanWaitPolicy(t *testing.T, d *datadriven.TestData, required bool) lock.WaitPolicy { const key = "wait-policy" if !required && !d.HasArg(key) { @@ -121,6 +127,13 @@ func scanWaitPolicy(t *testing.T, d *datadriven.TestData, required bool) lock.Wa } } +func scanIgnoredSeqNumbers(t *testing.T, d *datadriven.TestData) []enginepb.IgnoredSeqNumRange { + if !d.HasArg("ignored-seqs") { + return nil + } + return concurrency.ScanIgnoredSeqNumbers(t, d) +} + func scanPoisonPolicy(t *testing.T, d *datadriven.TestData) poison.Policy { const key = "poison-policy" if !d.HasArg(key) { diff --git a/pkg/kv/kvserver/concurrency/lock_table.go b/pkg/kv/kvserver/concurrency/lock_table.go index de5fe6ecd390..cb5121d2849f 100644 --- a/pkg/kv/kvserver/concurrency/lock_table.go +++ b/pkg/kv/kvserver/concurrency/lock_table.go @@ -3222,6 +3222,17 @@ func (kl *keyLocks) tryUpdateLockLocked(up roachpb.LockUpdate) (heldByTxn, gc bo // The lock transitioned from held to unheld as a result of this lock // update. gc = kl.releaseWaitersOnKeyUnlocked() + } else { + // If we're in this branch, it must be the case that there are multiple + // shared locks held on this key, and as a result, releasing one of the + // locks hasn't transitioned the key to unlocked. However, the lock that + // we just released may have belonged to the claimant transaction -- the + // one that any waiters on this key were pushing. If this is the case, + // we'll need to inform these waiters about a new claimant they should be + // pushing instead. A call to informActiveWaiters will do exactly that. + // Note that if the lock that was cleared didn't belong to a transaction + // all waiters were pushing, the call to informActiveWaiters will no-op. + kl.informActiveWaiters() } return true, gc } @@ -3304,6 +3315,8 @@ func (kl *keyLocks) tryUpdateLockLocked(up roachpb.LockUpdate) (heldByTxn, gc bo kl.clearLockHeldBy(txn.ID) if !kl.isLocked() { gc = kl.releaseWaitersOnKeyUnlocked() + } else { + kl.informActiveWaiters() } return true, gc } diff --git a/pkg/kv/kvserver/concurrency/lock_table_test.go b/pkg/kv/kvserver/concurrency/lock_table_test.go index 6afa206c7530..a19e8c565672 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_test.go @@ -393,7 +393,7 @@ func TestLockTableBasic(t *testing.T) { acq := roachpb.MakeLockAcquisition(req.Txn, roachpb.Key(key), durability, strength) var ignored []enginepb.IgnoredSeqNumRange if d.HasArg("ignored-seqs") { - ignored = scanIgnoredSeqNumbers(t, d) + ignored = ScanIgnoredSeqNumbers(t, d) } acq.IgnoredSeqNums = ignored if err := lt.AcquireLock(&acq); err != nil { @@ -437,7 +437,7 @@ func TestLockTableBasic(t *testing.T) { span := getSpan(t, d, s) var ignored []enginepb.IgnoredSeqNumRange if d.HasArg("ignored-seqs") { - ignored = scanIgnoredSeqNumbers(t, d) + ignored = ScanIgnoredSeqNumbers(t, d) } // TODO(sbhola): also test STAGING. intent := &roachpb.LockUpdate{ @@ -803,7 +803,7 @@ func GetStrength(t *testing.T, d *datadriven.TestData, strS string) lock.Strengt } } -func scanIgnoredSeqNumbers(t *testing.T, d *datadriven.TestData) []enginepb.IgnoredSeqNumRange { +func ScanIgnoredSeqNumbers(t *testing.T, d *datadriven.TestData) []enginepb.IgnoredSeqNumRange { var ignored []enginepb.IgnoredSeqNumRange var seqsStr string d.ScanArgs(t, "ignored-seqs", &seqsStr) @@ -1561,9 +1561,7 @@ func TestLockTableConcurrentRequests(t *testing.T) { for i := 0; i < 10; i++ { keys = append(keys, roachpb.Key(string(rune('a'+i)))) } - // TODO(nvanbenschoten): add lock.Shared back in once #111144 is fixed. - //strs := []lock.Strength{lock.None, lock.Shared, lock.Exclusive, lock.Intent} - strs := []lock.Strength{lock.None, lock.Exclusive, lock.Intent} + strs := []lock.Strength{lock.None, lock.Shared, lock.Exclusive, lock.Intent} rng := rand.New(rand.NewSource(uint64(timeutil.Now().UnixNano()))) const numActiveTxns = 8 var activeTxns [numActiveTxns]*enginepb.TxnMeta diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/shared_locks b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/shared_locks new file mode 100644 index 000000000000..b01c2bb5c272 --- /dev/null +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/shared_locks @@ -0,0 +1,182 @@ +new-txn name=txn1 ts=10,1 epoch=0 +---- + +new-txn name=txn2 ts=10,1 epoch=0 +---- + +new-txn name=txn3 ts=10,1 epoch=0 +---- + +new-txn name=txn4 ts=10,1 epoch=0 +---- + +new-txn name=txn5 ts=10,1 epoch=0 +---- + +# ----------------------------------------------------------------------------- +# Ensure releasing the first of multiple shared lock holders results in correct +# pushes. +# ----------------------------------------------------------------------------- + +new-request name=req1 txn=txn1 ts=10,1 + get key=a str=shared +---- + +sequence req=req1 +---- +[1] sequence req1: sequencing request +[1] sequence req1: acquiring latches +[1] sequence req1: scanning lock table for conflicting locks +[1] sequence req1: sequencing complete, returned guard + +on-lock-acquired req=req1 key=a dur=u str=shared +---- +[-] acquire lock: txn 00000001 @ ‹a› + +finish req=req1 +---- +[-] finish req1: finishing request + +new-request name=req2 txn=txn2 ts=10,1 + get key=a str=shared +---- + +sequence req=req2 +---- +[2] sequence req2: sequencing request +[2] sequence req2: acquiring latches +[2] sequence req2: scanning lock table for conflicting locks +[2] sequence req2: sequencing complete, returned guard + +on-lock-acquired req=req2 key=a dur=u str=shared +---- +[-] acquire lock: txn 00000002 @ ‹a› + +finish req=req2 +---- +[-] finish req2: finishing request + +new-request name=req3 txn=txn3 ts=10,1 + get key=a str=shared +---- + +sequence req=req3 +---- +[3] sequence req3: sequencing request +[3] sequence req3: acquiring latches +[3] sequence req3: scanning lock table for conflicting locks +[3] sequence req3: sequencing complete, returned guard + +on-lock-acquired req=req3 key=a dur=u str=shared +---- +[-] acquire lock: txn 00000003 @ ‹a› + +finish req=req3 +---- +[-] finish req3: finishing request + +new-request name=req4 txn=txn4 ts=10,1 + get key=a str=shared +---- + +sequence req=req4 +---- +[4] sequence req4: sequencing request +[4] sequence req4: acquiring latches +[4] sequence req4: scanning lock table for conflicting locks +[4] sequence req4: sequencing complete, returned guard + +on-lock-acquired req=req4 key=a dur=u str=shared +---- +[-] acquire lock: txn 00000004 @ ‹a› + +finish req=req4 +---- +[-] finish req4: finishing request + +debug-lock-table +---- +num=1 + lock: "a" + holders: txn: 00000001-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, info: unrepl [(str: Shared seq: 0)] + txn: 00000002-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, info: unrepl [(str: Shared seq: 0)] + txn: 00000003-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, info: unrepl [(str: Shared seq: 0)] + txn: 00000004-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, info: unrepl [(str: Shared seq: 0)] + +# Setup complete. + +new-request name=req5 txn=txn5 ts=10,1 + get key=a str=exclusive +---- + +sequence req=req5 +---- +[5] sequence req5: sequencing request +[5] sequence req5: acquiring latches +[5] sequence req5: scanning lock table for conflicting locks +[5] sequence req5: waiting in lock wait-queues +[5] sequence req5: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key ‹"a"› (queuedLockingRequests: 1, queuedReaders: 0) +[5] sequence req5: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false, wait policy error = false +[5] sequence req5: pushing txn 00000001 to abort +[5] sequence req5: blocked on select in concurrency_test.(*cluster).PushTransaction + +# Commit txn1 (the transaction req5 is pushing) to have it release the lock. req5 +# should start pushing txn2. +on-txn-updated txn=txn1 status=committed +---- +[-] update txn: committing txn1 +[5] sequence req5: resolving intent ‹"a"› for txn 00000001 with COMMITTED status +[5] sequence req5: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key ‹"a"› (queuedLockingRequests: 1, queuedReaders: 0) +[5] sequence req5: conflicted with ‹00000001-0000-0000-0000-000000000000› on ‹"a"› for 0.000s +[5] sequence req5: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false, wait policy error = false +[5] sequence req5: pushing txn 00000002 to abort +[5] sequence req5: blocked on select in concurrency_test.(*cluster).PushTransaction + +# Abort txn2 (the transaction req5 is now pushing) to have it release the lock. +# req5 should start pushing txn3 now. +on-txn-updated txn=txn2 status=aborted +---- +[-] update txn: aborting txn2 +[5] sequence req5: resolving intent ‹"a"› for txn 00000002 with ABORTED status +[5] sequence req5: lock wait-queue event: wait for (distinguished) txn 00000003 holding lock @ key ‹"a"› (queuedLockingRequests: 1, queuedReaders: 0) +[5] sequence req5: conflicted with ‹00000002-0000-0000-0000-000000000000› on ‹"a"› for 0.000s +[5] sequence req5: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false, wait policy error = false +[5] sequence req5: pushing txn 00000003 to abort +[5] sequence req5: blocked on select in concurrency_test.(*cluster).PushTransaction + +# This time, instead of finalizing the transaction that's begin pushed (txn3), +# we'll instead finalize txn4 (the other shared lock holder) instead. Nothing +# should change in terms of who req5 is pushing as a result. +on-txn-updated txn=txn4 status=aborted +---- +[-] update txn: aborting txn4 + +debug-lock-table +---- +num=1 + lock: "a" + holders: txn: 00000003-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, info: unrepl [(str: Shared seq: 0)] + txn: 00000004-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, info: unrepl [(str: Shared seq: 0)] + queued locking requests: + active: true req: 5, strength: Exclusive, txn: 00000005-0000-0000-0000-000000000000 + distinguished req: 5 + +# Unlock the key entirely, ensure req5 can proceed. +on-txn-updated txn=txn3 status=committed +---- +[-] update txn: committing txn3 +[5] sequence req5: resolving intent ‹"a"› for txn 00000003 with COMMITTED status +[5] sequence req5: lock wait-queue event: wait for (distinguished) txn 00000004 holding lock @ key ‹"a"› (queuedLockingRequests: 1, queuedReaders: 0) +[5] sequence req5: conflicted with ‹00000003-0000-0000-0000-000000000000› on ‹"a"› for 0.000s +[5] sequence req5: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false, wait policy error = false +[5] sequence req5: pushing txn 00000004 to abort +[5] sequence req5: resolving intent ‹"a"› for txn 00000004 with ABORTED status +[5] sequence req5: lock wait-queue event: done waiting +[5] sequence req5: conflicted with ‹00000004-0000-0000-0000-000000000000› on ‹"a"› for 0.000s +[5] sequence req5: acquiring latches +[5] sequence req5: scanning lock table for conflicting locks +[5] sequence req5: sequencing complete, returned guard + +finish req=req5 +---- +[-] finish req5: finishing request