From e71f2f57fdd0a06517b6e7b707c78acc753d85fe Mon Sep 17 00:00:00 2001 From: Venkat Ram Date: Tue, 5 Jul 2022 14:03:03 +0530 Subject: [PATCH] Metro 71 fix zombie subscribers to avoid refreshing bindings (#384) * publish consumer lag as part of ticker * Add GetConsumerLag to IConsumer * Fix lint issues * Add consumer lag metric value * Fix lint issues * Fix lint issues * Fix typo * Added integration test for FetchConsumerLag * Avoid clash with topic separator --- internal/subscriber/subscriber.go | 2 +- pkg/messagebroker/kafka.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/subscriber/subscriber.go b/internal/subscriber/subscriber.go index 8979230c..674fa606 100644 --- a/internal/subscriber/subscriber.go +++ b/internal/subscriber/subscriber.go @@ -142,7 +142,7 @@ func (s *Subscriber) Run(ctx context.Context) { case <-s.healthMonitorTicker.C: lag := s.subscriberImpl.GetConsumerLag() for topic, offset := range lag { - tp := strings.Split(topic, "-") + tp := strings.Split(topic, "=") if len(tp) == 1 { logger.Ctx(ctx).Errorw("subscriber: failed to parse topic and partition for consumer lag", "topic", topic) continue diff --git a/pkg/messagebroker/kafka.go b/pkg/messagebroker/kafka.go index 828b80a3..fc282354 100644 --- a/pkg/messagebroker/kafka.go +++ b/pkg/messagebroker/kafka.go @@ -642,7 +642,7 @@ func (k *KafkaBroker) FetchConsumerLag(ctx context.Context) (map[string]uint64, } for _, tp := range committed { - topicPart := *tp.Topic + "-" + strconv.Itoa(int(tp.Partition)) + topicPart := *tp.Topic + "=" + strconv.Itoa(int(tp.Partition)) low, high, err := k.Consumer.QueryWatermarkOffsets(*tp.Topic, tp.Partition, kafkaMetadataTimeout) if err != nil { logger.Ctx(ctx).Errorw("kafka: failed to fetch watermark offsets",