Skip to content

Commit

Permalink
concurrency: disallow lock promotion from shared to exclusive/intent
Browse files Browse the repository at this point in the history
We do so by checking both the list of lock holders and any queued
requests that belong to our transaction. If we discover the lock is
being (or will be in the queued requests check) promoted from shared
to exclusive, we return an error to the client.

References cockroachdb#110435

Release note: None
  • Loading branch information
arulajmani committed Sep 12, 2023
1 parent 7463887 commit 8205b43
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 67 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ type lockTable interface {
// lockTableGuard and the subsequent calls reuse the previously returned
// one. The latches needed by the request must be held when calling this
// function.
ScanAndEnqueue(Request, lockTableGuard) lockTableGuard
ScanAndEnqueue(Request, lockTableGuard) (lockTableGuard, *Error)

// ScanOptimistic takes a snapshot of the lock table for later checking for
// conflicts, and returns a guard. It is for optimistic evaluation of
Expand Down Expand Up @@ -760,7 +760,7 @@ type lockTableGuard interface {
NewStateChan() chan struct{}

// CurState returns the latest waiting state.
CurState() waitingState
CurState() (waitingState, error)

// ResolveBeforeScanning lists the locks to resolve before scanning again.
// This must be called after:
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,10 @@ func (m *managerImpl) sequenceReqWithGuard(
} else {
// Scan for conflicting locks.
log.Event(ctx, "scanning lock table for conflicting locks")
g.ltg = m.lt.ScanAndEnqueue(g.Req, g.ltg)
g.ltg, err = m.lt.ScanAndEnqueue(g.Req, g.ltg)
if err != nil {
return nil, err
}
}

// Wait on conflicting locks, if necessary. Note that this will never be
Expand Down
130 changes: 93 additions & 37 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,19 +568,22 @@ func (g *lockTableGuardImpl) NewStateChan() chan struct{} {
return g.mu.signal
}

func (g *lockTableGuardImpl) CurState() waitingState {
func (g *lockTableGuardImpl) CurState() (waitingState, error) {
g.mu.Lock()
defer g.mu.Unlock()
if !g.mu.mustComputeWaitingState {
return g.mu.state
return g.mu.state, nil
}
// Not actively waiting anywhere so no one else can set
// mustComputeWaitingState to true while this method executes.
g.mu.mustComputeWaitingState = false
g.mu.Unlock()
g.resumeScan(false /* notify */)
err := g.resumeScan(false /* notify */)
g.mu.Lock() // Unlock deferred
return g.mu.state
if err != nil {
return waitingState{}, err
}
return g.mu.state, nil
}

// updateStateToDoneWaitingLocked updates the request's waiting state to
Expand Down Expand Up @@ -700,7 +703,11 @@ func (g *lockTableGuardImpl) IsKeyLockedByConflictingTxn(
return false, nil, nil
}

if l.alreadyHoldsLockAndIsAllowedToProceed(g, str) {
isAllowedToProceed, err := l.alreadyHoldsLockAndIsAllowedToProceed(g, str)
if err != nil {
return false, nil, err
}
if isAllowedToProceed {
// If another request from this transaction has already locked this key with
// sufficient locking strength then there's no conflict; we can proceed.
return false, nil, nil
Expand Down Expand Up @@ -859,7 +866,7 @@ func (g *lockTableGuardImpl) takeToResolveUnreplicated() []roachpb.LockUpdate {
// etc.
//
// ACQUIRES: g.mu.
func (g *lockTableGuardImpl) resumeScan(notify bool) {
func (g *lockTableGuardImpl) resumeScan(notify bool) error {
spans := g.spans.GetSpans(g.curStrength())
var span *roachpb.Span
resumingInSameSpan := false
Expand Down Expand Up @@ -908,9 +915,12 @@ func (g *lockTableGuardImpl) resumeScan(notify bool) {
// Else, past the lock where it stopped waiting. We may not
// encounter that lock since it may have been garbage collected.
}
conflicts := l.scanAndMaybeEnqueue(g, notify)
conflicts, err := l.scanAndMaybeEnqueue(g, notify)
if err != nil {
return err
}
if conflicts {
return
return nil
}
}
resumingInSameSpan = false
Expand Down Expand Up @@ -951,6 +961,7 @@ func (g *lockTableGuardImpl) resumeScan(notify bool) {
if notify {
g.notify()
}
return nil
}

// queuedGuard is used to wrap waiting locking requests in the keyLocks struct.
Expand Down Expand Up @@ -2327,32 +2338,39 @@ func (kl *keyLocks) lockAcquiredOrDiscovered(tl *txnLock) {
// need resolution.
//
// REQUIRES: kl.mu to be locked.
func (kl *keyLocks) scanAndMaybeEnqueue(g *lockTableGuardImpl, notify bool) (wait bool) {
func (kl *keyLocks) scanAndMaybeEnqueue(g *lockTableGuardImpl, notify bool) (wait bool, _ error) {
kl.mu.Lock()
defer kl.mu.Unlock()
if kl.isEmptyLock() {
return false /* wait */
return false /* wait */, nil
}

// It is possible that the lock is already held by this request's
// transaction, and it is held with a lock strength good enough for it.
if kl.alreadyHoldsLockAndIsAllowedToProceed(g, g.curStrength()) {
return false /* wait */
isAllowedToProceed, err := kl.alreadyHoldsLockAndIsAllowedToProceed(g, g.curStrength())
if err != nil {
return false, err
}
if isAllowedToProceed {
return false /* wait */, nil
}

if g.curStrength() == lock.None {
conflicts := kl.maybeEnqueueNonLockingReadRequest(g)
if conflicts {
ws := kl.constructWaitingState(g)
g.startWaitingWithWaitingState(ws, notify)
return true /* wait */
return true /* wait */, nil
}
return false /* wait */
return false /* wait */, nil
}

// We're purely dealing with locking requests from here on out.

maxQueueLengthExceeded := kl.enqueueLockingRequest(g)
maxQueueLengthExceeded, err := kl.enqueueLockingRequest(g)
if err != nil {
return false, err
}
if maxQueueLengthExceeded {
// NB: Requests that encounter a lock wait-queue that is longer than
// what they're willing to wait for are rejected by the lock table
Expand All @@ -2362,7 +2380,7 @@ func (kl *keyLocks) scanAndMaybeEnqueue(g *lockTableGuardImpl, notify bool) (wai
g.startWaitingWithWaitingState(ws, notify)
// Return true, not because we want to wait, but because we want
// this request to be rejected in the lock table waiter.
return true /* wait */
return true /* wait */, nil
}

if kl.shouldRequestActivelyWait(g) {
Expand All @@ -2383,14 +2401,14 @@ func (kl *keyLocks) scanAndMaybeEnqueue(g *lockTableGuardImpl, notify bool) (wai
// need to call informActiveWaiters. Note that informActiveWaiters elides
// updates if they're not meaningful, so we can get away with being less
// precise in handling the more general case at this level.
return true /* wait */
return true /* wait */, nil
}

kl.claimBeforeProceeding(g)
// Inform any active waiters that (may) need to be made aware that this
// request acquired a claim.
kl.informActiveWaiters()
return false /* wait */
return false /* wait */, nil
}

// constructWaitingState constructs the waiting state the supplied request
Expand Down Expand Up @@ -2424,36 +2442,60 @@ func (kl *keyLocks) constructWaitingState(g *lockTableGuardImpl) waitingState {
// REQUIRES: kl.mu to be locked.
func (kl *keyLocks) alreadyHoldsLockAndIsAllowedToProceed(
g *lockTableGuardImpl, str lock.Strength,
) bool {
) (bool, error) {
if !kl.isLocked() {
return false // no one holds the lock
return false, nil // no one holds the lock
}
if g.txn == nil {
return false // non-transactional requests do not hold locks
return false, nil // non-transactional requests do not hold locks
}
e, found := kl.heldBy[g.txn.ID]
if !found {
return false
return false, nil
}
tl := e.Value
heldMode := tl.getLockMode()
err := kl.maybeDisallowLockPromotion(heldMode.Strength, str)
if err != nil {
return false, err
}
// Check if the lock is already held by the guard's transaction with an equal
// or higher lock strength. If it is, we're good to go. Otherwise, the request
// is trying to promote a lock it previously acquired. In such cases, the
// existence of a lock with weaker strength doesn't do much for this request.
// It's no different than the case where its trying to acquire a fresh lock.
// It's no different from the case where it's trying to acquire a fresh lock.
return str <= heldMode.Strength ||
// TODO(arul): We want to allow requests that are writing to keys that they
// hold exclusive locks on to "jump ahead" of any potential waiters. This
// prevents deadlocks. The logic here is a bandaid until we implement a
// prevents deadlocks. The logic here is a band-aid until we implement a
// solution for the general case of arbitrary lock upgrades (e.g. shared ->
// exclusive, etc.). We'll do so by prioritizing requests from transaction's
// that hold locks over transactions that don't when storing them in the
// list of queuedLockingRequests. Instead of sorting the list of
// queuedLockingRequests just based on sequence numbers alone, we'll instead
// use (belongsToALockHolderTxn, sequence number) to construct the sort
// order.
(str == lock.Intent && heldMode.Strength == lock.Exclusive)
(str == lock.Intent && heldMode.Strength == lock.Exclusive), nil
}

// maybeDisallowLockPromotion checks if a lock is being promoted from
// lock.Shared to lock.Intent/lock.Exclusive, and returns an error if that's the
// case. See: https://github.com/cockroachdb/cockroach/issues/110435.
//
// REQUIRES: kl.mu to be locked.
//
// TODO(arul): Once we handle lock promotion correctly, and this function goes
// away, a lot of the error paths where this error is bubbled up to can go away.
func (kl *keyLocks) maybeDisallowLockPromotion(
held lock.Strength, reAcquisitionStr lock.Strength,
) error {
if held == lock.Shared && reAcquisitionStr > held {
return errors.UnimplementedErrorf(
errors.IssueLink{IssueURL: "https://github.com/cockroachdb/cockroach/issues/110435"},
"lock promotion from %s to %s is not allowed", held, reAcquisitionStr,
)
}
return nil
}

// conflictsWithLockHolders returns true if the request, referenced by the
Expand Down Expand Up @@ -2606,7 +2648,9 @@ func (kl *keyLocks) maybeEnqueueNonLockingReadRequest(g *lockTableGuardImpl) (co
// this case is returned to the caller.
//
// REQUIRES: kl.mu to be locked.
func (kl *keyLocks) enqueueLockingRequest(g *lockTableGuardImpl) (maxQueueLengthExceeded bool) {
func (kl *keyLocks) enqueueLockingRequest(
g *lockTableGuardImpl,
) (maxQueueLengthExceeded bool, _ error) {
assert(g.curStrength() != lock.None, "should only be called with a locking request")
g.mu.Lock()
defer g.mu.Unlock()
Expand All @@ -2624,7 +2668,7 @@ func (kl *keyLocks) enqueueLockingRequest(g *lockTableGuardImpl) (maxQueueLength
// it may be a candidate for becoming the distinguished waiter (if one
// doesn't exist already).
kl.maybeMakeDistinguishedWaiter(g)
return false /* maxQueueLengthExceeded */
return false /* maxQueueLengthExceeded */, nil
}
}
panic("lock table bug")
Expand All @@ -2638,7 +2682,7 @@ func (kl *keyLocks) enqueueLockingRequest(g *lockTableGuardImpl) (maxQueueLength
// queue and rejecting the tail of the queue above the max length. That
// would be more fair, but more complicated, and we expect that the
// common case is that this waiter will be at the end of the queue.
return true /* maxQueueLengthExceeded */
return true /* maxQueueLengthExceeded */, nil
}
qg := &queuedGuard{
guard: g,
Expand All @@ -2648,22 +2692,31 @@ func (kl *keyLocks) enqueueLockingRequest(g *lockTableGuardImpl) (maxQueueLength
// The request isn't in the queue. Add it in the correct position, based on
// its sequence number.
var e *list.Element[*queuedGuard]
for e = kl.queuedLockingRequests.Back(); e != nil; e = e.Prev() {
for e = kl.queuedLockingRequests.Front(); e != nil; e = e.Next() {
qqg := e.Value
if qqg.guard.seqNum < qg.guard.seqNum {
if qqg.guard.seqNum > qg.guard.seqNum {
break
}
if qg.guard.txn != nil && qqg.guard.isSameTxn(qg.guard.txnMeta()) {
// There's another request, from our transaction, that's waiting to acquire
// a lock on this key before us. As per its sequence number, it'll get a
// chance to do so before we do -- assuming it'll succeed, check if we've
// got ourselves into a lock promotion case that's not allowed.
if err := kl.maybeDisallowLockPromotion(qqg.mode.Strength, qg.mode.Strength); err != nil {
return false, err
}
}
}
if e == nil {
kl.queuedLockingRequests.PushFront(qg)
kl.queuedLockingRequests.PushBack(qg)
} else {
kl.queuedLockingRequests.InsertAfter(qg, e)
kl.queuedLockingRequests.InsertBefore(qg, e)
}
// This request may be a candidate to become a distinguished waiter if one
// doesn't exist yet; try making it such.
kl.maybeMakeDistinguishedWaiter(g)
g.maybeAddToLocksMap(kl, g.curStrength())
return false /* maxQueueLengthExceeded */
return false /* maxQueueLengthExceeded */, nil
}

// maybeMakeDistinguishedWaiter designates the supplied request as the
Expand Down Expand Up @@ -3613,7 +3666,7 @@ func (t *lockTableImpl) ScanOptimistic(req Request) lockTableGuard {
}

// ScanAndEnqueue implements the lockTable interface.
func (t *lockTableImpl) ScanAndEnqueue(req Request, guard lockTableGuard) lockTableGuard {
func (t *lockTableImpl) ScanAndEnqueue(req Request, guard lockTableGuard) (lockTableGuard, *Error) {
// NOTE: there is no need to synchronize with enabledMu here. ScanAndEnqueue
// scans the lockTable and enters any conflicting lock wait-queues, but a
// disabled lockTable will be empty. If the scan's btree snapshot races with
Expand Down Expand Up @@ -3642,17 +3695,20 @@ func (t *lockTableImpl) ScanAndEnqueue(req Request, guard lockTableGuard) lockTa
// snapshot but does not scan the lock table when sequencing. Instead, it
// calls into IsKeyLockedByConflictingTxn before adding keys to its result
// set to determine which keys it should skip.
return g
return g, nil
}

g.resumeScan(true /* notify */)
err := g.resumeScan(true /* notify */)
if err != nil {
return nil, kvpb.NewError(err)
}
if g.notRemovableLock != nil {
// Either waiting at the notRemovableLock, or elsewhere. Either way we are
// making forward progress, which ensures liveness.
g.notRemovableLock.decrementNotRemovable()
g.notRemovableLock = nil
}
return g
return g, nil
}

func (t *lockTableImpl) newGuardForReq(req Request) *lockTableGuardImpl {
Expand Down
Loading

0 comments on commit 8205b43

Please sign in to comment.