Skip to content

Commit

Permalink
use uber atomic lib to avoid atomic value data race.
Browse files Browse the repository at this point in the history
  • Loading branch information
WJL3333 committed Jul 23, 2022
1 parent d3be7e5 commit 7904994
Showing 1 changed file with 28 additions and 20 deletions.
48 changes: 28 additions & 20 deletions consumer/process_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ package consumer
import (
"strconv"
"sync"
"sync/atomic"

"time"

"github.com/emirpasic/gods/maps/treemap"
"github.com/emirpasic/gods/utils"
gods_util "github.com/emirpasic/gods/utils"
uatomic "go.uber.org/atomic"
"go.uber.org/atomic"

"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/primitive"
Expand All @@ -40,19 +40,19 @@ const (
)

type processQueue struct {
cachedMsgCount int64
cachedMsgSize int64
cachedMsgCount *atomic.Int64
cachedMsgSize *atomic.Int64
tryUnlockTimes int64
queueOffsetMax int64
msgAccCnt int64
msgCache *treemap.Map
mutex sync.RWMutex
consumeLock sync.Mutex
consumingMsgOrderlyTreeMap *treemap.Map
dropped *uatomic.Bool
dropped *atomic.Bool
lastPullTime atomic.Value
lastConsumeTime atomic.Value
locked *uatomic.Bool
locked *atomic.Bool
lastLockTime atomic.Value
consuming bool
lockConsume sync.Mutex
Expand All @@ -73,15 +73,17 @@ func newProcessQueue(order bool) *processQueue {
lastPullTime.Store(time.Now())

pq := &processQueue{
cachedMsgCount: atomic.NewInt64(0),
cachedMsgSize: atomic.NewInt64(0),
msgCache: treemap.NewWith(utils.Int64Comparator),
lastPullTime: lastPullTime,
lastConsumeTime: lastConsumeTime,
lastLockTime: lastLockTime,
msgCh: make(chan []*primitive.MessageExt, 32),
consumingMsgOrderlyTreeMap: consumingMsgOrderlyTreeMap,
order: order,
locked: uatomic.NewBool(false),
dropped: uatomic.NewBool(false),
locked: atomic.NewBool(false),
dropped: atomic.NewBool(false),
}
return pq
}
Expand Down Expand Up @@ -112,13 +114,14 @@ func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) {
pq.msgCache.Put(msg.QueueOffset, msg)
validMessageCount++
pq.queueOffsetMax = msg.QueueOffset
atomic.AddInt64(&pq.cachedMsgSize, int64(len(msg.Body)))

pq.cachedMsgSize.Add(int64(len(msg.Body)))
}
pq.mutex.Unlock()

atomic.AddInt64(&pq.cachedMsgCount, int64(validMessageCount))
pq.cachedMsgCount.Add(int64(validMessageCount))
pq.mutex.Unlock()

if pq.msgCache.Size() > 0 && !pq.consuming {
if pq.cachedMsgCount.Load() > 0 && !pq.consuming {
pq.consuming = true
}

Expand Down Expand Up @@ -195,11 +198,14 @@ func (pq *processQueue) removeMessage(messages ...*primitive.MessageExt) int64 {
if !found {
continue
}

pq.msgCache.Remove(msg.QueueOffset)
removedCount++
atomic.AddInt64(&pq.cachedMsgSize, int64(-len(msg.Body)))

pq.cachedMsgSize.Sub(int64(-len(msg.Body)))
}
atomic.AddInt64(&pq.cachedMsgCount, int64(-removedCount))

pq.cachedMsgCount.Sub(int64(-removedCount))
}
if !pq.msgCache.Empty() {
first, _ := pq.msgCache.Min()
Expand All @@ -217,7 +223,7 @@ func (pq *processQueue) isPullExpired() bool {
return time.Now().Sub(pq.LastPullTime()) > _PullMaxIdleTime
}

func (pq *processQueue) cleanExpiredMsg(consumer defaultConsumer) {
func (pq *processQueue) cleanExpiredMsg(consumer *defaultConsumer) {
if consumer.option.ConsumeOrderly {
return
}
Expand Down Expand Up @@ -350,8 +356,8 @@ func (pq *processQueue) clear() {
pq.mutex.Lock()
defer pq.mutex.Unlock()
pq.msgCache.Clear()
pq.cachedMsgCount = 0
pq.cachedMsgSize = 0
pq.cachedMsgCount.Store(0)
pq.cachedMsgSize.Store(0)
pq.queueOffsetMax = 0
}

Expand All @@ -364,11 +370,13 @@ func (pq *processQueue) commit() int64 {
if iter != nil {
offset = iter.(int64)
}
pq.cachedMsgCount -= int64(pq.consumingMsgOrderlyTreeMap.Size())
pq.cachedMsgCount.Sub(-1 * int64(pq.consumingMsgOrderlyTreeMap.Size()))

pq.consumingMsgOrderlyTreeMap.Each(func(key interface{}, value interface{}) {
msg := value.(*primitive.MessageExt)
pq.cachedMsgSize -= int64(len(msg.Body))
pq.cachedMsgSize.Sub(-1 * int64(len(msg.Body)))
})

pq.consumingMsgOrderlyTreeMap.Clear()
return offset + 1
}
Expand All @@ -389,7 +397,7 @@ func (pq *processQueue) currentInfo() internal.ProcessQueueInfo {
info.CachedMsgMinOffset = pq.Min()
info.CachedMsgMaxOffset = pq.Max()
info.CachedMsgCount = pq.msgCache.Size()
info.CachedMsgSizeInMiB = pq.cachedMsgSize / int64(1024*1024)
info.CachedMsgSizeInMiB = pq.cachedMsgSize.Load() / int64(1024*1024)
}

if !pq.consumingMsgOrderlyTreeMap.Empty() {
Expand Down

0 comments on commit 7904994

Please sign in to comment.