From 71b571c03d29f39544494d15791a36d49a2256b7 Mon Sep 17 00:00:00 2001 From: Wouter Coekaerts Date: Thu, 12 Dec 2024 16:32:54 +0100 Subject: [PATCH] GH-3660: Fix EmbeddedKafkaBroker seekToEnd (#3661) Fixes: #3660 Problem: consumeFromEmbeddedTopics calls Consumer.seekToEnd, which "evaluates lazily, seeking to the final offset in all partitions only when poll(Duration) or position(TopicPartition) are called". This means that the consumer may or may not see future messages, depending on timing. This fix calls `position` so that the seek happens before consumeFromEmbeddedTopics returns. That was it is ensured that any message sent to the topic after the call to consumeFromEmbeddedTopics are seen by the consumer. Issue: https://github.com/spring-projects/spring-kafka/issues/3660 --- .../kafka/test/EmbeddedKafkaKraftBroker.java | 5 ++++ .../kafka/test/EmbeddedKafkaZKBroker.java | 6 +++++ .../test/EmbeddedKafkaKraftBrokerTests.java | 24 +++++++++++++++++ .../test/EmbeddedKafkaZKBrokerTests.java | 26 +++++++++++++++++++ 4 files changed, 61 insertions(+) diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java index 9d09a4487b..1721be71c7 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java @@ -69,6 +69,9 @@ * @author Nakul Mishra * @author Pawel Lozinski * @author Adrian Chlebosz + * @author Soby Chacko + * @author Sanghyeok An + * @author Wouter Coekaerts * * @since 3.1 */ @@ -553,6 +556,8 @@ public void onPartitionsAssigned(Collection partitions) { + (seekToEnd ? "end; " : "beginning")); if (seekToEnd) { consumer.seekToEnd(assigned.get()); + // seekToEnd is asynchronous. query the position to force the seek to happen now. + assigned.get().forEach(consumer::position); } else { consumer.seekToBeginning(assigned.get()); diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java index 7d1ab4ae22..27fe067e4d 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java @@ -86,6 +86,10 @@ * @author Nakul Mishra * @author Pawel Lozinski * @author Adrian Chlebosz + * @author Soby Chacko + * @author Sanghyeok An + * @author Borahm Lee + * @author Wouter Coekaerts * * @since 2.2 */ @@ -755,6 +759,8 @@ public void onPartitionsAssigned(Collection partitions) { + (seekToEnd ? "end; " : "beginning")); if (seekToEnd) { consumer.seekToEnd(assigned.get()); + // seekToEnd is asynchronous. query the position to force the seek to happen now. + assigned.get().forEach(consumer::position); } else { consumer.seekToBeginning(assigned.get()); diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java index f1be0ac0ba..4a44c61b9b 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java @@ -16,14 +16,21 @@ package org.springframework.kafka.test; +import java.util.Map; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Test; +import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.util.StringUtils; import static org.assertj.core.api.Assertions.assertThat; /** * @author Gary Russell + * @author Wouter Coekaerts * @since 3.1 * */ @@ -37,4 +44,21 @@ void testUpDown() { kafka.destroy(); } + @Test + void testConsumeFromEmbeddedWithSeekToEnd() { + EmbeddedKafkaKraftBroker kafka = new EmbeddedKafkaKraftBroker(1, 1, "seekTestTopic"); + kafka.afterPropertiesSet(); + Map producerProps = KafkaTestUtils.producerProps(kafka); + KafkaProducer producer = new KafkaProducer<>(producerProps); + producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "beforeSeekToEnd")); + Map consumerProps = KafkaTestUtils.consumerProps("seekTest", "false", kafka); + KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); + kafka.consumeFromAnEmbeddedTopic(consumer, true /* seekToEnd */, "seekTestTopic"); + producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "afterSeekToEnd")); + producer.close(); + assertThat(KafkaTestUtils.getSingleRecord(consumer, "seekTestTopic").value()) + .isEqualTo("afterSeekToEnd"); + consumer.close(); + } + } diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java index 7d0fa1cd86..5506c0433a 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java @@ -16,12 +16,20 @@ package org.springframework.kafka.test; +import java.util.Map; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Test; +import org.springframework.kafka.test.utils.KafkaTestUtils; + import static org.assertj.core.api.Assertions.assertThat; /** * @author Gary Russell + * @author Wouter Coekaerts * @since 2.3 * */ @@ -42,4 +50,22 @@ void testUpDown() { assertThat(System.getProperty(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS)).isNull(); } + @Test + void testConsumeFromEmbeddedWithSeekToEnd() { + EmbeddedKafkaZKBroker kafka = new EmbeddedKafkaZKBroker(1); + kafka.afterPropertiesSet(); + kafka.addTopics("seekTestTopic"); + Map producerProps = KafkaTestUtils.producerProps(kafka); + KafkaProducer producer = new KafkaProducer<>(producerProps); + producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "beforeSeekToEnd")); + Map consumerProps = KafkaTestUtils.consumerProps("seekTest", "false", kafka); + KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); + kafka.consumeFromAnEmbeddedTopic(consumer, true /* seekToEnd */, "seekTestTopic"); + producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "afterSeekToEnd")); + producer.close(); + assertThat(KafkaTestUtils.getSingleRecord(consumer, "seekTestTopic").value()) + .isEqualTo("afterSeekToEnd"); + consumer.close(); + } + }