Skip to content

Commit

Permalink
KAFKA-17448: New consumer seek should update positions in background …
Browse files Browse the repository at this point in the history
…thread (#17075)

Reviewers: Lianet Magrans <[email protected]>, Kirk True <[email protected]>
  • Loading branch information
FrankYang0529 authored Sep 11, 2024
1 parent 0c4ffc6 commit 3f4c25f
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
Expand Down Expand Up @@ -56,6 +55,7 @@
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
Expand Down Expand Up @@ -790,11 +790,10 @@ public void seek(TopicPartition partition, long offset) {
acquireAndEnsureOpen();
try {
log.info("Seeking to offset {} for partition {}", offset, partition);
SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
offset,
Optional.empty(), // This will ensure we skip validation
metadata.currentLeader(partition));
subscriptions.seekUnvalidated(partition, newPosition);
Timer timer = time.timer(defaultApiTimeoutMs);
SeekUnvalidatedEvent seekUnvalidatedEventEvent = new SeekUnvalidatedEvent(
calculateDeadlineMs(timer), partition, offset, Optional.empty());
applicationEventHandler.addAndGet(seekUnvalidatedEventEvent);
} finally {
release();
}
Expand All @@ -815,13 +814,12 @@ public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)
} else {
log.info("Seeking to offset {} for partition {}", offset, partition);
}
Metadata.LeaderAndEpoch currentLeaderAndEpoch = metadata.currentLeader(partition);
SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
offsetAndMetadata.offset(),
offsetAndMetadata.leaderEpoch(),
currentLeaderAndEpoch);
updateLastSeenEpochIfNewer(partition, offsetAndMetadata);
subscriptions.seekUnvalidated(partition, newPosition);

Timer timer = time.timer(defaultApiTimeoutMs);
SeekUnvalidatedEvent seekUnvalidatedEventEvent = new SeekUnvalidatedEvent(
calculateDeadlineMs(timer), partition, offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch());
applicationEventHandler.addAndGet(seekUnvalidatedEventEvent);
} finally {
release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public enum Type {
COMMIT_ON_CLOSE,
SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
SHARE_ACKNOWLEDGE_ON_CLOSE
SHARE_ACKNOWLEDGE_ON_CLOSE,
SEEK_UNVALIDATED,
}

private final Type type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ public void process(ApplicationEvent event) {
process((ShareAcknowledgeOnCloseEvent) event);
return;

case SEEK_UNVALIDATED:
process((SeekUnvalidatedEvent) event);
return;

default:
log.warn("Application event type {} was not expected", event.type());
}
Expand Down Expand Up @@ -409,4 +413,18 @@ protected ApplicationEventProcessor create() {
}
};
}

private void process(final SeekUnvalidatedEvent event) {
try {
SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
event.offset(),
event.offsetEpoch(),
metadata.currentLeader(event.partition())
);
subscriptions.seekUnvalidated(event.partition(), newPosition);
event.future().complete(null);
} catch (Exception e) {
event.future().completeExceptionally(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.TopicPartition;

import java.util.Optional;

/**
* Event to perform {@link SubscriptionState#seekUnvalidated(TopicPartition, SubscriptionState.FetchPosition)}
* in the background thread. This can avoid race conditions when subscription state is updated.
*/
public class SeekUnvalidatedEvent extends CompletableApplicationEvent<Void> {
private final TopicPartition partition;
private final long offset;
private final Optional<Integer> offsetEpoch;

public SeekUnvalidatedEvent(long deadlineMs, TopicPartition partition, long offset, Optional<Integer> offsetEpoch) {
super(Type.SEEK_UNVALIDATED, deadlineMs);
this.partition = partition;
this.offset = offset;
this.offsetEpoch = offsetEpoch;
}

public TopicPartition partition() {
return partition;
}

public long offset() {
return offset;
}

public Optional<Integer> offsetEpoch() {
return offsetEpoch;
}

@Override
protected String toStringBase() {
return super.toStringBase()
+ ", partition=" + partition
+ ", offset=" + offset
+ offsetEpoch.map(integer -> ", offsetEpoch=" + integer).orElse("");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
Expand Down Expand Up @@ -769,6 +770,7 @@ public void testCommitAsyncLeaderEpochUpdate() {
new Node(1, "host", 9000)), Optional.of(1)));
completeAssignmentChangeEventSuccessfully();
consumer.assign(Arrays.asList(t0, t1));
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(t0, 10);
consumer.seek(t1, 20);

Expand Down Expand Up @@ -804,6 +806,7 @@ public void testCommitAsyncTriggersFencedExceptionFromCommitAsync() {
final TopicPartition tp = new TopicPartition("foo", 0);
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);

assertDoesNotThrow(() -> consumer.commitAsync());
Expand All @@ -827,6 +830,7 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() {
final TopicPartition tp = new TopicPartition("foo", 0);
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);

assertDoesNotThrow(() -> consumer.commitAsync());
Expand Down Expand Up @@ -894,6 +898,7 @@ private CompletableFuture<Void> setUpConsumerWithIncompleteAsyncCommit(TopicPart
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);
consumer.commitAsync();

Expand Down Expand Up @@ -925,6 +930,7 @@ public void testPollTriggersFencedExceptionFromCommitAsync() {
final TopicPartition tp = new TopicPartition("foo", 0);
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);

assertDoesNotThrow(() -> consumer.commitAsync());
Expand Down Expand Up @@ -1018,6 +1024,7 @@ public void testAutoCommitSyncEnabled() {
"client-id");
consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class));
subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0)));
completeSeekUnvalidatedEventSuccessfully();
subscriptions.seek(new TopicPartition("topic", 0), 100);
consumer.commitSyncAllConsumed(time.timer(100));
verify(applicationEventHandler).add(any(SyncCommitEvent.class));
Expand All @@ -1035,6 +1042,7 @@ public void testAutoCommitSyncDisabled() {
"client-id");
consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class));
subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0)));
completeSeekUnvalidatedEventSuccessfully();
subscriptions.seek(new TopicPartition("topic", 0), 100);
verify(applicationEventHandler, never()).add(any(SyncCommitEvent.class));
}
Expand Down Expand Up @@ -1293,6 +1301,7 @@ public void testNoWakeupInCloseCommit() {
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 10);
consumer.wakeup();

