Skip to content

Commit

Permalink
fix waiter leak (#20441)
Browse files Browse the repository at this point in the history
fix waiter leak

Approved by: @zhangxu19830126, @sukki37
  • Loading branch information
iamlinjunhong authored Nov 29, 2024
1 parent a8b4f9c commit 2bb1f98
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 0 deletions.
10 changes: 10 additions & 0 deletions pkg/lockservice/lock_table_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (l *localLockTable) doLock(
c *lockContext,
blocked bool) {
var old *waiter
var oldOffset int
var err error
table := l.bind.Table
for {
Expand Down Expand Up @@ -129,6 +130,14 @@ func (l *localLockTable) doLock(
return
}

if oldOffset != c.offset {
if old != nil {
old.disableNotify()
old.close("doLock, lock next row", l.logger)
}
c.txn.clearBlocked(old, l.logger)
}

// we handle remote lock on current rpc io read goroutine, so we can not wait here, otherwise
// the rpc will be blocked.
if c.opts.async {
Expand All @@ -139,6 +148,7 @@ func (l *localLockTable) doLock(
// txn is locked by service.lock or service_remote.lock. We must unlock here. And lock again after
// wait result. Because during wait, deadlock detection may be triggered, and need call txn.fetchWhoWaitingMe,
// or other concurrent txn method.
oldOffset = c.offset
oldTxnID := c.txn.txnID
old = c.w
c.txn.Unlock()
Expand Down
76 changes: 76 additions & 0 deletions pkg/lockservice/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3468,6 +3468,82 @@ func TestLockResultWithConflictAndTxnAborted(t *testing.T) {
)
}

func TestIssue19913(t *testing.T) {
runLockServiceTests(
t,
[]string{"s1"},
func(alloc *lockTableAllocator, s []*service) {
err := os.Setenv("mo_reuse_enable_checker", "true")
require.NoError(t, err)
l1 := s[0]

ctx, cancel := context.WithTimeout(
context.Background(),
time.Second*10)
defer cancel()
option := pb.LockOptions{
Granularity: pb.Granularity_Row,
Mode: pb.LockMode_Exclusive,
Policy: pb.WaitPolicy_Wait,
}

_, err = l1.Lock(
ctx,
0,
[][]byte{{1}},
[]byte("txn1"),
option)
require.NoError(t, err)

_, err = l1.Lock(
ctx,
0,
[][]byte{{2}},
[]byte("txn2"),
option)
require.NoError(t, err)

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// blocked by txn1
_, err := l1.Lock(
ctx,
0,
newTestRows(1, 2),
[]byte("txn3"),
option)
require.NoError(t, err)
}()

waitWaiters(t, l1, 0, []byte{1}, 1)

w := l1.activeTxnHolder.getActiveTxn([]byte("txn3"), false, "").blockedWaiters[0]

require.NoError(t, l1.Unlock(
ctx,
[]byte("txn1"),
timestamp.Timestamp{}))

waitWaiters(t, l1, 0, []byte{2}, 1)

require.NoError(t, l1.Unlock(
ctx,
[]byte("txn2"),
timestamp.Timestamp{}))
wg.Wait()

require.NoError(t, l1.Unlock(
ctx,
[]byte("txn3"),
timestamp.Timestamp{}))

require.Less(t, w.refCount.Load(), int32(2))
},
)
}

func TestRowLockWithConflictAndUnlock(t *testing.T) {
table := uint64(0)
getRunner(false)(
Expand Down

0 comments on commit 2bb1f98

Please sign in to comment.