Skip to content

Commit

Permalink
GH-3402: Fix KafkaAdmin clusterId config with observability enabled
Browse files Browse the repository at this point in the history
Fixes: #3402

Re-set clusterId after creating new KafkaAdmin to ensure proper
configuration when observability is enabled and bootstrap supplier
is not set.

This addresses the issue where kafkaAdmin clusterId configuration
was being ignored under specific conditions.

 **Auto-cherry-pick to `3.2.x` & `3.1.x`**
  • Loading branch information
varmenise authored and sobychacko committed Aug 19, 2024
1 parent e263ff0 commit f11c7ca
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
* @author Artem Bilan
* @author Adrian Gygax
* @author Sanghyeok An
* @author Valentina Armenise
*
* @since 1.3
*/
Expand Down Expand Up @@ -214,6 +215,15 @@ public void setClusterId(String clusterId) {
this.clusterId = clusterId;
}

/**
* Get the clusterId property.
* @return the cluster id.
* @since 3.1.8
*/
public String getClusterId() {
return this.clusterId;
}

@Override
public Map<String, Object> getConfigurationProperties() {
Map<String, Object> configs2 = new HashMap<>(this.configs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
* @author Thomas Strauß
* @author Soby Chacko
* @author Gurps Bassi
* @author Valentina Armenise
*/
public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware,
ApplicationListener<ContextStoppedEvent>, DisposableBean, SmartInitializingSingleton {
Expand Down Expand Up @@ -485,13 +486,17 @@ public void afterSingletonsInstantiated() {
if (this.kafkaAdmin != null) {
Object producerServers = this.producerFactory.getConfigurationProperties()
.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
String adminServers = this.kafkaAdmin.getBootstrapServers();
String adminServers = getAdminBootstrapAddress();
if (!producerServers.equals(adminServers)) {
Map<String, Object> props = new HashMap<>(this.kafkaAdmin.getConfigurationProperties());
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, producerServers);
int opTo = this.kafkaAdmin.getOperationTimeout();
String clusterId = this.kafkaAdmin.getClusterId();
this.kafkaAdmin = new KafkaAdmin(props);
this.kafkaAdmin.setOperationTimeout(opTo);
if (clusterId != null && !clusterId.isEmpty()) {
this.kafkaAdmin.setClusterId(clusterId);
}
}
}
}
Expand All @@ -501,6 +506,21 @@ else if (this.micrometerEnabled) {
}
}

private String getAdminBootstrapAddress() {
// Retrieve bootstrap servers from KafkaAdmin bootstrap supplier if available
String adminServers = this.kafkaAdmin.getBootstrapServers();

// Fallback to configuration properties if bootstrap servers are not set
if (adminServers == null) {
adminServers = this.kafkaAdmin.getConfigurationProperties().getOrDefault(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
""
).toString();
}

return adminServers;
}

@Nullable
private String clusterId() {
if (this.kafkaAdmin != null && this.clusterId == null) {
Expand Down

0 comments on commit f11c7ca

Please sign in to comment.