diff --git a/consumer/consumer.go b/consumer/consumer.go index 7bd2b027..acce804e 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -430,6 +430,25 @@ func (dc *defaultConsumer) doBalance() { } return true }) + dc.truncateMessageQueueNotMyTopic() +} + +func (dc *defaultConsumer) truncateMessageQueueNotMyTopic() { + dc.processQueueTable.Range(func(key, value interface{}) bool { + mq := key.(primitive.MessageQueue) + pq := value.(*processQueue) + if _, ok := dc.subscriptionDataTable.Load(mq.Topic); !ok { + pq.WithDropped(true) + if dc.removeUnnecessaryMessageQueue(&mq, pq) { + dc.processQueueTable.Delete(key) + rlog.Info("remove unnecessary mq because unsubscribed", map[string]interface{}{ + rlog.LogKeyConsumerGroup: dc.consumerGroup, + rlog.LogKeyMessageQueue: mq.String(), + }) + } + } + return true + }) } func (dc *defaultConsumer) SubscriptionDataList() []*internal.SubscriptionData {