Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

concurrency: refactors in preparation for lock promotion #118448

Merged
merged 2 commits into from
Feb 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 90 additions & 45 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,10 +741,10 @@ func (g *lockTableGuardImpl) IsKeyLockedByConflictingTxn(

for e := l.queuedLockingRequests.Front(); e != nil; e = e.Next() {
qqg := e.Value
if qqg.guard.seqNum > g.seqNum {
// We only need to check for conflicts with requests that came before us
// (read: have lower sequence numbers than us). Note that the list of
// queuedLockingRequests is sorted in increasing order of sequence number.
qo := makeQueueOrder(g)
if qqg.order.after(qo) {
// We only need to check for conflicts with requests that sort before us.
// Note that the list of queuedLockingRequests is already sorted.
break
}
if qqg.guard.txnMeta() != nil && g.isSameTxn(qqg.guard.txnMeta()) {
Expand Down Expand Up @@ -1000,6 +1000,33 @@ type queuedGuard struct {
guard *lockTableGuardImpl
mode lock.Mode // protected by keyLocks.mu
active bool // protected by keyLocks.mu
order queueOrder
}

// queueOrder encapsulates fields that are used to determine the order in which
// locking requests are stored in a lock's wait queue.
type queueOrder struct {
reqSeqNum uint64
}

// makeQueueOrder constructs a queueOrder.
func makeQueueOrder(g *lockTableGuardImpl) queueOrder {
return queueOrder{
reqSeqNum: g.seqNum,
}
}

// after returns true if the receiver should be ordered after the supplied
// queueOrder.
//
// Comparison is based on sequence numbers, which correspond to a request's
// arrival time -- requests that arrive later are ordered after requests that
// arrive earlier, and vice-versa.
func (o1 queueOrder) after(o2 queueOrder) bool {
if o1.reqSeqNum == o2.reqSeqNum {
return false // same request; doesn't sort after
}
return o1.reqSeqNum > o2.reqSeqNum
}

// Information about a lock holder for unreplicated locks.
Expand Down Expand Up @@ -2711,17 +2738,51 @@ func (kl *keyLocks) enqueueLockingRequest(
// common case is that this waiter will be at the end of the queue.
return true /* maxQueueLengthExceeded */, nil
}

if _, err := kl.insertLockingRequest(g, g.curStrength()); err != nil {
return false, err
}
// This request may be a candidate to become a distinguished waiter if one
// doesn't exist yet; try making it such.
kl.maybeMakeDistinguishedWaiter(g)
return false /* maxQueueLengthExceeded */, nil
}

// insertLockingRequest inserts the locking request, trying to access the lock
// with the supplied strength, at the correct position in the lock's wait queue.
// The request is wrapped in a queuedGuard to insert it into the queue,
// the reference for which is returned to the caller.
//
// Note that the request should not already be present in the lock's wait queue.
//
// REQUIRES: kl.mu to be locked.
// REQUIRES: g.mu to be locked.
//
// TODO(arul): we can change this function to no longer return an error once
// lock promotion is allowed.
func (kl *keyLocks) insertLockingRequest(
g *lockTableGuardImpl, accessStrength lock.Strength,
) (*queuedGuard, error) {
assert(accessStrength != lock.None, "should only be called with a locking request")
_, inQueue := g.mu.locks[kl]
assert(!inQueue, "should not be called with a locking request already in the queue")

qg := &queuedGuard{
guard: g,
mode: g.curLockMode(),
mode: makeLockMode(accessStrength, g.txnMeta(), g.ts),
active: true,
order: makeQueueOrder(g),
}
// The request isn't in the queue. Add it in the correct position, based on
// its sequence number.
// its queueOrder.
var e *list.Element[*queuedGuard]
// TODO(arul): Once we've addressed lifted the limitation on lock promotion
// (https://github.com/cockroachdb/cockroach/issues/110435), we should switch
// this loop to iterate from the back of the queue -- it's more likely that
// new requests are added to the back of the queue.
for e = kl.queuedLockingRequests.Front(); e != nil; e = e.Next() {
qqg := e.Value
if qqg.guard.seqNum > qg.guard.seqNum {
if qqg.order.after(qg.order) {
break
}
if qg.guard.txn != nil && qqg.guard.isSameTxn(qg.guard.txnMeta()) {
Expand All @@ -2730,7 +2791,7 @@ func (kl *keyLocks) enqueueLockingRequest(
// chance to do so before we do -- assuming it'll succeed, check if we've
// got ourselves into a lock promotion case that's not allowed.
if err := kl.maybeDisallowLockPromotion(qqg.mode.Strength, qg.mode.Strength); err != nil {
return false, err
return nil, err
}
}
}
Expand All @@ -2739,11 +2800,9 @@ func (kl *keyLocks) enqueueLockingRequest(
} else {
kl.queuedLockingRequests.InsertBefore(qg, e)
}
// This request may be a candidate to become a distinguished waiter if one
// doesn't exist yet; try making it such.
kl.maybeMakeDistinguishedWaiter(g)
g.maybeAddToLocksMap(kl, g.curStrength())
return false /* maxQueueLengthExceeded */, nil
added := g.maybeAddToLocksMap(kl, accessStrength)
assert(added, "should not have been present before")
return qg, nil
}

// maybeMakeDistinguishedWaiter designates the supplied request as the
Expand Down Expand Up @@ -3090,32 +3149,19 @@ func (kl *keyLocks) discoveredLock(
// Immediately enter the lock's queuedLockingRequests list.
// NB: this inactive waiter can be non-transactional.
g.mu.Lock()
added := g.maybeAddToLocksMap(kl, accessStrength)
g.mu.Unlock()

if added {
_, inQueue := g.mu.locks[kl]
if !inQueue {
// We weren't previously waiting in this lock's wait queue, so we need to
// add the request to the list of queuedLockingRequests as an inactive
// waiter.
qg := &queuedGuard{
guard: g,
mode: makeLockMode(accessStrength, g.txnMeta(), g.ts),
active: false,
}
// g is not necessarily first in the queue in the (rare) case (a) above.
var e *list.Element[*queuedGuard]
for e = kl.queuedLockingRequests.Front(); e != nil; e = e.Next() {
qqg := e.Value
if qqg.guard.seqNum > g.seqNum {
break
}
}
if e == nil {
kl.queuedLockingRequests.PushBack(qg)
} else {
kl.queuedLockingRequests.InsertBefore(qg, e)
qg, err := kl.insertLockingRequest(g, accessStrength)
if err != nil {
g.mu.Unlock()
return err
}
qg.active = false
}
g.mu.Unlock()
}

// If there are waiting requests from the same txn, they no longer need to wait.
Expand Down Expand Up @@ -3845,12 +3891,12 @@ func (kl *keyLocks) verify(st *cluster.Settings) error {
}
}

// 2. Ensure queued locking requests are stored in sorted sequence number
// order.
for e1 := kl.queuedLockingRequests.Front(); e1 != nil; e1 = e1.Next() {
if e1.Prev() != nil && e1.Prev().Value.guard.seqNum >= e1.Value.guard.seqNum {
// 2. Ensure queued locking requests are stored in the correct order, as
// dictated by the queuedGuard.order field.
for e := kl.queuedLockingRequests.Front(); e != nil; e = e.Next() {
if e.Prev() != nil && e.Prev().Value.order.after(e.Value.order) {
return errors.AssertionFailedf(
"queued locking requests should be stored in sequence number order %s", kl,
"queued locking requests should be stored in sorted order %s", kl,
)
}
}
Expand Down Expand Up @@ -3904,8 +3950,7 @@ func (kl *keyLocks) verify(st *cluster.Settings) error {
}
// If a locking request is actively waiting at a key, it must either:
// 1. Conflict with one of the lock holders.
// 2. OR Conflict with one of the queued locking requests in front (read:
// lower sequence number) of it.
// 2. OR Conflict with one of the queued locking requests in front of it.
conflicts := false
for e2 := kl.holders.Front(); e2 != nil; e2 = e2.Next() {
holder := e2.Value
Expand All @@ -3922,11 +3967,11 @@ func (kl *keyLocks) verify(st *cluster.Settings) error {
// No conflict found with lock holder(s); check other queued locking
// requests waiting in front of the request.
for e2 := kl.queuedLockingRequests.Front(); ; e2 = e2.Next() {
req := e2.Value
if req.guard.seqNum >= qlr.guard.seqNum {
break
qg := e2.Value
if qg.guard == qlr.guard {
break // we've found our request
}
if lock.Conflicts(req.mode, qlr.mode, &st.SV) {
if lock.Conflicts(qg.mode, qlr.mode, &st.SV) {
conflicts = true
break
}
Expand Down
Loading