diff --git a/consumer/consumer.go b/consumer/consumer.go index 6e53eafa..b6056591 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -221,8 +221,8 @@ type PullRequest struct { } func (pr *PullRequest) String() string { - return fmt.Sprintf("[ConsumerGroup: %s, Topic: %s, MessageQueue: %d]", - pr.consumerGroup, pr.mq.Topic, pr.mq.QueueId) + return fmt.Sprintf("[ConsumerGroup: %s, Topic: %s, MessageQueue: brokerName=%s, queueId=%d, nextOffset=%d]", + pr.consumerGroup, pr.mq.Topic, pr.mq.BrokerName, pr.mq.QueueId, pr.nextOffset) } type defaultConsumer struct { @@ -357,6 +357,16 @@ func (dc *defaultConsumer) isSubscribeTopicNeedUpdate(topic string) bool { return !exist } +func (dc *defaultConsumer) doBalanceIfNotPaused() { + if dc.pause { + rlog.Info("[BALANCE-SKIP] since consumer paused", map[string]interface{}{ + rlog.LogKeyConsumerGroup: dc.consumerGroup, + }) + return + } + dc.doBalance() +} + func (dc *defaultConsumer) doBalance() { dc.subscriptionDataTable.Range(func(key, value interface{}) bool { topic := key.(string) @@ -674,7 +684,7 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*primitiv if dc.removeUnnecessaryMessageQueue(&mq, pq) { dc.processQueueTable.Delete(key) changed = true - rlog.Debug("remove unnecessary mq because pull was paused, prepare to fix it", map[string]interface{}{ + rlog.Debug("remove unnecessary mq because pull was expired, prepare to fix it", map[string]interface{}{ rlog.LogKeyConsumerGroup: dc.consumerGroup, rlog.LogKeyMessageQueue: mq.String(), }) diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index 4ad5ee36..57726cf8 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -259,6 +259,10 @@ func (pc *pushConsumer) Rebalance() { pc.defaultConsumer.doBalance() } +func (pc *pushConsumer) RebalanceIfNotPaused() { + pc.defaultConsumer.doBalanceIfNotPaused() +} + func (pc *pushConsumer) PersistConsumerOffset() error { return pc.defaultConsumer.persistConsumerOffset() } @@ -877,6 +881,8 @@ func (pc *pushConsumer) ResetOffset(topic string, table map[primitive.MessageQue pc.suspend() defer pc.resume() + mqs := make([]*primitive.MessageQueue, 0) + copyPc := sync.Map{} pc.processQueueTable.Range(func(key, value interface{}) bool { mq := key.(primitive.MessageQueue) pq := value.(*processQueue) @@ -884,24 +890,21 @@ func (pc *pushConsumer) ResetOffset(topic string, table map[primitive.MessageQue pq.WithDropped(true) pq.clear() } + mqs = append(mqs, &mq) + copyPc.Store(&mq, pq) return true }) time.Sleep(10 * time.Second) - v, exist := pc.topicSubscribeInfoTable.Load(topic) - if !exist { - return - } - queuesOfTopic := v.([]*primitive.MessageQueue) - for _, k := range queuesOfTopic { - if _, ok := table[*k]; ok { - pc.storage.update(k, table[*k], false) - v, exist := pc.processQueueTable.Load(k) + for _, mq := range mqs { + if _, ok := table[*mq]; ok { + pc.storage.update(mq, table[*mq], false) + v, exist := copyPc.Load(mq) if !exist { continue } pq := v.(*processQueue) - pc.removeUnnecessaryMessageQueue(k, pq) - pc.processQueueTable.Delete(k) + pc.removeUnnecessaryMessageQueue(mq, pq) + pc.processQueueTable.Delete(mq) } } } diff --git a/internal/client.go b/internal/client.go index e8c48034..138dcf39 100644 --- a/internal/client.go +++ b/internal/client.go @@ -84,6 +84,7 @@ type InnerConsumer interface { IsSubscribeTopicNeedUpdate(topic string) bool SubscriptionDataList() []*SubscriptionData Rebalance() + RebalanceIfNotPaused() IsUnitMode() bool GetConsumerRunningInfo(stack bool) *ConsumerRunningInfo ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string) *ConsumeMessageDirectlyResult @@ -223,7 +224,7 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) R rlog.Info("receive broker's notification to consumer group", map[string]interface{}{ rlog.LogKeyConsumerGroup: req.ExtFields["consumerGroup"], }) - client.RebalanceImmediately() + client.RebalanceIfNotPaused() return nil }) client.remoteClient.RegisterRequestFunc(ReqCheckTransactionState, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand { @@ -492,7 +493,7 @@ func (c *rmqClient) Start() { for { select { case <-ticker.C: - c.RebalanceImmediately() + c.RebalanceIfNotPaused() case <-c.done: rlog.Info("The RMQClient stopping do rebalance", map[string]interface{}{ "clientID": c.ClientID(), @@ -820,6 +821,16 @@ func (c *rmqClient) RebalanceImmediately() { }) } +func (c *rmqClient) RebalanceIfNotPaused() { + c.rbMutex.Lock() + defer c.rbMutex.Unlock() + c.consumerMap.Range(func(key, value interface{}) bool { + consumer := value.(InnerConsumer) + consumer.RebalanceIfNotPaused() + return true + }) +} + func (c *rmqClient) UpdatePublishInfo(topic string, data *TopicRouteData, changed bool) { if data == nil { return