Skip to content

Commit

Permalink
spring-projectsGH-3466: Optimize KafkaAdmin creation in KafkaTemplate
Browse files Browse the repository at this point in the history
Fixes: spring-projects#3466

spring-projects#3466

Improve bootstrap-server config comparison to avoid unnecessary
KafkaAdmin recreation. This addresses inconsistencies between
List<String> and String configurations for bootstrap servers.

The change ensures that List versions of bootstrap-server configs are
converted to regular Strings by removing brackets. This allows for
consistent comparison between producer and admin configurations.

This optimization is particularly relevant for Spring Boot scenarios
where configs may be provided in different formats but represent the
same underlying values.
  • Loading branch information
sobychacko committed Aug 29, 2024
1 parent 4dc0976 commit cc99c9a
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
Expand Down Expand Up @@ -484,8 +485,9 @@ public void afterSingletonsInstantiated() {
if (this.kafkaAdmin == null) {
this.kafkaAdmin = this.applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
if (this.kafkaAdmin != null) {
Object producerServers = this.producerFactory.getConfigurationProperties()
.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
String producerServers = this.producerFactory.getConfigurationProperties()
.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).toString();
producerServers = removeLeadingAndTrailingBrackets(producerServers);
String adminServers = getAdminBootstrapAddress();
if (!producerServers.equals(adminServers)) {
Map<String, Object> props = new HashMap<>(this.kafkaAdmin.getConfigurationProperties());
Expand All @@ -506,19 +508,22 @@ else if (this.micrometerEnabled) {
}
}

private String removeLeadingAndTrailingBrackets(String str) {
return StringUtils.trimTrailingCharacter(StringUtils.trimLeadingCharacter(str, '['),
']');
}

private String getAdminBootstrapAddress() {
// Retrieve bootstrap servers from KafkaAdmin bootstrap supplier if available
String adminServers = this.kafkaAdmin.getBootstrapServers();

// Fallback to configuration properties if bootstrap servers are not set
if (adminServers == null) {
adminServers = this.kafkaAdmin.getConfigurationProperties().getOrDefault(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
""
).toString();
}

return adminServers;
return removeLeadingAndTrailingBrackets(adminServers);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.springframework.lang.Nullable;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.util.StringUtils;

import io.micrometer.common.KeyValues;
import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -230,7 +231,7 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context)
assertThatListenerHasTimerWithNameAndTags(meterRegistryAssert, OBSERVATION_TEST_2, "obs2", "obs2-0");

assertThat(admin.getConfigurationProperties())
.containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString());
.containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, List.of(broker.getBrokersAsString()));
// producer factory broker different to admin
assertThatAdmin(template, admin, broker.getBrokersAsString() + "," + broker.getBrokersAsString(),
"kafkaAdmin");
Expand Down Expand Up @@ -386,6 +387,14 @@ void observationErrorException(@Autowired ExceptionListener listener, @Autowired
.hasMessage("obs5 error");
}

@Test
void kafkaAdminNotRecreatedIfBootstrapServersSameInProducerAndAdminConfig(
@Autowired @Qualifier("reuseAdminBeanKafkaTemplate") KafkaTemplate<Integer, String> template,
@Autowired KafkaAdmin kafkaAdmin) {
// See this issue for more details: https://github.com/spring-projects/spring-kafka/issues/3466
assertThat(template.getKafkaAdmin()).isSameAs(kafkaAdmin);
}

@Configuration
@EnableKafka
public static class Config {
Expand All @@ -394,20 +403,30 @@ public static class Config {

@Bean
KafkaAdmin admin(EmbeddedKafkaBroker broker) {
String[] brokers = StringUtils.commaDelimitedListToStringArray(broker.getBrokersAsString());
List<String> brokersAsList = Arrays.asList(brokers);
KafkaAdmin admin = new KafkaAdmin(
Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()));
Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokersAsList));
admin.setOperationTimeout(42);
return admin;
}

@Bean
@Primary
ProducerFactory<Integer, String> producerFactory(EmbeddedKafkaBroker broker) {
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + ","
+ broker.getBrokersAsString());
return new DefaultKafkaProducerFactory<>(producerProps);
}

@Bean
ProducerFactory<Integer, String> customProducerFactory(EmbeddedKafkaBroker broker) {
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString());
return new DefaultKafkaProducerFactory<>(producerProps);
}

@Bean
ConsumerFactory<Integer, String> consumerFactory(EmbeddedKafkaBroker broker) {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("obs", "false", broker);
Expand Down Expand Up @@ -439,6 +458,14 @@ KafkaTemplate<Integer, String> throwableTemplate(ProducerFactory<Integer, String
return template;
}

@Bean
KafkaTemplate<Integer, String> reuseAdminBeanKafkaTemplate(
@Qualifier("customProducerFactory") ProducerFactory<Integer, String> pf) {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setObservationEnabled(true);
return template;
}

@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
ConsumerFactory<Integer, String> cf) {
Expand Down

0 comments on commit cc99c9a

Please sign in to comment.