diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java index 2af05dcbb0..8adf37de9c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -58,6 +58,7 @@ import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextStoppedEvent; import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.KafkaException; import org.springframework.kafka.support.TransactionSupport; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -716,7 +717,20 @@ private CloseSafeProducer doCreateTxProducer(String prefix, String suffix, } checkBootstrap(newProducerConfigs); newProducer = createRawProducer(newProducerConfigs); - newProducer.initTransactions(); + try { + newProducer.initTransactions(); + } + catch (RuntimeException ex) { + try { + newProducer.close(this.physicalCloseTimeout); + } + catch (RuntimeException ex2) { + KafkaException newEx = new KafkaException("initTransactions() failed and then close() failed", ex); + newEx.addSuppressed(ex2); + throw newEx; // NOSONAR - lost stack trace + } + throw new KafkaException("initTransactions() failed", ex); + } CloseSafeProducer closeSafeProducer = new CloseSafeProducer<>(newProducer, remover, prefix, this.physicalCloseTimeout, this.beanName, this.epoch.get());