From 7993abb6b804a0f48cae76acab2eb1fa03652c77 Mon Sep 17 00:00:00 2001 From: czah <81543537+czah@users.noreply.github.com> Date: Tue, 8 Jun 2021 16:09:39 +0800 Subject: [PATCH] fix(queue-selector): fix an issue where a map read operation was not protected by the lock (#652) Co-authored-by: czah --- producer/selector.go | 30 +++++++++++++----------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/producer/selector.go b/producer/selector.go index 69d216f3..74f5badb 100644 --- a/producer/selector.go +++ b/producer/selector.go @@ -21,7 +21,6 @@ import ( "hash/fnv" "math/rand" "sync" - "sync/atomic" "time" "github.com/apache/rocketmq-client-go/v2/primitive" @@ -64,35 +63,32 @@ func (r *randomQueueSelector) Select(message *primitive.Message, queues []*primi // roundRobinQueueSelector choose the queue by roundRobin. type roundRobinQueueSelector struct { sync.Locker - indexer map[string]*int32 + indexer map[string]*uint32 } func NewRoundRobinQueueSelector() QueueSelector { s := &roundRobinQueueSelector{ Locker: new(sync.Mutex), - indexer: map[string]*int32{}, + indexer: map[string]*uint32{}, } return s } func (r *roundRobinQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue { t := message.Topic - if _, exist := r.indexer[t]; !exist { - r.Lock() - if _, exist := r.indexer[t]; !exist { - var v = int32(0) - r.indexer[t] = &v - } - r.Unlock() + var idx *uint32 + + r.Lock() + idx, exist := r.indexer[t] + if !exist { + var v uint32 = 0 + idx = &v + r.indexer[t] = idx } - index := r.indexer[t] + *idx++ + r.Unlock() - i := atomic.AddInt32(index, 1) - if i < 0 { - i = -i - atomic.StoreInt32(index, 0) - } - qIndex := int(i) % len(queues) + qIndex := *idx % uint32(len(queues)) return queues[qIndex] }