Skip to content

Commit

Permalink
[ISSUE #790] use uber atomic lib to avoid atomic value data race. (#866)
Browse files Browse the repository at this point in the history
* use uber atomic lib to avoid atomic value data race.

* change wrong sub value

Co-authored-by: dinglei <[email protected]>
  • Loading branch information
WJL3333 and ShannonDing authored Jul 25, 2022
1 parent 595f2d9 commit b6d6f07
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 23 deletions.
48 changes: 28 additions & 20 deletions consumer/process_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ package consumer
import (
"strconv"
"sync"
"sync/atomic"

"time"

"github.com/emirpasic/gods/maps/treemap"
"github.com/emirpasic/gods/utils"
gods_util "github.com/emirpasic/gods/utils"
uatomic "go.uber.org/atomic"
"go.uber.org/atomic"

"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/primitive"
Expand All @@ -40,19 +40,19 @@ const (
)

type processQueue struct {
cachedMsgCount int64
cachedMsgSize int64
cachedMsgCount *atomic.Int64
cachedMsgSize *atomic.Int64
tryUnlockTimes int64
queueOffsetMax int64
msgAccCnt int64
msgCache *treemap.Map
mutex sync.RWMutex
consumeLock sync.Mutex
consumingMsgOrderlyTreeMap *treemap.Map
dropped *uatomic.Bool
dropped *atomic.Bool
lastPullTime atomic.Value
lastConsumeTime atomic.Value
locked *uatomic.Bool
locked *atomic.Bool
lastLockTime atomic.Value
consuming bool
lockConsume sync.Mutex
Expand All @@ -75,6 +75,8 @@ func newProcessQueue(order bool) *processQueue {
lastPullTime.Store(time.Now())

pq := &processQueue{
cachedMsgCount: atomic.NewInt64(0),
cachedMsgSize: atomic.NewInt64(0),
msgCache: treemap.NewWith(utils.Int64Comparator),
lastPullTime: lastPullTime,
lastConsumeTime: lastConsumeTime,
Expand All @@ -84,8 +86,8 @@ func newProcessQueue(order bool) *processQueue {
order: order,
closeChanOnce: &sync.Once{},
closeChan: make(chan struct{}),
locked: uatomic.NewBool(false),
dropped: uatomic.NewBool(false),
locked: atomic.NewBool(false),
dropped: atomic.NewBool(false),
}
return pq
}
Expand Down Expand Up @@ -120,13 +122,14 @@ func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) {
pq.msgCache.Put(msg.QueueOffset, msg)
validMessageCount++
pq.queueOffsetMax = msg.QueueOffset
atomic.AddInt64(&pq.cachedMsgSize, int64(len(msg.Body)))

pq.cachedMsgSize.Add(int64(len(msg.Body)))
}
pq.mutex.Unlock()

atomic.AddInt64(&pq.cachedMsgCount, int64(validMessageCount))
pq.cachedMsgCount.Add(int64(validMessageCount))
pq.mutex.Unlock()

if pq.msgCache.Size() > 0 && !pq.consuming {
if pq.cachedMsgCount.Load() > 0 && !pq.consuming {
pq.consuming = true
}

Expand Down Expand Up @@ -206,11 +209,14 @@ func (pq *processQueue) removeMessage(messages ...*primitive.MessageExt) int64 {
if !found {
continue
}

pq.msgCache.Remove(msg.QueueOffset)
removedCount++
atomic.AddInt64(&pq.cachedMsgSize, int64(-len(msg.Body)))

pq.cachedMsgSize.Sub(int64(len(msg.Body)))
}
atomic.AddInt64(&pq.cachedMsgCount, int64(-removedCount))

pq.cachedMsgCount.Sub(int64(removedCount))
}
if !pq.msgCache.Empty() {
first, _ := pq.msgCache.Min()
Expand All @@ -228,7 +234,7 @@ func (pq *processQueue) isPullExpired() bool {
return time.Now().Sub(pq.LastPullTime()) > _PullMaxIdleTime
}

func (pq *processQueue) cleanExpiredMsg(consumer defaultConsumer) {
func (pq *processQueue) cleanExpiredMsg(consumer *defaultConsumer) {
if consumer.option.ConsumeOrderly {
return
}
Expand Down Expand Up @@ -366,8 +372,8 @@ func (pq *processQueue) clear() {
pq.mutex.Lock()
defer pq.mutex.Unlock()
pq.msgCache.Clear()
pq.cachedMsgCount = 0
pq.cachedMsgSize = 0
pq.cachedMsgCount.Store(0)
pq.cachedMsgSize.Store(0)
pq.queueOffsetMax = 0
}

Expand All @@ -380,11 +386,13 @@ func (pq *processQueue) commit() int64 {
if iter != nil {
offset = iter.(int64)
}
pq.cachedMsgCount -= int64(pq.consumingMsgOrderlyTreeMap.Size())
pq.cachedMsgCount.Sub(int64(pq.consumingMsgOrderlyTreeMap.Size()))

pq.consumingMsgOrderlyTreeMap.Each(func(key interface{}, value interface{}) {
msg := value.(*primitive.MessageExt)
pq.cachedMsgSize -= int64(len(msg.Body))
pq.cachedMsgSize.Sub(int64(len(msg.Body)))
})

pq.consumingMsgOrderlyTreeMap.Clear()
return offset + 1
}
Expand All @@ -405,7 +413,7 @@ func (pq *processQueue) currentInfo() internal.ProcessQueueInfo {
info.CachedMsgMinOffset = pq.Min()
info.CachedMsgMaxOffset = pq.Max()
info.CachedMsgCount = pq.msgCache.Size()
info.CachedMsgSizeInMiB = pq.cachedMsgSize / int64(1024*1024)
info.CachedMsgSizeInMiB = pq.cachedMsgSize.Load() / int64(1024*1024)
}

if !pq.consumingMsgOrderlyTreeMap.Empty() {
Expand Down
6 changes: 3 additions & 3 deletions consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,8 +604,8 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
goto NEXT
}

cachedMessageSizeInMiB := int(pq.cachedMsgSize / Mb)
if pq.cachedMsgCount > pc.option.PullThresholdForQueue {
cachedMessageSizeInMiB := int(pq.cachedMsgSize.Load() / Mb)
if pq.cachedMsgCount.Load() > pc.option.PullThresholdForQueue {
if pc.queueFlowControlTimes%1000 == 0 {
rlog.Warning("the cached message count exceeds the threshold, so do flow control", map[string]interface{}{
"PullThresholdForQueue": pc.option.PullThresholdForQueue,
Expand Down Expand Up @@ -818,7 +818,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
}

func (pc *pushConsumer) correctTagsOffset(pr *PullRequest) {
if pr.pq.cachedMsgCount <= 0 {
if pr.pq.cachedMsgCount.Load() <= 0 {
pc.storage.update(pr.mq, pr.nextOffset, true)
}
}
Expand Down

0 comments on commit b6d6f07

Please sign in to comment.