Expand Down Expand Up @@ -1322,6 +1331,7 @@ public void testCloseAwaitPendingAsyncCommitIncomplete() {
final TopicPartition tp = new TopicPartition("foo", 0);
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);

consumer.commitAsync();
Expand All @@ -1340,6 +1350,7 @@ public void testCloseAwaitPendingAsyncCommitComplete() {
final TopicPartition tp = new TopicPartition("foo", 0);
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);
completeCommitAsyncApplicationEventSuccessfully();
consumer.commitAsync(cb);
Expand Down Expand Up @@ -2187,6 +2198,20 @@ private void completeAssignmentChangeEventSuccessfully() {
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(AssignmentChangeEvent.class));
}

private void completeSeekUnvalidatedEventSuccessfully() {
doAnswer(invocation -> {
SeekUnvalidatedEvent event = invocation.getArgument(0);
SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
event.offset(),
event.offsetEpoch(),
metadata.currentLeader(event.partition())
);
consumer.subscriptions().seekUnvalidated(event.partition(), newPosition);
event.future().complete(null);
return null;
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(SeekUnvalidatedEvent.class));
}

private void forceCommitCallbackInvocation() {
// Invokes callback
consumer.commitAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
Expand Down Expand Up @@ -53,6 +54,8 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -183,6 +186,36 @@ public void testAssignmentChangeEventWithException() {
assertInstanceOf(IllegalStateException.class, e.getCause());
}

@Test
public void testSeekUnvalidatedEvent() {
TopicPartition tp = new TopicPartition("topic", 0);
SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(
0, Optional.empty(), Metadata.LeaderAndEpoch.noLeaderOrEpoch());
SeekUnvalidatedEvent event = new SeekUnvalidatedEvent(12345, tp, 0, Optional.empty());

setupProcessor(false);
doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(tp);
doNothing().when(subscriptionState).seekUnvalidated(eq(tp), any());
processor.process(event);
verify(metadata).currentLeader(tp);
verify(subscriptionState).seekUnvalidated(tp, position);
assertDoesNotThrow(() -> event.future().get());
}

@Test
public void testSeekUnvalidatedEventWithException() {
TopicPartition tp = new TopicPartition("topic", 0);
SeekUnvalidatedEvent event = new SeekUnvalidatedEvent(12345, tp, 0, Optional.empty());

setupProcessor(false);
doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(tp);
doThrow(new IllegalStateException()).when(subscriptionState).seekUnvalidated(eq(tp), any());
processor.process(event);

ExecutionException e = assertThrows(ExecutionException.class, () -> event.future().get());
assertInstanceOf(IllegalStateException.class, e.getCause());
}

private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {
return Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class));
}
Expand Down

0 comments on commit 3f4c25f

Please sign in to comment.