From ba179aa63e7887f8ced6c4c5027ceaa19e83f3d9 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 29 Aug 2024 21:06:24 -0400 Subject: [PATCH] GH-3466: Optimize KafkaAdmin creation in KafkaTemplate (#3471) * GH-3466: Optimize KafkaAdmin creation in KafkaTemplate Fixes: #3466 https://github.com/spring-projects/spring-kafka/issues/3466 Improve bootstrap-server config comparison to avoid unnecessary KafkaAdmin recreation. This addresses inconsistencies between List and String configurations for bootstrap servers. The change ensures that List versions of bootstrap-server configs are converted to regular Strings by removing brackets. This allows for consistent comparison between producer and admin configurations. This optimization is particularly relevant for Spring Boot scenarios where configs may be provided in different formats but represent the same underlying values. * Addressing PR review (cherry picked from commit 1d4f7f95c792e2fd985d75259032a1c8e656b67f) --- .../kafka/core/KafkaTemplate.java | 14 ++++++--- .../support/micrometer/ObservationTests.java | 31 +++++++++++++++++-- 2 files changed, 38 insertions(+), 7 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index c4f89d8a27..0f07679659 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -80,6 +80,7 @@ import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; import io.micrometer.observation.Observation; import io.micrometer.observation.ObservationRegistry; @@ -484,8 +485,9 @@ public void afterSingletonsInstantiated() { if (this.kafkaAdmin == null) { this.kafkaAdmin = this.applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique(); if (this.kafkaAdmin != null) { - Object producerServers = this.producerFactory.getConfigurationProperties() - .get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG); + String producerServers = this.producerFactory.getConfigurationProperties() + .get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).toString(); + producerServers = removeLeadingAndTrailingBrackets(producerServers); String adminServers = getAdminBootstrapAddress(); if (!producerServers.equals(adminServers)) { Map props = new HashMap<>(this.kafkaAdmin.getConfigurationProperties()); @@ -509,7 +511,6 @@ else if (this.micrometerEnabled) { private String getAdminBootstrapAddress() { // Retrieve bootstrap servers from KafkaAdmin bootstrap supplier if available String adminServers = this.kafkaAdmin.getBootstrapServers(); - // Fallback to configuration properties if bootstrap servers are not set if (adminServers == null) { adminServers = this.kafkaAdmin.getConfigurationProperties().getOrDefault( @@ -517,8 +518,7 @@ private String getAdminBootstrapAddress() { "" ).toString(); } - - return adminServers; + return removeLeadingAndTrailingBrackets(adminServers); } @Nullable @@ -1003,6 +1003,10 @@ public void destroy() { } } + private static String removeLeadingAndTrailingBrackets(String str) { + return StringUtils.trimTrailingCharacter(StringUtils.trimLeadingCharacter(str, '['), ']'); + } + @SuppressWarnings("serial") private static final class SkipAbortException extends RuntimeException { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index 430b3b453c..37f52c6b6e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -68,6 +68,7 @@ import org.springframework.lang.Nullable; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.util.StringUtils; import io.micrometer.common.KeyValues; import io.micrometer.core.instrument.MeterRegistry; @@ -230,7 +231,7 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) assertThatListenerHasTimerWithNameAndTags(meterRegistryAssert, OBSERVATION_TEST_2, "obs2", "obs2-0"); assertThat(admin.getConfigurationProperties()) - .containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()); + .containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, List.of(broker.getBrokersAsString())); // producer factory broker different to admin assertThatAdmin(template, admin, broker.getBrokersAsString() + "," + broker.getBrokersAsString(), "kafkaAdmin"); @@ -386,6 +387,14 @@ void observationErrorException(@Autowired ExceptionListener listener, @Autowired .hasMessage("obs5 error"); } + @Test + void kafkaAdminNotRecreatedIfBootstrapServersSameInProducerAndAdminConfig( + @Autowired @Qualifier("reuseAdminBeanKafkaTemplate") KafkaTemplate template, + @Autowired KafkaAdmin kafkaAdmin) { + // See this issue for more details: https://github.com/spring-projects/spring-kafka/issues/3466 + assertThat(template.getKafkaAdmin()).isSameAs(kafkaAdmin); + } + @Configuration @EnableKafka public static class Config { @@ -394,13 +403,16 @@ public static class Config { @Bean KafkaAdmin admin(EmbeddedKafkaBroker broker) { + String[] brokers = StringUtils.commaDelimitedListToStringArray(broker.getBrokersAsString()); + List brokersAsList = Arrays.asList(brokers); KafkaAdmin admin = new KafkaAdmin( - Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString())); + Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokersAsList)); admin.setOperationTimeout(42); return admin; } @Bean + @Primary ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { Map producerProps = KafkaTestUtils.producerProps(broker); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + "," @@ -408,6 +420,13 @@ ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { return new DefaultKafkaProducerFactory<>(producerProps); } + @Bean + ProducerFactory customProducerFactory(EmbeddedKafkaBroker broker) { + Map producerProps = KafkaTestUtils.producerProps(broker); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()); + return new DefaultKafkaProducerFactory<>(producerProps); + } + @Bean ConsumerFactory consumerFactory(EmbeddedKafkaBroker broker) { Map consumerProps = KafkaTestUtils.consumerProps("obs", "false", broker); @@ -439,6 +458,14 @@ KafkaTemplate throwableTemplate(ProducerFactory reuseAdminBeanKafkaTemplate( + @Qualifier("customProducerFactory") ProducerFactory pf) { + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setObservationEnabled(true); + return template; + } + @Bean ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConsumerFactory cf) {