From c33b0b3386c18e62ffed9be5027b077062e1d114 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 3 Mar 2021 11:19:37 -0500 Subject: [PATCH] GH-1727: Close Producer if initTransactions Fails Resolves https://github.com/spring-projects/spring-kafka/issues/1727 --- .../core/DefaultKafkaProducerFactory.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java index 2af05dcbb0..8adf37de9c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -58,6 +58,7 @@ import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextStoppedEvent; import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.KafkaException; import org.springframework.kafka.support.TransactionSupport; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -716,7 +717,20 @@ private CloseSafeProducer doCreateTxProducer(String prefix, String suffix, } checkBootstrap(newProducerConfigs); newProducer = createRawProducer(newProducerConfigs); - newProducer.initTransactions(); + try { + newProducer.initTransactions(); + } + catch (RuntimeException ex) { + try { + newProducer.close(this.physicalCloseTimeout); + } + catch (RuntimeException ex2) { + KafkaException newEx = new KafkaException("initTransactions() failed and then close() failed", ex); + newEx.addSuppressed(ex2); + throw newEx; // NOSONAR - lost stack trace + } + throw new KafkaException("initTransactions() failed", ex); + } CloseSafeProducer closeSafeProducer = new CloseSafeProducer<>(newProducer, remover, prefix, this.physicalCloseTimeout, this.beanName, this.epoch.get());