diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index 9b58fc669f..d037c20e0a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -80,6 +80,7 @@ import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; import io.micrometer.observation.Observation; import io.micrometer.observation.ObservationRegistry; @@ -485,8 +486,9 @@ public void afterSingletonsInstantiated() { if (this.kafkaAdmin == null) { this.kafkaAdmin = this.applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique(); if (this.kafkaAdmin != null) { - Object producerServers = this.producerFactory.getConfigurationProperties() - .get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG); + String producerServers = this.producerFactory.getConfigurationProperties() + .get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).toString(); + producerServers = removeLeadingAndTrailingBrackets(producerServers); String adminServers = getAdminBootstrapAddress(); if (!producerServers.equals(adminServers)) { Map props = new HashMap<>(this.kafkaAdmin.getConfigurationProperties()); @@ -510,7 +512,6 @@ else if (this.micrometerEnabled) { private String getAdminBootstrapAddress() { // Retrieve bootstrap servers from KafkaAdmin bootstrap supplier if available String adminServers = this.kafkaAdmin.getBootstrapServers(); - // Fallback to configuration properties if bootstrap servers are not set if (adminServers == null) { adminServers = this.kafkaAdmin.getConfigurationProperties().getOrDefault( @@ -518,8 +519,7 @@ private String getAdminBootstrapAddress() { "" ).toString(); } - - return adminServers; + return removeLeadingAndTrailingBrackets(adminServers); } @Nullable @@ -1007,6 +1007,10 @@ public void destroy() { } } + private static String removeLeadingAndTrailingBrackets(String str) { + return StringUtils.trimTrailingCharacter(StringUtils.trimLeadingCharacter(str, '['), ']'); + } + @SuppressWarnings("serial") private static final class SkipAbortException extends RuntimeException { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index 45fa015ac7..d93c606919 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -64,6 +64,7 @@ import org.springframework.lang.Nullable; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.util.StringUtils; import io.micrometer.common.KeyValues; import io.micrometer.core.instrument.MeterRegistry; @@ -204,7 +205,7 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) KeyValues.of("spring.kafka.listener.id", "obs1-0", "baz", "qux")) .hasTimerWithNameAndTags("spring.kafka.listener", KeyValues.of("spring.kafka.listener.id", "obs2-0")); assertThat(admin.getConfigurationProperties()) - .containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()); + .containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, List.of(broker.getBrokersAsString())); // producer factory broker different to admin KafkaAdmin pAdmin = KafkaTestUtils.getPropertyValue(template, "kafkaAdmin", KafkaAdmin.class); assertThat(pAdmin.getOperationTimeout()).isEqualTo(admin.getOperationTimeout()); @@ -287,6 +288,14 @@ void observationErrorException(@Autowired ExceptionListener listener, @Autowired .hasMessage("obs5 error"); } + @Test + void kafkaAdminNotRecreatedIfBootstrapServersSameInProducerAndAdminConfig( + @Autowired @Qualifier("reuseAdminBeanKafkaTemplate") KafkaTemplate template, + @Autowired KafkaAdmin kafkaAdmin) { + // See this issue for more details: https://github.com/spring-projects/spring-kafka/issues/3466 + assertThat(template.getKafkaAdmin()).isSameAs(kafkaAdmin); + } + @Configuration @EnableKafka public static class Config { @@ -295,13 +304,16 @@ public static class Config { @Bean KafkaAdmin admin(EmbeddedKafkaBroker broker) { + String[] brokers = StringUtils.commaDelimitedListToStringArray(broker.getBrokersAsString()); + List brokersAsList = Arrays.asList(brokers); KafkaAdmin admin = new KafkaAdmin( - Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString())); + Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokersAsList)); admin.setOperationTimeout(42); return admin; } @Bean + @Primary ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { Map producerProps = KafkaTestUtils.producerProps(broker); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + "," @@ -309,6 +321,13 @@ ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { return new DefaultKafkaProducerFactory<>(producerProps); } + @Bean + ProducerFactory customProducerFactory(EmbeddedKafkaBroker broker) { + Map producerProps = KafkaTestUtils.producerProps(broker); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()); + return new DefaultKafkaProducerFactory<>(producerProps); + } + @Bean ConsumerFactory consumerFactory(EmbeddedKafkaBroker broker) { Map consumerProps = KafkaTestUtils.consumerProps("obs", "false", broker); @@ -340,6 +359,14 @@ KafkaTemplate throwableTemplate(ProducerFactory reuseAdminBeanKafkaTemplate( + @Qualifier("customProducerFactory") ProducerFactory pf) { + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setObservationEnabled(true); + return template; + } + @Bean ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConsumerFactory cf) {