diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index 19ada3f72c..a076e6ff98 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -655,9 +655,14 @@ public T executeInTransaction(OperationsCallback callback) { catch (SkipAbortException e) { // NOSONAR - exception flow control throw ((RuntimeException) e.getCause()); // NOSONAR - lost stack trace } - catch (Exception e) { - producer.abortTransaction(); - throw e; + catch (Exception ex) { + try { + producer.abortTransaction(); + } + catch (Exception abortException) { + ex.addSuppressed(abortException); + } + throw ex; } finally { this.producers.remove(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java index 776b82f355..af039772e9 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2023 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -505,6 +505,31 @@ public void testAbort() { verify(producer, never()).commitTransaction(); } + @Test + public void abortFiledOriginalExceptionRethrown() { + MockProducer producer = spy(new MockProducer<>()); + producer.initTransactions(); + producer.abortTransactionException = new RuntimeException("abort failed"); + + ProducerFactory pf = new MockProducerFactory<>((tx, id) -> producer, null); + + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setDefaultTopic(STRING_KEY_TOPIC); + + assertThatExceptionOfType(RuntimeException.class) + .isThrownBy(() -> + template.executeInTransaction(t -> { + throw new RuntimeException("intentional"); + })) + .withMessage("intentional") + .withStackTraceContaining("abort failed"); + + assertThat(producer.transactionCommitted()).isFalse(); + assertThat(producer.transactionAborted()).isFalse(); + assertThat(producer.closed()).isTrue(); + verify(producer, never()).commitTransaction(); + } + @Test public void testExecuteInTransactionNewInnerTx() { @SuppressWarnings("unchecked")