From cc99c9a9a70dead9030af7e3c49af07225ba368f Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 29 Aug 2024 14:15:03 -0400 Subject: [PATCH] GH-3466: Optimize KafkaAdmin creation in KafkaTemplate Fixes: #3466 https://github.com/spring-projects/spring-kafka/issues/3466 Improve bootstrap-server config comparison to avoid unnecessary KafkaAdmin recreation. This addresses inconsistencies between List and String configurations for bootstrap servers. The change ensures that List versions of bootstrap-server configs are converted to regular Strings by removing brackets. This allows for consistent comparison between producer and admin configurations. This optimization is particularly relevant for Spring Boot scenarios where configs may be provided in different formats but represent the same underlying values. --- .../kafka/core/KafkaTemplate.java | 15 ++++++--- .../support/micrometer/ObservationTests.java | 31 +++++++++++++++++-- 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index c4f89d8a27..a50c9981ea 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -80,6 +80,7 @@ import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; import io.micrometer.observation.Observation; import io.micrometer.observation.ObservationRegistry; @@ -484,8 +485,9 @@ public void afterSingletonsInstantiated() { if (this.kafkaAdmin == null) { this.kafkaAdmin = this.applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique(); if (this.kafkaAdmin != null) { - Object producerServers = this.producerFactory.getConfigurationProperties() - .get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG); + String producerServers = this.producerFactory.getConfigurationProperties() + .get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).toString(); + producerServers = removeLeadingAndTrailingBrackets(producerServers); String adminServers = getAdminBootstrapAddress(); if (!producerServers.equals(adminServers)) { Map props = new HashMap<>(this.kafkaAdmin.getConfigurationProperties()); @@ -506,10 +508,14 @@ else if (this.micrometerEnabled) { } } + private String removeLeadingAndTrailingBrackets(String str) { + return StringUtils.trimTrailingCharacter(StringUtils.trimLeadingCharacter(str, '['), + ']'); + } + private String getAdminBootstrapAddress() { // Retrieve bootstrap servers from KafkaAdmin bootstrap supplier if available String adminServers = this.kafkaAdmin.getBootstrapServers(); - // Fallback to configuration properties if bootstrap servers are not set if (adminServers == null) { adminServers = this.kafkaAdmin.getConfigurationProperties().getOrDefault( @@ -517,8 +523,7 @@ private String getAdminBootstrapAddress() { "" ).toString(); } - - return adminServers; + return removeLeadingAndTrailingBrackets(adminServers); } @Nullable diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index 430b3b453c..37f52c6b6e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -68,6 +68,7 @@ import org.springframework.lang.Nullable; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.util.StringUtils; import io.micrometer.common.KeyValues; import io.micrometer.core.instrument.MeterRegistry; @@ -230,7 +231,7 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) assertThatListenerHasTimerWithNameAndTags(meterRegistryAssert, OBSERVATION_TEST_2, "obs2", "obs2-0"); assertThat(admin.getConfigurationProperties()) - .containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()); + .containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, List.of(broker.getBrokersAsString())); // producer factory broker different to admin assertThatAdmin(template, admin, broker.getBrokersAsString() + "," + broker.getBrokersAsString(), "kafkaAdmin"); @@ -386,6 +387,14 @@ void observationErrorException(@Autowired ExceptionListener listener, @Autowired .hasMessage("obs5 error"); } + @Test + void kafkaAdminNotRecreatedIfBootstrapServersSameInProducerAndAdminConfig( + @Autowired @Qualifier("reuseAdminBeanKafkaTemplate") KafkaTemplate template, + @Autowired KafkaAdmin kafkaAdmin) { + // See this issue for more details: https://github.com/spring-projects/spring-kafka/issues/3466 + assertThat(template.getKafkaAdmin()).isSameAs(kafkaAdmin); + } + @Configuration @EnableKafka public static class Config { @@ -394,13 +403,16 @@ public static class Config { @Bean KafkaAdmin admin(EmbeddedKafkaBroker broker) { + String[] brokers = StringUtils.commaDelimitedListToStringArray(broker.getBrokersAsString()); + List brokersAsList = Arrays.asList(brokers); KafkaAdmin admin = new KafkaAdmin( - Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString())); + Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokersAsList)); admin.setOperationTimeout(42); return admin; } @Bean + @Primary ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { Map producerProps = KafkaTestUtils.producerProps(broker); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + "," @@ -408,6 +420,13 @@ ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { return new DefaultKafkaProducerFactory<>(producerProps); } + @Bean + ProducerFactory customProducerFactory(EmbeddedKafkaBroker broker) { + Map producerProps = KafkaTestUtils.producerProps(broker); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()); + return new DefaultKafkaProducerFactory<>(producerProps); + } + @Bean ConsumerFactory consumerFactory(EmbeddedKafkaBroker broker) { Map consumerProps = KafkaTestUtils.consumerProps("obs", "false", broker); @@ -439,6 +458,14 @@ KafkaTemplate throwableTemplate(ProducerFactory reuseAdminBeanKafkaTemplate( + @Qualifier("customProducerFactory") ProducerFactory pf) { + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setObservationEnabled(true); + return template; + } + @Bean ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConsumerFactory cf) {