diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java index 9d09a4487..1721be71c 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java @@ -69,6 +69,9 @@ * @author Nakul Mishra * @author Pawel Lozinski * @author Adrian Chlebosz + * @author Soby Chacko + * @author Sanghyeok An + * @author Wouter Coekaerts * * @since 3.1 */ @@ -553,6 +556,8 @@ public void onPartitionsAssigned(Collection partitions) { + (seekToEnd ? "end; " : "beginning")); if (seekToEnd) { consumer.seekToEnd(assigned.get()); + // seekToEnd is asynchronous. query the position to force the seek to happen now. + assigned.get().forEach(consumer::position); } else { consumer.seekToBeginning(assigned.get()); diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java index 7d1ab4ae2..27fe067e4 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java @@ -86,6 +86,10 @@ * @author Nakul Mishra * @author Pawel Lozinski * @author Adrian Chlebosz + * @author Soby Chacko + * @author Sanghyeok An + * @author Borahm Lee + * @author Wouter Coekaerts * * @since 2.2 */ @@ -755,6 +759,8 @@ public void onPartitionsAssigned(Collection partitions) { + (seekToEnd ? "end; " : "beginning")); if (seekToEnd) { consumer.seekToEnd(assigned.get()); + // seekToEnd is asynchronous. query the position to force the seek to happen now. + assigned.get().forEach(consumer::position); } else { consumer.seekToBeginning(assigned.get()); diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java index f1be0ac0b..4a44c61b9 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java @@ -16,14 +16,21 @@ package org.springframework.kafka.test; +import java.util.Map; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Test; +import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.util.StringUtils; import static org.assertj.core.api.Assertions.assertThat; /** * @author Gary Russell + * @author Wouter Coekaerts * @since 3.1 * */ @@ -37,4 +44,21 @@ void testUpDown() { kafka.destroy(); } + @Test + void testConsumeFromEmbeddedWithSeekToEnd() { + EmbeddedKafkaKraftBroker kafka = new EmbeddedKafkaKraftBroker(1, 1, "seekTestTopic"); + kafka.afterPropertiesSet(); + Map producerProps = KafkaTestUtils.producerProps(kafka); + KafkaProducer producer = new KafkaProducer<>(producerProps); + producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "beforeSeekToEnd")); + Map consumerProps = KafkaTestUtils.consumerProps("seekTest", "false", kafka); + KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); + kafka.consumeFromAnEmbeddedTopic(consumer, true /* seekToEnd */, "seekTestTopic"); + producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "afterSeekToEnd")); + producer.close(); + assertThat(KafkaTestUtils.getSingleRecord(consumer, "seekTestTopic").value()) + .isEqualTo("afterSeekToEnd"); + consumer.close(); + } + } diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java index 7d0fa1cd8..5506c0433 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java @@ -16,12 +16,20 @@ package org.springframework.kafka.test; +import java.util.Map; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Test; +import org.springframework.kafka.test.utils.KafkaTestUtils; + import static org.assertj.core.api.Assertions.assertThat; /** * @author Gary Russell + * @author Wouter Coekaerts * @since 2.3 * */ @@ -42,4 +50,22 @@ void testUpDown() { assertThat(System.getProperty(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS)).isNull(); } + @Test + void testConsumeFromEmbeddedWithSeekToEnd() { + EmbeddedKafkaZKBroker kafka = new EmbeddedKafkaZKBroker(1); + kafka.afterPropertiesSet(); + kafka.addTopics("seekTestTopic"); + Map producerProps = KafkaTestUtils.producerProps(kafka); + KafkaProducer producer = new KafkaProducer<>(producerProps); + producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "beforeSeekToEnd")); + Map consumerProps = KafkaTestUtils.consumerProps("seekTest", "false", kafka); + KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); + kafka.consumeFromAnEmbeddedTopic(consumer, true /* seekToEnd */, "seekTestTopic"); + producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "afterSeekToEnd")); + producer.close(); + assertThat(KafkaTestUtils.getSingleRecord(consumer, "seekTestTopic").value()) + .isEqualTo("afterSeekToEnd"); + consumer.close(); + } + }