From a9beca44ef922572a116fab99cce3a61bc62a622 Mon Sep 17 00:00:00 2001 From: wei liu Date: Fri, 8 Nov 2024 18:46:27 +0800 Subject: [PATCH] fix: watch channel stuck due to misuse of timer.Reset (#37433) (#37542) issue: #37166 pr: #37433 cause the misuse of timer.Reset, which cause dispatcher failed to send msg to virtual channel buffer, and dispatcher do splitting again and again, which hold the dispatcher manager's lock, block watching channel progress. This PR fix the misuse of timer.Reset Signed-off-by: Wei Liu --- internal/datanode/channel_manager.go | 6 ++++++ pkg/mq/msgdispatcher/target.go | 7 +++++++ pkg/mq/msgdispatcher/target_test.go | 30 ++++++++++++++++++++++++++++ 3 files changed, 43 insertions(+) create mode 100644 pkg/mq/msgdispatcher/target_test.go diff --git a/internal/datanode/channel_manager.go b/internal/datanode/channel_manager.go index 9cdc78cc15c91..7291b65c16f85 100644 --- a/internal/datanode/channel_manager.go +++ b/internal/datanode/channel_manager.go @@ -392,6 +392,12 @@ func (r *opRunner) watchWithTimer(info *datapb.ChannelWatchInfo) *opState { case <-tickler.progressSig: log.Info("Reset timer for tickler updated", zap.Int32("current progress", tickler.progress())) + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } timer.Reset(watchTimeout) case <-successSig: diff --git a/pkg/mq/msgdispatcher/target.go b/pkg/mq/msgdispatcher/target.go index cce4aa5fa2db6..d1ccab6f9ad72 100644 --- a/pkg/mq/msgdispatcher/target.go +++ b/pkg/mq/msgdispatcher/target.go @@ -73,6 +73,13 @@ func (t *target) send(pack *MsgPack) error { if t.closed { return nil } + + if !t.timer.Stop() { + select { + case <-t.timer.C: + default: + } + } t.timer.Reset(t.maxLag) select { case <-t.cancelCh.CloseCh(): diff --git a/pkg/mq/msgdispatcher/target_test.go b/pkg/mq/msgdispatcher/target_test.go new file mode 100644 index 0000000000000..29e970068ea36 --- /dev/null +++ b/pkg/mq/msgdispatcher/target_test.go @@ -0,0 +1,30 @@ +package msgdispatcher + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +func TestSendTimeout(t *testing.T) { + target := newTarget("test1", &msgpb.MsgPosition{}) + + time.Sleep(paramtable.Get().MQCfg.MaxTolerantLag.GetAsDuration(time.Second)) + + counter := 0 + for i := 0; i < 10; i++ { + err := target.send(&msgstream.MsgPack{}) + if err != nil { + log.Error("send failed", zap.Int("idx", i), zap.Error(err)) + counter++ + } + } + assert.Equal(t, counter, 0) +}