From f11c7caf081e25c8a513c888f22b9144b07a9404 Mon Sep 17 00:00:00 2001 From: Valentina Armenise Date: Fri, 2 Aug 2024 10:11:29 +0200 Subject: [PATCH] GH-3402: Fix KafkaAdmin clusterId config with observability enabled Fixes: #3402 Re-set clusterId after creating new KafkaAdmin to ensure proper configuration when observability is enabled and bootstrap supplier is not set. This addresses the issue where kafkaAdmin clusterId configuration was being ignored under specific conditions. **Auto-cherry-pick to `3.2.x` & `3.1.x`** --- .../kafka/core/KafkaAdmin.java | 10 +++++++++ .../kafka/core/KafkaTemplate.java | 22 ++++++++++++++++++- 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java index f4fb291d0e..e988eae6a0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java @@ -75,6 +75,7 @@ * @author Artem Bilan * @author Adrian Gygax * @author Sanghyeok An + * @author Valentina Armenise * * @since 1.3 */ @@ -214,6 +215,15 @@ public void setClusterId(String clusterId) { this.clusterId = clusterId; } + /** + * Get the clusterId property. + * @return the cluster id. + * @since 3.1.8 + */ + public String getClusterId() { + return this.clusterId; + } + @Override public Map getConfigurationProperties() { Map configs2 = new HashMap<>(this.configs); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index a1e6043310..c4f89d8a27 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -102,6 +102,7 @@ * @author Thomas Strauß * @author Soby Chacko * @author Gurps Bassi + * @author Valentina Armenise */ public class KafkaTemplate implements KafkaOperations, ApplicationContextAware, BeanNameAware, ApplicationListener, DisposableBean, SmartInitializingSingleton { @@ -485,13 +486,17 @@ public void afterSingletonsInstantiated() { if (this.kafkaAdmin != null) { Object producerServers = this.producerFactory.getConfigurationProperties() .get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG); - String adminServers = this.kafkaAdmin.getBootstrapServers(); + String adminServers = getAdminBootstrapAddress(); if (!producerServers.equals(adminServers)) { Map props = new HashMap<>(this.kafkaAdmin.getConfigurationProperties()); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, producerServers); int opTo = this.kafkaAdmin.getOperationTimeout(); + String clusterId = this.kafkaAdmin.getClusterId(); this.kafkaAdmin = new KafkaAdmin(props); this.kafkaAdmin.setOperationTimeout(opTo); + if (clusterId != null && !clusterId.isEmpty()) { + this.kafkaAdmin.setClusterId(clusterId); + } } } } @@ -501,6 +506,21 @@ else if (this.micrometerEnabled) { } } + private String getAdminBootstrapAddress() { + // Retrieve bootstrap servers from KafkaAdmin bootstrap supplier if available + String adminServers = this.kafkaAdmin.getBootstrapServers(); + + // Fallback to configuration properties if bootstrap servers are not set + if (adminServers == null) { + adminServers = this.kafkaAdmin.getConfigurationProperties().getOrDefault( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, + "" + ).toString(); + } + + return adminServers; + } + @Nullable private String clusterId() { if (this.kafkaAdmin != null && this.clusterId == null) {