From 311827525ed521f2195b317824438b1662aa36ed Mon Sep 17 00:00:00 2001 From: WJL3333 Date: Sat, 23 Jul 2022 20:14:40 +0800 Subject: [PATCH 1/2] use uber atomic lib to avoid atomic value data race. --- consumer/process_queue.go | 48 +++++++++++++++++++++++---------------- consumer/push_consumer.go | 6 ++--- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/consumer/process_queue.go b/consumer/process_queue.go index 497b6955..b98dd287 100644 --- a/consumer/process_queue.go +++ b/consumer/process_queue.go @@ -20,13 +20,13 @@ package consumer import ( "strconv" "sync" - "sync/atomic" + "time" "github.com/emirpasic/gods/maps/treemap" "github.com/emirpasic/gods/utils" gods_util "github.com/emirpasic/gods/utils" - uatomic "go.uber.org/atomic" + "go.uber.org/atomic" "github.com/apache/rocketmq-client-go/v2/internal" "github.com/apache/rocketmq-client-go/v2/primitive" @@ -40,8 +40,8 @@ const ( ) type processQueue struct { - cachedMsgCount int64 - cachedMsgSize int64 + cachedMsgCount *atomic.Int64 + cachedMsgSize *atomic.Int64 tryUnlockTimes int64 queueOffsetMax int64 msgAccCnt int64 @@ -49,10 +49,10 @@ type processQueue struct { mutex sync.RWMutex consumeLock sync.Mutex consumingMsgOrderlyTreeMap *treemap.Map - dropped *uatomic.Bool + dropped *atomic.Bool lastPullTime atomic.Value lastConsumeTime atomic.Value - locked *uatomic.Bool + locked *atomic.Bool lastLockTime atomic.Value consuming bool lockConsume sync.Mutex @@ -73,6 +73,8 @@ func newProcessQueue(order bool) *processQueue { lastPullTime.Store(time.Now()) pq := &processQueue{ + cachedMsgCount: atomic.NewInt64(0), + cachedMsgSize: atomic.NewInt64(0), msgCache: treemap.NewWith(utils.Int64Comparator), lastPullTime: lastPullTime, lastConsumeTime: lastConsumeTime, @@ -80,8 +82,8 @@ func newProcessQueue(order bool) *processQueue { msgCh: make(chan []*primitive.MessageExt, 32), consumingMsgOrderlyTreeMap: consumingMsgOrderlyTreeMap, order: order, - locked: uatomic.NewBool(false), - dropped: uatomic.NewBool(false), + locked: atomic.NewBool(false), + dropped: atomic.NewBool(false), } return pq } @@ -112,13 +114,14 @@ func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) { pq.msgCache.Put(msg.QueueOffset, msg) validMessageCount++ pq.queueOffsetMax = msg.QueueOffset - atomic.AddInt64(&pq.cachedMsgSize, int64(len(msg.Body))) + + pq.cachedMsgSize.Add(int64(len(msg.Body))) } - pq.mutex.Unlock() - atomic.AddInt64(&pq.cachedMsgCount, int64(validMessageCount)) + pq.cachedMsgCount.Add(int64(validMessageCount)) + pq.mutex.Unlock() - if pq.msgCache.Size() > 0 && !pq.consuming { + if pq.cachedMsgCount.Load() > 0 && !pq.consuming { pq.consuming = true } @@ -195,11 +198,14 @@ func (pq *processQueue) removeMessage(messages ...*primitive.MessageExt) int64 { if !found { continue } + pq.msgCache.Remove(msg.QueueOffset) removedCount++ - atomic.AddInt64(&pq.cachedMsgSize, int64(-len(msg.Body))) + + pq.cachedMsgSize.Sub(int64(-len(msg.Body))) } - atomic.AddInt64(&pq.cachedMsgCount, int64(-removedCount)) + + pq.cachedMsgCount.Sub(int64(-removedCount)) } if !pq.msgCache.Empty() { first, _ := pq.msgCache.Min() @@ -217,7 +223,7 @@ func (pq *processQueue) isPullExpired() bool { return time.Now().Sub(pq.LastPullTime()) > _PullMaxIdleTime } -func (pq *processQueue) cleanExpiredMsg(consumer defaultConsumer) { +func (pq *processQueue) cleanExpiredMsg(consumer *defaultConsumer) { if consumer.option.ConsumeOrderly { return } @@ -350,8 +356,8 @@ func (pq *processQueue) clear() { pq.mutex.Lock() defer pq.mutex.Unlock() pq.msgCache.Clear() - pq.cachedMsgCount = 0 - pq.cachedMsgSize = 0 + pq.cachedMsgCount.Store(0) + pq.cachedMsgSize.Store(0) pq.queueOffsetMax = 0 } @@ -364,11 +370,13 @@ func (pq *processQueue) commit() int64 { if iter != nil { offset = iter.(int64) } - pq.cachedMsgCount -= int64(pq.consumingMsgOrderlyTreeMap.Size()) + pq.cachedMsgCount.Sub(-1 * int64(pq.consumingMsgOrderlyTreeMap.Size())) + pq.consumingMsgOrderlyTreeMap.Each(func(key interface{}, value interface{}) { msg := value.(*primitive.MessageExt) - pq.cachedMsgSize -= int64(len(msg.Body)) + pq.cachedMsgSize.Sub(-1 * int64(len(msg.Body))) }) + pq.consumingMsgOrderlyTreeMap.Clear() return offset + 1 } @@ -389,7 +397,7 @@ func (pq *processQueue) currentInfo() internal.ProcessQueueInfo { info.CachedMsgMinOffset = pq.Min() info.CachedMsgMaxOffset = pq.Max() info.CachedMsgCount = pq.msgCache.Size() - info.CachedMsgSizeInMiB = pq.cachedMsgSize / int64(1024*1024) + info.CachedMsgSizeInMiB = pq.cachedMsgSize.Load() / int64(1024*1024) } if !pq.consumingMsgOrderlyTreeMap.Empty() { diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index 3221c6c9..a9587844 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -604,8 +604,8 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) { goto NEXT } - cachedMessageSizeInMiB := int(pq.cachedMsgSize / Mb) - if pq.cachedMsgCount > pc.option.PullThresholdForQueue { + cachedMessageSizeInMiB := int(pq.cachedMsgSize.Load() / Mb) + if pq.cachedMsgCount.Load() > pc.option.PullThresholdForQueue { if pc.queueFlowControlTimes%1000 == 0 { rlog.Warning("the cached message count exceeds the threshold, so do flow control", map[string]interface{}{ "PullThresholdForQueue": pc.option.PullThresholdForQueue, @@ -818,7 +818,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) { } func (pc *pushConsumer) correctTagsOffset(pr *PullRequest) { - if pr.pq.cachedMsgCount <= 0 { + if pr.pq.cachedMsgCount.Load() <= 0 { pc.storage.update(pr.mq, pr.nextOffset, true) } } From addb2e0f97ccc315f38d63a4782d0a28ee042e69 Mon Sep 17 00:00:00 2001 From: WJL3333 Date: Sun, 24 Jul 2022 08:29:03 +0800 Subject: [PATCH 2/2] change wrong sub value --- consumer/process_queue.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/consumer/process_queue.go b/consumer/process_queue.go index b98dd287..4482a640 100644 --- a/consumer/process_queue.go +++ b/consumer/process_queue.go @@ -202,10 +202,10 @@ func (pq *processQueue) removeMessage(messages ...*primitive.MessageExt) int64 { pq.msgCache.Remove(msg.QueueOffset) removedCount++ - pq.cachedMsgSize.Sub(int64(-len(msg.Body))) + pq.cachedMsgSize.Sub(int64(len(msg.Body))) } - pq.cachedMsgCount.Sub(int64(-removedCount)) + pq.cachedMsgCount.Sub(int64(removedCount)) } if !pq.msgCache.Empty() { first, _ := pq.msgCache.Min() @@ -370,11 +370,11 @@ func (pq *processQueue) commit() int64 { if iter != nil { offset = iter.(int64) } - pq.cachedMsgCount.Sub(-1 * int64(pq.consumingMsgOrderlyTreeMap.Size())) + pq.cachedMsgCount.Sub(int64(pq.consumingMsgOrderlyTreeMap.Size())) pq.consumingMsgOrderlyTreeMap.Each(func(key interface{}, value interface{}) { msg := value.(*primitive.MessageExt) - pq.cachedMsgSize.Sub(-1 * int64(len(msg.Body))) + pq.cachedMsgSize.Sub(int64(len(msg.Body))) }) pq.consumingMsgOrderlyTreeMap.Clear()