Skip to content

Commit

Permalink
fix hung due to range lock to 2.0 (#20452)
Browse files Browse the repository at this point in the history
fix hung due to range lock

Approved by: @iamlinjunhong, @sukki37
  • Loading branch information
zhangxu19830126 authored Nov 29, 2024
1 parent e15d510 commit a41cc75
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 19 deletions.
4 changes: 0 additions & 4 deletions pkg/lockservice/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,6 @@ func (l Lock) release() {

func (l Lock) closeWaiter(w *waiter, logger *log.MOLogger) bool {
canRemove := func() bool {
if !l.isLockRow() {
panic("BUG: range lock cannot call closeWaiter")
}

if l.holders.size() > 0 {
return false
}
Expand Down
30 changes: 29 additions & 1 deletion pkg/lockservice/lock_table_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type localLockTable struct {

options struct {
beforeCloseFirstWaiter func(c *lockContext)
beforeWait func(c *lockContext) func()
afterWait func(c *lockContext) func()
}
}

Expand Down Expand Up @@ -152,7 +154,17 @@ func (l *localLockTable) doLock(
oldTxnID := c.txn.txnID
old = c.w
c.txn.Unlock()

if l.options.beforeWait != nil {
l.options.beforeWait(c)()
}

v := c.w.wait(c.ctx, l.logger)

if l.options.afterWait != nil {
l.options.afterWait(c)()
}

c.txn.Lock()

logLocalLockWaitOnResult(l.logger, c.txn, table, c.rows[c.idx], c.opts, c.w, v)
Expand Down Expand Up @@ -484,7 +496,8 @@ func (l *localLockTable) addRowLockLocked(
func (l *localLockTable) handleLockConflictLocked(
c *lockContext,
key []byte,
conflictWith Lock) error {
conflictWith Lock,
) error {
if c.opts.Policy == pb.WaitPolicy_FastFail {
return ErrLockConflict
}
Expand Down Expand Up @@ -512,6 +525,21 @@ func (l *localLockTable) handleLockConflictLocked(
// waiter added, we need to active deadlock check.
c.txn.setBlocked(c.w, l.logger)
logLocalLockWaitOn(l.logger, c.txn, l.bind.Table, c.w, key, conflictWith)

if c.opts.Granularity != pb.Granularity_Range {
return nil
}

if len(c.rangeLastWaitKey) > 0 {
v, ok := l.mu.store.Get(c.rangeLastWaitKey)
if !ok {
panic("BUG: missing range last wait key")
}
if ok && v.closeWaiter(c.w, l.logger) {
l.mu.store.Delete(c.rangeLastWaitKey)
}
}
c.rangeLastWaitKey = key
return nil
}

Expand Down
208 changes: 208 additions & 0 deletions pkg/lockservice/lock_table_local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,214 @@ func TestLocalNeedUpgrade(t *testing.T) {
)
}

func TestCannotHungIfRangeConflictWithRowMultiTimes(t *testing.T) {
runLockServiceTests(
t,
[]string{"s1"},
func(_ *lockTableAllocator, s []*service) {
l := s[0]
ctx, cancel := context.WithTimeout(
context.Background(),
time.Second*10,
)
defer cancel()

tableID := uint64(10)
add := func(
txn []byte,
rows [][]byte,
g pb.Granularity,
) {
mustAddTestLock(
t,
ctx,
l,
tableID,
txn,
rows,
g,
)
}

// workflow
//
// txn1 lock k4
// k4: holder(txn1) waiters()
//
// txn3 lock [k1, k4], wait at k4
// k4: holder(txn1) waiters(txn3)
//
// txn1 unlock, notify txn3 ------------------|
// k4: holder() waiters(txn3) |
// |
// txn2 lock range k2, k3 |
// k2: holder(txn2) waiters() |
// k3: holder(txn2) waiters() |
// k4: holder() waiters(txn3) |
// |
// txn3 lock [k1, k4] retry, wait at k2 <-----|
// k2: holder(txn2) waiters(txn3)
// k3: holder(txn2) waiters()
// k4: holder() waiters(txn3)
//
// txn4 lock k2, wait at k2 --------------------------------|
// k2: holder(txn2) waiters(txn3, txn4) |
// k3: holder(txn2) waiters() |
// k4: holder() waiters(txn3) |
// |
// |
// txn2 unlock, notify txn3, txn4 |
// k2: holder(txn2) waiters(txn3, txn4) -> deleted |
// k3: holder(txn2) waiters() -> deleted |
// k4: holder() waiters(txn3) |
// |
// txn4 lock k2 retry <-------------------------------------|
// k2: holder(txn4) waiters(txn3)
// k4: holder() waiters(txn3)
//
// txn3 lock [k1, k4] retry, wait at k2
// k2: holder(txn4) waiters(txn3)
// k4: holder() waiters(txn3)
//
// txn4 lock k4, wait txn3
// k2: holder(txn4) waiters()
// k4: holder() waiters(txn3, txn4)

// txn1 hold row1
txn1 := []byte{1}
txn2 := []byte{2}
txn3 := []byte{3}
txn4 := []byte{4}

key2 := newTestRows(2)
key4 := newTestRows(4)
range23 := newTestRows(2, 3)
range14 := newTestRows(1, 4)

txn2Locked := make(chan struct{})
txn4WaitAt2 := make(chan struct{})
txn4GetLockAt1 := make(chan struct{})
startTxn3 := make(chan struct{})
txn3WaitAt2 := make(chan struct{})
txn3WaitAt2Again := make(chan struct{})
txn3WaitAt4 := make(chan struct{})
txn3NotifiedAt4 := make(chan struct{})
var once sync.Once

// txn1 lock k4
add(txn1, key4, pb.Granularity_Row)
close(startTxn3)

v, err := l.getLockTable(0, tableID)
require.NoError(t, err)
lt := v.(*localLockTable)
txn3WaitTimes := 0
lt.options.beforeWait = func(c *lockContext) func() {
if bytes.Equal(c.txn.txnID, txn3) {
return func() {
if txn3WaitTimes == 0 {
// txn3 wait at key4
close(txn3WaitAt4)
txn3WaitTimes++
return
}

if txn3WaitTimes == 1 {
close(txn3WaitAt2)
txn3WaitTimes++
return
}

if txn3WaitTimes == 2 {
// step10: txn4 retry lock and wait at key2 again
close(txn3WaitAt2Again)
txn3WaitTimes++
return
}
}
}

if bytes.Equal(c.txn.txnID, txn4) {
return func() {
once.Do(func() {
close(txn4WaitAt2)
})
}
}

return func() {}
}

txn3NotifiedTimes := 0
lt.options.afterWait = func(c *lockContext) func() {
if bytes.Equal(c.txn.txnID, txn3) {
return func() {
if txn3NotifiedTimes == 0 {
// txn1 closed and txn3 get notified
close(txn3NotifiedAt4)
txn3NotifiedTimes++
<-txn2Locked
return
}

if txn3NotifiedTimes == 1 {
<-txn4GetLockAt1
}
}
}
return func() {}
}

var wg sync.WaitGroup
wg.Add(5)

go func() {
defer wg.Done()
<-startTxn3
// txn3 lock range [k1, k4]
add(txn3, range14, pb.Granularity_Range)
}()

go func() {
defer wg.Done()
<-txn3WaitAt4
// txn1 unlock
require.NoError(t, l.Unlock(ctx, txn1, timestamp.Timestamp{}))
}()

go func() {
defer wg.Done()
<-txn3NotifiedAt4
// txn2 lock range [k3, k3]
add(txn2, range23, pb.Granularity_Range)
close(txn2Locked)
}()

go func() {
defer wg.Done()
<-txn3WaitAt2
// txn4 lock k2
add(txn4, key2, pb.Granularity_Row)
close(txn4GetLockAt1)
<-txn3WaitAt2Again

// txn4 lock k4
add(txn4, key4, pb.Granularity_Row)

require.NoError(t, l.Unlock(ctx, txn4, timestamp.Timestamp{}))
}()

go func() {
defer wg.Done()
<-txn4WaitAt2
require.NoError(t, l.Unlock(ctx, txn2, timestamp.Timestamp{}))
}()

wg.Wait()
},
)
}

type target struct {
Start string `json:"start"`
End string `json:"end"`
Expand Down
29 changes: 15 additions & 14 deletions pkg/lockservice/waiter_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,21 @@ func init() {
}

type lockContext struct {
ctx context.Context
txn *activeTxn
waitTxn pb.WaitTxn
rows [][]byte
opts LockOptions
offset int
idx int
lockedTS timestamp.Timestamp
result pb.Result
cb func(pb.Result, error)
lockFunc func(*lockContext, bool)
w *waiter
createAt time.Time
closed bool
ctx context.Context
txn *activeTxn
waitTxn pb.WaitTxn
rows [][]byte
opts LockOptions
offset int
idx int
lockedTS timestamp.Timestamp
result pb.Result
cb func(pb.Result, error)
lockFunc func(*lockContext, bool)
w *waiter
createAt time.Time
closed bool
rangeLastWaitKey []byte
}

func (l *localLockTable) newLockContext(
Expand Down

0 comments on commit a41cc75

Please sign in to comment.