Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
45601: storage/concurrency: release reqs from same txn as discovered lock r=nvanbenschoten a=nvanbenschoten

This commit updates the AddDiscoveredLock state transition in lockTableImpl to release all waiting writers that are part of the same transaction as a discovered lock.

This caused issues in cockroachdb#45482, where we saw txn deadlocks in the `pkg/sql/tests.Bank` benchmark. This issue was triggered because we were forgetting to inform the lockTable of a lock acquisition in a subtle case. That is now fixed and it's not clear that we can actually hit the bug here anymore given the current policy on how wait-queues form in the lock-table. Regardless, this is worth handling correctly in case things ever change.

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Mar 4, 2020
2 parents b9aa44e + 7381a72 commit cdc5681
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 90 deletions.
181 changes: 92 additions & 89 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,12 +676,70 @@ type lockWaitQueue struct {
func (l *lockState) ID() uint64 { return l.id }
func (l *lockState) Key() []byte { return l.key }
func (l *lockState) EndKey() []byte { return l.endKey }
func (l *lockState) String() string { return string(l.key) }
func (l *lockState) New() *lockState { return new(lockState) }
func (l *lockState) SetID(v uint64) { l.id = v }
func (l *lockState) SetKey(v []byte) { l.key = v }
func (l *lockState) SetEndKey(v []byte) { l.endKey = v }

// REQUIRES: l.mu is locked.
func (l *lockState) String() string {
var buf strings.Builder
l.Format(&buf)
return buf.String()
}

// REQUIRES: l.mu is locked.
func (l *lockState) Format(buf *strings.Builder) {
fmt.Fprintf(buf, " lock: %s\n", l.key)
if l.isEmptyLock() {
fmt.Fprintln(buf, " empty")
return
}
waitingOnStr := func(txn *enginepb.TxnMeta, ts hlc.Timestamp) string {
// TODO(sbhola): strip the leading 0 bytes from the UUID string since tests are assigning
// UUIDs using a counter and makes this output more readable.
var seqStr string
if txn.Sequence != 0 {
seqStr = fmt.Sprintf(", seq: %v", txn.Sequence)
}
return fmt.Sprintf("txn: %v, ts: %v%s", txn.ID, ts, seqStr)
}
txn, ts, _ := l.getLockerInfo()
if txn == nil {
fmt.Fprintf(buf, " res: req: %d, %s\n",
l.reservation.seqNum, waitingOnStr(l.reservation.txn, l.reservation.writeTS))
} else {
fmt.Fprintf(buf, " holder: %s\n", waitingOnStr(txn, ts))
}
if l.waitingReaders.Len() > 0 {
fmt.Fprintln(buf, " waiting readers:")
for e := l.waitingReaders.Front(); e != nil; e = e.Next() {
g := e.Value.(*lockTableGuardImpl)
txnStr := "none"
if g.txn != nil {
txnStr = fmt.Sprintf("%v", g.txn.ID)
}
fmt.Fprintf(buf, " req: %d, txn: %s\n", g.seqNum, txnStr)
}
}
if l.queuedWriters.Len() > 0 {
fmt.Fprintln(buf, " queued writers:")
for e := l.queuedWriters.Front(); e != nil; e = e.Next() {
qg := e.Value.(*queuedGuard)
g := qg.guard
txnStr := "none"
if g.txn != nil {
txnStr = fmt.Sprintf("%v", g.txn.ID)
}
fmt.Fprintf(buf, " active: %t req: %d, txn: %s\n",
qg.active, qg.guard.seqNum, txnStr)
}
}
if l.distinguishedWaiter != nil {
fmt.Fprintf(buf, " distinguished req: %d\n", l.distinguishedWaiter.seqNum)
}
}

// Called for a write request when there is a reservation. Returns true iff it
// succeeds.
// REQUIRES: l.mu is locked.
Expand Down Expand Up @@ -772,6 +830,31 @@ func (l *lockState) informActiveWaiters() {
}
}

// releaseWritersFromTxn removes all waiting writers for the lockState that are
// part of the specified transaction.
// REQUIRES: l.mu is locked.
func (l *lockState) releaseWritersFromTxn(txn *enginepb.TxnMeta) {
for e := l.queuedWriters.Front(); e != nil; {
qg := e.Value.(*queuedGuard)
curr := e
e = e.Next()
g := qg.guard
if g.isTxn(txn) {
if qg.active {
if g == l.distinguishedWaiter {
l.distinguishedWaiter = nil
}
g.doneWaitingAtLock(false, l)
} else {
g.mu.Lock()
delete(g.mu.locks, l)
g.mu.Unlock()
}
l.queuedWriters.Remove(curr)
}
}
}

