diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/ConsumerSeekAware.java b/kafka/src/main/java/io/micronaut/configuration/kafka/ConsumerSeekAware.java
new file mode 100644
index 000000000..35f8e3523
--- /dev/null
+++ b/kafka/src/main/java/io/micronaut/configuration/kafka/ConsumerSeekAware.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2017-2020 original authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.micronaut.configuration.kafka;
+
+import io.micronaut.configuration.kafka.annotation.KafkaListener;
+import io.micronaut.configuration.kafka.seek.KafkaSeekOperation;
+import io.micronaut.configuration.kafka.seek.KafkaSeeker;
+import io.micronaut.core.annotation.NonNull;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.common.TopicPartition;
+
+import java.time.Duration;
+import java.util.Collection;
+
+/**
+ * Interface for {@link KafkaListener} instances to implement if they wish to perform
+ * {@link KafkaSeekOperation seek operations} when the set of partitions assigned to the
+ * {@link Consumer} changes.
+ *
+ *
This callback interface is based on {@link ConsumerRebalanceListener} and it provides a
+ * {@link KafkaSeeker} object that can perform {@link KafkaSeekOperation} instances immediately.
+ *
+ * @author Guillermo Calvo
+ * @see ConsumerRebalanceListener
+ * @see KafkaSeekOperation
+ * @see KafkaSeeker
+ * @since 4.1
+ */
+@FunctionalInterface
+public interface ConsumerSeekAware {
+
+ /**
+ * A callback method the user can implement to provide handling of customized offsets
+ * on completion of a successful partition re-assignment.
+ *
+ * This method will be called after the partition re-assignment completes and before the
+ * consumer starts fetching data, and only as the result of a
+ * {@link Consumer#poll(Duration) poll(long)} call.
+ * Under normal conditions, {@link #onPartitionsRevoked(Collection)} will be executed before
+ * {@link #onPartitionsAssigned(Collection, KafkaSeeker)}.
+ *
+ * The provided {@link KafkaSeeker} object can perform
+ * {@link KafkaSeekOperation seek operations} on the underlying consumer.
+ *
+ * @param partitions The list of partitions that are now assigned to the consumer
+ * @param seeker The object that can perform {@link KafkaSeekOperation seek operations}
+ * @see ConsumerRebalanceListener#onPartitionsAssigned(Collection)
+ */
+ void onPartitionsAssigned(@NonNull Collection partitions, @NonNull KafkaSeeker seeker);
+
+ /**
+ * @see ConsumerRebalanceListener#onPartitionsRevoked(Collection)
+ * @param partitions The list of partitions that were assigned to the consumer and now need to
+ * be revoked (may not include all currently assigned partitions, i.e. there
+ * may still be some partitions left)
+ */
+ default void onPartitionsRevoked(@NonNull Collection partitions) { }
+
+ /**
+ * @see ConsumerRebalanceListener#onPartitionsLost(Collection)
+ * @param partitions The list of partitions that are now assigned to the consumer (previously
+ * owned partitions will NOT be included, i.e. this list will only include
+ * newly added partitions)
+ */
+ default void onPartitionsLost(@NonNull Collection partitions) {
+ onPartitionsRevoked(partitions);
+ }
+}
diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java
index a22c86bf5..b630a017e 100644
--- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java
+++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java
@@ -17,6 +17,7 @@
import io.micronaut.configuration.kafka.ConsumerAware;
import io.micronaut.configuration.kafka.ConsumerRegistry;
+import io.micronaut.configuration.kafka.ConsumerSeekAware;
import io.micronaut.configuration.kafka.KafkaAcknowledgement;
import io.micronaut.configuration.kafka.KafkaMessage;
import io.micronaut.configuration.kafka.ProducerRegistry;
@@ -37,6 +38,8 @@
import io.micronaut.configuration.kafka.event.KafkaConsumerSubscribedEvent;
import io.micronaut.configuration.kafka.exceptions.KafkaListenerException;
import io.micronaut.configuration.kafka.exceptions.KafkaListenerExceptionHandler;
+import io.micronaut.configuration.kafka.seek.KafkaSeekOperations;
+import io.micronaut.configuration.kafka.seek.KafkaSeeker;
import io.micronaut.configuration.kafka.serde.SerdeRegistry;
import io.micronaut.context.BeanContext;
import io.micronaut.context.annotation.Requires;
@@ -462,6 +465,9 @@ private void createConsumerThreadPollLoop(final ExecutableMethod, ?> method,
final Optional> consumerArg = Arrays.stream(method.getArguments())
.filter(arg -> Consumer.class.isAssignableFrom(arg.getType()))
.findFirst();
+ final Optional> seekArg = Arrays.stream(method.getArguments())
+ .filter(arg -> KafkaSeekOperations.class.isAssignableFrom(arg.getType()))
+ .findFirst();
final Optional> ackArg = Arrays.stream(method.getArguments())
.filter(arg -> Acknowledgement.class.isAssignableFrom(arg.getType()))
.findFirst();
@@ -525,7 +531,7 @@ private void createConsumerThreadPollLoop(final ExecutableMethod, ?> method,
if (isBatch) {
failed = !processConsumerRecordsAsBatch(consumerState, method, boundArguments, consumerRecords);
} else {
- failed = !processConsumerRecords(consumerState, method, boundArguments, trackPartitions, ackArg, consumerRecords);
+ failed = !processConsumerRecords(consumerState, method, boundArguments, trackPartitions, seekArg, ackArg, consumerRecords);
}
if (!failed) {
if (consumerState.offsetStrategy == OffsetStrategy.SYNC) {
@@ -561,6 +567,7 @@ private boolean processConsumerRecords(final ConsumerState consumerState,
final ExecutableMethod, ?> method,
final Map, Object> boundArguments,
final boolean trackPartitions,
+ final Optional> seekArg,
final Optional> ackArg,
final ConsumerRecords, ?> consumerRecords) {
final ExecutableBinder> executableBinder = new DefaultExecutableBinder<>(boundArguments);
@@ -581,6 +588,8 @@ private boolean processConsumerRecords(final ConsumerState consumerState,
}
Consumer, ?> kafkaConsumer = consumerState.kafkaConsumer;
+ final KafkaSeekOperations seek = seekArg.map(x -> KafkaSeekOperations.newInstance()).orElse(null);
+ seekArg.ifPresent(argument -> boundArguments.put(argument, seek));
ackArg.ifPresent(argument -> boundArguments.put(argument, (KafkaAcknowledgement) () -> kafkaConsumer.commitSync(currentOffsets)));
try {
@@ -614,6 +623,11 @@ private boolean processConsumerRecords(final ConsumerState consumerState,
} else if (consumerState.offsetStrategy == OffsetStrategy.ASYNC_PER_RECORD) {
kafkaConsumer.commitAsync(currentOffsets, resolveCommitCallback(consumerState.consumerBean));
}
+ if (seek != null) {
+ // Performs seek operations that were deferred by the user
+ final KafkaSeeker seeker = KafkaSeeker.newInstance(kafkaConsumer);
+ seek.forEach(seeker::perform);
+ }
}
return true;
}
@@ -760,7 +774,9 @@ private static void setupConsumerSubscription(final ExecutableMethod, ?> metho
if (hasTopics) {
final List topics = Arrays.asList(topicNames);
- if (consumerBean instanceof ConsumerRebalanceListener crl) {
+ if (consumerBean instanceof ConsumerSeekAware csa) {
+ kafkaConsumer.subscribe(topics, new ConsumerSeekAwareAdapter(KafkaSeeker.newInstance(kafkaConsumer), csa));
+ } else if (consumerBean instanceof ConsumerRebalanceListener crl) {
kafkaConsumer.subscribe(topics, crl);
} else {
kafkaConsumer.subscribe(topics);
@@ -779,7 +795,9 @@ private static void setupConsumerSubscription(final ExecutableMethod, ?> metho
} catch (Exception e) {
throw new MessagingSystemException("Invalid topic pattern [" + pattern + "] for method [" + method + "]: " + e.getMessage(), e);
}
- if (consumerBean instanceof ConsumerRebalanceListener crl) {
+ if (consumerBean instanceof ConsumerSeekAware csa) {
+ kafkaConsumer.subscribe(compiledPattern, new ConsumerSeekAwareAdapter(KafkaSeeker.newInstance(kafkaConsumer), csa));
+ } else if (consumerBean instanceof ConsumerRebalanceListener crl) {
kafkaConsumer.subscribe(compiledPattern, crl);
} else {
kafkaConsumer.subscribe(compiledPattern);
@@ -1258,4 +1276,17 @@ private enum ConsumerCloseState {
NOT_STARTED, POLLING, CLOSED
}
+ private record ConsumerSeekAwareAdapter(@NonNull KafkaSeeker seeker, @NonNull ConsumerSeekAware bean)
+ implements ConsumerRebalanceListener {
+
+ @Override
+ public void onPartitionsRevoked(Collection partitions) {
+ bean.onPartitionsRevoked(partitions != null ? partitions : Collections.emptyList());
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection partitions) {
+ bean.onPartitionsAssigned(partitions != null ? partitions : Collections.emptyList(), seeker);
+ }
+ }
}
diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/seek/DefaultKafkaSeekOperation.java b/kafka/src/main/java/io/micronaut/configuration/kafka/seek/DefaultKafkaSeekOperation.java
new file mode 100644
index 000000000..f8148a4c3
--- /dev/null
+++ b/kafka/src/main/java/io/micronaut/configuration/kafka/seek/DefaultKafkaSeekOperation.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2017-2020 original authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.micronaut.configuration.kafka.seek;
+
+import io.micronaut.core.annotation.Internal;
+import io.micronaut.core.annotation.NonNull;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Objects;
+
+/**
+ * Default implementation of {@link KafkaSeekOperation}.
+ *
+ * @param topicPartition the topic name and partition number on which the seek should be performed.
+ * @param offsetType the offset type
+ * @param offset the offset that should be used to perform the seek. Must be positive
+ * @author Guillermo Calvo
+ * @see KafkaSeekOperation
+ * @since 4.1
+ */
+@Internal
+record DefaultKafkaSeekOperation(
+ @NonNull TopicPartition topicPartition, @NonNull OffsetType offsetType, long offset
+) implements KafkaSeekOperation {
+
+ /**
+ * Creates a new instance.
+ *
+ * @param topicPartition the topic name and partition number on which the seek should be performed
+ * @param offsetType the offset type
+ * @param offset the offset that should be used to perform the seek. Must be positive
+ */
+ public DefaultKafkaSeekOperation {
+ Objects.requireNonNull(topicPartition, "topicPartition");
+ Objects.requireNonNull(topicPartition.topic(), "topicPartition.topic");
+ Objects.requireNonNull(offsetType, "offsetType");
+ if (offset < 0) {
+ throw new IllegalArgumentException("Negative offset");
+ }
+ }
+}
diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/seek/DefaultKafkaSeekOperations.java b/kafka/src/main/java/io/micronaut/configuration/kafka/seek/DefaultKafkaSeekOperations.java
new file mode 100644
index 000000000..2a71ed308
--- /dev/null
+++ b/kafka/src/main/java/io/micronaut/configuration/kafka/seek/DefaultKafkaSeekOperations.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2017-2020 original authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.micronaut.configuration.kafka.seek;
+
+import io.micronaut.core.annotation.Internal;
+import io.micronaut.core.annotation.NonNull;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Default implementation of {@link KafkaSeekOperations}.
+ *
+ * @author Guillermo Calvo
+ * @see KafkaSeekOperations
+ * @since 4.1
+ */
+@Internal
+final class DefaultKafkaSeekOperations implements KafkaSeekOperations {
+
+ private final List operations = new ArrayList<>();
+
+ @Override
+ @NonNull
+ public Iterator iterator() {
+ return operations.iterator();
+ }
+
+ @Override
+ public void defer(@NonNull KafkaSeekOperation operation) {
+ operations.add(Objects.requireNonNull(operation, "operation"));
+ }
+}
diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/seek/DefaultKafkaSeeker.java b/kafka/src/main/java/io/micronaut/configuration/kafka/seek/DefaultKafkaSeeker.java
new file mode 100644
index 000000000..3cf9c45b6
--- /dev/null
+++ b/kafka/src/main/java/io/micronaut/configuration/kafka/seek/DefaultKafkaSeeker.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2017-2020 original authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.micronaut.configuration.kafka.seek;
+
+import io.micronaut.core.annotation.Internal;
+import io.micronaut.core.annotation.NonNull;
+import io.micronaut.core.annotation.Nullable;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+
+/**
+ * Default implementation of {@link KafkaSeeker}.
+ *
+ * @param consumer the consumer on which to perform the {@link KafkaSeekOperations seek operations}.
+ * @author Guillermo Calvo
+ * @see KafkaSeeker
+ * @since 4.1
+ */
+@Internal
+record DefaultKafkaSeeker(@NonNull Consumer, ?> consumer) implements KafkaSeeker {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultKafkaSeeker.class);
+
+ /**
+ * Creates a new instance.
+ *
+ * @param consumer the consumer on which to perform the {@link KafkaSeekOperations seek operations}.
+ */
+ public DefaultKafkaSeeker {
+ Objects.requireNonNull(consumer, "consumer");
+ }
+
+ @Override
+ public boolean perform(@NonNull KafkaSeekOperation operation) {
+ try {
+ final TopicPartition tp = operation.topicPartition();
+ if (operation.offset() == 0) {
+ Optional performed = performForZeroOffset(operation, tp);
+ if (performed.isPresent()) {
+ return performed.get();
+ }
+ }
+ final long offset = offset(operation, tp);
+ consumer.seek(tp, Math.max(0, offset));
+ LOG.debug("Seek operation succeeded: {} - offset: {}", operation, offset);
+ return true;
+ } catch (Exception e) {
+ LOG.error("Seek operation failed: {}", operation, e);
+ return false;
+ }
+ }
+
+ @NonNull
+ private Optional performForZeroOffset(@NonNull KafkaSeekOperation operation,
+ @NonNull TopicPartition tp) {
+ final String topic = operation.topic();
+ final int partition = operation.partition();
+ switch (operation.offsetType()) {
+ case FORWARD, BACKWARD -> {
+ // Special case: relative zero-offset
+ LOG.debug("Relative zero-offset seek operation dropped: {}", operation);
+ return Optional.of(false);
+ }
+ case BEGINNING -> {
+ // Optimized case: seek to the beginning
+ consumer.seekToBeginning(singletonList(tp));
+ LOG.debug("Seek to the beginning operation succeeded: {}-{}", topic, partition);
+ return Optional.of(true);
+ }
+ case END -> {
+ // Optimized case: seek to the end
+ consumer.seekToEnd(singletonList(tp));
+ LOG.debug("Seek to the end operation succeeded: {}-{}", topic, partition);
+ return Optional.of(true);
+ }
+ default -> {
+ /* Perform operation regularly */
+ return Optional.empty();
+ }
+ }
+ }
+
+ private long offset(@NonNull KafkaSeekOperation operation, @Nullable TopicPartition tp) {
+ return switch (operation.offsetType()) {
+ case ABSOLUTE -> operation.offset();
+ case FORWARD -> current(tp) + operation.offset();
+ case BACKWARD -> current(tp) - operation.offset();
+ case BEGINNING -> beginning(tp) + operation.offset();
+ case END -> end(tp) - operation.offset();
+ case TIMESTAMP -> earliest(tp, operation.offset()).orElseGet(() -> end(tp));
+ };
+ }
+
+ private long current(TopicPartition tp) {
+ return consumer.position(tp);
+ }
+
+ public long beginning(TopicPartition tp) {
+ return consumer.beginningOffsets(singletonList(tp)).get(tp);
+ }
+
+ private long end(TopicPartition tp) {
+ return consumer.endOffsets(singletonList(tp)).get(tp);
+ }
+
+ private Optional earliest(TopicPartition tp, long ts) {
+ return Optional.ofNullable(consumer.offsetsForTimes(singletonMap(tp, ts)).get(tp))
+ .map(OffsetAndTimestamp::offset);
+ }
+}
diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/seek/KafkaSeekOperation.java b/kafka/src/main/java/io/micronaut/configuration/kafka/seek/KafkaSeekOperation.java
new file mode 100644
index 000000000..3b947ca7b
--- /dev/null
+++ b/kafka/src/main/java/io/micronaut/configuration/kafka/seek/KafkaSeekOperation.java
@@ -0,0 +1,228 @@
+/*
+ * Copyright 2017-2020 original authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.micronaut.configuration.kafka.seek;
+
+import io.micronaut.core.annotation.NonNull;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Represents a {@code seek} operation that may be performed on a {@link Consumer}.
+ *
+ * If successful, the operation will determine the next offset returned by
+ * {@link Consumer#poll(Duration)}.
+ *
+ * @author Guillermo Calvo
+ * @see Consumer#seek(TopicPartition, long)
+ * @see Consumer#poll(Duration)
+ * @since 4.1
+ */
+public interface KafkaSeekOperation {
+
+ /**
+ * Creates an absolute seek operation.
+ *
+ * @param topicPartition the topic partition.
+ * @param offset the absolute offset. Must be zero or greater.
+ * @return an absolute seek operation.
+ */
+ @NonNull
+ static KafkaSeekOperation seek(@NonNull TopicPartition topicPartition, long offset) {
+ return new DefaultKafkaSeekOperation(topicPartition, OffsetType.ABSOLUTE, offset);
+ }
+
+ /**
+ * Creates a seek operation relative to the beginning.
+ *
+ * @param topicPartition the topic partition.
+ * @param offset the offset. Must be zero or greater.
+ * @return a seek operation relative to the beginning.
+ */
+ @NonNull
+ static KafkaSeekOperation seekRelativeToBeginning(@NonNull TopicPartition topicPartition, long offset) {
+ return new DefaultKafkaSeekOperation(topicPartition, OffsetType.BEGINNING, offset);
+ }
+
+ /**
+ * Creates a seek to the beginning operation.
+ *
+ * @param topicPartition the topic partition.
+ * @return a seek to the beginning operation.
+ */
+ @NonNull
+ static KafkaSeekOperation seekToBeginning(@NonNull TopicPartition topicPartition) {
+ return new DefaultKafkaSeekOperation(topicPartition, OffsetType.BEGINNING, 0L);
+ }
+
+ /**
+ * Creates a list of seek to the beginning operations.
+ *
+ * @param partitions the {@link TopicPartition}s.
+ * @return a list of seek to the beginning operations.
+ */
+ @NonNull
+ static List seekToBeginning(@NonNull Collection partitions) {
+ return partitions.stream().map(KafkaSeekOperation::seekToBeginning).toList();
+ }
+
+ /**
+ * Creates a seek operation relative to the end.
+ *
+ * @param topicPartition the topic partition.
+ * @param offset the offset. Must be zero or greater.
+ * @return a seek operation relative to the end.
+ */
+ @NonNull
+ static KafkaSeekOperation seekRelativeToEnd(@NonNull TopicPartition topicPartition, long offset) {
+ return new DefaultKafkaSeekOperation(topicPartition, OffsetType.END, offset);
+ }
+
+ /**
+ * Creates a seek to the end operation.
+ *
+ * @param topicPartition the topic partition.
+ * @return a seek to the end operation.
+ */
+ @NonNull
+ static KafkaSeekOperation seekToEnd(@NonNull TopicPartition topicPartition) {
+ return new DefaultKafkaSeekOperation(topicPartition, OffsetType.END, 0L);
+ }
+
+ /**
+ * Creates a list of seek to the end operations.
+ *
+ * @param partitions the {@link TopicPartition}s.
+ * @return a list of seek to the end operations.
+ */
+ @NonNull
+ static List seekToEnd(@NonNull Collection partitions) {
+ return partitions.stream().map(KafkaSeekOperation::seekToEnd).toList();
+ }
+
+ /**
+ * Creates a forward seek operation.
+ *
+ * @param topicPartition the topic partition.
+ * @param offset the offset. Must be zero or greater.
+ * @return a forward seek operation.
+ */
+ @NonNull
+ static KafkaSeekOperation seekForward(@NonNull TopicPartition topicPartition, long offset) {
+ return new DefaultKafkaSeekOperation(topicPartition, OffsetType.FORWARD, offset);
+ }
+
+ /**
+ * Creates a backward seek operation.
+ *
+ * @param topicPartition the topic partition.
+ * @param offset the offset. Must be zero or greater.
+ * @return a backward seek operation.
+ */
+ @NonNull
+ static KafkaSeekOperation seekBackward(@NonNull TopicPartition topicPartition, long offset) {
+ return new DefaultKafkaSeekOperation(topicPartition, OffsetType.BACKWARD, offset);
+ }
+
+ /**
+ * Creates a seek to the timestamp operation.
+ *
+ * This operation will seek to the first offset whose timestamp is greater than or equal to
+ * the given one if it exists; otherwise it will seek to the end.
+ *
+ * @param topicPartition the topic partition.
+ * @param timestamp the kafka time stamp.
+ * @return a seek to the timestamp operation.
+ */
+ @NonNull
+ static KafkaSeekOperation seekToTimestamp(@NonNull TopicPartition topicPartition, long timestamp) {
+ return new DefaultKafkaSeekOperation(topicPartition, OffsetType.TIMESTAMP, timestamp);
+ }
+
+ /**
+ * Creates a list of seek to the timestamp operations.
+ *
+ * This operation will seek to the first offset whose timestamp is greater than or equal to
+ * the given one if it exists; otherwise it will seek to the end.
+ *
+ * @param topicPartitions the topic/partitions.
+ * @param timestamp the kafka time stamp.
+ * @return a list of seek to the timestamp operations.
+ */
+ @NonNull
+ static List seekToTimestamp(@NonNull Collection topicPartitions, long timestamp) {
+ return topicPartitions.stream().map(tp -> seekToTimestamp(tp, timestamp)).toList();
+ }
+
+ /**
+ * @return the topic name and partition number on which the seek should be performed.
+ */
+ @NonNull
+ TopicPartition topicPartition();
+
+ /**
+ * @return the offset type.
+ */
+ @NonNull
+ OffsetType offsetType();
+
+ /**
+ * @return the offset that should be used to perform the seek. Must be positive.
+ */
+ long offset();
+
+ /**
+ * @return the topic name on which the seek will be performed.
+ */
+ @NonNull
+ default String topic() {
+ return topicPartition().topic();
+ }
+
+ /**
+ * @return the partition number on which the seek will be performed.
+ */
+ default int partition() {
+ return topicPartition().partition();
+ }
+
+ /**
+ * Determines the interpretation of the {@link KafkaSeekOperation#offset()} value.
+ */
+ enum OffsetType {
+
+ /** The offset is absolute. */
+ ABSOLUTE,
+
+ /** The offset goes forward, relative to the current position. */
+ FORWARD,
+
+ /** The offset goes backward, relative to the current position. */
+ BACKWARD,
+
+ /** The offset goes forward, relative to the beginning of the partition. */
+ BEGINNING,
+
+ /** The offset goes backward, relative to the end of the partition. */
+ END,
+
+ /** The offset represents a Kafka timestamp. */
+ TIMESTAMP,
+ }
+}
diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/seek/KafkaSeekOperations.java b/kafka/src/main/java/io/micronaut/configuration/kafka/seek/KafkaSeekOperations.java
new file mode 100644
index 000000000..1aa973de8
--- /dev/null
+++ b/kafka/src/main/java/io/micronaut/configuration/kafka/seek/KafkaSeekOperations.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2017-2020 original authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.micronaut.configuration.kafka.seek;
+
+import io.micronaut.configuration.kafka.annotation.OffsetStrategy;
+import io.micronaut.core.annotation.Internal;
+import io.micronaut.core.annotation.NonNull;
+import io.micronaut.configuration.kafka.annotation.KafkaListener;
+
+/**
+ * Defines an interface that can be injected into {@link KafkaListener} beans so that
+ * {@link KafkaSeekOperation seek operations} can be eventually performed on a consumer.
+ *
+ * The operations will be performed by Micronaut automatically, when the consumer method
+ * completes successfully, possibly after committing offsets via {@link OffsetStrategy#AUTO}.
+ *
+ * @author Guillermo Calvo
+ * @see KafkaSeekOperation
+ * @since 4.1
+ */
+public interface KafkaSeekOperations extends Iterable {
+
+ /**
+ * Creates a new {@link KafkaSeekOperations} instance.
+ *
+ * @return a new instance.
+ */
+ @Internal
+ @NonNull
+ static KafkaSeekOperations newInstance() {
+ return new DefaultKafkaSeekOperations();
+ }
+
+ /**
+ * Adds a {@link KafkaSeekOperation} to the list.
+ *
+ * @param operation the kafka seek operation to eventually perform.
+ */
+ void defer(@NonNull KafkaSeekOperation operation);
+}
diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/seek/KafkaSeeker.java b/kafka/src/main/java/io/micronaut/configuration/kafka/seek/KafkaSeeker.java
new file mode 100644
index 000000000..9f7ca460e
--- /dev/null
+++ b/kafka/src/main/java/io/micronaut/configuration/kafka/seek/KafkaSeeker.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2017-2020 original authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.micronaut.configuration.kafka.seek;
+
+import io.micronaut.core.annotation.Internal;
+import io.micronaut.core.annotation.NonNull;
+import org.apache.kafka.clients.consumer.Consumer;
+
+/**
+ * Performs {@link KafkaSeekOperation seek operations} on a target consumer.
+ *
+ * @author Guillermo Calvo
+ * @see KafkaSeekOperation
+ * @since 4.1
+ */
+public interface KafkaSeeker {
+
+ /**
+ * Creates a new {@link KafkaSeeker} with a given target {@link Consumer}.
+ *
+ * @param consumer the target consumer.
+ * @return a new kafka seeker.
+ */
+ @Internal
+ @NonNull
+ static KafkaSeeker newInstance(@NonNull Consumer, ?> consumer) {
+ return new DefaultKafkaSeeker(consumer);
+ }
+
+ /**
+ * Performs a kafka seek operation immediately.
+ *
+ * @param operation the kafka seek operation to perform.
+ * @return whether the seek operation succeeded or not.
+ */
+ boolean perform(@NonNull KafkaSeekOperation operation);
+}
diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/seek/ConsumerSeekAwareSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/seek/ConsumerSeekAwareSpec.groovy
new file mode 100644
index 000000000..a5e8d560c
--- /dev/null
+++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/seek/ConsumerSeekAwareSpec.groovy
@@ -0,0 +1,261 @@
+package io.micronaut.configuration.kafka.seek
+
+import io.micronaut.configuration.kafka.AbstractKafkaContainerSpec
+import io.micronaut.configuration.kafka.ConsumerSeekAware
+import io.micronaut.configuration.kafka.annotation.KafkaClient
+import io.micronaut.configuration.kafka.annotation.KafkaListener
+import io.micronaut.configuration.kafka.annotation.Topic
+import io.micronaut.context.annotation.Requires
+import jakarta.inject.Singleton
+import org.apache.kafka.common.TopicPartition
+import java.time.Instant
+
+import static java.time.temporal.ChronoUnit.DAYS
+import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST
+import static io.micronaut.configuration.kafka.annotation.OffsetReset.LATEST
+import static io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration.EMBEDDED_TOPICS
+
+class ConsumerSeekAwareSpec extends AbstractKafkaContainerSpec {
+
+ static final String TEST_TOPIC = "ConsumerSeekAwareSpec-messages"
+
+ static final String MESSAGE_0 = "zero"
+ static final String MESSAGE_1 = "one"
+ static final String MESSAGE_2 = "two"
+ static final String MESSAGE_3 = "three"
+ static final String MESSAGE_4 = "four"
+ static final String MESSAGE_5 = "five"
+ static final String MESSAGE_6 = "six"
+ static final String MESSAGE_7 = "seven"
+ static final List MESSAGES_PRE_REBALANCE = [MESSAGE_0, MESSAGE_1, MESSAGE_2, MESSAGE_3, MESSAGE_4]
+ static final List MESSAGES_POST_REBALANCE = [MESSAGE_5, MESSAGE_6, MESSAGE_7]
+ static final List ALL_MESSAGES = MESSAGES_PRE_REBALANCE + MESSAGES_POST_REBALANCE
+
+ @Override
+ protected Map getConfiguration() {
+ super.configuration + [(EMBEDDED_TOPICS): TEST_TOPIC]
+ }
+
+ void "perform seek operations on partitions assigned"() {
+ given: "consumers that implement ConsumerSeekAware are rebalanced"
+ final MyConsumer01 consumer01 = context.getBean(MyConsumer01) // absolute seek to offset 3
+ final MyConsumer02 consumer02 = context.getBean(MyConsumer02) // seek to beginning
+ final MyConsumer03 consumer03 = context.getBean(MyConsumer03) // seek to beginning + 2
+ final MyConsumer04 consumer04 = context.getBean(MyConsumer04) // seek to end
+ final MyConsumer05 consumer05 = context.getBean(MyConsumer05) // seek to end - 2
+ final MyConsumer06 consumer06 = context.getBean(MyConsumer06) // seek to future timestamp
+ final MyConsumer07 consumer07 = context.getBean(MyConsumer07) // seek to past timestamp
+ final MyConsumer08 consumer08 = context.getBean(MyConsumer08) // seek to current + 3
+ final MyConsumer09 consumer09 = context.getBean(MyConsumer09) // seek to current - 3
+ final MyConsumer10 consumer10 = context.getBean(MyConsumer10) // seek to current + 0
+
+ expect: "consumers start consuming messages"
+ conditions.eventually {
+ !consumer01.revoked
+ !consumer01.messages.empty
+ !consumer02.messages.empty
+ !consumer03.messages.empty
+ consumer04.messages.empty
+ !consumer05.messages.empty
+ consumer06.messages.empty
+ !consumer07.messages.empty
+ !consumer08.messages.empty
+ !consumer09.messages.empty
+ !consumer10.messages.empty
+ }
+
+ and: "A few more messages are produced after rebalance"
+ final MyProducer producer = context.getBean(MyProducer)
+ MESSAGES_POST_REBALANCE.forEach(producer::produce)
+ consumer01.onPartitionsLost([])
+
+ and: "consumer#1 performed absolute seek operation to offset 3 -- offset 0 to 2 are skipped"
+ conditions.eventually {
+ consumer01.revoked
+ consumer01.messages == [
+ MESSAGE_3,
+ MESSAGE_4,
+ MESSAGE_5,
+ MESSAGE_6,
+ MESSAGE_7,
+ ]
+ }
+
+ and: "consumer#2 performed seek-to-beginning operation -- no offsets are skipped"
+ conditions.eventually {
+ consumer02.messages == ALL_MESSAGES
+ }
+
+ and: "consumer#3 performed seek to beginning plus 2 -- offsets 0 to 1 are skipped"
+ conditions.eventually {
+ consumer03.messages == [
+ MESSAGE_2,
+ MESSAGE_3,
+ MESSAGE_4,
+ MESSAGE_5,
+ MESSAGE_6,
+ MESSAGE_7,
+ ]
+ }
+
+ and: "consumer#4 performed seek-to-end operation -- pre-rebalance messages are skipped"
+ conditions.eventually {
+ consumer04.messages == MESSAGES_POST_REBALANCE
+ }
+
+ and: "consumer#5 performed seek to end minus 2 -- offsets 0 to 2 are skipped"
+ conditions.eventually {
+ consumer05.messages == [
+ MESSAGE_3,
+ MESSAGE_4,
+ MESSAGE_5,
+ MESSAGE_6,
+ MESSAGE_7,
+ ]
+ }
+
+ and: "consumer#6 performed seek to today plus 7 days -- falls back to seek-to-end"
+ conditions.eventually {
+ consumer06.messages == MESSAGES_POST_REBALANCE
+ }
+
+ and: "consumer#7 performed seek to epoch -- no offsets are skipped"
+ conditions.eventually {
+ consumer07.messages == ALL_MESSAGES
+ }
+
+ and: "consumer#8 performed seek to current plus 3 -- offsets 0 to 2 are skipped"
+ conditions.eventually {
+ consumer08.messages == [
+ MESSAGE_3,
+ MESSAGE_4,
+ MESSAGE_5,
+ MESSAGE_6,
+ MESSAGE_7,
+ ]
+ }
+
+ and: "consumer#9 performed seek to current minus 3 -- no offsets are skipped"
+ conditions.eventually {
+ consumer09.messages == ALL_MESSAGES
+ }
+
+ and: "consumer#10 performed seek to current plus zero -- no offsets are skipped"
+ conditions.eventually {
+ consumer10.messages == ALL_MESSAGES
+ }
+ }
+
+ @KafkaClient
+ @Requires(property = 'spec.name', value = 'ConsumerSeekAwareSpec')
+ static interface MyProducer {
+ @Topic(TEST_TOPIC) void produce(String message)
+ }
+
+ @Singleton
+ @Requires(property = 'spec.name', value = 'ConsumerSeekAwareSpec')
+ static class TestMessages {
+ TestMessages(MyProducer producer) { MESSAGES_PRE_REBALANCE.forEach(producer::produce) }
+ }
+
+ static abstract class MyAbstractConsumer implements ConsumerSeekAware {
+ final List messages = []
+ @Topic(TEST_TOPIC) void consume(String message) { messages << message }
+ }
+
+ @KafkaListener(offsetReset = EARLIEST)
+ @Requires(property = 'spec.name', value = 'ConsumerSeekAwareSpec')
+ static class MyConsumer01 extends MyAbstractConsumer {
+ MyConsumer01(TestMessages test) {}
+ boolean revoked = false
+ @Override void onPartitionsAssigned(Collection partitions, KafkaSeeker seeker) {
+ partitions.each(tp -> seeker.perform(KafkaSeekOperation.seek(tp, 3)))
+ }
+ @Override void onPartitionsRevoked(Collection partitions) {
+ revoked = true
+ }
+ @Override @Topic(patterns = TEST_TOPIC) void consume(String message) {
+ messages << message
+ }
+ }
+
+ @KafkaListener(offsetReset = LATEST)
+ @Requires(property = 'spec.name', value = 'ConsumerSeekAwareSpec')
+ static class MyConsumer02 extends MyAbstractConsumer {
+ MyConsumer02(TestMessages test) {}
+ @Override void onPartitionsAssigned(Collection partitions, KafkaSeeker seeker) {
+ KafkaSeekOperation.seekToBeginning(partitions).forEach(seeker::perform)
+ }
+ }
+
+ @KafkaListener(offsetReset = LATEST)
+ @Requires(property = 'spec.name', value = 'ConsumerSeekAwareSpec')
+ static class MyConsumer03 extends MyAbstractConsumer {
+ MyConsumer03(TestMessages test) {}
+ @Override void onPartitionsAssigned(Collection partitions, KafkaSeeker seeker) {
+ partitions.each(tp -> seeker.perform(KafkaSeekOperation.seekRelativeToBeginning(tp, 2)))
+ }
+ }
+
+ @KafkaListener(offsetReset = EARLIEST)
+ @Requires(property = 'spec.name', value = 'ConsumerSeekAwareSpec')
+ static class MyConsumer04 extends MyAbstractConsumer {
+ MyConsumer04(TestMessages test) {}
+ @Override void onPartitionsAssigned(Collection partitions, KafkaSeeker seeker) {
+ KafkaSeekOperation.seekToEnd(partitions).forEach(seeker::perform)
+ }
+ }
+
+ @KafkaListener(offsetReset = EARLIEST)
+ @Requires(property = 'spec.name', value = 'ConsumerSeekAwareSpec')
+ static class MyConsumer05 extends MyAbstractConsumer {
+ MyConsumer05(TestMessages test) {}
+ @Override void onPartitionsAssigned(Collection partitions, KafkaSeeker seeker) {
+ partitions.each(tp -> seeker.perform(KafkaSeekOperation.seekRelativeToEnd(tp, 2)))
+ }
+ }
+
+ @KafkaListener(offsetReset = EARLIEST)
+ @Requires(property = 'spec.name', value = 'ConsumerSeekAwareSpec')
+ static class MyConsumer06 extends MyAbstractConsumer {
+ MyConsumer06(TestMessages test) {}
+ @Override void onPartitionsAssigned(Collection partitions, KafkaSeeker seeker) {
+ KafkaSeekOperation.seekToTimestamp(partitions, Instant.now().plus(7, DAYS).toEpochMilli()).forEach(seeker::perform)
+ }
+ }
+
+ @KafkaListener(offsetReset = EARLIEST)
+ @Requires(property = 'spec.name', value = 'ConsumerSeekAwareSpec')
+ static class MyConsumer07 extends MyAbstractConsumer {
+ MyConsumer07(TestMessages test) {}
+ @Override void onPartitionsAssigned(Collection partitions, KafkaSeeker seeker) {
+ KafkaSeekOperation.seekToTimestamp(partitions, 0L).forEach(seeker::perform)
+ }
+ }
+ @KafkaListener(offsetReset = EARLIEST)
+ @Requires(property = 'spec.name', value = 'ConsumerSeekAwareSpec')
+ static class MyConsumer08 extends MyAbstractConsumer {
+ MyConsumer08(TestMessages test) {}
+ @Override void onPartitionsAssigned(Collection partitions, KafkaSeeker seeker) {
+ partitions.each(tp -> seeker.perform(KafkaSeekOperation.seekForward(tp, 3)))
+ }
+ }
+
+ @KafkaListener(offsetReset = EARLIEST)
+ @Requires(property = 'spec.name', value = 'ConsumerSeekAwareSpec')
+ static class MyConsumer09 extends MyAbstractConsumer {
+ MyConsumer09(TestMessages test) {}
+ @Override void onPartitionsAssigned(Collection partitions, KafkaSeeker seeker) {
+ partitions.each(tp -> seeker.perform(KafkaSeekOperation.seekBackward(tp, 3)))
+ }
+ }
+
+ @KafkaListener(offsetReset = EARLIEST)
+ @Requires(property = 'spec.name', value = 'ConsumerSeekAwareSpec')
+ static class MyConsumer10 extends MyAbstractConsumer {
+ MyConsumer10(TestMessages test) {}
+ @Override void onPartitionsAssigned(Collection partitions, KafkaSeeker seeker) {
+ partitions.each(tp -> seeker.perform(KafkaSeekOperation.seekForward(tp, 0)))
+ }
+ }
+}
diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/seek/KafkaSeekOperationsSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/seek/KafkaSeekOperationsSpec.groovy
new file mode 100644
index 000000000..7290e085a
--- /dev/null
+++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/seek/KafkaSeekOperationsSpec.groovy
@@ -0,0 +1,289 @@
+package io.micronaut.configuration.kafka.seek
+
+import io.micronaut.configuration.kafka.AbstractKafkaContainerSpec
+import io.micronaut.configuration.kafka.annotation.KafkaClient
+import io.micronaut.configuration.kafka.annotation.KafkaListener
+import io.micronaut.configuration.kafka.annotation.Topic
+import io.micronaut.context.annotation.Requires
+import jakarta.inject.Singleton
+import org.apache.kafka.common.TopicPartition
+
+import java.time.Instant
+
+import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST
+import static io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration.EMBEDDED_TOPICS
+import static java.time.temporal.ChronoUnit.DAYS
+
+class KafkaSeekOperationsSpec extends AbstractKafkaContainerSpec {
+
+ static final String TEST_TOPIC = "KafkaSeekOperationsSpec-messages"
+ static final TopicPartition TP = new TopicPartition(TEST_TOPIC, 0)
+ static final TopicPartition WRONG_TP = new TopicPartition("wrong-topic", 123)
+
+ static final String MESSAGE_0 = "zero"
+ static final String MESSAGE_1 = "one"
+ static final String MESSAGE_2 = "two"
+ static final String MESSAGE_3 = "three"
+ static final String MESSAGE_4 = "four"
+ static final String MESSAGE_5 = "five"
+ static final String MESSAGE_6 = "six"
+ static final String MESSAGE_7 = "seven"
+ static final List ALL_MESSAGES = [MESSAGE_0, MESSAGE_1, MESSAGE_2, MESSAGE_3, MESSAGE_4, MESSAGE_5, MESSAGE_6, MESSAGE_7]
+
+ @Override
+ protected Map getConfiguration() {
+ super.configuration + ['kafka.consumers.default.max.poll.records': 1, (EMBEDDED_TOPICS): TEST_TOPIC]
+ }
+
+ @Override
+ protected int getConditionsTimeout() {
+ 5
+ }
+
+ void "defer seek operations from a consumer method"() {
+ given: "consumers that implement ConsumerSeekAware are rebalanced"
+ final MyConsumer01 consumer01 = context.getBean(MyConsumer01) // absolute seek to offset 4
+ final MyConsumer02 consumer02 = context.getBean(MyConsumer02) // seek to beginning
+ final MyConsumer03 consumer03 = context.getBean(MyConsumer03) // seek to beginning + 2
+ final MyConsumer04 consumer04 = context.getBean(MyConsumer04) // seek to end
+ final MyConsumer05 consumer05 = context.getBean(MyConsumer05) // seek to end - 2
+ final MyConsumer06 consumer06 = context.getBean(MyConsumer06) // seek to future timestamp
+ final MyConsumer07 consumer07 = context.getBean(MyConsumer07) // seek to negative offset
+ final MyConsumer08 consumer08 = context.getBean(MyConsumer08) // force seek error
+ final MyConsumer09 consumer09 = context.getBean(MyConsumer09) // seek to current + 3
+ final MyConsumer10 consumer10 = context.getBean(MyConsumer10) // seek to current - 3
+
+ expect: "consumer#1 performed absolute seek operation to offset 4 -- offsets 2 to 3 are skipped"
+ conditions.eventually {
+ consumer01.messages == [
+ MESSAGE_0,
+ MESSAGE_1,
+ // now sek to offset 4
+ MESSAGE_4,
+ MESSAGE_5,
+ MESSAGE_6,
+ MESSAGE_7,
+ ]
+ }
+
+ and: "consumer#2 performed seek-to-beginning operation -- offsets 0 to 2 are processed twice"
+ conditions.eventually {
+ consumer02.messages == [
+ MESSAGE_0,
+ MESSAGE_1,
+ MESSAGE_2,
+ // now seek to the beginning
+ MESSAGE_0,
+ MESSAGE_1,
+ MESSAGE_2,
+ MESSAGE_3,
+ MESSAGE_4,
+ MESSAGE_5,
+ MESSAGE_6,
+ MESSAGE_7,
+ ]
+ }
+
+ and: "consumer#3 performed seek to beginning plus 2 -- offsets 2 to 4 are processed twice"
+ conditions.eventually {
+ consumer03.messages == [
+ MESSAGE_0,
+ MESSAGE_1,
+ MESSAGE_2,
+ MESSAGE_3,
+ MESSAGE_4,
+ // now seek to the beginning + 2
+ MESSAGE_2,
+ MESSAGE_3,
+ MESSAGE_4,
+ MESSAGE_5,
+ MESSAGE_6,
+ MESSAGE_7,
+ ]
+ }
+
+ and: "consumer#4 performed seek-to-end operation -- offsets 6 to 7 are skipped"
+ conditions.eventually {
+ consumer04.messages == [
+ MESSAGE_0,
+ MESSAGE_1,
+ MESSAGE_2,
+ MESSAGE_3,
+ // now seek to the end
+ ]
+ }
+
+ and: "consumer#5 performed seek to end minus 2 -- offsets 4 to 5 are skipped"
+ conditions.eventually {
+ consumer05.messages == [
+ MESSAGE_0,
+ MESSAGE_1,
+ MESSAGE_2,
+ MESSAGE_3,
+ // now seek to the end - 2
+ MESSAGE_6,
+ MESSAGE_7,
+ ]
+ }
+
+ and: "consumer#6 performed seek to today plus 7 days -- falls back to seek-to-end"
+ conditions.eventually {
+ consumer06.messages == [
+ MESSAGE_0,
+ MESSAGE_1,
+ MESSAGE_2,
+ // now seek to today + 7 days
+ ]
+ }
+
+ and: "consumer#7 performed seek to negative offset -- no offsets are skipped"
+ conditions.eventually {
+ consumer07.messages == ALL_MESSAGES
+ consumer07.error instanceof IllegalArgumentException
+ }
+
+ and: "consumer#8 forced seek error -- no offsets are skipped"
+ conditions.eventually {
+ consumer08.messages == [
+ MESSAGE_0,
+ MESSAGE_1,
+ MESSAGE_2,
+ // force seek error
+ MESSAGE_3,
+ MESSAGE_4,
+ MESSAGE_5,
+ MESSAGE_6,
+ MESSAGE_7,
+ ]
+ }
+
+ and: "consumer#9 performed seek to current plus 3 -- offsets 4 to 5 are skipped"
+ conditions.eventually {
+ consumer09.messages == [
+ MESSAGE_0,
+ MESSAGE_1,
+ MESSAGE_2,
+ MESSAGE_3,
+ // now seek to current + 3
+ MESSAGE_7,
+ ]
+ }
+
+ and: "consumer#10 performed seek to current minus 3 -- offsets 3 to 6 are processed twice"
+ conditions.eventually {
+ consumer10.messages == [
+ MESSAGE_0,
+ MESSAGE_1,
+ MESSAGE_2,
+ MESSAGE_3,
+ MESSAGE_4,
+ MESSAGE_5,
+ MESSAGE_6,
+ // now seek to current - 3
+ MESSAGE_4,
+ MESSAGE_5,
+ MESSAGE_6,
+ MESSAGE_7,
+ ]
+ }
+ }
+
+ @KafkaClient
+ @Requires(property = 'spec.name', value = 'KafkaSeekOperationsSpec')
+ static interface MyProducer {
+ @Topic(TEST_TOPIC) void produce(String message)
+ }
+
+ @Singleton
+ @Requires(property = 'spec.name', value = 'KafkaSeekOperationsSpec')
+ static class TestMessages {
+ TestMessages(MyProducer producer) { ALL_MESSAGES.forEach(producer::produce) }
+ }
+
+ static abstract class MyAbstractConsumer {
+ final List messages = []
+ final String targetMessage
+ Exception error
+ boolean alreadyDone
+ MyAbstractConsumer(String message) { targetMessage = message }
+ @Topic(TEST_TOPIC)
+ void consume(String message, KafkaSeekOperations ops) {
+ messages << message
+ if (!alreadyDone && message == targetMessage) {
+ alreadyDone = true
+ try { ops.defer(doTheSeek()) } catch (Exception e) { error = e }
+ }
+ }
+ abstract KafkaSeekOperation doTheSeek()
+ }
+
+ @KafkaListener(offsetReset = EARLIEST)
+ @Requires(property = 'spec.name', value = 'KafkaSeekOperationsSpec')
+ static class MyConsumer01 extends MyAbstractConsumer {
+ MyConsumer01(TestMessages test) { super('one') }
+ KafkaSeekOperation doTheSeek() { KafkaSeekOperation.seek(TP, 4) }
+ }
+
+ @KafkaListener(offsetReset = EARLIEST)
+ @Requires(property = 'spec.name', value = 'KafkaSeekOperationsSpec')
+ static class MyConsumer02 extends MyAbstractConsumer {
+ MyConsumer02(TestMessages test) { super('two') }
+ KafkaSeekOperation doTheSeek() { KafkaSeekOperation.seekToBeginning(TP) }
+ }
+
+ @KafkaListener(offsetReset = EARLIEST)
+ @Requires(property = 'spec.name', value = 'KafkaSeekOperationsSpec')
+ static class MyConsumer03 extends MyAbstractConsumer {
+ MyConsumer03(TestMessages test) { super('four') }
+ KafkaSeekOperation doTheSeek() { KafkaSeekOperation.seekRelativeToBeginning(TP, 2) }
+ }
+
+ @KafkaListener(offsetReset = EARLIEST)
+ @Requires(property = 'spec.name', value = 'KafkaSeekOperationsSpec')
+ static class MyConsumer04 extends MyAbstractConsumer {
+ MyConsumer04(TestMessages test) { super('three') }
+ KafkaSeekOperation doTheSeek() { KafkaSeekOperation.seekToEnd(TP) }
+ }
+
+ @KafkaListener(offsetReset = EARLIEST)
+ @Requires(property = 'spec.name', value = 'KafkaSeekOperationsSpec')
+ static class MyConsumer05 extends MyAbstractConsumer {
+ MyConsumer05(TestMessages test) { super('three') }
+ KafkaSeekOperation doTheSeek() { KafkaSeekOperation.seekRelativeToEnd(TP, 2) }
+ }
+
+ @KafkaListener(offsetReset = EARLIEST)
+ @Requires(property = 'spec.name', value = 'KafkaSeekOperationsSpec')
+ static class MyConsumer06 extends MyAbstractConsumer {
+ MyConsumer06(TestMessages test) { super('two') }
+ KafkaSeekOperation doTheSeek() { KafkaSeekOperation.seekToTimestamp(TP, Instant.now().plus(7, DAYS).toEpochMilli()) }
+ }
+
+ @KafkaListener(offsetReset = EARLIEST)
+ @Requires(property = 'spec.name', value = 'KafkaSeekOperationsSpec')
+ static class MyConsumer07 extends MyAbstractConsumer {
+ MyConsumer07(TestMessages test) { super('five') }
+ KafkaSeekOperation doTheSeek() { KafkaSeekOperation.seek(TP, -5) }
+ }
+
+ @KafkaListener(offsetReset = EARLIEST)
+ @Requires(property = 'spec.name', value = 'KafkaSeekOperationsSpec')
+ static class MyConsumer08 extends MyAbstractConsumer {
+ MyConsumer08(TestMessages test) { super('two') }
+ KafkaSeekOperation doTheSeek() { KafkaSeekOperation.seek(WRONG_TP, 321) }
+ }
+
+ @KafkaListener(offsetReset = EARLIEST)
+ @Requires(property = 'spec.name', value = 'KafkaSeekOperationsSpec')
+ static class MyConsumer09 extends MyAbstractConsumer {
+ MyConsumer09(TestMessages test) { super('three') }
+ KafkaSeekOperation doTheSeek() { KafkaSeekOperation.seekForward(TP, 3) }
+ }
+
+ @KafkaListener(offsetReset = EARLIEST)
+ @Requires(property = 'spec.name', value = 'KafkaSeekOperationsSpec')
+ static class MyConsumer10 extends MyAbstractConsumer {
+ MyConsumer10(TestMessages test) { super('six') }
+ KafkaSeekOperation doTheSeek() { KafkaSeekOperation.seekBackward(TP, 3) }
+ }
+}
diff --git a/kafka/src/test/resources/logback.xml b/kafka/src/test/resources/logback.xml
index 31ce8cd88..c0cab3b2f 100644
--- a/kafka/src/test/resources/logback.xml
+++ b/kafka/src/test/resources/logback.xml
@@ -11,6 +11,7 @@
+
diff --git a/src/main/docs/guide/kafkaListener/kafkaOffsets.adoc b/src/main/docs/guide/kafkaListener/kafkaOffsets.adoc
index 4c07eace8..3c8d556bf 100644
--- a/src/main/docs/guide/kafkaListener/kafkaOffsets.adoc
+++ b/src/main/docs/guide/kafkaListener/kafkaOffsets.adoc
@@ -65,32 +65,3 @@ include::{testskafka}/consumer/offsets/manual/ProductListener.java[tags=method,
<1> Committing offsets automatically is disabled
<2> The listener method specifies that it receives the offset data and a `KafkaConsumer`
<3> The `commitSync()` method is called once the record has been processed
-
-=== Manually Assigning Offsets to a Consumer Bean
-
-Sometimes you may wish to control exactly the position you wish to resume consuming messages from.
-
-For example if you store offsets in a database you may wish to read the offsets from the database when the consumer starts and start reading from the position stored in the database.
-
-To support this use case your consumer bean can implement the link:{kafkaapi}/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html[ConsumerRebalanceListener] and api:configuration.kafka.KafkaConsumerAware[] interfaces:
-
-.Manually seeking offsets with the `KafkaConsumer` API
-[source,java]
-----
-include::{testskafka}/consumer/offsets/rebalance/ProductListener.java[tags=imports, indent=0]
-
-include::{testskafka}/consumer/offsets/rebalance/ProductListener.java[tags=clazz, indent=0]
-----
-
-<1> The `setKafkaConsumer` of the api:configuration.kafka.KafkaConsumerAware[] allows access to the underlying producer
-<2> The `onPartitionsRevoked` can be used to save offsets
-<3> The `onPartitionsAssigned` can use used to read offsets and seek to a specific position. In this trivial example we just seek to the offset 1 (skipping the first record).
-
-=== Manual Offsets with Multiple Topics
-
-It is possible for a single `@KafkaListener` bean to represent multiple consumers. If you have more than one method annotated with `@Topic` then `setKafkaConsumer` will be called multiple times for each backing consumer.
-
-
-It is recommended in the case of manually seeking offsets that you use a single listener bean per consumer, the alternative is to store an internal `Set` of all consumers associated with a particular listener and manually search for the correct listener in the `onPartitionsAssigned` using the partition assignment data.
-
-WARNING: Not doing so will lead to a `ConcurrentModificationException` error.
diff --git a/src/main/docs/guide/kafkaListener/kafkaSeek.adoc b/src/main/docs/guide/kafkaListener/kafkaSeek.adoc
new file mode 100644
index 000000000..e69de29bb
diff --git a/src/main/docs/guide/kafkaListener/kafkaSeek/creatingKafkaSeekOperations.adoc b/src/main/docs/guide/kafkaListener/kafkaSeek/creatingKafkaSeekOperations.adoc
new file mode 100644
index 000000000..c58da675f
--- /dev/null
+++ b/src/main/docs/guide/kafkaListener/kafkaSeek/creatingKafkaSeekOperations.adoc
@@ -0,0 +1,10 @@
+The interface KafkaSeekOperation.api:configuration.kafka.seek.KafkaSeekOperation[] provides several static methods to create `seek` operations:
+
+* `seek`: Creates an absolute seek operation.
+* `seekRelativeToBeginning`: Creates a seek operation relative to the beginning.
+* `seekToBeginning`: Creates a seek to the beginning operation.
+* `seekRelativeToEnd`: Creates a seek operation relative to the end.
+* `seekToEnd`: Creates a seek to the end operation.
+* `seekForward`: Creates a forward seek operation.
+* `seekBackward`: Creates a backward seek operation.
+* `seekToTimestamp`: Creates a seek to the timestamp operation.
diff --git a/src/main/docs/guide/kafkaListener/kafkaSeek/manualOffsetsWithMultipleTopics.adoc b/src/main/docs/guide/kafkaListener/kafkaSeek/manualOffsetsWithMultipleTopics.adoc
new file mode 100644
index 000000000..c0e89d9ef
--- /dev/null
+++ b/src/main/docs/guide/kafkaListener/kafkaSeek/manualOffsetsWithMultipleTopics.adoc
@@ -0,0 +1,6 @@
+It is possible for a single `@KafkaListener` bean to represent multiple consumers. If you have more than one method annotated with `@Topic` then `setKafkaConsumer` will be called multiple times for each backing consumer.
+
+
+It is recommended in the case of manually seeking offsets that you use a single listener bean per consumer, the alternative is to store an internal `Set` of all consumers associated with a particular listener and manually search for the correct listener in the `onPartitionsAssigned` using the partition assignment data.
+
+WARNING: Not doing so will lead to a `ConcurrentModificationException` error.
diff --git a/src/main/docs/guide/kafkaListener/kafkaSeek/manuallyAssigningOffsetsFromAConsumerMethod.adoc b/src/main/docs/guide/kafkaListener/kafkaSeek/manuallyAssigningOffsetsFromAConsumerMethod.adoc
new file mode 100644
index 000000000..13d0b595e
--- /dev/null
+++ b/src/main/docs/guide/kafkaListener/kafkaSeek/manuallyAssigningOffsetsFromAConsumerMethod.adoc
@@ -0,0 +1,12 @@
+There may be some scenarios where you realize you need to `seek` to a different offset while consuming another one.
+
+To support this use case, your consumer method can receive a api:configuration.kafka.seek.KafkaSeekOperations[] instance as a parameter:
+
+snippet::io.micronaut.kafka.docs.seek.ops.ProductListener[]
+
+<1> An instance of api:configuration.kafka.seek.KafkaSeekOperations[] will be injected to the method
+<2> Any number of `seek` operations can be deferred. In this trivial example we just seek to the end of the partition.
+
+The `seek` operations will be performed by Micronaut automatically, when the consumer method completes successfully, possibly after committing offsets via `OffsetStrategy.AUTO`.
+
+TIP: These operations determine the next offset retrieved by `poll`. Take into account that, even if the `seek` operation performs successfully, your consumer method may keep receiving records that were cached by the previous call. You can configure `max.poll.records` to control the maximum number of records returned by a single call to `poll`.
diff --git a/src/main/docs/guide/kafkaListener/kafkaSeek/manuallyAssigningOffsetsToAConsumerBean.adoc b/src/main/docs/guide/kafkaListener/kafkaSeek/manuallyAssigningOffsetsToAConsumerBean.adoc
new file mode 100644
index 000000000..dea5b1131
--- /dev/null
+++ b/src/main/docs/guide/kafkaListener/kafkaSeek/manuallyAssigningOffsetsToAConsumerBean.adoc
@@ -0,0 +1,25 @@
+Sometimes you may wish to control exactly the position you wish to resume consuming messages from.
+
+For example if you store offsets in a database you may wish to read the offsets from the database when the consumer starts and start reading from the position stored in the database.
+
+To support this use case your consumer bean can implement the api:configuration.kafka.ConsumerSeekAware[] interface:
+
+.Manually seeking offsets with the `ConsumerSeekAware` API
+
+snippet::io.micronaut.kafka.docs.seek.aware.ProductListener[]
+
+<1> Implement the interface `ConsumerSeekAware`
+<2> The `onPartitionsRevoked` can be used to save offsets
+<3> The `onPartitionsAssigned` can use used to read offsets and seek to a specific position. In this trivial example we just seek to the offset 1 (skipping the first record).
+
+TIP: api:configuration.kafka.ConsumerSeekAware[] provides a convenient api:configuration.kafka.seek.KafkaSeeker[] object that can be used to perform api:configuration.kafka.seek.KafkaSeekOperation[]s immediately on the underlying consumer.
+
+Alternatively, when more fine-grained access to the Kafka consumer is required, your consumer bean can instead implement the link:{kafkaapi}/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html[ConsumerRebalanceListener] and api:configuration.kafka.ConsumerAware[] interfaces:
+
+.Manually seeking offsets with the `KafkaConsumer` API
+
+snippet::io.micronaut.kafka.docs.seek.rebalance.ProductListener[]
+
+<1> The `setKafkaConsumer` of the api:configuration.kafka.ConsumerAware[] allows access to the underlying consumer
+<2> The `onPartitionsRevoked` can be used to save offsets
+<3> The `onPartitionsAssigned` can use used to read offsets and seek to a specific position. In this trivial example we just seek to the offset 1 (skipping the first record).
diff --git a/src/main/docs/guide/toc.yml b/src/main/docs/guide/toc.yml
index 676f705d9..ede5758b3 100644
--- a/src/main/docs/guide/toc.yml
+++ b/src/main/docs/guide/toc.yml
@@ -15,6 +15,12 @@ kafkaListener:
kafkaListenerMethods: Defining @KafkaListener Methods
kafkaListenerConfiguration: Configuring @KafkaListener beans
kafkaOffsets: Commiting Kafka Offsets
+ kafkaSeek:
+ title: Assigning Kafka Offsets
+ manuallyAssigningOffsetsToAConsumerBean: Manually Assigning Offsets to a Consumer Bean
+ manualOffsetsWithMultipleTopics: Manual Offsets with Multiple Topics
+ manuallyAssigningOffsetsFromAConsumerMethod: Manually Assigning Offsets from a Consumer Method
+ creatingKafkaSeekOperations: Creating Kafka Seek Operations
kafkaListenerBatch: Kafka Batch Processing
kafkaSendTo: Forwarding Messages with @SendTo
kafkaErrors: Handling Consumer Exceptions
@@ -28,4 +34,4 @@ kafkaStreams:
title: Kafka Streams
kafkaStreamInteractiveQuery: Interactive Query Service
kafkaStreamHealth: Kafka Stream Health Checks
-repository: Repository
\ No newline at end of file
+repository: Repository
diff --git a/test-suite-groovy/build.gradle.kts b/test-suite-groovy/build.gradle.kts
index 86ad0232e..c84dff671 100644
--- a/test-suite-groovy/build.gradle.kts
+++ b/test-suite-groovy/build.gradle.kts
@@ -6,6 +6,7 @@ plugins {
dependencies {
testImplementation(platform(mn.micronaut.core.bom))
testCompileOnly(mn.micronaut.inject.groovy)
+ testImplementation(mnSerde.micronaut.serde.jackson)
testImplementation(libs.testcontainers.kafka)
testImplementation(mnTest.micronaut.test.spock)
testRuntimeOnly(mnLogging.logback.classic)
diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/Product.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/Product.groovy
new file mode 100644
index 000000000..feb388ba4
--- /dev/null
+++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/Product.groovy
@@ -0,0 +1,11 @@
+package io.micronaut.kafka.docs
+
+import groovy.transform.Canonical
+import io.micronaut.serde.annotation.Serdeable
+
+@Serdeable
+@Canonical
+class Product {
+ String name
+ int quantity
+}
diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/Products.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/Products.groovy
new file mode 100644
index 000000000..0c77731bc
--- /dev/null
+++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/Products.groovy
@@ -0,0 +1,7 @@
+package io.micronaut.kafka.docs
+
+class Products {
+
+ static final Product PRODUCT_0 = new Product("Apple", 10)
+ static final Product PRODUCT_1 = new Product("Banana", 20)
+}
diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/aware/ConsumerSeekAwareSpec.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/aware/ConsumerSeekAwareSpec.groovy
new file mode 100644
index 000000000..017b26161
--- /dev/null
+++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/aware/ConsumerSeekAwareSpec.groovy
@@ -0,0 +1,24 @@
+package io.micronaut.kafka.docs.seek.aware
+
+import io.micronaut.context.annotation.Property
+import io.micronaut.kafka.docs.Products
+import io.micronaut.test.extensions.spock.annotation.MicronautTest
+import jakarta.inject.Inject
+import spock.lang.Specification
+import spock.util.concurrent.PollingConditions
+
+@MicronautTest
+@Property(name = "spec.name", value = "ConsumerSeekAwareSpec")
+class ConsumerSeekAwareSpec extends Specification {
+
+ @Inject
+ ProductListener consumer
+
+ void "test product listener"() {
+ expect:
+ new PollingConditions(timeout: 5).eventually {
+ !consumer.processed.contains(Products.PRODUCT_0)
+ consumer.processed.contains(Products.PRODUCT_1)
+ }
+ }
+}
diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/aware/ProductClient.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/aware/ProductClient.groovy
new file mode 100644
index 000000000..ca93db245
--- /dev/null
+++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/aware/ProductClient.groovy
@@ -0,0 +1,13 @@
+package io.micronaut.kafka.docs.seek.aware
+
+import io.micronaut.configuration.kafka.annotation.KafkaClient
+import io.micronaut.configuration.kafka.annotation.Topic
+import io.micronaut.context.annotation.Requires
+import io.micronaut.kafka.docs.Product
+
+@Requires(property = "spec.name", value = "ConsumerSeekAwareSpec")
+@KafkaClient
+interface ProductClient {
+ @Topic("wonderful-products")
+ void produce(Product product)
+}
diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/aware/ProductListener.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/aware/ProductListener.groovy
new file mode 100644
index 000000000..fa2dcb5a2
--- /dev/null
+++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/aware/ProductListener.groovy
@@ -0,0 +1,35 @@
+package io.micronaut.kafka.docs.seek.aware
+
+import io.micronaut.configuration.kafka.ConsumerSeekAware
+import io.micronaut.configuration.kafka.annotation.*
+import io.micronaut.configuration.kafka.seek.*
+import io.micronaut.context.annotation.Requires
+import io.micronaut.kafka.docs.Product
+import org.apache.kafka.common.TopicPartition
+
+@KafkaListener
+@Requires(property = "spec.name", value = "ConsumerSeekAwareSpec")
+class ProductListener implements ConsumerSeekAware { // <1>
+
+ List processed = []
+
+ ProductListener(ProductListenerConfiguration config) {
+ // ...
+ }
+
+ @Topic("wonderful-products")
+ void receive(Product product) {
+ processed << product
+ }
+
+ @Override
+ void onPartitionsRevoked(Collection partitions) { // <2>
+ // save offsets here
+ }
+
+ @Override
+ void onPartitionsAssigned(Collection partitions, KafkaSeeker seeker) { // <3>
+ // seek to offset here
+ partitions.collect { KafkaSeekOperation.seek(it, 1) }.each(seeker.&perform)
+ }
+}
diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/aware/ProductListenerConfiguration.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/aware/ProductListenerConfiguration.groovy
new file mode 100644
index 000000000..e0e4ba3f4
--- /dev/null
+++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/aware/ProductListenerConfiguration.groovy
@@ -0,0 +1,16 @@
+package io.micronaut.kafka.docs.seek.aware
+
+import io.micronaut.context.annotation.Requires
+import io.micronaut.kafka.docs.Products
+import jakarta.inject.Singleton
+
+@Singleton
+@Requires(property = "spec.name", value = "ConsumerSeekAwareSpec")
+class ProductListenerConfiguration {
+
+ ProductListenerConfiguration(ProductClient producer) {
+ // Records are produced before ProductListener rebalances
+ producer.produce(Products.PRODUCT_0)
+ producer.produce(Products.PRODUCT_1)
+ }
+}
diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/ops/KafkaSeekOperationsSpec.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/ops/KafkaSeekOperationsSpec.groovy
new file mode 100644
index 000000000..f6840865c
--- /dev/null
+++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/ops/KafkaSeekOperationsSpec.groovy
@@ -0,0 +1,24 @@
+package io.micronaut.kafka.docs.seek.ops
+
+import io.micronaut.context.annotation.Property
+import io.micronaut.kafka.docs.Products
+import io.micronaut.test.extensions.spock.annotation.MicronautTest
+import jakarta.inject.Inject
+import spock.lang.Specification
+import spock.util.concurrent.PollingConditions
+
+@MicronautTest
+@Property(name = "spec.name", value = "KafkaSeekOperationsSpec")
+class KafkaSeekOperationsSpec extends Specification {
+
+ @Inject
+ ProductListener consumer
+
+ void "test product listener"() {
+ expect:
+ new PollingConditions(timeout: 5).eventually {
+ consumer.processed.contains(Products.PRODUCT_0)
+ !consumer.processed.contains(Products.PRODUCT_1)
+ }
+ }
+}
diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/ops/ProductClient.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/ops/ProductClient.groovy
new file mode 100644
index 000000000..b37faea3d
--- /dev/null
+++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/ops/ProductClient.groovy
@@ -0,0 +1,13 @@
+package io.micronaut.kafka.docs.seek.ops
+
+import io.micronaut.configuration.kafka.annotation.KafkaClient
+import io.micronaut.configuration.kafka.annotation.Topic
+import io.micronaut.context.annotation.Requires
+import io.micronaut.kafka.docs.Product
+
+@Requires(property = "spec.name", value = "KafkaSeekOperationsSpec")
+@KafkaClient
+interface ProductClient {
+ @Topic("amazing-products")
+ void produce(Product product)
+}
diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/ops/ProductListener.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/ops/ProductListener.groovy
new file mode 100644
index 000000000..131bee686
--- /dev/null
+++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/ops/ProductListener.groovy
@@ -0,0 +1,24 @@
+package io.micronaut.kafka.docs.seek.ops
+
+import io.micronaut.configuration.kafka.annotation.*
+import io.micronaut.configuration.kafka.seek.*
+import io.micronaut.context.annotation.*
+import io.micronaut.kafka.docs.Product
+import org.apache.kafka.common.TopicPartition
+
+@KafkaListener(offsetReset = OffsetReset.EARLIEST, properties = @Property(name = "max.poll.records", value = "1"))
+@Requires(property = "spec.name", value = "KafkaSeekOperationsSpec")
+class ProductListener {
+
+ List processed = []
+
+ ProductListener(ProductListenerConfiguration config) {
+ // ...
+ }
+
+ @Topic("amazing-products")
+ void receive(Product product, KafkaSeekOperations ops) { // <1>
+ processed << product
+ ops.defer(KafkaSeekOperation.seekToEnd(new TopicPartition("amazing-products", 0))); // <2>
+ }
+}
diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/ops/ProductListenerConfiguration.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/ops/ProductListenerConfiguration.groovy
new file mode 100644
index 000000000..153d9ba53
--- /dev/null
+++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/ops/ProductListenerConfiguration.groovy
@@ -0,0 +1,16 @@
+package io.micronaut.kafka.docs.seek.ops
+
+import io.micronaut.context.annotation.Requires
+import io.micronaut.kafka.docs.Products
+import jakarta.inject.Singleton
+
+@Singleton
+@Requires(property = "spec.name", value = "KafkaSeekOperationsSpec")
+class ProductListenerConfiguration {
+
+ ProductListenerConfiguration(ProductClient producer) {
+ // Records are produced before ProductListener rebalances
+ producer.produce(Products.PRODUCT_0)
+ producer.produce(Products.PRODUCT_1)
+ }
+}
diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/rebalance/ConsumerRebalanceListenerSpec.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/rebalance/ConsumerRebalanceListenerSpec.groovy
new file mode 100644
index 000000000..02a469db9
--- /dev/null
+++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/rebalance/ConsumerRebalanceListenerSpec.groovy
@@ -0,0 +1,24 @@
+package io.micronaut.kafka.docs.seek.rebalance
+
+import io.micronaut.context.annotation.Property
+import io.micronaut.kafka.docs.Products
+import io.micronaut.test.extensions.spock.annotation.MicronautTest
+import jakarta.inject.Inject
+import spock.lang.Specification
+import spock.util.concurrent.PollingConditions
+
+@MicronautTest
+@Property(name = "spec.name", value = "ConsumerRebalanceListenerSpec")
+class ConsumerRebalanceListenerSpec extends Specification {
+
+ @Inject
+ ProductListener consumer
+
+ void "test product listener"() {
+ expect:
+ new PollingConditions(timeout: 5).eventually {
+ !consumer.processed.contains(Products.PRODUCT_0)
+ consumer.processed.contains(Products.PRODUCT_1)
+ }
+ }
+}
diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/rebalance/ProductClient.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/rebalance/ProductClient.groovy
new file mode 100644
index 000000000..be0032c48
--- /dev/null
+++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/rebalance/ProductClient.groovy
@@ -0,0 +1,13 @@
+package io.micronaut.kafka.docs.seek.rebalance
+
+import io.micronaut.configuration.kafka.annotation.KafkaClient
+import io.micronaut.configuration.kafka.annotation.Topic
+import io.micronaut.context.annotation.Requires
+import io.micronaut.kafka.docs.Product
+
+@Requires(property = "spec.name", value = "ConsumerRebalanceListenerSpec")
+@KafkaClient
+interface ProductClient {
+ @Topic("awesome-products")
+ void produce(Product product)
+}
diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/rebalance/ProductListener.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/rebalance/ProductListener.groovy
new file mode 100644
index 000000000..8f310b461
--- /dev/null
+++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/rebalance/ProductListener.groovy
@@ -0,0 +1,44 @@
+package io.micronaut.kafka.docs.seek.rebalance
+
+import io.micronaut.configuration.kafka.ConsumerAware
+import io.micronaut.configuration.kafka.annotation.*
+import io.micronaut.context.annotation.Requires
+import io.micronaut.core.annotation.NonNull
+import io.micronaut.kafka.docs.Product
+import org.apache.kafka.clients.consumer.*
+import org.apache.kafka.common.TopicPartition
+
+@KafkaListener(offsetReset = OffsetReset.EARLIEST)
+@Requires(property = "spec.name", value = "ConsumerRebalanceListenerSpec")
+class ProductListener implements ConsumerRebalanceListener, ConsumerAware {
+
+ List processed = []
+ private Consumer consumer
+
+ ProductListener(ProductListenerConfiguration config) {
+ // ...
+ }
+
+ @Override
+ void setKafkaConsumer(@NonNull Consumer consumer) { // <1>
+ this.consumer = consumer
+ }
+
+ @Topic("awesome-products")
+ void receive(Product product) {
+ processed << product
+ }
+
+ @Override
+ void onPartitionsRevoked(Collection partitions) { // <2>
+ // save offsets here
+ }
+
+ @Override
+ void onPartitionsAssigned(Collection partitions) { // <3>
+ // seek to offset here
+ for (TopicPartition partition : partitions) {
+ consumer.seek(partition, 1)
+ }
+ }
+}
diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/rebalance/ProductListenerConfiguration.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/rebalance/ProductListenerConfiguration.groovy
new file mode 100644
index 000000000..b5d01ac5e
--- /dev/null
+++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/seek/rebalance/ProductListenerConfiguration.groovy
@@ -0,0 +1,16 @@
+package io.micronaut.kafka.docs.seek.rebalance
+
+import io.micronaut.context.annotation.Requires
+import io.micronaut.kafka.docs.Products
+import jakarta.inject.Singleton
+
+@Singleton
+@Requires(property = "spec.name", value = "ConsumerRebalanceListenerSpec")
+class ProductListenerConfiguration {
+
+ ProductListenerConfiguration(ProductClient producer) {
+ // Records are produced before ProductListener rebalances
+ producer.produce(Products.PRODUCT_0)
+ producer.produce(Products.PRODUCT_1)
+ }
+}
diff --git a/test-suite-kotlin/build.gradle.kts b/test-suite-kotlin/build.gradle.kts
index 81d71e5fd..b05d3bc17 100644
--- a/test-suite-kotlin/build.gradle.kts
+++ b/test-suite-kotlin/build.gradle.kts
@@ -7,6 +7,7 @@ plugins {
dependencies {
kaptTest(platform(mn.micronaut.core.bom))
kaptTest(mn.micronaut.inject.java)
+ testImplementation(mnSerde.micronaut.serde.jackson)
testImplementation(libs.testcontainers.kafka)
testImplementation(mnTest.micronaut.test.junit5)
testRuntimeOnly(libs.junit.jupiter.engine)
diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/Product.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/Product.kt
new file mode 100644
index 000000000..a923b1d3f
--- /dev/null
+++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/Product.kt
@@ -0,0 +1,6 @@
+package io.micronaut.kafka.docs
+
+import io.micronaut.serde.annotation.Serdeable
+
+@Serdeable
+data class Product(val name: String, val quantity: Int) { }
diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/Products.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/Products.kt
new file mode 100644
index 000000000..afe02bc23
--- /dev/null
+++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/Products.kt
@@ -0,0 +1,9 @@
+package io.micronaut.kafka.docs
+
+class Products {
+
+ companion object {
+ val PRODUCT_0 = Product("Apple", 10)
+ val PRODUCT_1 = Product("Banana", 20)
+ }
+}
diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/aware/ConsumerSeekAwareTest.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/aware/ConsumerSeekAwareTest.kt
new file mode 100644
index 000000000..5d2feaedc
--- /dev/null
+++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/aware/ConsumerSeekAwareTest.kt
@@ -0,0 +1,20 @@
+package io.micronaut.kafka.docs.seek.aware
+
+import io.micronaut.context.annotation.Property
+import io.micronaut.kafka.docs.Products
+import io.micronaut.test.extensions.junit5.annotation.MicronautTest
+import org.awaitility.Awaitility.await
+import org.junit.jupiter.api.Test
+import java.util.concurrent.TimeUnit
+
+@MicronautTest
+@Property(name = "spec.name", value = "ConsumerSeekAwareTest")
+internal class ConsumerSeekAwareTest {
+ @Test
+ fun testProductListener(consumer: ProductListener) {
+ await().atMost(10, TimeUnit.SECONDS).until {
+ !consumer.processed.contains(Products.PRODUCT_0) &&
+ consumer.processed.contains(Products.PRODUCT_1)
+ }
+ }
+}
diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/aware/ProductClient.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/aware/ProductClient.kt
new file mode 100644
index 000000000..74630b231
--- /dev/null
+++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/aware/ProductClient.kt
@@ -0,0 +1,13 @@
+package io.micronaut.kafka.docs.seek.aware
+
+import io.micronaut.configuration.kafka.annotation.KafkaClient
+import io.micronaut.configuration.kafka.annotation.Topic
+import io.micronaut.context.annotation.Requires
+import io.micronaut.kafka.docs.Product
+
+@Requires(property = "spec.name", value = "ConsumerSeekAwareTest")
+@KafkaClient
+interface ProductClient {
+ @Topic("wonderful-products")
+ fun produce(product: Product)
+}
diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/aware/ProductListener.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/aware/ProductListener.kt
new file mode 100644
index 000000000..da3cfd224
--- /dev/null
+++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/aware/ProductListener.kt
@@ -0,0 +1,29 @@
+package io.micronaut.kafka.docs.seek.aware
+
+import io.micronaut.configuration.kafka.ConsumerSeekAware
+import io.micronaut.configuration.kafka.annotation.*
+import io.micronaut.configuration.kafka.seek.*
+import io.micronaut.context.annotation.Requires
+import io.micronaut.kafka.docs.Product
+import org.apache.kafka.common.TopicPartition
+
+@KafkaListener
+@Requires(property = "spec.name", value = "ConsumerSeekAwareTest")
+class ProductListener constructor(config: ProductListenerConfiguration) : ConsumerSeekAware { // <1>
+
+ var processed: MutableList = mutableListOf()
+
+ @Topic("wonderful-products")
+ fun receive(product: Product) {
+ processed.add(product)
+ }
+
+ override fun onPartitionsRevoked(partitions: Collection) { // <2>
+ // save offsets here
+ }
+
+ override fun onPartitionsAssigned(partitions: Collection, seeker: KafkaSeeker) { // <3>
+ // seek to offset here
+ partitions.stream().map { tp -> KafkaSeekOperation.seek(tp, 1) }.forEach(seeker::perform)
+ }
+}
diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/aware/ProductListenerConfiguration.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/aware/ProductListenerConfiguration.kt
new file mode 100644
index 000000000..e40234969
--- /dev/null
+++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/aware/ProductListenerConfiguration.kt
@@ -0,0 +1,16 @@
+package io.micronaut.kafka.docs.seek.aware
+
+import io.micronaut.context.annotation.Requires
+import io.micronaut.kafka.docs.Products
+import jakarta.inject.Singleton
+
+@Singleton
+@Requires(property = "spec.name", value = "ConsumerSeekAwareTest")
+class ProductListenerConfiguration constructor(producer: ProductClient) {
+
+ init {
+ // Records are produced before ProductListener rebalances
+ producer.produce(Products.PRODUCT_0)
+ producer.produce(Products.PRODUCT_1)
+ }
+}
diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/ops/KafkaSeekOperationsTest.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/ops/KafkaSeekOperationsTest.kt
new file mode 100644
index 000000000..12abb7db5
--- /dev/null
+++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/ops/KafkaSeekOperationsTest.kt
@@ -0,0 +1,20 @@
+package io.micronaut.kafka.docs.seek.ops
+
+import io.micronaut.context.annotation.Property
+import io.micronaut.kafka.docs.Products
+import io.micronaut.test.extensions.junit5.annotation.MicronautTest
+import org.awaitility.Awaitility.await
+import org.junit.jupiter.api.Test
+import java.util.concurrent.TimeUnit
+
+@MicronautTest
+@Property(name = "spec.name", value = "KafkaSeekOperationsTest")
+internal class KafkaSeekOperationsTest {
+ @Test
+ fun testProductListener(consumer: ProductListener) {
+ await().atMost(10, TimeUnit.SECONDS).until {
+ consumer.processed.contains(Products.PRODUCT_0) &&
+ !consumer.processed.contains(Products.PRODUCT_1)
+ }
+ }
+}
diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/ops/ProductClient.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/ops/ProductClient.kt
new file mode 100644
index 000000000..67d822a3e
--- /dev/null
+++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/ops/ProductClient.kt
@@ -0,0 +1,13 @@
+package io.micronaut.kafka.docs.seek.ops
+
+import io.micronaut.configuration.kafka.annotation.KafkaClient
+import io.micronaut.configuration.kafka.annotation.Topic
+import io.micronaut.context.annotation.Requires
+import io.micronaut.kafka.docs.Product
+
+@Requires(property = "spec.name", value = "KafkaSeekOperationsTest")
+@KafkaClient
+interface ProductClient {
+ @Topic("amazing-products")
+ fun produce(product: Product)
+}
diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/ops/ProductListener.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/ops/ProductListener.kt
new file mode 100644
index 000000000..a4fb38967
--- /dev/null
+++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/ops/ProductListener.kt
@@ -0,0 +1,20 @@
+package io.micronaut.kafka.docs.seek.ops
+
+import io.micronaut.configuration.kafka.annotation.*
+import io.micronaut.configuration.kafka.seek.*
+import io.micronaut.context.annotation.*
+import io.micronaut.kafka.docs.Product
+import org.apache.kafka.common.TopicPartition
+
+@KafkaListener(offsetReset = OffsetReset.EARLIEST, properties = [Property(name = "max.poll.records", value = "1")])
+@Requires(property = "spec.name", value = "KafkaSeekOperationsTest")
+class ProductListener constructor(config: ProductListenerConfiguration) {
+
+ var processed: MutableList = mutableListOf()
+
+ @Topic("amazing-products")
+ fun receive(product: Product, ops: KafkaSeekOperations) { // <1>
+ processed.add(product)
+ ops.defer(KafkaSeekOperation.seekToBeginning(TopicPartition("amazing-products", 0))) // <2>
+ }
+}
diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/ops/ProductListenerConfiguration.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/ops/ProductListenerConfiguration.kt
new file mode 100644
index 000000000..c836ed28b
--- /dev/null
+++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/ops/ProductListenerConfiguration.kt
@@ -0,0 +1,16 @@
+package io.micronaut.kafka.docs.seek.ops
+
+import io.micronaut.context.annotation.Requires
+import io.micronaut.kafka.docs.Products
+import jakarta.inject.Singleton
+
+@Singleton
+@Requires(property = "spec.name", value = "KafkaSeekOperationsTest")
+class ProductListenerConfiguration constructor(producer: ProductClient) {
+
+ init {
+ // Records are produced before ProductListener rebalances
+ producer.produce(Products.PRODUCT_0)
+ producer.produce(Products.PRODUCT_1)
+ }
+}
diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/rebalance/ConsumerRebalanceListenerTest.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/rebalance/ConsumerRebalanceListenerTest.kt
new file mode 100644
index 000000000..e085bbcdf
--- /dev/null
+++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/rebalance/ConsumerRebalanceListenerTest.kt
@@ -0,0 +1,20 @@
+package io.micronaut.kafka.docs.seek.rebalance
+
+import io.micronaut.context.annotation.Property
+import io.micronaut.kafka.docs.Products
+import io.micronaut.test.extensions.junit5.annotation.MicronautTest
+import org.awaitility.Awaitility.await
+import org.junit.jupiter.api.Test
+import java.util.concurrent.TimeUnit
+
+@MicronautTest
+@Property(name = "spec.name", value = "ConsumerRebalanceListenerTest")
+internal class ConsumerRebalanceListenerTest {
+ @Test
+ fun testProductListener(consumer: ProductListener) {
+ await().atMost(10, TimeUnit.SECONDS).until {
+ !consumer.processed.contains(Products.PRODUCT_0) &&
+ consumer.processed.contains(Products.PRODUCT_1)
+ }
+ }
+}
diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/rebalance/ProductClient.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/rebalance/ProductClient.kt
new file mode 100644
index 000000000..edd26b839
--- /dev/null
+++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/rebalance/ProductClient.kt
@@ -0,0 +1,13 @@
+package io.micronaut.kafka.docs.seek.rebalance
+
+import io.micronaut.configuration.kafka.annotation.KafkaClient
+import io.micronaut.configuration.kafka.annotation.Topic
+import io.micronaut.context.annotation.Requires
+import io.micronaut.kafka.docs.Product
+
+@Requires(property = "spec.name", value = "ConsumerRebalanceListenerTest")
+@KafkaClient
+interface ProductClient {
+ @Topic("awesome-products")
+ fun produce(product: Product)
+}
diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/rebalance/ProductListener.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/rebalance/ProductListener.kt
new file mode 100644
index 000000000..7864efdb2
--- /dev/null
+++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/rebalance/ProductListener.kt
@@ -0,0 +1,36 @@
+package io.micronaut.kafka.docs.seek.rebalance
+
+import io.micronaut.configuration.kafka.ConsumerAware
+import io.micronaut.configuration.kafka.annotation.*
+import io.micronaut.context.annotation.Requires
+import io.micronaut.kafka.docs.Product
+import org.apache.kafka.clients.consumer.*
+import org.apache.kafka.common.TopicPartition
+
+@KafkaListener(offsetReset = OffsetReset.EARLIEST)
+@Requires(property = "spec.name", value = "ConsumerRebalanceListenerTest")
+class ProductListener constructor(config: ProductListenerConfiguration) : ConsumerRebalanceListener, ConsumerAware {
+
+ var processed: MutableList = mutableListOf()
+ private var consumer: Consumer<*, *>? = null
+
+ override fun setKafkaConsumer(consumer: Consumer?) { // <1>
+ this.consumer = consumer
+ }
+
+ @Topic("awesome-products")
+ fun receive(product: Product) {
+ processed.add(product)
+ }
+
+ override fun onPartitionsRevoked(partitions: Collection) { // <2>
+ // save offsets here
+ }
+
+ override fun onPartitionsAssigned(partitions: Collection) { // <3>
+ // seek to offset here
+ for (partition in partitions) {
+ consumer!!.seek(partition, 1)
+ }
+ }
+}
diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/rebalance/ProductListenerConfiguration.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/rebalance/ProductListenerConfiguration.kt
new file mode 100644
index 000000000..9a39ebcc6
--- /dev/null
+++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/seek/rebalance/ProductListenerConfiguration.kt
@@ -0,0 +1,16 @@
+package io.micronaut.kafka.docs.seek.rebalance
+
+import io.micronaut.context.annotation.Requires
+import io.micronaut.kafka.docs.Products
+import jakarta.inject.Singleton
+
+@Singleton
+@Requires(property = "spec.name", value = "ConsumerRebalanceListenerTest")
+class ProductListenerConfiguration constructor(producer: ProductClient) {
+
+ init {
+ // Records are produced before ProductListener rebalances
+ producer.produce(Products.PRODUCT_0)
+ producer.produce(Products.PRODUCT_1)
+ }
+}
diff --git a/test-suite/build.gradle.kts b/test-suite/build.gradle.kts
index a62fad80a..2941ab74d 100644
--- a/test-suite/build.gradle.kts
+++ b/test-suite/build.gradle.kts
@@ -6,6 +6,7 @@ plugins {
dependencies {
testAnnotationProcessor(platform(mn.micronaut.core.bom))
testAnnotationProcessor(mn.micronaut.inject.java)
+ testImplementation(mnSerde.micronaut.serde.jackson)
testImplementation(libs.testcontainers.kafka)
testImplementation(mnTest.micronaut.test.junit5)
testRuntimeOnly(libs.junit.jupiter.engine)
diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/Product.java b/test-suite/src/test/java/io/micronaut/kafka/docs/Product.java
new file mode 100644
index 000000000..0fa9923ae
--- /dev/null
+++ b/test-suite/src/test/java/io/micronaut/kafka/docs/Product.java
@@ -0,0 +1,6 @@
+package io.micronaut.kafka.docs;
+
+import io.micronaut.serde.annotation.Serdeable;
+
+@Serdeable
+public record Product (String name, int quantity) { }
diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/Products.java b/test-suite/src/test/java/io/micronaut/kafka/docs/Products.java
new file mode 100644
index 000000000..0485e1000
--- /dev/null
+++ b/test-suite/src/test/java/io/micronaut/kafka/docs/Products.java
@@ -0,0 +1,7 @@
+package io.micronaut.kafka.docs;
+
+public class Products {
+
+ public static final Product PRODUCT_0 = new Product("Apple", 10);
+ public static final Product PRODUCT_1 = new Product("Banana", 20);
+}
diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/seek/aware/ConsumerSeekAwareTest.java b/test-suite/src/test/java/io/micronaut/kafka/docs/seek/aware/ConsumerSeekAwareTest.java
new file mode 100644
index 000000000..201b12831
--- /dev/null
+++ b/test-suite/src/test/java/io/micronaut/kafka/docs/seek/aware/ConsumerSeekAwareTest.java
@@ -0,0 +1,21 @@
+package io.micronaut.kafka.docs.seek.aware;
+
+import io.micronaut.context.annotation.Property;
+import io.micronaut.kafka.docs.Products;
+import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
+import org.junit.jupiter.api.Test;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+
+@MicronautTest
+@Property(name = "spec.name", value = "ConsumerSeekAwareTest")
+class ConsumerSeekAwareTest {
+ @Test
+ void testProductListener(ProductListener consumer) {
+ await().atMost(10, SECONDS).until(() ->
+ !consumer.processed.contains(Products.PRODUCT_0) &&
+ consumer.processed.contains(Products.PRODUCT_1)
+ );
+ }
+}
diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/seek/aware/ProductClient.java b/test-suite/src/test/java/io/micronaut/kafka/docs/seek/aware/ProductClient.java
new file mode 100644
index 000000000..f05ecb689
--- /dev/null
+++ b/test-suite/src/test/java/io/micronaut/kafka/docs/seek/aware/ProductClient.java
@@ -0,0 +1,13 @@
+package io.micronaut.kafka.docs.seek.aware;
+
+import io.micronaut.configuration.kafka.annotation.KafkaClient;
+import io.micronaut.configuration.kafka.annotation.Topic;
+import io.micronaut.context.annotation.Requires;
+import io.micronaut.kafka.docs.Product;
+
+@Requires(property = "spec.name", value = "ConsumerSeekAwareTest")
+@KafkaClient
+public interface ProductClient {
+ @Topic("wonderful-products")
+ void produce(Product product);
+}
diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/seek/aware/ProductListener.java b/test-suite/src/test/java/io/micronaut/kafka/docs/seek/aware/ProductListener.java
new file mode 100644
index 000000000..7635d646c
--- /dev/null
+++ b/test-suite/src/test/java/io/micronaut/kafka/docs/seek/aware/ProductListener.java
@@ -0,0 +1,36 @@
+package io.micronaut.kafka.docs.seek.aware;
+
+import io.micronaut.configuration.kafka.ConsumerSeekAware;
+import io.micronaut.configuration.kafka.annotation.*;
+import io.micronaut.configuration.kafka.seek.*;
+import io.micronaut.context.annotation.Requires;
+import io.micronaut.kafka.docs.Product;
+import org.apache.kafka.common.TopicPartition;
+import java.util.*;
+
+@KafkaListener
+@Requires(property = "spec.name", value = "ConsumerSeekAwareTest")
+public class ProductListener implements ConsumerSeekAware { // <1>
+
+ List processed = new ArrayList<>();
+
+ public ProductListener(ProductListenerConfiguration config) {
+ // ...
+ }
+
+ @Topic("wonderful-products")
+ void receive(Product product) {
+ processed.add(product);
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection partitions) { // <2>
+ // save offsets here
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection partitions, KafkaSeeker seeker) { // <3>
+ // seek to offset here
+ partitions.stream().map(tp -> KafkaSeekOperation.seek(tp, 1)).forEach(seeker::perform);
+ }
+}
diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/seek/aware/ProductListenerConfiguration.java b/test-suite/src/test/java/io/micronaut/kafka/docs/seek/aware/ProductListenerConfiguration.java
new file mode 100644
index 000000000..b1581c75c
--- /dev/null
+++ b/test-suite/src/test/java/io/micronaut/kafka/docs/seek/aware/ProductListenerConfiguration.java
@@ -0,0 +1,16 @@
+package io.micronaut.kafka.docs.seek.aware;
+
+import io.micronaut.context.annotation.Requires;
+import io.micronaut.kafka.docs.Products;
+import jakarta.inject.Singleton;
+
+@Singleton
+@Requires(property = "spec.name", value = "ConsumerSeekAwareTest")
+public class ProductListenerConfiguration {
+
+ public ProductListenerConfiguration(ProductClient producer) {
+ // Records are produced before ProductListener rebalances
+ producer.produce(Products.PRODUCT_0);
+ producer.produce(Products.PRODUCT_1);
+ }
+}
diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/seek/ops/KafkaSeekOperationsTest.java b/test-suite/src/test/java/io/micronaut/kafka/docs/seek/ops/KafkaSeekOperationsTest.java
new file mode 100644
index 000000000..7ea4b7fb4
--- /dev/null
+++ b/test-suite/src/test/java/io/micronaut/kafka/docs/seek/ops/KafkaSeekOperationsTest.java
@@ -0,0 +1,21 @@
+package io.micronaut.kafka.docs.seek.ops;
+
+import io.micronaut.context.annotation.Property;
+import io.micronaut.kafka.docs.Products;
+import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
+import org.junit.jupiter.api.Test;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+
+@MicronautTest
+@Property(name = "spec.name", value = "KafkaSeekOperationsTest")
+class KafkaSeekOperationsTest {
+ @Test
+ void testProductListener(ProductListener consumer) {
+ await().atMost(10, SECONDS).until(() ->
+ consumer.processed.contains(Products.PRODUCT_0) &&
+ !consumer.processed.contains(Products.PRODUCT_1)
+ );
+ }
+}
diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/seek/ops/ProductClient.java b/test-suite/src/test/java/io/micronaut/kafka/docs/seek/ops/ProductClient.java
new file mode 100644
index 000000000..47ccf53ff
--- /dev/null
+++ b/test-suite/src/test/java/io/micronaut/kafka/docs/seek/ops/ProductClient.java
@@ -0,0 +1,13 @@
+package io.micronaut.kafka.docs.seek.ops;
+
+import io.micronaut.configuration.kafka.annotation.KafkaClient;
+import io.micronaut.configuration.kafka.annotation.Topic;
+import io.micronaut.context.annotation.Requires;
+import io.micronaut.kafka.docs.Product;
+
+@Requires(property = "spec.name", value = "KafkaSeekOperationsTest")
+@KafkaClient
+public interface ProductClient {
+ @Topic("amazing-products")
+ void produce(Product product);
+}
diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/seek/ops/ProductListener.java b/test-suite/src/test/java/io/micronaut/kafka/docs/seek/ops/ProductListener.java
new file mode 100644
index 000000000..49afeff60
--- /dev/null
+++ b/test-suite/src/test/java/io/micronaut/kafka/docs/seek/ops/ProductListener.java
@@ -0,0 +1,25 @@
+package io.micronaut.kafka.docs.seek.ops;
+
+import io.micronaut.configuration.kafka.annotation.*;
+import io.micronaut.configuration.kafka.seek.*;
+import io.micronaut.context.annotation.*;
+import io.micronaut.kafka.docs.Product;
+import org.apache.kafka.common.TopicPartition;
+import java.util.*;
+
+@KafkaListener(offsetReset = OffsetReset.EARLIEST, properties = @Property(name = "max.poll.records", value = "1"))
+@Requires(property = "spec.name", value = "KafkaSeekOperationsTest")
+public class ProductListener {
+
+ List processed = new ArrayList<>();
+
+ public ProductListener(ProductListenerConfiguration config) {
+ // ...
+ }
+
+ @Topic("amazing-products")
+ void receive(Product product, KafkaSeekOperations ops) { // <1>
+ processed.add(product);
+ ops.defer(KafkaSeekOperation.seekToEnd(new TopicPartition("amazing-products", 0))); // <2>
+ }
+}
diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/seek/ops/ProductListenerConfiguration.java b/test-suite/src/test/java/io/micronaut/kafka/docs/seek/ops/ProductListenerConfiguration.java
new file mode 100644
index 000000000..9ba17cf65
--- /dev/null
+++ b/test-suite/src/test/java/io/micronaut/kafka/docs/seek/ops/ProductListenerConfiguration.java
@@ -0,0 +1,16 @@
+package io.micronaut.kafka.docs.seek.ops;
+
+import io.micronaut.context.annotation.Requires;
+import io.micronaut.kafka.docs.Products;
+import jakarta.inject.Singleton;
+
+@Singleton
+@Requires(property = "spec.name", value = "KafkaSeekOperationsTest")
+public class ProductListenerConfiguration {
+
+ public ProductListenerConfiguration(ProductClient producer) {
+ // Records are produced before ProductListener rebalances
+ producer.produce(Products.PRODUCT_0);
+ producer.produce(Products.PRODUCT_1);
+ }
+}
diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/seek/rebalance/ConsumerRebalanceListenerTest.java b/test-suite/src/test/java/io/micronaut/kafka/docs/seek/rebalance/ConsumerRebalanceListenerTest.java
new file mode 100644
index 000000000..f3dc207e4
--- /dev/null
+++ b/test-suite/src/test/java/io/micronaut/kafka/docs/seek/rebalance/ConsumerRebalanceListenerTest.java
@@ -0,0 +1,21 @@
+package io.micronaut.kafka.docs.seek.rebalance;
+
+import io.micronaut.context.annotation.Property;
+import io.micronaut.kafka.docs.Products;
+import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
+import org.junit.jupiter.api.Test;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+
+@MicronautTest
+@Property(name = "spec.name", value = "ConsumerRebalanceListenerTest")
+class ConsumerRebalanceListenerTest {
+ @Test
+ void testProductListener(ProductListener consumer) {
+ await().atMost(10, SECONDS).until(() ->
+ !consumer.processed.contains(Products.PRODUCT_0) &&
+ consumer.processed.contains(Products.PRODUCT_1)
+ );
+ }
+}
diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/seek/rebalance/ProductClient.java b/test-suite/src/test/java/io/micronaut/kafka/docs/seek/rebalance/ProductClient.java
new file mode 100644
index 000000000..588bf16db
--- /dev/null
+++ b/test-suite/src/test/java/io/micronaut/kafka/docs/seek/rebalance/ProductClient.java
@@ -0,0 +1,13 @@
+package io.micronaut.kafka.docs.seek.rebalance;
+
+import io.micronaut.configuration.kafka.annotation.KafkaClient;
+import io.micronaut.configuration.kafka.annotation.Topic;
+import io.micronaut.context.annotation.Requires;
+import io.micronaut.kafka.docs.Product;
+
+@Requires(property = "spec.name", value = "ConsumerRebalanceListenerTest")
+@KafkaClient
+public interface ProductClient {
+ @Topic("awesome-products")
+ void produce(Product product);
+}
diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/seek/rebalance/ProductListener.java b/test-suite/src/test/java/io/micronaut/kafka/docs/seek/rebalance/ProductListener.java
new file mode 100644
index 000000000..b9e0d50ae
--- /dev/null
+++ b/test-suite/src/test/java/io/micronaut/kafka/docs/seek/rebalance/ProductListener.java
@@ -0,0 +1,45 @@
+package io.micronaut.kafka.docs.seek.rebalance;
+
+import io.micronaut.configuration.kafka.ConsumerAware;
+import io.micronaut.configuration.kafka.annotation.*;
+import io.micronaut.context.annotation.Requires;
+import io.micronaut.kafka.docs.Product;
+import io.micronaut.core.annotation.NonNull;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import java.util.*;
+
+@KafkaListener(offsetReset = OffsetReset.EARLIEST)
+@Requires(property = "spec.name", value = "ConsumerRebalanceListenerTest")
+public class ProductListener implements ConsumerRebalanceListener, ConsumerAware {
+
+ List processed = new ArrayList<>();
+ private Consumer consumer;
+
+ public ProductListener(ProductListenerConfiguration config) {
+ // ...
+ }
+
+ @Override
+ public void setKafkaConsumer(@NonNull Consumer consumer) { // <1>
+ this.consumer = consumer;
+ }
+
+ @Topic("awesome-products")
+ void receive(Product product) {
+ processed.add(product);
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection partitions) { // <2>
+ // save offsets here
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection partitions) { // <3>
+ // seek to offset here
+ for (TopicPartition partition : partitions) {
+ consumer.seek(partition, 1);
+ }
+ }
+}
diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/seek/rebalance/ProductListenerConfiguration.java b/test-suite/src/test/java/io/micronaut/kafka/docs/seek/rebalance/ProductListenerConfiguration.java
new file mode 100644
index 000000000..467fb3581
--- /dev/null
+++ b/test-suite/src/test/java/io/micronaut/kafka/docs/seek/rebalance/ProductListenerConfiguration.java
@@ -0,0 +1,16 @@
+package io.micronaut.kafka.docs.seek.rebalance;
+
+import io.micronaut.context.annotation.Requires;
+import io.micronaut.kafka.docs.Products;
+import jakarta.inject.Singleton;
+
+@Singleton
+@Requires(property = "spec.name", value = "ConsumerRebalanceListenerTest")
+public class ProductListenerConfiguration {
+
+ public ProductListenerConfiguration(ProductClient producer) {
+ // Records are produced before ProductListener rebalances
+ producer.produce(Products.PRODUCT_0);
+ producer.produce(Products.PRODUCT_1);
+ }
+}