Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

concurrency: ensure releasing shared locks leads to correct pushes #111554

Merged
merged 1 commit into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ import (
// check-opt-no-conflicts req=<req-name>
// is-key-locked-by-conflicting-txn req=<req-name> key=<key> strength=<strength>
//
// on-lock-acquired req=<req-name> key=<key> [seq=<seq>] [dur=r|u] [strength=<strength>]
// on-lock-updated req=<req-name> txn=<txn-name> key=<key> status=[committed|aborted|pending] [ts=<int>[,<int>]]
// on-lock-acquired req=<req-name> key=<key> [seq=<seq>] [dur=r|u] [str=<strength>]
// on-lock-updated req=<req-name> txn=<txn-name> key=<key> status=[committed|aborted|pending] [ts=<int>[,<int>]] [ignored-seqs=<int>[-<int>][,<int>[-<int>]]
// on-txn-updated txn=<txn-name> status=[committed|aborted|pending] [ts=<int>[,<int>]]
//
// on-lease-updated leaseholder=<bool> lease-seq=<seq>
Expand Down Expand Up @@ -377,6 +377,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
}
var key string
d.ScanArgs(t, "key", &key)
// TODO(nvanbenschoten): replace with scanLockStrength.
strength := concurrency.ScanLockStrength(t, d)
ok, txn, err := g.IsKeyLockedByConflictingTxn(roachpb.Key(key), strength)
if err != nil {
Expand Down Expand Up @@ -416,7 +417,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
}
var str lock.Strength
if d.HasArg("str") {
str = concurrency.ScanLockStrength(t, d)
str = scanLockStrength(t, d)
} else {
// If no lock strength is provided in the test, infer it from the
// durability.
Expand Down Expand Up @@ -478,6 +479,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
if d.HasArg("ts") {
ts = scanTimestamp(t, d)
}
ignoredSeqNums := scanIgnoredSeqNumbers(t, d)

// Confirm that the request has a corresponding ResolveIntent.
found := false
Expand All @@ -502,6 +504,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
log.Eventf(ctx, "%s txn %s @ %s", verb, txn.Short(), key)
span := roachpb.Span{Key: roachpb.Key(key)}
up := roachpb.MakeLockUpdate(txnUpdate, span)
up.IgnoredSeqNums = ignoredSeqNums
m.OnLockUpdated(ctx, &up)
})
return c.waitAndCollect(t, mon)
Expand Down
13 changes: 13 additions & 0 deletions pkg/kv/kvserver/concurrency/datadriven_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ func getLockDurability(t *testing.T, d *datadriven.TestData, durS string) lock.D
}
}

func scanLockStrength(t *testing.T, d *datadriven.TestData) lock.Strength {
var strS string
d.ScanArgs(t, "str", &strS)
return concurrency.GetStrength(t, d, strS)
}

func scanWaitPolicy(t *testing.T, d *datadriven.TestData, required bool) lock.WaitPolicy {
const key = "wait-policy"
if !required && !d.HasArg(key) {
Expand All @@ -121,6 +127,13 @@ func scanWaitPolicy(t *testing.T, d *datadriven.TestData, required bool) lock.Wa
}
}

func scanIgnoredSeqNumbers(t *testing.T, d *datadriven.TestData) []enginepb.IgnoredSeqNumRange {
if !d.HasArg("ignored-seqs") {
return nil
}
return concurrency.ScanIgnoredSeqNumbers(t, d)
}

