Skip to content

Commit

Permalink
Metro 71 fix zombie subscribers to avoid refreshing bindings (#384)
Browse files Browse the repository at this point in the history
* publish consumer lag as part of ticker

* Add GetConsumerLag to IConsumer

* Fix lint issues

* Add consumer lag metric value

* Fix lint issues

* Fix lint issues

* Fix typo

* Added integration test for FetchConsumerLag

* Avoid clash with topic separator
  • Loading branch information
Venkat Ram authored Jul 5, 2022
1 parent 58840ee commit e71f2f5
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 2 deletions.
2 changes: 1 addition & 1 deletion internal/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (s *Subscriber) Run(ctx context.Context) {
case <-s.healthMonitorTicker.C:
lag := s.subscriberImpl.GetConsumerLag()
for topic, offset := range lag {
tp := strings.Split(topic, "-")
tp := strings.Split(topic, "=")
if len(tp) == 1 {
logger.Ctx(ctx).Errorw("subscriber: failed to parse topic and partition for consumer lag", "topic", topic)
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/messagebroker/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ func (k *KafkaBroker) FetchConsumerLag(ctx context.Context) (map[string]uint64,
}

for _, tp := range committed {
topicPart := *tp.Topic + "-" + strconv.Itoa(int(tp.Partition))
topicPart := *tp.Topic + "=" + strconv.Itoa(int(tp.Partition))
low, high, err := k.Consumer.QueryWatermarkOffsets(*tp.Topic, tp.Partition, kafkaMetadataTimeout)
if err != nil {
logger.Ctx(ctx).Errorw("kafka: failed to fetch watermark offsets",
Expand Down

0 comments on commit e71f2f5

Please sign in to comment.