From abaa465b1c777db64d2492e79bec82e314a54b8e Mon Sep 17 00:00:00 2001 From: Huijin Hong Date: Thu, 30 May 2024 23:28:01 +0900 Subject: [PATCH] GH-3277 : Fix coroutine detection logic in MessagingMessageListenerAdapter Fixes: #3277 * Fix coroutine detection logic in `MessagingMessageListenerAdapter` by continuation class type * Use proper `@since 3.2.1` for new API **Auto-cherry-pick to `3.2.x`** --- .../kafka/listener/adapter/AdapterUtils.java | 10 ++++++++++ .../ContinuationHandlerMethodArgumentResolver.java | 3 ++- .../adapter/MessagingMessageListenerAdapter.java | 8 ++++---- .../kafka/listener/EnableKafkaKotlinTests.kt | 13 +++++++++---- 4 files changed, 25 insertions(+), 9 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java index bd83b9664f..de9e0d7d2c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java @@ -35,6 +35,7 @@ * * @author Gary Russell * @author Wang Zhiyang + * @author Huijin Hong * @since 2.5 * */ @@ -124,4 +125,13 @@ public static boolean isCompletableFuture(Class resultType) { return CompletableFuture.class.isAssignableFrom(resultType); } + /** + * Return the true when type is {@code Continuation}. + * @param parameterType {@code MethodParameter} parameter type. + * @return type is {@code Continuation}. + * @since 3.2.1 + */ + public static boolean isKotlinContinuation(Class parameterType) { + return "kotlin.coroutines.Continuation".equals(parameterType.getName()); + } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ContinuationHandlerMethodArgumentResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ContinuationHandlerMethodArgumentResolver.java index f475f72a58..23da8298d6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ContinuationHandlerMethodArgumentResolver.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ContinuationHandlerMethodArgumentResolver.java @@ -30,6 +30,7 @@ * but for regular {@link HandlerMethodArgumentResolver} contract. * * @author Wang Zhiyang + * @author Huijin Hong * * @since 3.2 * @@ -39,7 +40,7 @@ public class ContinuationHandlerMethodArgumentResolver implements HandlerMethodA @Override public boolean supportsParameter(MethodParameter parameter) { - return "kotlin.coroutines.Continuation".equals(parameter.getParameterType().getName()); + return AdapterUtils.isKotlinContinuation(parameter.getParameterType()); } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 76ae8f1453..dcf17e0d0f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -37,7 +37,6 @@ import org.apache.kafka.common.TopicPartition; import org.springframework.context.expression.MapAccessor; -import org.springframework.core.KotlinDetector; import org.springframework.core.MethodParameter; import org.springframework.core.log.LogAccessor; import org.springframework.expression.BeanResolver; @@ -90,6 +89,7 @@ * @author Venil Noronha * @author Nathan Xu * @author Wang ZhiYang + * @author Huijin Hong */ public abstract class MessagingMessageListenerAdapter implements ConsumerSeekAware, AsyncRepliesAware { @@ -763,8 +763,8 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity isNotConvertible |= isAck; boolean isConsumer = parameterIsType(parameterType, Consumer.class); isNotConvertible |= isConsumer; - boolean isCoroutines = KotlinDetector.isKotlinType(methodParameter.getParameterType()); - isNotConvertible |= isCoroutines; + boolean isKotlinContinuation = AdapterUtils.isKotlinContinuation(methodParameter.getParameterType()); + isNotConvertible |= isKotlinContinuation; boolean isMeta = parameterIsType(parameterType, ConsumerRecordMetadata.class); this.hasMetadataParameter |= isMeta; isNotConvertible |= isMeta; @@ -783,7 +783,7 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity break; } } - else if (isAck || isCoroutines || isConsumer || annotationHeaderIsGroupId(methodParameter)) { + else if (isAck || isKotlinContinuation || isConsumer || annotationHeaderIsGroupId(methodParameter)) { allowedBatchParameters++; } } diff --git a/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinTests.kt b/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinTests.kt index 9a112cb801..11c46c01c4 100644 --- a/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinTests.kt +++ b/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinTests.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,6 +36,7 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.core.ProducerFactory import org.springframework.kafka.listener.* +import org.springframework.kafka.support.converter.JsonMessageConverter import org.springframework.kafka.test.EmbeddedKafkaBroker import org.springframework.kafka.test.context.EmbeddedKafka import org.springframework.test.annotation.DirtiesContext @@ -47,6 +48,7 @@ import java.util.concurrent.TimeUnit /** * @author Gary Russell + * @author Huijin Hong * @since 2.2 */ @@ -63,7 +65,7 @@ class EnableKafkaKotlinTests { @Test fun `test listener`() { - this.template.send("kotlinTestTopic1", "foo") + this.template.send("kotlinTestTopic1", "{\"data\":\"foo\"}") assertThat(this.config.latch1.await(10, TimeUnit.SECONDS)).isTrue() assertThat(this.config.received).isEqualTo("foo") } @@ -173,6 +175,7 @@ class EnableKafkaKotlinTests { = ConcurrentKafkaListenerContainerFactory() factory.consumerFactory = kcf() factory.setCommonErrorHandler(eh) + factory.setRecordMessageConverter(JsonMessageConverter()) return factory } @@ -186,9 +189,11 @@ class EnableKafkaKotlinTests { return factory } + data class TestKafkaMessage(val data: String) + @KafkaListener(id = "kotlin", topics = ["kotlinTestTopic1"], containerFactory = "kafkaListenerContainerFactory") - fun listen(value: String) { - this.received = value + fun listen(value: TestKafkaMessage) { + this.received = value.data this.latch1.countDown() }