From 3f4c25fe1d800abafa9df81ccaa42b1b3921c824 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Thu, 12 Sep 2024 02:08:33 +0800 Subject: [PATCH] KAFKA-17448: New consumer seek should update positions in background thread (#17075) Reviewers: Lianet Magrans , Kirk True --- .../internals/AsyncKafkaConsumer.java | 22 ++++--- .../internals/events/ApplicationEvent.java | 3 +- .../events/ApplicationEventProcessor.java | 18 ++++++ .../events/SeekUnvalidatedEvent.java | 59 +++++++++++++++++++ .../internals/AsyncKafkaConsumerTest.java | 25 ++++++++ .../events/ApplicationEventProcessorTest.java | 33 +++++++++++ 6 files changed, 147 insertions(+), 13 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SeekUnvalidatedEvent.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index e988afa10c7e..06ee3bc3616d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -21,7 +21,6 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.KafkaClient; -import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; @@ -56,6 +55,7 @@ import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.PollEvent; import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent; +import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent; import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent; import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent; @@ -790,11 +790,10 @@ public void seek(TopicPartition partition, long offset) { acquireAndEnsureOpen(); try { log.info("Seeking to offset {} for partition {}", offset, partition); - SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition( - offset, - Optional.empty(), // This will ensure we skip validation - metadata.currentLeader(partition)); - subscriptions.seekUnvalidated(partition, newPosition); + Timer timer = time.timer(defaultApiTimeoutMs); + SeekUnvalidatedEvent seekUnvalidatedEventEvent = new SeekUnvalidatedEvent( + calculateDeadlineMs(timer), partition, offset, Optional.empty()); + applicationEventHandler.addAndGet(seekUnvalidatedEventEvent); } finally { release(); } @@ -815,13 +814,12 @@ public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) } else { log.info("Seeking to offset {} for partition {}", offset, partition); } - Metadata.LeaderAndEpoch currentLeaderAndEpoch = metadata.currentLeader(partition); - SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition( - offsetAndMetadata.offset(), - offsetAndMetadata.leaderEpoch(), - currentLeaderAndEpoch); updateLastSeenEpochIfNewer(partition, offsetAndMetadata); - subscriptions.seekUnvalidated(partition, newPosition); + + Timer timer = time.timer(defaultApiTimeoutMs); + SeekUnvalidatedEvent seekUnvalidatedEventEvent = new SeekUnvalidatedEvent( + calculateDeadlineMs(timer), partition, offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch()); + applicationEventHandler.addAndGet(seekUnvalidatedEventEvent); } finally { release(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java index a31f458e5e0d..4b0584b5bb8c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -35,7 +35,8 @@ public enum Type { COMMIT_ON_CLOSE, SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC, SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE, - SHARE_ACKNOWLEDGE_ON_CLOSE + SHARE_ACKNOWLEDGE_ON_CLOSE, + SEEK_UNVALIDATED, } private final Type type; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index b28750ac7fb9..6ce8737c78a5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -147,6 +147,10 @@ public void process(ApplicationEvent event) { process((ShareAcknowledgeOnCloseEvent) event); return; + case SEEK_UNVALIDATED: + process((SeekUnvalidatedEvent) event); + return; + default: log.warn("Application event type {} was not expected", event.type()); } @@ -409,4 +413,18 @@ protected ApplicationEventProcessor create() { } }; } + + private void process(final SeekUnvalidatedEvent event) { + try { + SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition( + event.offset(), + event.offsetEpoch(), + metadata.currentLeader(event.partition()) + ); + subscriptions.seekUnvalidated(event.partition(), newPosition); + event.future().complete(null); + } catch (Exception e) { + event.future().completeExceptionally(e); + } + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SeekUnvalidatedEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SeekUnvalidatedEvent.java new file mode 100644 index 000000000000..d3f4cedff7f2 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SeekUnvalidatedEvent.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.internals.SubscriptionState; +import org.apache.kafka.common.TopicPartition; + +import java.util.Optional; + +/** + * Event to perform {@link SubscriptionState#seekUnvalidated(TopicPartition, SubscriptionState.FetchPosition)} + * in the background thread. This can avoid race conditions when subscription state is updated. + */ +public class SeekUnvalidatedEvent extends CompletableApplicationEvent { + private final TopicPartition partition; + private final long offset; + private final Optional offsetEpoch; + + public SeekUnvalidatedEvent(long deadlineMs, TopicPartition partition, long offset, Optional offsetEpoch) { + super(Type.SEEK_UNVALIDATED, deadlineMs); + this.partition = partition; + this.offset = offset; + this.offsetEpoch = offsetEpoch; + } + + public TopicPartition partition() { + return partition; + } + + public long offset() { + return offset; + } + + public Optional offsetEpoch() { + return offsetEpoch; + } + + @Override + protected String toStringBase() { + return super.toStringBase() + + ", partition=" + partition + + ", offset=" + offset + + offsetEpoch.map(integer -> ", offsetEpoch=" + integer).orElse(""); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index ae694c12a09e..2526b6447a4c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -45,6 +45,7 @@ import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.PollEvent; import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent; +import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent; import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent; import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; @@ -769,6 +770,7 @@ public void testCommitAsyncLeaderEpochUpdate() { new Node(1, "host", 9000)), Optional.of(1))); completeAssignmentChangeEventSuccessfully(); consumer.assign(Arrays.asList(t0, t1)); + completeSeekUnvalidatedEventSuccessfully(); consumer.seek(t0, 10); consumer.seek(t1, 20); @@ -804,6 +806,7 @@ public void testCommitAsyncTriggersFencedExceptionFromCommitAsync() { final TopicPartition tp = new TopicPartition("foo", 0); completeAssignmentChangeEventSuccessfully(); consumer.assign(Collections.singleton(tp)); + completeSeekUnvalidatedEventSuccessfully(); consumer.seek(tp, 20); assertDoesNotThrow(() -> consumer.commitAsync()); @@ -827,6 +830,7 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { final TopicPartition tp = new TopicPartition("foo", 0); completeAssignmentChangeEventSuccessfully(); consumer.assign(Collections.singleton(tp)); + completeSeekUnvalidatedEventSuccessfully(); consumer.seek(tp, 20); assertDoesNotThrow(() -> consumer.commitAsync()); @@ -894,6 +898,7 @@ private CompletableFuture setUpConsumerWithIncompleteAsyncCommit(TopicPart doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); completeAssignmentChangeEventSuccessfully(); consumer.assign(Collections.singleton(tp)); + completeSeekUnvalidatedEventSuccessfully(); consumer.seek(tp, 20); consumer.commitAsync(); @@ -925,6 +930,7 @@ public void testPollTriggersFencedExceptionFromCommitAsync() { final TopicPartition tp = new TopicPartition("foo", 0); completeAssignmentChangeEventSuccessfully(); consumer.assign(Collections.singleton(tp)); + completeSeekUnvalidatedEventSuccessfully(); consumer.seek(tp, 20); assertDoesNotThrow(() -> consumer.commitAsync()); @@ -1018,6 +1024,7 @@ public void testAutoCommitSyncEnabled() { "client-id"); consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class)); subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0))); + completeSeekUnvalidatedEventSuccessfully(); subscriptions.seek(new TopicPartition("topic", 0), 100); consumer.commitSyncAllConsumed(time.timer(100)); verify(applicationEventHandler).add(any(SyncCommitEvent.class)); @@ -1035,6 +1042,7 @@ public void testAutoCommitSyncDisabled() { "client-id"); consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class)); subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0))); + completeSeekUnvalidatedEventSuccessfully(); subscriptions.seek(new TopicPartition("topic", 0), 100); verify(applicationEventHandler, never()).add(any(SyncCommitEvent.class)); } @@ -1293,6 +1301,7 @@ public void testNoWakeupInCloseCommit() { completeAssignmentChangeEventSuccessfully(); consumer.assign(Collections.singleton(tp)); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); + completeSeekUnvalidatedEventSuccessfully(); consumer.seek(tp, 10); consumer.wakeup(); @@ -1322,6 +1331,7 @@ public void testCloseAwaitPendingAsyncCommitIncomplete() { final TopicPartition tp = new TopicPartition("foo", 0); completeAssignmentChangeEventSuccessfully(); consumer.assign(Collections.singleton(tp)); + completeSeekUnvalidatedEventSuccessfully(); consumer.seek(tp, 20); consumer.commitAsync(); @@ -1340,6 +1350,7 @@ public void testCloseAwaitPendingAsyncCommitComplete() { final TopicPartition tp = new TopicPartition("foo", 0); completeAssignmentChangeEventSuccessfully(); consumer.assign(Collections.singleton(tp)); + completeSeekUnvalidatedEventSuccessfully(); consumer.seek(tp, 20); completeCommitAsyncApplicationEventSuccessfully(); consumer.commitAsync(cb); @@ -2187,6 +2198,20 @@ private void completeAssignmentChangeEventSuccessfully() { }).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(AssignmentChangeEvent.class)); } + private void completeSeekUnvalidatedEventSuccessfully() { + doAnswer(invocation -> { + SeekUnvalidatedEvent event = invocation.getArgument(0); + SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition( + event.offset(), + event.offsetEpoch(), + metadata.currentLeader(event.partition()) + ); + consumer.subscriptions().seekUnvalidated(event.partition(), newPosition); + event.future().complete(null); + return null; + }).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(SeekUnvalidatedEvent.class)); + } + private void forceCommitCallbackInvocation() { // Invokes callback consumer.commitAsync(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index df90c43bf4af..84a7ac84d1c7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals.events; +import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.internals.CommitRequestManager; import org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager; import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager; @@ -53,6 +54,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -183,6 +186,36 @@ public void testAssignmentChangeEventWithException() { assertInstanceOf(IllegalStateException.class, e.getCause()); } + @Test + public void testSeekUnvalidatedEvent() { + TopicPartition tp = new TopicPartition("topic", 0); + SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition( + 0, Optional.empty(), Metadata.LeaderAndEpoch.noLeaderOrEpoch()); + SeekUnvalidatedEvent event = new SeekUnvalidatedEvent(12345, tp, 0, Optional.empty()); + + setupProcessor(false); + doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(tp); + doNothing().when(subscriptionState).seekUnvalidated(eq(tp), any()); + processor.process(event); + verify(metadata).currentLeader(tp); + verify(subscriptionState).seekUnvalidated(tp, position); + assertDoesNotThrow(() -> event.future().get()); + } + + @Test + public void testSeekUnvalidatedEventWithException() { + TopicPartition tp = new TopicPartition("topic", 0); + SeekUnvalidatedEvent event = new SeekUnvalidatedEvent(12345, tp, 0, Optional.empty()); + + setupProcessor(false); + doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(tp); + doThrow(new IllegalStateException()).when(subscriptionState).seekUnvalidated(eq(tp), any()); + processor.process(event); + + ExecutionException e = assertThrows(ExecutionException.class, () -> event.future().get()); + assertInstanceOf(IllegalStateException.class, e.getCause()); + } + private List mockCommitResults() { return Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class)); }