From 4fcde4542b1a7ab976702d39c5894d11d756ad15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=B3=E6=98=B1=E9=9C=96=28Yu-Lin=20Chen=29?= Date: Tue, 14 Jan 2025 02:03:52 +0800 Subject: [PATCH] KAFKA-18469;KAFKA-18036: AsyncConsumer should request metadata update if ListOffsetRequest encounters a retriable error (#18475) Reviewers: Lianet Magrans --- .../clients/consumer/internals/OffsetsRequestManager.java | 1 + .../clients/consumer/internals/OffsetsRequestManagerTest.java | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java index 7870caec1ba07..4c8d10ad323ac 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java @@ -578,6 +578,7 @@ private List buildListOffsetsRequests( listOffsetsRequestState.globalResult.complete(listOffsetResult); } else { requestsToRetry.add(listOffsetsRequestState); + metadata.requestUpdate(false); } } else { log.debug("ListOffsets request failed with error", error); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java index dbfcb6cd46a8c..2f92740c41411 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java @@ -286,6 +286,8 @@ public void testRequestFailsWithRetriableError_RetrySucceeds(Errors error) throw assertFalse(fetchOffsetsFuture.isDone()); assertEquals(1, requestManager.requestsToRetry()); assertEquals(0, requestManager.requestsToSend()); + // A retriable error should be followed by a metadata update request + verify(metadata).requestUpdate(false); // Cluster metadata update. Failed requests should be retried and succeed mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1)); @@ -384,6 +386,8 @@ public void testRequestPartiallyFailsWithRetriableError_RetrySucceeds() throws E assertFalse(fetchOffsetsFuture.isDone()); assertEquals(1, requestManager.requestsToRetry()); assertEquals(0, requestManager.requestsToSend()); + // A retriable error should be followed by a metadata update request + verify(metadata).requestUpdate(false); // Cluster metadata update. Failed requests should be retried mockSuccessfulRequest(partitionLeaders);