diff --git a/consumer/process_queue.go b/consumer/process_queue.go index ded8d085..c07cf7e4 100644 --- a/consumer/process_queue.go +++ b/consumer/process_queue.go @@ -20,13 +20,13 @@ package consumer import ( "strconv" "sync" - "sync/atomic" + "time" "github.com/emirpasic/gods/maps/treemap" "github.com/emirpasic/gods/utils" gods_util "github.com/emirpasic/gods/utils" - uatomic "go.uber.org/atomic" + "go.uber.org/atomic" "github.com/apache/rocketmq-client-go/v2/internal" "github.com/apache/rocketmq-client-go/v2/primitive" @@ -40,8 +40,8 @@ const ( ) type processQueue struct { - cachedMsgCount int64 - cachedMsgSize int64 + cachedMsgCount *atomic.Int64 + cachedMsgSize *atomic.Int64 tryUnlockTimes int64 queueOffsetMax int64 msgAccCnt int64 @@ -49,10 +49,10 @@ type processQueue struct { mutex sync.RWMutex consumeLock sync.Mutex consumingMsgOrderlyTreeMap *treemap.Map - dropped *uatomic.Bool + dropped *atomic.Bool lastPullTime atomic.Value lastConsumeTime atomic.Value - locked *uatomic.Bool + locked *atomic.Bool lastLockTime atomic.Value consuming bool lockConsume sync.Mutex @@ -75,6 +75,8 @@ func newProcessQueue(order bool) *processQueue { lastPullTime.Store(time.Now()) pq := &processQueue{ + cachedMsgCount: atomic.NewInt64(0), + cachedMsgSize: atomic.NewInt64(0), msgCache: treemap.NewWith(utils.Int64Comparator), lastPullTime: lastPullTime, lastConsumeTime: lastConsumeTime, @@ -84,8 +86,8 @@ func newProcessQueue(order bool) *processQueue { order: order, closeChanOnce: &sync.Once{}, closeChan: make(chan struct{}), - locked: uatomic.NewBool(false), - dropped: uatomic.NewBool(false), + locked: atomic.NewBool(false), + dropped: atomic.NewBool(false), } return pq } @@ -120,13 +122,14 @@ func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) { pq.msgCache.Put(msg.QueueOffset, msg) validMessageCount++ pq.queueOffsetMax = msg.QueueOffset - atomic.AddInt64(&pq.cachedMsgSize, int64(len(msg.Body))) + + pq.cachedMsgSize.Add(int64(len(msg.Body))) } - pq.mutex.Unlock() - atomic.AddInt64(&pq.cachedMsgCount, int64(validMessageCount)) + pq.cachedMsgCount.Add(int64(validMessageCount)) + pq.mutex.Unlock() - if pq.msgCache.Size() > 0 && !pq.consuming { + if pq.cachedMsgCount.Load() > 0 && !pq.consuming { pq.consuming = true } @@ -206,11 +209,14 @@ func (pq *processQueue) removeMessage(messages ...*primitive.MessageExt) int64 { if !found { continue } + pq.msgCache.Remove(msg.QueueOffset) removedCount++ - atomic.AddInt64(&pq.cachedMsgSize, int64(-len(msg.Body))) + + pq.cachedMsgSize.Sub(int64(len(msg.Body))) } - atomic.AddInt64(&pq.cachedMsgCount, int64(-removedCount)) + + pq.cachedMsgCount.Sub(int64(removedCount)) } if !pq.msgCache.Empty() { first, _ := pq.msgCache.Min() @@ -228,7 +234,7 @@ func (pq *processQueue) isPullExpired() bool { return time.Now().Sub(pq.LastPullTime()) > _PullMaxIdleTime } -func (pq *processQueue) cleanExpiredMsg(consumer defaultConsumer) { +func (pq *processQueue) cleanExpiredMsg(consumer *defaultConsumer) { if consumer.option.ConsumeOrderly { return } @@ -366,8 +372,8 @@ func (pq *processQueue) clear() { pq.mutex.Lock() defer pq.mutex.Unlock() pq.msgCache.Clear() - pq.cachedMsgCount = 0 - pq.cachedMsgSize = 0 + pq.cachedMsgCount.Store(0) + pq.cachedMsgSize.Store(0) pq.queueOffsetMax = 0 } @@ -380,11 +386,13 @@ func (pq *processQueue) commit() int64 { if iter != nil { offset = iter.(int64) } - pq.cachedMsgCount -= int64(pq.consumingMsgOrderlyTreeMap.Size()) + pq.cachedMsgCount.Sub(int64(pq.consumingMsgOrderlyTreeMap.Size())) + pq.consumingMsgOrderlyTreeMap.Each(func(key interface{}, value interface{}) { msg := value.(*primitive.MessageExt) - pq.cachedMsgSize -= int64(len(msg.Body)) + pq.cachedMsgSize.Sub(int64(len(msg.Body))) }) + pq.consumingMsgOrderlyTreeMap.Clear() return offset + 1 } @@ -405,7 +413,7 @@ func (pq *processQueue) currentInfo() internal.ProcessQueueInfo { info.CachedMsgMinOffset = pq.Min() info.CachedMsgMaxOffset = pq.Max() info.CachedMsgCount = pq.msgCache.Size() - info.CachedMsgSizeInMiB = pq.cachedMsgSize / int64(1024*1024) + info.CachedMsgSizeInMiB = pq.cachedMsgSize.Load() / int64(1024*1024) } if !pq.consumingMsgOrderlyTreeMap.Empty() { diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index 111a0a82..f61d33be 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -604,8 +604,8 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) { goto NEXT } - cachedMessageSizeInMiB := int(pq.cachedMsgSize / Mb) - if pq.cachedMsgCount > pc.option.PullThresholdForQueue { + cachedMessageSizeInMiB := int(pq.cachedMsgSize.Load() / Mb) + if pq.cachedMsgCount.Load() > pc.option.PullThresholdForQueue { if pc.queueFlowControlTimes%1000 == 0 { rlog.Warning("the cached message count exceeds the threshold, so do flow control", map[string]interface{}{ "PullThresholdForQueue": pc.option.PullThresholdForQueue, @@ -818,7 +818,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) { } func (pc *pushConsumer) correctTagsOffset(pr *PullRequest) { - if pr.pq.cachedMsgCount <= 0 { + if pr.pq.cachedMsgCount.Load() <= 0 { pc.storage.update(pr.mq, pr.nextOffset, true) } }