Skip to content

Commit

Permalink
[ISSUE apache#926] don't share limit channel in pushConsumer (apache#923
Browse files Browse the repository at this point in the history
)

* [feat] don't share goroutin in pushConsumer
  • Loading branch information
aireet authored Sep 28, 2022
1 parent 4c9c5db commit 92515f7
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type pushConsumer struct {
queueLock *QueueLock
done chan struct{}
closeOnce sync.Once
crCh chan struct{}
crCh map[string]chan struct{}
}

func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
Expand Down Expand Up @@ -116,7 +116,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
queueLock: newQueueLock(),
done: make(chan struct{}, 1),
consumeFunc: utils.NewSet(),
crCh: make(chan struct{}, defaultOpts.ConsumeGoroutineNums),
crCh: make(map[string]chan struct{}),
}
dc.mqChanged = p.messageQueueChanged
if p.consumeOrderly {
Expand Down Expand Up @@ -260,6 +260,10 @@ func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
return errors2.ErrStartTopic
}

if _, ok := pc.crCh[topic]; !ok {
pc.crCh[topic] = make(chan struct{}, pc.defaultConsumer.option.ConsumeGoroutineNums)
}

if pc.option.Namespace != "" {
topic = pc.option.Namespace + "%" + topic
}
Expand Down Expand Up @@ -1018,6 +1022,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
if msgs == nil {
return
}

for count := 0; count < len(msgs); count++ {
var subMsgs []*primitive.MessageExt
if count+pc.option.ConsumeMessageBatchMaxSize > len(msgs) {
Expand All @@ -1034,7 +1039,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
if limiterOn {
limiter(utils.WithoutNamespace(mq.Topic))
} else {
pc.crCh <- struct{}{}
pc.crCh[mq.Topic] <- struct{}{}
}

go primitive.WithRecover(func() {
Expand All @@ -1046,7 +1051,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
})
}
if !limiterOn {
<-pc.crCh
<-pc.crCh[mq.Topic]
}
}()
RETRY:
Expand Down

0 comments on commit 92515f7

Please sign in to comment.