From 6c1e4dc23949a4bb4c879b5849ba18cb42f9971c Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 28 Aug 2024 09:07:06 -0400 Subject: [PATCH] GH-3463: Allow `@DltHandler` on super class Fixes: #3463 Currently, a `@DltHandler`-annotated method must be in the same class as the corresponding `@KafkaListener` annotation. Some logic might be really the same for different `@KafkaListener` services. * Use `MethodIntrospector` in the `RetryableTopicAnnotationProcessor` to be able to process methods from super classes as well --- .../RetryableTopicAnnotationProcessor.java | 8 +++- .../RetryTopicClassLevelIntegrationTests.java | 47 ++++++++----------- 2 files changed, 25 insertions(+), 30 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java index 24b5888f39..8304db9809 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java @@ -34,6 +34,7 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.expression.BeanFactoryResolver; import org.springframework.context.expression.StandardBeanExpressionResolver; +import org.springframework.core.MethodIntrospector; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.kafka.core.KafkaOperations; @@ -64,6 +65,7 @@ * @author Gary Russell * @author Adrian Chlebosz * @author Wang Zhiyang + * @author Artem Bilan * * @since 2.7 * @@ -228,8 +230,10 @@ private Map>> createDltRoutingSpecFromAnn } private EndpointHandlerMethod getDltProcessor(Class clazz, Object bean) { - return Arrays.stream(ReflectionUtils.getDeclaredMethods(clazz)) - .filter(method -> AnnotationUtils.findAnnotation(method, DltHandler.class) != null) + ReflectionUtils.MethodFilter selector = + (method) -> AnnotationUtils.findAnnotation(method, DltHandler.class) != null; + return MethodIntrospector.selectMethods(clazz, selector) + .stream() .map(method -> RetryTopicConfigurer.createHandlerMethodWith(bean, method)) .findFirst() .orElse(RetryTopicConfigurer.DEFAULT_DLT_HANDLER); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicClassLevelIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicClassLevelIntegrationTests.java index 5339093485..359f53cb1d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicClassLevelIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicClassLevelIntegrationTests.java @@ -93,6 +93,7 @@ * Test class level non-blocking retries. * * @author Wang Zhiyang + * @author Artem Bilan * * @since 3.2 */ @@ -227,7 +228,6 @@ void shouldRetryFifthTopicWithTwoListenersAndManualAssignment(@Autowired FifthTo assertThat(awaitLatch(latchContainer.countDownLatch51)).isTrue(); assertThat(awaitLatch(latchContainer.countDownLatch52)).isTrue(); assertThat(awaitLatch(latchContainer.countDownLatchDltThree)).isTrue(); - assertThat(awaitLatch(latchContainer.countDownLatchDltFour)).isTrue(); assertThat(listener1.topics).containsExactly(TWO_LISTENERS_TOPIC, TWO_LISTENERS_TOPIC + "-listener1-0", TWO_LISTENERS_TOPIC + "-listener1-1", TWO_LISTENERS_TOPIC + "-listener1-2", TWO_LISTENERS_TOPIC + "-listener1-dlt"); @@ -387,6 +387,21 @@ public void shouldNotGetHere() { } } + static class AbstractFifthTopicListener { + + final List topics = Collections.synchronizedList(new ArrayList<>()); + + @Autowired + CountDownLatchContainer container; + + @DltHandler + public void annotatedDltMethod(ConsumerRecord record) { + this.topics.add(record.topic()); + container.countDownLatchDltThree.countDown(); + } + + } + @RetryableTopic(attempts = "4", backoff = @Backoff(250), numPartitions = "2", @@ -397,12 +412,7 @@ public void shouldNotGetHere() { @KafkaListener(id = "fifthTopicId1", topicPartitions = {@TopicPartition(topic = TWO_LISTENERS_TOPIC, partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))}, containerFactory = MAIN_TOPIC_CONTAINER_FACTORY) - static class FifthTopicListener1 { - - final List topics = Collections.synchronizedList(new ArrayList<>()); - - @Autowired - CountDownLatchContainer container; + static class FifthTopicListener1 extends AbstractFifthTopicListener { @KafkaHandler public void listenWithAnnotation(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { @@ -411,12 +421,6 @@ public void listenWithAnnotation(String message, @Header(KafkaHeaders.RECEIVED_T throw new RuntimeException("Annotated woooops... " + receivedTopic); } - @DltHandler - public void annotatedDltMethod(ConsumerRecord record) { - this.topics.add(record.topic()); - container.countDownLatchDltThree.countDown(); - } - } @RetryableTopic(attempts = "4", @@ -429,12 +433,7 @@ public void annotatedDltMethod(ConsumerRecord record) { @KafkaListener(id = "fifthTopicId2", topicPartitions = {@TopicPartition(topic = TWO_LISTENERS_TOPIC, partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "0"))}, containerFactory = MAIN_TOPIC_CONTAINER_FACTORY) - static class FifthTopicListener2 { - - final List topics = Collections.synchronizedList(new ArrayList<>()); - - @Autowired - CountDownLatchContainer container; + static class FifthTopicListener2 extends AbstractFifthTopicListener { @KafkaHandler public void listenWithAnnotation2(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { @@ -443,12 +442,6 @@ public void listenWithAnnotation2(String message, @Header(KafkaHeaders.RECEIVED_ throw new RuntimeException("Annotated woooops... " + receivedTopic); } - @DltHandler - public void annotatedDltMethod(ConsumerRecord record) { - this.topics.add(record.topic()); - container.countDownLatchDltFour.countDown(); - } - } @Component @@ -575,9 +568,7 @@ static class CountDownLatchContainer { CountDownLatch countDownLatchDltTwo = new CountDownLatch(1); - CountDownLatch countDownLatchDltThree = new CountDownLatch(1); - - CountDownLatch countDownLatchDltFour = new CountDownLatch(1); + CountDownLatch countDownLatchDltThree = new CountDownLatch(2); CountDownLatch countDownLatchReuseOne = new CountDownLatch(2);