Skip to content

Commit

Permalink
Permit the use of suspend on a method returning Flow
Browse files Browse the repository at this point in the history
Fixes: quarkusio#20345
(cherry picked from commit 1486dff)
  • Loading branch information
geoand committed Sep 28, 2021
1 parent 20f34ad commit 3724f87
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ public List<HandlerChainCustomizer> scan(MethodInfo method, ClassInfo actualEndp
method.declaringClass(), method,
(BuildProducer<GeneratedClassBuildItem>) methodContext.get(GeneratedClassBuildItem.class.getName()),
recorder));
if (methodContext.containsKey(EndpointIndexer.METHOD_CONTEXT_CUSTOM_RETURN_TYPE_KEY)) {
Type methodReturnType = (Type) methodContext.get(EndpointIndexer.METHOD_CONTEXT_CUSTOM_RETURN_TYPE_KEY);
if (methodReturnType != null) {
if (methodReturnType.name().equals(FLOW)) {
return List.of(processor, flowCustomizer());
}
}
}
return Collections.singletonList(processor);
}
return Collections.emptyList();
Expand Down Expand Up @@ -172,9 +180,7 @@ public List<HandlerChainCustomizer> scan(MethodInfo method, ClassInfo actualEndp
Map<String, Object> methodContext) {
DotName returnTypeName = method.returnType().name();
if (returnTypeName.equals(FLOW)) {
return Collections.singletonList(new FixedHandlersChainCustomizer(
List.of(new FlowToPublisherHandler(), new PublisherResponseHandler()),
HandlerChainCustomizer.Phase.AFTER_METHOD_INVOKE));
return Collections.singletonList(flowCustomizer());
}
return Collections.emptyList();
}
Expand All @@ -185,4 +191,10 @@ public boolean isMethodSignatureAsync(MethodInfo info) {
}
});
}

private static HandlerChainCustomizer flowCustomizer() {
return new FixedHandlersChainCustomizer(
List.of(new FlowToPublisherHandler(), new PublisherResponseHandler()),
HandlerChainCustomizer.Phase.AFTER_METHOD_INVOKE);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.quarkus.it.resteasy.reactive.kotlin

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import org.jboss.resteasy.reactive.RestSseElementType
import javax.ws.rs.GET
Expand All @@ -21,6 +22,19 @@ class FlowResource(private val uppercaseService: UppercaseService) {
emit(uppercaseService.convert("Flow"))
}

@GET
@Path("suspendStr")
@Produces(MediaType.SERVER_SENT_EVENTS)
suspend fun suspendSseStrings(): Flow<String> {
delay(100)
return flow {
emit(uppercaseService.convert("Hello"))
emit(uppercaseService.convert("From"))
emit(uppercaseService.convert("Kotlin"))
emit(uppercaseService.convert("Flow"))
}
}

@GET
@Path("json")
@Produces(MediaType.SERVER_SENT_EVENTS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@ class FlowResourceTest {
var flowPath: String? = null

@Test
fun testSeeStrings() {
fun testSseStrings() {
testSse("str", 5) {
assertThat(it).containsExactly("HELLO", "FROM", "KOTLIN", "FLOW")
}
}

@Test
fun testSuspendSseStrings() {
testSse("suspendStr", 5) {
assertThat(it).containsExactly("HELLO", "FROM", "KOTLIN", "FLOW")
}
}

@Test
fun testSeeJson() {
testSse("json", 10) {
Expand Down

0 comments on commit 3724f87

Please sign in to comment.