Skip to content

Commit

Permalink
GH-3277 : Fix coroutine detection logic in MessagingMessageListenerAd…
Browse files Browse the repository at this point in the history
…apter

Fixes: #3277

* Fix coroutine detection logic in `MessagingMessageListenerAdapter` by continuation class type

* Use proper `@since 3.2.1` for new API

**Auto-cherry-pick to `3.2.x`**
  • Loading branch information
huisam authored May 30, 2024
1 parent 12ed737 commit abaa465
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
*
* @author Gary Russell
* @author Wang Zhiyang
* @author Huijin Hong
* @since 2.5
*
*/
Expand Down Expand Up @@ -124,4 +125,13 @@ public static boolean isCompletableFuture(Class<?> resultType) {
return CompletableFuture.class.isAssignableFrom(resultType);
}

/**
* Return the true when type is {@code Continuation}.
* @param parameterType {@code MethodParameter} parameter type.
* @return type is {@code Continuation}.
* @since 3.2.1
*/
public static boolean isKotlinContinuation(Class<?> parameterType) {
return "kotlin.coroutines.Continuation".equals(parameterType.getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* but for regular {@link HandlerMethodArgumentResolver} contract.
*
* @author Wang Zhiyang
* @author Huijin Hong
*
* @since 3.2
*
Expand All @@ -39,7 +40,7 @@ public class ContinuationHandlerMethodArgumentResolver implements HandlerMethodA

@Override
public boolean supportsParameter(MethodParameter parameter) {
return "kotlin.coroutines.Continuation".equals(parameter.getParameterType().getName());
return AdapterUtils.isKotlinContinuation(parameter.getParameterType());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.kafka.common.TopicPartition;

import org.springframework.context.expression.MapAccessor;
import org.springframework.core.KotlinDetector;
import org.springframework.core.MethodParameter;
import org.springframework.core.log.LogAccessor;
import org.springframework.expression.BeanResolver;
Expand Down Expand Up @@ -90,6 +89,7 @@
* @author Venil Noronha
* @author Nathan Xu
* @author Wang ZhiYang
* @author Huijin Hong
*/
public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerSeekAware, AsyncRepliesAware {

Expand Down Expand Up @@ -763,8 +763,8 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity
isNotConvertible |= isAck;
boolean isConsumer = parameterIsType(parameterType, Consumer.class);
isNotConvertible |= isConsumer;
boolean isCoroutines = KotlinDetector.isKotlinType(methodParameter.getParameterType());
isNotConvertible |= isCoroutines;
boolean isKotlinContinuation = AdapterUtils.isKotlinContinuation(methodParameter.getParameterType());
isNotConvertible |= isKotlinContinuation;
boolean isMeta = parameterIsType(parameterType, ConsumerRecordMetadata.class);
this.hasMetadataParameter |= isMeta;
isNotConvertible |= isMeta;
Expand All @@ -783,7 +783,7 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity
break;
}
}
else if (isAck || isCoroutines || isConsumer || annotationHeaderIsGroupId(methodParameter)) {
else if (isAck || isKotlinContinuation || isConsumer || annotationHeaderIsGroupId(methodParameter)) {
allowedBatchParameters++;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,6 +36,7 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory
import org.springframework.kafka.listener.*
import org.springframework.kafka.support.converter.JsonMessageConverter
import org.springframework.kafka.test.EmbeddedKafkaBroker
import org.springframework.kafka.test.context.EmbeddedKafka
import org.springframework.test.annotation.DirtiesContext
Expand All @@ -47,6 +48,7 @@ import java.util.concurrent.TimeUnit

/**
* @author Gary Russell
* @author Huijin Hong
* @since 2.2
*/

Expand All @@ -63,7 +65,7 @@ class EnableKafkaKotlinTests {

@Test
fun `test listener`() {
this.template.send("kotlinTestTopic1", "foo")
this.template.send("kotlinTestTopic1", "{\"data\":\"foo\"}")
assertThat(this.config.latch1.await(10, TimeUnit.SECONDS)).isTrue()
assertThat(this.config.received).isEqualTo("foo")
}
Expand Down Expand Up @@ -173,6 +175,7 @@ class EnableKafkaKotlinTests {
= ConcurrentKafkaListenerContainerFactory()
factory.consumerFactory = kcf()
factory.setCommonErrorHandler(eh)
factory.setRecordMessageConverter(JsonMessageConverter())
return factory
}

Expand All @@ -186,9 +189,11 @@ class EnableKafkaKotlinTests {
return factory
}

data class TestKafkaMessage(val data: String)

@KafkaListener(id = "kotlin", topics = ["kotlinTestTopic1"], containerFactory = "kafkaListenerContainerFactory")
fun listen(value: String) {
this.received = value
fun listen(value: TestKafkaMessage) {
this.received = value.data
this.latch1.countDown()
}

Expand Down

0 comments on commit abaa465

Please sign in to comment.