From c671246c14f945044093d68dc4acb0ae11606d3d Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Wed, 17 Jan 2024 16:11:42 -0500 Subject: [PATCH] GH-2991: Fix unwanted warning logs in KMLC Fixes: #2991 There is a warning log from `KafkaMessageListenerContainter#checkRebalanceCommits` that unconditionally prints that offsets could not be committed. Because of this unconditional check, the messages are very likely to be false positives. Fixing this by adding a check to see if there are indeed uncommitted records. (cherry picked from commit f9476a380ac442f41ccf6441d7af8f51a9be09a3) --- .../kafka/listener/KafkaMessageListenerContainer.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 8a9eb3b352..390f27b6d7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -159,6 +159,7 @@ * @author Tomaz Fernandes * @author Francois Rosiere * @author Daniel Gentes + * @author Soby Chacko */ public class KafkaMessageListenerContainer // NOSONAR line count extends AbstractMessageListenerContainer implements ConsumerPauseResumeEventPublisher { @@ -1711,9 +1712,10 @@ private void checkRebalanceCommits() { .stream() .filter(entry -> !this.assignedPartitions.contains(entry.getKey())) .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); - this.logger.warn(() -> "These offsets could not be committed; partition(s) lost during rebalance: " - + uncommitted); - + if (!uncommitted.isEmpty()) { + this.logger.warn(() -> "These offsets could not be committed; partition(s) lost during rebalance: " + + uncommitted); + } this.commitsDuringRebalance.clear(); this.logger.debug(() -> "Commit list: " + commits); commitSync(commits);