func scanPoisonPolicy(t *testing.T, d *datadriven.TestData) poison.Policy {
const key = "poison-policy"
if !d.HasArg(key) {
Expand Down
13 changes: 13 additions & 0 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -3222,6 +3222,17 @@ func (kl *keyLocks) tryUpdateLockLocked(up roachpb.LockUpdate) (heldByTxn, gc bo
// The lock transitioned from held to unheld as a result of this lock
// update.
gc = kl.releaseWaitersOnKeyUnlocked()
} else {
// If we're in this branch, it must be the case that there are multiple
// shared locks held on this key, and as a result, releasing one of the
// locks hasn't transitioned the key to unlocked. However, the lock that
// we just released may have belonged to the claimant transaction -- the
// one that any waiters on this key were pushing. If this is the case,
// we'll need to inform these waiters about a new claimant they should be
// pushing instead. A call to informActiveWaiters will do exactly that.
// Note that if the lock that was cleared didn't belong to a transaction
// all waiters were pushing, the call to informActiveWaiters will no-op.
kl.informActiveWaiters()
}
return true, gc
}
Expand Down Expand Up @@ -3304,6 +3315,8 @@ func (kl *keyLocks) tryUpdateLockLocked(up roachpb.LockUpdate) (heldByTxn, gc bo
kl.clearLockHeldBy(txn.ID)
if !kl.isLocked() {
gc = kl.releaseWaitersOnKeyUnlocked()
} else {
kl.informActiveWaiters()
}
return true, gc
}
Expand Down
10 changes: 4 additions & 6 deletions pkg/kv/kvserver/concurrency/lock_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func TestLockTableBasic(t *testing.T) {
acq := roachpb.MakeLockAcquisition(req.Txn, roachpb.Key(key), durability, strength)
var ignored []enginepb.IgnoredSeqNumRange
if d.HasArg("ignored-seqs") {
ignored = scanIgnoredSeqNumbers(t, d)
ignored = ScanIgnoredSeqNumbers(t, d)
}
acq.IgnoredSeqNums = ignored
if err := lt.AcquireLock(&acq); err != nil {
Expand Down Expand Up @@ -437,7 +437,7 @@ func TestLockTableBasic(t *testing.T) {
span := getSpan(t, d, s)
var ignored []enginepb.IgnoredSeqNumRange
if d.HasArg("ignored-seqs") {
ignored = scanIgnoredSeqNumbers(t, d)
ignored = ScanIgnoredSeqNumbers(t, d)
}
// TODO(sbhola): also test STAGING.
intent := &roachpb.LockUpdate{
Expand Down Expand Up @@ -803,7 +803,7 @@ func GetStrength(t *testing.T, d *datadriven.TestData, strS string) lock.Strengt
}
}

func scanIgnoredSeqNumbers(t *testing.T, d *datadriven.TestData) []enginepb.IgnoredSeqNumRange {
func ScanIgnoredSeqNumbers(t *testing.T, d *datadriven.TestData) []enginepb.IgnoredSeqNumRange {
var ignored []enginepb.IgnoredSeqNumRange
var seqsStr string
d.ScanArgs(t, "ignored-seqs", &seqsStr)
Expand Down Expand Up @@ -1561,9 +1561,7 @@ func TestLockTableConcurrentRequests(t *testing.T) {
for i := 0; i < 10; i++ {
keys = append(keys, roachpb.Key(string(rune('a'+i))))
}
// TODO(nvanbenschoten): add lock.Shared back in once #111144 is fixed.
//strs := []lock.Strength{lock.None, lock.Shared, lock.Exclusive, lock.Intent}
strs := []lock.Strength{lock.None, lock.Exclusive, lock.Intent}
strs := []lock.Strength{lock.None, lock.Shared, lock.Exclusive, lock.Intent}
rng := rand.New(rand.NewSource(uint64(timeutil.Now().UnixNano())))
const numActiveTxns = 8
var activeTxns [numActiveTxns]*enginepb.TxnMeta
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
new-txn name=txn1 ts=10,1 epoch=0
----

new-txn name=txn2 ts=10,1 epoch=0
----

new-txn name=txn3 ts=10,1 epoch=0
----

new-txn name=txn4 ts=10,1 epoch=0
----

new-txn name=txn5 ts=10,1 epoch=0
----

# -----------------------------------------------------------------------------
# Ensure releasing the first of multiple shared lock holders results in correct
# pushes.
# -----------------------------------------------------------------------------

new-request name=req1 txn=txn1 ts=10,1
get key=a str=shared
----

sequence req=req1
----
[1] sequence req1: sequencing request
[1] sequence req1: acquiring latches
[1] sequence req1: scanning lock table for conflicting locks
[1] sequence req1: sequencing complete, returned guard

on-lock-acquired req=req1 key=a dur=u str=shared
----
[-] acquire lock: txn 00000001 @ ‹a›

finish req=req1
----
[-] finish req1: finishing request

new-request name=req2 txn=txn2 ts=10,1
get key=a str=shared
----

sequence req=req2
----
[2] sequence req2: sequencing request
[2] sequence req2: acquiring latches
[2] sequence req2: scanning lock table for conflicting locks
[2] sequence req2: sequencing complete, returned guard

on-lock-acquired req=req2 key=a dur=u str=shared
----
[-] acquire lock: txn 00000002 @ ‹a›

finish req=req2
----
[-] finish req2: finishing request

new-request name=req3 txn=txn3 ts=10,1
get key=a str=shared
----

sequence req=req3
----
[3] sequence req3: sequencing request
[3] sequence req3: acquiring latches
[3] sequence req3: scanning lock table for conflicting locks
[3] sequence req3: sequencing complete, returned guard

on-lock-acquired req=req3 key=a dur=u str=shared
----
[-] acquire lock: txn 00000003 @ ‹a›

finish req=req3
----
[-] finish req3: finishing request

new-request name=req4 txn=txn4 ts=10,1
get key=a str=shared
----

sequence req=req4
----
[4] sequence req4: sequencing request
[4] sequence req4: acquiring latches
[4] sequence req4: scanning lock table for conflicting locks
[4] sequence req4: sequencing complete, returned guard

on-lock-acquired req=req4 key=a dur=u str=shared
----
[-] acquire lock: txn 00000004 @ ‹a›

finish req=req4
----
[-] finish req4: finishing request

debug-lock-table
----
num=1
lock: "a"
holders: txn: 00000001-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, info: unrepl [(str: Shared seq: 0)]
txn: 00000002-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, info: unrepl [(str: Shared seq: 0)]
txn: 00000003-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, info: unrepl [(str: Shared seq: 0)]
txn: 00000004-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, info: unrepl [(str: Shared seq: 0)]

# Setup complete.

new-request name=req5 txn=txn5 ts=10,1
get key=a str=exclusive
----

sequence req=req5
----
[5] sequence req5: sequencing request
[5] sequence req5: acquiring latches
[5] sequence req5: scanning lock table for conflicting locks
[5] sequence req5: waiting in lock wait-queues
[5] sequence req5: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key ‹"a"› (queuedLockingRequests: 1, queuedReaders: 0)
[5] sequence req5: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false, wait policy error = false
[5] sequence req5: pushing txn 00000001 to abort
[5] sequence req5: blocked on select in concurrency_test.(*cluster).PushTransaction

# Commit txn1 (the transaction req5 is pushing) to have it release the lock. req5
# should start pushing txn2.
on-txn-updated txn=txn1 status=committed
----
[-] update txn: committing txn1
[5] sequence req5: resolving intent ‹"a"› for txn 00000001 with COMMITTED status
[5] sequence req5: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key ‹"a"› (queuedLockingRequests: 1, queuedReaders: 0)
[5] sequence req5: conflicted with ‹00000001-0000-0000-0000-000000000000› on ‹"a"› for 0.000s
[5] sequence req5: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false, wait policy error = false
[5] sequence req5: pushing txn 00000002 to abort
[5] sequence req5: blocked on select in concurrency_test.(*cluster).PushTransaction

# Abort txn2 (the transaction req5 is now pushing) to have it release the lock.
# req5 should start pushing txn3 now.
on-txn-updated txn=txn2 status=aborted
----
[-] update txn: aborting txn2
[5] sequence req5: resolving intent ‹"a"› for txn 00000002 with ABORTED status
[5] sequence req5: lock wait-queue event: wait for (distinguished) txn 00000003 holding lock @ key ‹"a"› (queuedLockingRequests: 1, queuedReaders: 0)
[5] sequence req5: conflicted with ‹00000002-0000-0000-0000-000000000000› on ‹"a"› for 0.000s
[5] sequence req5: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false, wait policy error = false
[5] sequence req5: pushing txn 00000003 to abort
[5] sequence req5: blocked on select in concurrency_test.(*cluster).PushTransaction

# This time, instead of finalizing the transaction that's begin pushed (txn3),
# we'll instead finalize txn4 (the other shared lock holder) instead. Nothing
# should change in terms of who req5 is pushing as a result.
on-txn-updated txn=txn4 status=aborted
----
[-] update txn: aborting txn4

debug-lock-table
----
num=1
lock: "a"
holders: txn: 00000003-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, info: unrepl [(str: Shared seq: 0)]
txn: 00000004-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, info: unrepl [(str: Shared seq: 0)]
queued locking requests:
active: true req: 5, strength: Exclusive, txn: 00000005-0000-0000-0000-000000000000
distinguished req: 5

# Unlock the key entirely, ensure req5 can proceed.
on-txn-updated txn=txn3 status=committed
----
[-] update txn: committing txn3
[5] sequence req5: resolving intent ‹"a"› for txn 00000003 with COMMITTED status
[5] sequence req5: lock wait-queue event: wait for (distinguished) txn 00000004 holding lock @ key ‹"a"› (queuedLockingRequests: 1, queuedReaders: 0)
[5] sequence req5: conflicted with ‹00000003-0000-0000-0000-000000000000› on ‹"a"› for 0.000s
[5] sequence req5: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false, wait policy error = false
[5] sequence req5: pushing txn 00000004 to abort
[5] sequence req5: resolving intent ‹"a"› for txn 00000004 with ABORTED status
[5] sequence req5: lock wait-queue event: done waiting
[5] sequence req5: conflicted with ‹00000004-0000-0000-0000-000000000000› on ‹"a"› for 0.000s
[5] sequence req5: acquiring latches
[5] sequence req5: scanning lock table for conflicting locks
[5] sequence req5: sequencing complete, returned guard

finish req=req5
----
[-] finish req5: finishing request