Skip to content

Commit

Permalink
KAFKA-18071: Avoid event to refresh regex if no pattern subscription (a…
Browse files Browse the repository at this point in the history
…pache#17917)

Reviewers: David Jacot <[email protected]>, Andrew Schofield <[email protected]>
  • Loading branch information
lianetm authored Nov 25, 2024
1 parent 70babd5 commit 654ebe1
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1766,12 +1766,14 @@ private void sendPrefetches(Timer timer) {
@Override
public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
offsetCommitCallbackInvoker.executeCallbacks();
try {
applicationEventHandler.addAndGet(new UpdatePatternSubscriptionEvent(calculateDeadlineMs(timer)));
} catch (TimeoutException e) {
return false;
} finally {
timer.update();
if (subscriptions.hasPatternSubscription()) {
try {
applicationEventHandler.addAndGet(new UpdatePatternSubscriptionEvent(calculateDeadlineMs(timer)));
} catch (TimeoutException e) {
return false;
} finally {
timer.update();
}
}
processBackgroundEvents();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,12 @@ private void process(final TopicPatternSubscriptionChangeEvent event) {
* This will make the consumer send the updated subscription on the next poll.
*/
private void process(final UpdatePatternSubscriptionEvent event) {
if (!subscriptions.hasPatternSubscription()) {
return;
}
if (this.metadataVersionSnapshot < metadata.updateVersion()) {
this.metadataVersionSnapshot = metadata.updateVersion();
if (subscriptions.hasPatternSubscription()) {
updatePatternSubscription(metadata.fetch());
}
updatePatternSubscription(metadata.fetch());
}
event.future().complete(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
import org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -1830,6 +1831,27 @@ public void testSeekToEnd() {
assertEquals(OffsetResetStrategy.LATEST, resetOffsetEvent.offsetResetStrategy());
}

@Test
public void testUpdatePatternSubscriptionEventGeneratedOnlyIfPatternUsed() {
consumer = newConsumer();
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
completeAssignmentChangeEventSuccessfully();
completeTopicPatternSubscriptionChangeEventSuccessfully();
completeUnsubscribeApplicationEventSuccessfully();

consumer.assign(singleton(new TopicPartition("topic1", 0)));
consumer.poll(Duration.ZERO);
verify(applicationEventHandler, never()).addAndGet(any(UpdatePatternSubscriptionEvent.class));

consumer.unsubscribe();

consumer.subscribe(Pattern.compile("t*"));
consumer.poll(Duration.ZERO);
verify(applicationEventHandler).addAndGet(any(UpdatePatternSubscriptionEvent.class));
}

@Test
public void testSubscribeToRe2JPatternValidation() {
consumer = newConsumer();
Expand Down Expand Up @@ -1927,6 +1949,7 @@ private void completeFetchedCommittedOffsetApplicationEventExceptionally(Excepti
private void completeUnsubscribeApplicationEventSuccessfully() {
doAnswer(invocation -> {
UnsubscribeEvent event = invocation.getArgument(0);
consumer.subscriptions().unsubscribe();
event.future().complete(null);
return null;
}).when(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,11 +355,10 @@ public void testUpdatePatternSubscriptionEventOnlyTakesEffectWhenMetadataHasNewV
UpdatePatternSubscriptionEvent event1 = new UpdatePatternSubscriptionEvent(12345);

setupProcessor(true);

when(subscriptionState.hasPatternSubscription()).thenReturn(true);
when(metadata.updateVersion()).thenReturn(0);

processor.process(event1);
verify(subscriptionState, never()).hasPatternSubscription();
assertDoesNotThrow(() -> event1.future().get());

Cluster cluster = mock(Cluster.class);
Expand All @@ -377,7 +376,6 @@ public void testUpdatePatternSubscriptionEventOnlyTakesEffectWhenMetadataHasNewV
UpdatePatternSubscriptionEvent event2 = new UpdatePatternSubscriptionEvent(12345);
processor.process(event2);
verify(metadata).requestUpdateForNewTopics();
verify(subscriptionState).hasPatternSubscription();
verify(subscriptionState).subscribeFromPattern(topics);
assertEquals(1, processor.metadataVersionSnapshot());
verify(membershipManager).onSubscriptionUpdated();
Expand Down

0 comments on commit 654ebe1

Please sign in to comment.