Skip to content

Commit

Permalink
fix(queue-selector): fix an issue where a map read operation was not …
Browse files Browse the repository at this point in the history
…protected by the lock (#652)

Co-authored-by: czah <[email protected]>
  • Loading branch information
czah and czah authored Jun 8, 2021
1 parent 673a8d0 commit 7993abb
Showing 1 changed file with 13 additions and 17 deletions.
30 changes: 13 additions & 17 deletions producer/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"hash/fnv"
"math/rand"
"sync"
"sync/atomic"
"time"

"github.com/apache/rocketmq-client-go/v2/primitive"
Expand Down Expand Up @@ -64,35 +63,32 @@ func (r *randomQueueSelector) Select(message *primitive.Message, queues []*primi
// roundRobinQueueSelector choose the queue by roundRobin.
type roundRobinQueueSelector struct {
sync.Locker
indexer map[string]*int32
indexer map[string]*uint32
}

func NewRoundRobinQueueSelector() QueueSelector {
s := &roundRobinQueueSelector{
Locker: new(sync.Mutex),
indexer: map[string]*int32{},
indexer: map[string]*uint32{},
}
return s
}

func (r *roundRobinQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {
t := message.Topic
if _, exist := r.indexer[t]; !exist {
r.Lock()
if _, exist := r.indexer[t]; !exist {
var v = int32(0)
r.indexer[t] = &v
}
r.Unlock()
var idx *uint32

r.Lock()
idx, exist := r.indexer[t]
if !exist {
var v uint32 = 0
idx = &v
r.indexer[t] = idx
}
index := r.indexer[t]
*idx++
r.Unlock()

i := atomic.AddInt32(index, 1)
if i < 0 {
i = -i
atomic.StoreInt32(index, 0)
}
qIndex := int(i) % len(queues)
qIndex := *idx % uint32(len(queues))
return queues[qIndex]
}

Expand Down

0 comments on commit 7993abb

Please sign in to comment.