From 654c531d43ead22ff01d3d503e37c2853ce32010 Mon Sep 17 00:00:00 2001 From: SzalkoTo Date: Thu, 28 Apr 2022 19:23:07 +0200 Subject: [PATCH] GH-2246: Partition attribute SpEL expression Fixes: gh-2246 When partition attribute is specified as a SpEL expression on PartitionOffset annotation, it doesn not resolve the value from the expression. Fixing this issue. Adding test to verify. **Cherry-pick to 3.0.x** --- ...afkaListenerAnnotationBeanPostProcessor.java | 2 +- .../annotation/EnableKafkaIntegrationTests.java | 17 ++++++++++++++++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index 1d6399f195..acc7383fa6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -867,7 +867,7 @@ private List resolveTopicPartitionsList(TopicPartition top for (String partition : partitions) { resolvePartitionAsInteger((String) topic, resolveExpression(partition), result, null, false, false); } - if (partitionOffsets.length == 1 && partitionOffsets[0].partition().equals("*")) { + if (partitionOffsets.length == 1 && resolveExpression(partitionOffsets[0].partition()).equals("*")) { result.forEach(tpo -> { tpo.setOffset(resolveInitialOffset(tpo.getTopic(), partitionOffsets[0])); tpo.setRelativeToCurrent(isRelative(tpo.getTopic(), partitionOffsets[0])); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index 7285aba7fc..72539006a3 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -175,10 +175,11 @@ * @author Venil Noronha * @author Dimitri Penner * @author Nakul Mishra + * @author Soby Chacko */ @SpringJUnitConfig @DirtiesContext -@EmbeddedKafka(topics = { "annotated1", "annotated2", "annotated3", +@EmbeddedKafka(topics = { "annotated1", "annotated2", "annotated3", "annotated3x", "annotated4", "annotated5", "annotated6", "annotated7", "annotated8", "annotated8reply", "annotated9", "annotated10", "annotated11", "annotated12", "annotated13", "annotated14", "annotated15", "annotated16", "annotated17", @@ -311,6 +312,10 @@ public void manyTests() throws Exception { assertThat(this.listener.capturedRecord.value()).isEqualTo("foo"); assertThat(this.config.listen3Exception).isNotNull(); + template.send("annotated3x", 0, "foo"); + assertThat(this.listener.latch3x.await(60, TimeUnit.SECONDS)).isTrue(); + assertThat(this.listener.capturedRecord.value()).isEqualTo("foo"); + template.send("annotated4", 0, "foo"); assertThat(this.listener.latch4.await(60, TimeUnit.SECONDS)).isTrue(); assertThat(this.listener.capturedRecord.value()).isEqualTo("foo"); @@ -1832,6 +1837,8 @@ static class Listener implements ConsumerSeekAware { final CountDownLatch latch3 = new CountDownLatch(1); + final CountDownLatch latch3x = new CountDownLatch(1); + final CountDownLatch latch4 = new CountDownLatch(1); final CountDownLatch latch5 = new CountDownLatch(1); @@ -2010,6 +2017,14 @@ public void listen3(ConsumerRecord record) { this.latch3.countDown(); } + @KafkaListener(id = "partitionExpression", topicPartitions = @TopicPartition(topic = "${topicThree:annotated3x}", + partitions = "${zero:0}", + partitionOffsets = @PartitionOffset(partition = "#{'*'}", initialOffset = "0"))) + public void listenPartitionSpelExpression(ConsumerRecord record) { + this.capturedRecord = record; + this.latch3x.countDown(); + } + @KafkaListener(id = "#{'qux'}", topics = "annotated4", containerFactory = "kafkaManualAckListenerContainerFactory", containerGroup = "qux#{'Group'}", properties = {