From 6e4a098055d5ef852022a76a6ea48ccb880f6342 Mon Sep 17 00:00:00 2001 From: Calvin Liu <83986057+CalvinConfluent@users.noreply.github.com> Date: Fri, 29 Mar 2024 16:54:55 -0700 Subject: [PATCH] KAFKA-16217: Stop the abort transaction try loop when closing producers (#15541) This is a mitigation fix for the https://issues.apache.org/jira/browse/KAFKA-16217. Exceptions should not block closing the producers. This PR reverts a part of the change #13591 Reviewers: Kirk True , Justine Olshan --- .../clients/producer/internals/Sender.java | 5 +++-- .../producer/internals/SenderTest.java | 22 +++++++++++++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 99bc1d68b0b22..c4e2b73e8b91b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -270,13 +270,14 @@ public void run() { while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) { if (!transactionManager.isCompleting()) { log.info("Aborting incomplete transaction due to shutdown"); - try { // It is possible for the transaction manager to throw errors when aborting. Catch these // so as not to interfere with the rest of the shutdown logic. transactionManager.beginAbort(); } catch (Exception e) { - log.error("Error in kafka producer I/O thread while aborting transaction: ", e); + log.error("Error in kafka producer I/O thread while aborting transaction when during closing: ", e); + // Force close in case the transactionManager is in error states. + forceClose = true; } } try { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 4ef9dab4d096c..eb01d1d5841d7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -132,9 +132,11 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class SenderTest { private static final int MAX_REQUEST_SIZE = 1024 * 1024; @@ -3274,6 +3276,26 @@ public void testProducerBatchRetriesWhenPartitionLeaderChanges() throws Exceptio } } + // This test is expected to run fast. If timeout, the sender is not able to close properly. + @Timeout(5) + @Test + public void testSenderShouldCloseWhenTransactionManagerInErrorState() throws Exception { + metrics.close(); + Map clientTags = Collections.singletonMap("client-id", "clientA"); + metrics = new Metrics(new MetricConfig().tags(clientTags)); + TransactionManager transactionManager = mock(TransactionManager.class); + SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry(metrics); + Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, + 1, metricsRegistry, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); + when(transactionManager.hasOngoingTransaction()).thenReturn(true); + when(transactionManager.beginAbort()).thenThrow(new IllegalStateException()); + sender.initiateClose(); + + // The sender should directly get closed. + sender.run(); + verify(transactionManager, times(1)).close(); + } + /** * Test the scenario that FetchResponse returns NOT_LEADER_OR_FOLLOWER, indicating change in leadership, but it * does not contain new leader info(defined in KIP-951).