diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java index 41bd32b196..35f80813c6 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java @@ -71,6 +71,7 @@ * @author Adrian Chlebosz * @author Soby Chacko * @author Sanghyeok An + * @author Wouter Coekaerts * * @since 3.1 */ @@ -560,6 +561,8 @@ public void onPartitionsAssigned(Collection partitions) { + (seekToEnd ? "end; " : "beginning")); if (seekToEnd) { consumer.seekToEnd(assigned.get()); + // seekToEnd is asynchronous. query the position to force the seek to happen now. + assigned.get().forEach(consumer::position); } else { consumer.seekToBeginning(assigned.get()); diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java index a4e3761acd..a9b070a7d2 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java @@ -89,6 +89,7 @@ * @author Soby Chacko * @author Sanghyeok An * @author Borahm Lee + * @author Wouter Coekaerts * * @since 2.2 */ @@ -763,6 +764,8 @@ public void onPartitionsAssigned(Collection partitions) { + (seekToEnd ? "end; " : "beginning")); if (seekToEnd) { consumer.seekToEnd(assigned.get()); + // seekToEnd is asynchronous. query the position to force the seek to happen now. + assigned.get().forEach(consumer::position); } else { consumer.seekToBeginning(assigned.get()); diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java index 50fe009357..cc1b5e9412 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java @@ -16,14 +16,21 @@ package org.springframework.kafka.test; +import java.util.Map; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Test; +import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.util.StringUtils; import static org.assertj.core.api.Assertions.assertThat; /** * @author Gary Russell + * @author Wouter Coekaerts * @since 3.1 * */ @@ -37,4 +44,21 @@ void testUpDown() { kafka.destroy(); } + @Test + void testConsumeFromEmbeddedWithSeekToEnd() { + EmbeddedKafkaKraftBroker kafka = new EmbeddedKafkaKraftBroker(1, 1, "seekTestTopic"); + kafka.afterPropertiesSet(); + Map producerProps = KafkaTestUtils.producerProps(kafka); + KafkaProducer producer = new KafkaProducer<>(producerProps); + producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "beforeSeekToEnd")); + Map consumerProps = KafkaTestUtils.consumerProps("seekTest", "false", kafka); + KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); + kafka.consumeFromAnEmbeddedTopic(consumer, true /* seekToEnd */, "seekTestTopic"); + producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "afterSeekToEnd")); + producer.close(); + assertThat(KafkaTestUtils.getSingleRecord(consumer, "seekTestTopic").value()) + .isEqualTo("afterSeekToEnd"); + consumer.close(); + } + } diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java index 4688e38e2d..c01891556f 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java @@ -16,12 +16,20 @@ package org.springframework.kafka.test; +import java.util.Map; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Test; +import org.springframework.kafka.test.utils.KafkaTestUtils; + import static org.assertj.core.api.Assertions.assertThat; /** * @author Gary Russell + * @author Wouter Coekaerts * @since 2.3 * */ @@ -42,4 +50,22 @@ void testUpDown() { assertThat(System.getProperty(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS)).isNull(); } + @Test + void testConsumeFromEmbeddedWithSeekToEnd() { + EmbeddedKafkaZKBroker kafka = new EmbeddedKafkaZKBroker(1); + kafka.afterPropertiesSet(); + kafka.addTopics("seekTestTopic"); + Map producerProps = KafkaTestUtils.producerProps(kafka); + KafkaProducer producer = new KafkaProducer<>(producerProps); + producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "beforeSeekToEnd")); + Map consumerProps = KafkaTestUtils.consumerProps("seekTest", "false", kafka); + KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); + kafka.consumeFromAnEmbeddedTopic(consumer, true /* seekToEnd */, "seekTestTopic"); + producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "afterSeekToEnd")); + producer.close(); + assertThat(KafkaTestUtils.getSingleRecord(consumer, "seekTestTopic").value()) + .isEqualTo("afterSeekToEnd"); + consumer.close(); + } + }