From 654ebe10f4a5c31e449b2a2ef6c284254ed7dceb Mon Sep 17 00:00:00 2001 From: Lianet Magrans <98415067+lianetm@users.noreply.github.com> Date: Sun, 24 Nov 2024 21:39:11 -0500 Subject: [PATCH] KAFKA-18071: Avoid event to refresh regex if no pattern subscription (#17917) Reviewers: David Jacot , Andrew Schofield --- .../internals/AsyncKafkaConsumer.java | 14 ++++++----- .../events/ApplicationEventProcessor.java | 7 +++--- .../internals/AsyncKafkaConsumerTest.java | 23 +++++++++++++++++++ .../events/ApplicationEventProcessorTest.java | 4 +--- 4 files changed, 36 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 033b16ba7b6a2..646087ee58d9b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1766,12 +1766,14 @@ private void sendPrefetches(Timer timer) { @Override public boolean updateAssignmentMetadataIfNeeded(Timer timer) { offsetCommitCallbackInvoker.executeCallbacks(); - try { - applicationEventHandler.addAndGet(new UpdatePatternSubscriptionEvent(calculateDeadlineMs(timer))); - } catch (TimeoutException e) { - return false; - } finally { - timer.update(); + if (subscriptions.hasPatternSubscription()) { + try { + applicationEventHandler.addAndGet(new UpdatePatternSubscriptionEvent(calculateDeadlineMs(timer))); + } catch (TimeoutException e) { + return false; + } finally { + timer.update(); + } } processBackgroundEvents(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index d2e45370c66dc..c9735617abbbd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -320,11 +320,12 @@ private void process(final TopicPatternSubscriptionChangeEvent event) { * This will make the consumer send the updated subscription on the next poll. */ private void process(final UpdatePatternSubscriptionEvent event) { + if (!subscriptions.hasPatternSubscription()) { + return; + } if (this.metadataVersionSnapshot < metadata.updateVersion()) { this.metadataVersionSnapshot = metadata.updateVersion(); - if (subscriptions.hasPatternSubscription()) { - updatePatternSubscription(metadata.fetch()); - } + updatePatternSubscription(metadata.fetch()); } event.future().complete(null); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 91e4cae0f98a5..940d9cbaef3fc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -54,6 +54,7 @@ import org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; +import org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.TopicPartition; @@ -1830,6 +1831,27 @@ public void testSeekToEnd() { assertEquals(OffsetResetStrategy.LATEST, resetOffsetEvent.offsetResetStrategy()); } + @Test + public void testUpdatePatternSubscriptionEventGeneratedOnlyIfPatternUsed() { + consumer = newConsumer(); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); + completeAssignmentChangeEventSuccessfully(); + completeTopicPatternSubscriptionChangeEventSuccessfully(); + completeUnsubscribeApplicationEventSuccessfully(); + + consumer.assign(singleton(new TopicPartition("topic1", 0))); + consumer.poll(Duration.ZERO); + verify(applicationEventHandler, never()).addAndGet(any(UpdatePatternSubscriptionEvent.class)); + + consumer.unsubscribe(); + + consumer.subscribe(Pattern.compile("t*")); + consumer.poll(Duration.ZERO); + verify(applicationEventHandler).addAndGet(any(UpdatePatternSubscriptionEvent.class)); + } + @Test public void testSubscribeToRe2JPatternValidation() { consumer = newConsumer(); @@ -1927,6 +1949,7 @@ private void completeFetchedCommittedOffsetApplicationEventExceptionally(Excepti private void completeUnsubscribeApplicationEventSuccessfully() { doAnswer(invocation -> { UnsubscribeEvent event = invocation.getArgument(0); + consumer.subscriptions().unsubscribe(); event.future().complete(null); return null; }).when(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index a16f9612c741d..ea09fc2ae8be4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -355,11 +355,10 @@ public void testUpdatePatternSubscriptionEventOnlyTakesEffectWhenMetadataHasNewV UpdatePatternSubscriptionEvent event1 = new UpdatePatternSubscriptionEvent(12345); setupProcessor(true); - + when(subscriptionState.hasPatternSubscription()).thenReturn(true); when(metadata.updateVersion()).thenReturn(0); processor.process(event1); - verify(subscriptionState, never()).hasPatternSubscription(); assertDoesNotThrow(() -> event1.future().get()); Cluster cluster = mock(Cluster.class); @@ -377,7 +376,6 @@ public void testUpdatePatternSubscriptionEventOnlyTakesEffectWhenMetadataHasNewV UpdatePatternSubscriptionEvent event2 = new UpdatePatternSubscriptionEvent(12345); processor.process(event2); verify(metadata).requestUpdateForNewTopics(); - verify(subscriptionState).hasPatternSubscription(); verify(subscriptionState).subscribeFromPattern(topics); assertEquals(1, processor.metadataVersionSnapshot()); verify(membershipManager).onSubscriptionUpdated();