From d6c53f0851a9cf1cf9d5da4c83b275b150c7cad3 Mon Sep 17 00:00:00 2001 From: Valentina Armenise Date: Fri, 2 Aug 2024 10:11:29 +0200 Subject: [PATCH] GH-3402: Fix KafkaAdmin clusterId config with observability enabled Fixes: #3402 Re-set clusterId after creating new KafkaAdmin to ensure proper configuration when observability is enabled and bootstrap supplier is not set. This addresses the issue where kafkaAdmin clusterId configuration was being ignored under specific conditions. **Auto-cherry-pick to `3.2.x` & `3.1.x`** --- .../kafka/core/KafkaAdmin.java | 11 ++++++++++ .../kafka/core/KafkaTemplate.java | 22 ++++++++++++++++++- 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java index 92150cccd7..6e1ffc8ec2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java @@ -74,6 +74,8 @@ * @author Gary Russell * @author Artem Bilan * @author Adrian Gygax + * @author Sanghyeok An + * @author Valentina Armenise * * @since 1.3 */ @@ -213,6 +215,15 @@ public void setClusterId(String clusterId) { this.clusterId = clusterId; } + /** + * Get the clusterId property. + * @return the cluster id. + * @since 3.1.8 + */ + public String getClusterId() { + return this.clusterId; + } + @Override public Map getConfigurationProperties() { Map configs2 = new HashMap<>(this.configs); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index a1e6043310..c4f89d8a27 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -102,6 +102,7 @@ * @author Thomas Strauß * @author Soby Chacko * @author Gurps Bassi + * @author Valentina Armenise */ public class KafkaTemplate implements KafkaOperations, ApplicationContextAware, BeanNameAware, ApplicationListener, DisposableBean, SmartInitializingSingleton { @@ -485,13 +486,17 @@ public void afterSingletonsInstantiated() { if (this.kafkaAdmin != null) { Object producerServers = this.producerFactory.getConfigurationProperties() .get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG); - String adminServers = this.kafkaAdmin.getBootstrapServers(); + String adminServers = getAdminBootstrapAddress(); if (!producerServers.equals(adminServers)) { Map props = new HashMap<>(this.kafkaAdmin.getConfigurationProperties()); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, producerServers); int opTo = this.kafkaAdmin.getOperationTimeout(); + String clusterId = this.kafkaAdmin.getClusterId(); this.kafkaAdmin = new KafkaAdmin(props); this.kafkaAdmin.setOperationTimeout(opTo); + if (clusterId != null && !clusterId.isEmpty()) { + this.kafkaAdmin.setClusterId(clusterId); + } } } } @@ -501,6 +506,21 @@ else if (this.micrometerEnabled) { } } + private String getAdminBootstrapAddress() { + // Retrieve bootstrap servers from KafkaAdmin bootstrap supplier if available + String adminServers = this.kafkaAdmin.getBootstrapServers(); + + // Fallback to configuration properties if bootstrap servers are not set + if (adminServers == null) { + adminServers = this.kafkaAdmin.getConfigurationProperties().getOrDefault( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, + "" + ).toString(); + } + + return adminServers; + } + @Nullable private String clusterId() { if (this.kafkaAdmin != null && this.clusterId == null) {