Skip to content

Commit

Permalink
fix: watch channel stuck due to misuse of timer.Reset (#37433) (#37542)
Browse files Browse the repository at this point in the history
issue: #37166
pr: #37433
cause the misuse of timer.Reset, which cause dispatcher failed to send
msg to virtual channel buffer, and dispatcher do splitting again and
again, which hold the dispatcher manager's lock, block watching channel
progress.

This PR fix the misuse of timer.Reset

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Nov 8, 2024
1 parent bff0113 commit a9beca4
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 0 deletions.
6 changes: 6 additions & 0 deletions internal/datanode/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,12 @@ func (r *opRunner) watchWithTimer(info *datapb.ChannelWatchInfo) *opState {

case <-tickler.progressSig:
log.Info("Reset timer for tickler updated", zap.Int32("current progress", tickler.progress()))
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(watchTimeout)

case <-successSig:
Expand Down
7 changes: 7 additions & 0 deletions pkg/mq/msgdispatcher/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ func (t *target) send(pack *MsgPack) error {
if t.closed {
return nil
}

if !t.timer.Stop() {
select {
case <-t.timer.C:
default:
}
}
t.timer.Reset(t.maxLag)
select {
case <-t.cancelCh.CloseCh():
Expand Down
30 changes: 30 additions & 0 deletions pkg/mq/msgdispatcher/target_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package msgdispatcher

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

func TestSendTimeout(t *testing.T) {
target := newTarget("test1", &msgpb.MsgPosition{})

time.Sleep(paramtable.Get().MQCfg.MaxTolerantLag.GetAsDuration(time.Second))

counter := 0
for i := 0; i < 10; i++ {
err := target.send(&msgstream.MsgPack{})
if err != nil {
log.Error("send failed", zap.Int("idx", i), zap.Error(err))
counter++
}
}
assert.Equal(t, counter, 0)
}

0 comments on commit a9beca4

Please sign in to comment.