From 59bb505bc68ae3de1afa2c8cb2238e90b48e04b4 Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Sun, 1 Oct 2023 11:41:05 -0500 Subject: [PATCH] concurrency: ensure releasing shared locks leads to correct pushes When there are multiple shared locks on a key, any active waiters will push the first of the lock holders (aka the claimant). Previously, when the claimaint was finalized, we weren't recomputing the waiting state for any active waiters to push the new claimaint. As a result, in such a scenario, waiters would end up blocking indefinitely without pushing. This is non-ideal, as it means we're not going to be running deadlock/liveness detection. Waiters would hang indefinitely if there was a deadlock/liveness issue. This patch fixes this behaviour by recomputing new waiting state in cases where a shared lock is released but the key isn't unlocked. Epic: none Release note: None --- .../concurrency/concurrency_manager_test.go | 9 +- .../concurrency/datadriven_util_test.go | 13 ++ pkg/kv/kvserver/concurrency/lock_table.go | 13 ++ .../kvserver/concurrency/lock_table_test.go | 10 +- .../testdata/concurrency_manager/shared_locks | 182 ++++++++++++++++++ 5 files changed, 218 insertions(+), 9 deletions(-) create mode 100644 pkg/kv/kvserver/concurrency/testdata/concurrency_manager/shared_locks diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go index 79b6f72a4aef..1a88542f26e2 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go @@ -76,8 +76,8 @@ import ( // check-opt-no-conflicts req= // is-key-locked-by-conflicting-txn req= key= strength= // -// on-lock-acquired req= key= [seq=] [dur=r|u] [strength=] -// on-lock-updated req= txn= key= status=[committed|aborted|pending] [ts=[,]] +// on-lock-acquired req= key= [seq=] [dur=r|u] [str=] +// on-lock-updated req= txn= key= status=[committed|aborted|pending] [ts=[,]] [ignored-seqs=[-][,[-]] // on-txn-updated txn= status=[committed|aborted|pending] [ts=[,]] // // on-lease-updated leaseholder= lease-seq= @@ -377,6 +377,7 @@ func TestConcurrencyManagerBasic(t *testing.T) { } var key string d.ScanArgs(t, "key", &key) + // TODO(nvanbenschoten): replace with scanLockStrength. strength := concurrency.ScanLockStrength(t, d) ok, txn, err := g.IsKeyLockedByConflictingTxn(roachpb.Key(key), strength) if err != nil { @@ -416,7 +417,7 @@ func TestConcurrencyManagerBasic(t *testing.T) { } var str lock.Strength if d.HasArg("str") { - str = concurrency.ScanLockStrength(t, d) + str = scanLockStrength(t, d) } else { // If no lock strength is provided in the test, infer it from the // durability. @@ -478,6 +479,7 @@ func TestConcurrencyManagerBasic(t *testing.T) { if d.HasArg("ts") { ts = scanTimestamp(t, d) } + ignoredSeqNums := scanIgnoredSeqNumbers(t, d) // Confirm that the request has a corresponding ResolveIntent. found := false @@ -502,6 +504,7 @@ func TestConcurrencyManagerBasic(t *testing.T) { log.Eventf(ctx, "%s txn %s @ %s", verb, txn.Short(), key) span := roachpb.Span{Key: roachpb.Key(key)} up := roachpb.MakeLockUpdate(txnUpdate, span) + up.IgnoredSeqNums = ignoredSeqNums m.OnLockUpdated(ctx, &up) }) return c.waitAndCollect(t, mon) diff --git a/pkg/kv/kvserver/concurrency/datadriven_util_test.go b/pkg/kv/kvserver/concurrency/datadriven_util_test.go index b31e38b45e45..0ac44389272b 100644 --- a/pkg/kv/kvserver/concurrency/datadriven_util_test.go +++ b/pkg/kv/kvserver/concurrency/datadriven_util_test.go @@ -101,6 +101,12 @@ func getLockDurability(t *testing.T, d *datadriven.TestData, durS string) lock.D } } +func scanLockStrength(t *testing.T, d *datadriven.TestData) lock.Strength { + var strS string + d.ScanArgs(t, "str", &strS) + return concurrency.GetStrength(t, d, strS) +} + func scanWaitPolicy(t *testing.T, d *datadriven.TestData, required bool) lock.WaitPolicy { const key = "wait-policy" if !required && !d.HasArg(key) { @@ -121,6 +127,13 @@ func scanWaitPolicy(t *testing.T, d *datadriven.TestData, required bool) lock.Wa } } +func scanIgnoredSeqNumbers(t *testing.T, d *datadriven.TestData) []enginepb.IgnoredSeqNumRange { + if !d.HasArg("ignored-seqs") { + return nil + } + return concurrency.ScanIgnoredSeqNumbers(t, d) +} + func scanPoisonPolicy(t *testing.T, d *datadriven.TestData) poison.Policy { const key = "poison-policy" if !d.HasArg(key) { diff --git a/pkg/kv/kvserver/concurrency/lock_table.go b/pkg/kv/kvserver/concurrency/lock_table.go index de5fe6ecd390..cb5121d2849f 100644 --- a/pkg/kv/kvserver/concurrency/lock_table.go +++ b/pkg/kv/kvserver/concurrency/lock_table.go @@ -3222,6 +3222,17 @@ func (kl *keyLocks) tryUpdateLockLocked(up roachpb.LockUpdate) (heldByTxn, gc bo // The lock transitioned from held to unheld as a result of this lock // update. gc = kl.releaseWaitersOnKeyUnlocked() + } else { + // If we're in this branch, it must be the case that there are multiple + // shared locks held on this key, and as a result, releasing one of the + // locks hasn't transitioned the key to unlocked. However, the lock that + // we just released may have belonged to the claimant transaction -- the + // one that any waiters on this key were pushing. If this is the case, + // we'll need to inform these waiters about a new claimant they should be + // pushing instead. A call to informActiveWaiters will do exactly that. + // Note that if the lock that was cleared didn't belong to a transaction + // all waiters were pushing, the call to informActiveWaiters will no-op. + kl.informActiveWaiters() } return true, gc } @@ -3304,6 +3315,8 @@ func (kl *keyLocks) tryUpdateLockLocked(up roachpb.LockUpdate) (heldByTxn, gc bo kl.clearLockHeldBy(txn.ID) if !kl.isLocked() { gc = kl.releaseWaitersOnKeyUnlocked() + } else { + kl.informActiveWaiters() } return true, gc } diff --git a/pkg/kv/kvserver/concurrency/lock_table_test.go b/pkg/kv/kvserver/concurrency/lock_table_test.go index 6afa206c7530..a19e8c565672 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_test.go @@ -393,7 +393,7 @@ func TestLockTableBasic(t *testing.T) { acq := roachpb.MakeLockAcquisition(req.Txn, roachpb.Key(key), durability, strength) var ignored []enginepb.IgnoredSeqNumRange if d.HasArg("ignored-seqs") { - ignored = scanIgnoredSeqNumbers(t, d) + ignored = ScanIgnoredSeqNumbers(t, d) } acq.IgnoredSeqNums = ignored if err := lt.AcquireLock(&acq); err != nil { @@ -437,7 +437,7 @@ func TestLockTableBasic(t *testing.T) { span := getSpan(t, d, s) var ignored []enginepb.IgnoredSeqNumRange if d.HasArg("ignored-seqs") { - ignored = scanIgnoredSeqNumbers(t, d) + ignored = ScanIgnoredSeqNumbers(t, d) } // TODO(sbhola): also test STAGING. intent := &roachpb.LockUpdate{ @@ -803,7 +803,7 @@ func GetStrength(t *testing.T, d *datadriven.TestData, strS string) lock.Strengt } } -func scanIgnoredSeqNumbers(t *testing.T, d *datadriven.TestData) []enginepb.IgnoredSeqNumRange { +func ScanIgnoredSeqNumbers(t *testing.T, d *datadriven.TestData) []enginepb.IgnoredSeqNumRange { var ignored []enginepb.IgnoredSeqNumRange var seqsStr string d.ScanArgs(t, "ignored-seqs", &seqsStr) @@ -1561,9 +1561,7 @@ func TestLockTableConcurrentRequests(t *testing.T) { for i := 0; i < 10; i++ { keys = append(keys, roachpb.Key(string(rune('a'+i)))) } - // TODO(nvanbenschoten): add lock.Shared back in once #111144 is fixed. - //strs := []lock.Strength{lock.None, lock.Shared, lock.Exclusive, lock.Intent} - strs := []lock.Strength{lock.None, lock.Exclusive, lock.Intent} + strs := []lock.Strength{lock.None, lock.Shared, lock.Exclusive, lock.Intent} rng := rand.New(rand.NewSource(uint64(timeutil.Now().UnixNano()))) const numActiveTxns = 8 var activeTxns [numActiveTxns]*enginepb.TxnMeta diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/shared_locks b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/shared_locks new file mode 100644 index 000000000000..b01c2bb5c272 --- /dev/null +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/shared_locks @@ -0,0 +1,182 @@ +new-txn name=txn1 ts=10,1 epoch=0 +---- + +new-txn name=txn2 ts=10,1 epoch=0 +---- + +new-txn name=txn3 ts=10,1 epoch=0 +---- + +new-txn name=txn4 ts=10,1 epoch=0 +---- + +new-txn name=txn5 ts=10,1 epoch=0 +---- + +# ----------------------------------------------------------------------------- +# Ensure releasing the first of multiple shared lock holders results in correct +# pushes. +# ----------------------------------------------------------------------------- + +new-request name=req1 txn=txn1 ts=10,1 + get key=a str=shared +---- + +sequence req=req1 +---- +[1] sequence req1: sequencing request +[1] sequence req1: acquiring latches +[1] sequence req1: scanning lock table for conflicting locks +[1] sequence req1: sequencing complete, returned guard + +on-lock-acquired req=req1 key=a dur=u str=shared +---- +[-] acquire lock: txn 00000001 @ ‹a› + +finish req=req1 +---- +[-] finish req1: finishing request + +new-request name=req2 txn=txn2 ts=10,1 + get key=a str=shared +---- + +sequence req=req2 +---- +[2] sequence req2: sequencing request +[2] sequence req2: acquiring latches +[2] sequence req2: scanning lock table for conflicting locks +[2] sequence req2: sequencing complete, returned guard + +on-lock-acquired req=req2 key=a dur=u str=shared +---- +[-] acquire lock: txn 00000002 @ ‹a› + +finish req=req2 +---- +[-] finish req2: finishing request + +new-request name=req3 txn=txn3 ts=10,1 + get key=a str=shared +---- + +sequence req=req3 +---- +[3] sequence req3: sequencing request +[3] sequence req3: acquiring latches +[3] sequence req3: scanning lock table for conflicting locks +[3] sequence req3: sequencing complete, returned guard + +on-lock-acquired req=req3 key=a dur=u str=shared +---- +[-] acquire lock: txn 00000003 @ ‹a› + +finish req=req3 +---- +[-] finish req3: finishing request + +new-request name=req4 txn=txn4 ts=10,1 + get key=a str=shared +---- + +sequence req=req4 +---- +[4] sequence req4: sequencing request +[4] sequence req4: acquiring latches +[4] sequence req4: scanning lock table for conflicting locks +[4] sequence req4: sequencing complete, returned guard + +on-lock-acquired req=req4 key=a dur=u str=shared +---- +[-] acquire lock: txn 00000004 @ ‹a› + +finish req=req4 +---- +[-] finish req4: finishing request + +debug-lock-table +---- +num=1 + lock: "a" + holders: txn: 00000001-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, info: unrepl [(str: Shared seq: 0)] + txn: 00000002-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, info: unrepl [(str: Shared seq: 0)] + txn: 00000003-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, info: unrepl [(str: Shared seq: 0)] + txn: 00000004-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, info: unrepl [(str: Shared seq: 0)] + +# Setup complete. + +new-request name=req5 txn=txn5 ts=10,1 + get key=a str=exclusive +---- + +sequence req=req5 +---- +[5] sequence req5: sequencing request +[5] sequence req5: acquiring latches +[5] sequence req5: scanning lock table for conflicting locks +[5] sequence req5: waiting in lock wait-queues +[5] sequence req5: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key ‹"a"› (queuedLockingRequests: 1, queuedReaders: 0) +[5] sequence req5: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false, wait policy error = false +[5] sequence req5: pushing txn 00000001 to abort +[5] sequence req5: blocked on select in concurrency_test.(*cluster).PushTransaction + +# Commit txn1 (the transaction req5 is pushing) to have it release the lock. req5 +# should start pushing txn2. +on-txn-updated txn=txn1 status=committed +---- +[-] update txn: committing txn1 +[5] sequence req5: resolving intent ‹"a"› for txn 00000001 with COMMITTED status +[5] sequence req5: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key ‹"a"› (queuedLockingRequests: 1, queuedReaders: 0) +[5] sequence req5: conflicted with ‹00000001-0000-0000-0000-000000000000› on ‹"a"› for 0.000s +[5] sequence req5: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false, wait policy error = false +[5] sequence req5: pushing txn 00000002 to abort +[5] sequence req5: blocked on select in concurrency_test.(*cluster).PushTransaction + +# Abort txn2 (the transaction req5 is now pushing) to have it release the lock. +# req5 should start pushing txn3 now. +on-txn-updated txn=txn2 status=aborted +---- +[-] update txn: aborting txn2 +[5] sequence req5: resolving intent ‹"a"› for txn 00000002 with ABORTED status +[5] sequence req5: lock wait-queue event: wait for (distinguished) txn 00000003 holding lock @ key ‹"a"› (queuedLockingRequests: 1, queuedReaders: 0) +[5] sequence req5: conflicted with ‹00000002-0000-0000-0000-000000000000› on ‹"a"› for 0.000s +[5] sequence req5: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false, wait policy error = false +[5] sequence req5: pushing txn 00000003 to abort +[5] sequence req5: blocked on select in concurrency_test.(*cluster).PushTransaction + +# This time, instead of finalizing the transaction that's begin pushed (txn3), +# we'll instead finalize txn4 (the other shared lock holder) instead. Nothing +# should change in terms of who req5 is pushing as a result. +on-txn-updated txn=txn4 status=aborted +---- +[-] update txn: aborting txn4 + +debug-lock-table +---- +num=1 + lock: "a" + holders: txn: 00000003-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, info: unrepl [(str: Shared seq: 0)] + txn: 00000004-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, info: unrepl [(str: Shared seq: 0)] + queued locking requests: + active: true req: 5, strength: Exclusive, txn: 00000005-0000-0000-0000-000000000000 + distinguished req: 5 + +# Unlock the key entirely, ensure req5 can proceed. +on-txn-updated txn=txn3 status=committed +---- +[-] update txn: committing txn3 +[5] sequence req5: resolving intent ‹"a"› for txn 00000003 with COMMITTED status +[5] sequence req5: lock wait-queue event: wait for (distinguished) txn 00000004 holding lock @ key ‹"a"› (queuedLockingRequests: 1, queuedReaders: 0) +[5] sequence req5: conflicted with ‹00000003-0000-0000-0000-000000000000› on ‹"a"› for 0.000s +[5] sequence req5: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false, wait policy error = false +[5] sequence req5: pushing txn 00000004 to abort +[5] sequence req5: resolving intent ‹"a"› for txn 00000004 with ABORTED status +[5] sequence req5: lock wait-queue event: done waiting +[5] sequence req5: conflicted with ‹00000004-0000-0000-0000-000000000000› on ‹"a"› for 0.000s +[5] sequence req5: acquiring latches +[5] sequence req5: scanning lock table for conflicting locks +[5] sequence req5: sequencing complete, returned guard + +finish req=req5 +---- +[-] finish req5: finishing request