// When the active waiters have shrunk and the distinguished waiter has gone,
// try to make a new distinguished waiter if there is at least 1 active
// waiter.
Expand Down Expand Up @@ -1054,11 +1137,9 @@ func (l *lockState) acquireLock(
// possibility that some other request has broken this reservation because
// of a concurrent release but that is harmless since this request is
// holding latches and has proceeded to evaluation.
brokeReservation := false
if l.reservation != nil {
if l.reservation.txn.ID != txn.ID {
// Reservation is broken.
brokeReservation = true
qg := &queuedGuard{
guard: l.reservation,
active: false,
Expand Down Expand Up @@ -1090,29 +1171,7 @@ func (l *lockState) acquireLock(
l.holder.holder[durability].seqs = append([]enginepb.TxnSeq(nil), txn.Sequence)

// If there are waiting requests from the same txn, they no longer need to wait.
for e := l.queuedWriters.Front(); e != nil; {
qg := e.Value.(*queuedGuard)
curr := e
e = e.Next()
g := qg.guard
if g.isTxn(txn) {
if qg.active {
if g == l.distinguishedWaiter {
if brokeReservation {
l.distinguishedWaiter = nil
} else {
panic("lockTable bug")
}
}
g.doneWaitingAtLock(false, l)
} else {
g.mu.Lock()
delete(g.mu.locks, l)
g.mu.Unlock()
}
l.queuedWriters.Remove(curr)
}
}
l.releaseWritersFromTxn(txn)

// Inform active waiters since lock has transitioned to held.
l.informActiveWaiters()
Expand All @@ -1128,7 +1187,6 @@ func (l *lockState) discoveredLock(
l.mu.Lock()
defer l.mu.Unlock()

informWaiters := true
if l.holder.locked {
if !l.isLockedBy(txn.ID) {
panic("bug in caller or lockTable")
Expand All @@ -1137,7 +1195,6 @@ func (l *lockState) discoveredLock(
l.holder.holder[lock.Replicated].txn = txn
l.holder.holder[lock.Replicated].ts = ts
}
informWaiters = false
} else {
l.holder.locked = true
l.holder.holder[lock.Replicated].txn = txn
Expand All @@ -1156,11 +1213,6 @@ func (l *lockState) discoveredLock(
}
l.queuedWriters.PushFront(qg)
l.reservation = nil
} else {
// No reservation, so either the lock was already known to be held, in
// which case the active waiters know about the holder, or it was not held
// and so there are no active waiters.
informWaiters = false
}

switch sa {
Expand Down Expand Up @@ -1208,10 +1260,11 @@ func (l *lockState) discoveredLock(
}
}

// If there are waiting requests from the same txn, they no longer need to wait.
l.releaseWritersFromTxn(txn)

// Active waiters need to be told about who they are waiting for.
if informWaiters {
l.informActiveWaiters()
}
l.informActiveWaiters()
return nil
}

Expand Down Expand Up @@ -1825,57 +1878,6 @@ func (t *lockTableImpl) Clear() {
// For tests.
func (t *lockTableImpl) String() string {
var buf strings.Builder
waitingOnStr := func(txn *enginepb.TxnMeta, ts hlc.Timestamp) string {
// TODO(sbhola): strip the leading 0 bytes from the UUID string since tests are assigning
// UUIDs using a counter and makes this output more readable.
var seqStr string
if txn.Sequence != 0 {
seqStr = fmt.Sprintf(", seq: %v", txn.Sequence)
}
return fmt.Sprintf("txn: %v, ts: %v%s", txn.ID, ts, seqStr)
}
lockStateStrings := func(l *lockState) {
l.mu.Lock()
defer l.mu.Unlock()
if l.isEmptyLock() {
fmt.Fprintln(&buf, "empty")
return
}
txn, ts, _ := l.getLockerInfo()
if txn == nil {
fmt.Fprintf(&buf, " res: req: %d, %s\n",
l.reservation.seqNum, waitingOnStr(l.reservation.txn, l.reservation.writeTS))
} else {
fmt.Fprintf(&buf, " holder: %s\n", waitingOnStr(txn, ts))
}
if l.waitingReaders.Len() > 0 {
fmt.Fprintln(&buf, " waiting readers:")
for e := l.waitingReaders.Front(); e != nil; e = e.Next() {
g := e.Value.(*lockTableGuardImpl)
txnStr := "none"
if g.txn != nil {
txnStr = fmt.Sprintf("%v", g.txn.ID)
}
fmt.Fprintf(&buf, " req: %d, txn: %s\n", g.seqNum, txnStr)
}
}
if l.queuedWriters.Len() > 0 {
fmt.Fprintln(&buf, " queued writers:")
for e := l.queuedWriters.Front(); e != nil; e = e.Next() {
qg := e.Value.(*queuedGuard)
g := qg.guard
txnStr := "none"
if g.txn != nil {
txnStr = fmt.Sprintf("%v", g.txn.ID)
}
fmt.Fprintf(&buf, " active: %t req: %d, txn: %s\n",
qg.active, qg.guard.seqNum, txnStr)
}
}
if l.distinguishedWaiter != nil {
fmt.Fprintf(&buf, " distinguished req: %d\n", l.distinguishedWaiter.seqNum)
}
}
for i := 0; i < len(t.locks); i++ {
tree := &t.locks[i]
scope := spanset.SpanScope(i).String()
Expand All @@ -1884,8 +1886,9 @@ func (t *lockTableImpl) String() string {
iter := tree.MakeIter()
for iter.First(); iter.Valid(); iter.Next() {
l := iter.Cur()
fmt.Fprintf(&buf, " lock: %s\n", l.key)
lockStateStrings(l)
l.mu.Lock()
l.Format(&buf)
l.mu.Unlock()
}
tree.mu.RUnlock()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/concurrency/lock_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
Test needs to handle caller constraints wrt latches being held. The datadriven
test uses the following format:
new-locktable maxlocks=<int>
new-lock-table maxlocks=<int>
----
Creates a lockTable.
Expand Down
Loading

0 comments on commit cdc5681

Please sign in to comment.