diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-kotlin/deployment/src/main/java/io/quarkus/resteasy/reactive/kotlin/deployment/KotlinCoroutineIntegrationProcessor.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-kotlin/deployment/src/main/java/io/quarkus/resteasy/reactive/kotlin/deployment/KotlinCoroutineIntegrationProcessor.java index 922314bd4d383..5649b99283e79 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-kotlin/deployment/src/main/java/io/quarkus/resteasy/reactive/kotlin/deployment/KotlinCoroutineIntegrationProcessor.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-kotlin/deployment/src/main/java/io/quarkus/resteasy/reactive/kotlin/deployment/KotlinCoroutineIntegrationProcessor.java @@ -69,6 +69,14 @@ public List scan(MethodInfo method, ClassInfo actualEndp method.declaringClass(), method, (BuildProducer) methodContext.get(GeneratedClassBuildItem.class.getName()), recorder)); + if (methodContext.containsKey(EndpointIndexer.METHOD_CONTEXT_CUSTOM_RETURN_TYPE_KEY)) { + Type methodReturnType = (Type) methodContext.get(EndpointIndexer.METHOD_CONTEXT_CUSTOM_RETURN_TYPE_KEY); + if (methodReturnType != null) { + if (methodReturnType.name().equals(FLOW)) { + return List.of(processor, flowCustomizer()); + } + } + } return Collections.singletonList(processor); } return Collections.emptyList(); @@ -172,9 +180,7 @@ public List scan(MethodInfo method, ClassInfo actualEndp Map methodContext) { DotName returnTypeName = method.returnType().name(); if (returnTypeName.equals(FLOW)) { - return Collections.singletonList(new FixedHandlersChainCustomizer( - List.of(new FlowToPublisherHandler(), new PublisherResponseHandler()), - HandlerChainCustomizer.Phase.AFTER_METHOD_INVOKE)); + return Collections.singletonList(flowCustomizer()); } return Collections.emptyList(); } @@ -185,4 +191,10 @@ public boolean isMethodSignatureAsync(MethodInfo info) { } }); } + + private static HandlerChainCustomizer flowCustomizer() { + return new FixedHandlersChainCustomizer( + List.of(new FlowToPublisherHandler(), new PublisherResponseHandler()), + HandlerChainCustomizer.Phase.AFTER_METHOD_INVOKE); + } } diff --git a/integration-tests/resteasy-reactive-kotlin/standard/src/main/kotlin/io/quarkus/it/resteasy/reactive/kotlin/FlowResource.kt b/integration-tests/resteasy-reactive-kotlin/standard/src/main/kotlin/io/quarkus/it/resteasy/reactive/kotlin/FlowResource.kt index 339caf171586c..f5c99f697c2a5 100644 --- a/integration-tests/resteasy-reactive-kotlin/standard/src/main/kotlin/io/quarkus/it/resteasy/reactive/kotlin/FlowResource.kt +++ b/integration-tests/resteasy-reactive-kotlin/standard/src/main/kotlin/io/quarkus/it/resteasy/reactive/kotlin/FlowResource.kt @@ -1,6 +1,7 @@ package io.quarkus.it.resteasy.reactive.kotlin import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow import org.jboss.resteasy.reactive.RestSseElementType import javax.ws.rs.GET @@ -21,6 +22,19 @@ class FlowResource(private val uppercaseService: UppercaseService) { emit(uppercaseService.convert("Flow")) } + @GET + @Path("suspendStr") + @Produces(MediaType.SERVER_SENT_EVENTS) + suspend fun suspendSseStrings(): Flow { + delay(100) + return flow { + emit(uppercaseService.convert("Hello")) + emit(uppercaseService.convert("From")) + emit(uppercaseService.convert("Kotlin")) + emit(uppercaseService.convert("Flow")) + } + } + @GET @Path("json") @Produces(MediaType.SERVER_SENT_EVENTS) diff --git a/integration-tests/resteasy-reactive-kotlin/standard/src/test/kotlin/io/quarkus/it/resteasy/reactive/kotlin/FlowResourceTest.kt b/integration-tests/resteasy-reactive-kotlin/standard/src/test/kotlin/io/quarkus/it/resteasy/reactive/kotlin/FlowResourceTest.kt index fd1adfefa40af..d78b94ae76bf2 100644 --- a/integration-tests/resteasy-reactive-kotlin/standard/src/test/kotlin/io/quarkus/it/resteasy/reactive/kotlin/FlowResourceTest.kt +++ b/integration-tests/resteasy-reactive-kotlin/standard/src/test/kotlin/io/quarkus/it/resteasy/reactive/kotlin/FlowResourceTest.kt @@ -18,12 +18,19 @@ class FlowResourceTest { var flowPath: String? = null @Test - fun testSeeStrings() { + fun testSseStrings() { testSse("str", 5) { assertThat(it).containsExactly("HELLO", "FROM", "KOTLIN", "FLOW") } } + @Test + fun testSuspendSseStrings() { + testSse("suspendStr", 5) { + assertThat(it).containsExactly("HELLO", "FROM", "KOTLIN", "FLOW") + } + } + @Test fun testSeeJson() { testSse("json", 10) {