diff --git a/internal/datanode/channel_manager.go b/internal/datanode/channel_manager.go index 9cdc78cc15c91..7291b65c16f85 100644 --- a/internal/datanode/channel_manager.go +++ b/internal/datanode/channel_manager.go @@ -392,6 +392,12 @@ func (r *opRunner) watchWithTimer(info *datapb.ChannelWatchInfo) *opState { case <-tickler.progressSig: log.Info("Reset timer for tickler updated", zap.Int32("current progress", tickler.progress())) + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } timer.Reset(watchTimeout) case <-successSig: diff --git a/pkg/mq/msgdispatcher/target.go b/pkg/mq/msgdispatcher/target.go index cce4aa5fa2db6..d1ccab6f9ad72 100644 --- a/pkg/mq/msgdispatcher/target.go +++ b/pkg/mq/msgdispatcher/target.go @@ -73,6 +73,13 @@ func (t *target) send(pack *MsgPack) error { if t.closed { return nil } + + if !t.timer.Stop() { + select { + case <-t.timer.C: + default: + } + } t.timer.Reset(t.maxLag) select { case <-t.cancelCh.CloseCh(): diff --git a/pkg/mq/msgdispatcher/target_test.go b/pkg/mq/msgdispatcher/target_test.go new file mode 100644 index 0000000000000..29e970068ea36 --- /dev/null +++ b/pkg/mq/msgdispatcher/target_test.go @@ -0,0 +1,30 @@ +package msgdispatcher + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +func TestSendTimeout(t *testing.T) { + target := newTarget("test1", &msgpb.MsgPosition{}) + + time.Sleep(paramtable.Get().MQCfg.MaxTolerantLag.GetAsDuration(time.Second)) + + counter := 0 + for i := 0; i < 10; i++ { + err := target.send(&msgstream.MsgPack{}) + if err != nil { + log.Error("send failed", zap.Int("idx", i), zap.Error(err)) + counter++ + } + } + assert.Equal(t, counter, 0) +}