diff --git a/docs/src/main/asciidoc/resteasy-reactive.adoc b/docs/src/main/asciidoc/resteasy-reactive.adoc index bbc232ca45c4e0..85bc78ead1d7e0 100644 --- a/docs/src/main/asciidoc/resteasy-reactive.adoc +++ b/docs/src/main/asciidoc/resteasy-reactive.adoc @@ -933,6 +933,38 @@ impression that you can set headers or HTTP status codes, which is not true afte response. Exception mappers are also not invoked because part of the response may already have been written. +[TIP] +==== +If you need to set custom HTTP headers and / or the HTTP response, then you can return `org.jboss.resteasy.reactive.RestMulti` instead, like this: + +[source,java] +---- +package org.acme.rest; + +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; + +import org.eclipse.microprofile.reactive.messaging.Channel; + +import io.smallrye.mutiny.Multi; +import org.jboss.resteasy.reactive.RestMulti; + +@Path("logs") +public class Endpoint { + + @Inject + @Channel("log-out") + Multi logs; + + @GET + public Multi streamLogs() { + return RestMulti.from(logs).status(222).header("foo", "bar").build(); + } +} +---- +==== + === Server-Sent Event (SSE) support If you want to stream JSON objects in your response, you can use diff --git a/extensions/resteasy-reactive/jaxrs-client-reactive/deployment/src/main/java/io/quarkus/jaxrs/client/reactive/deployment/JaxrsClientReactiveProcessor.java b/extensions/resteasy-reactive/jaxrs-client-reactive/deployment/src/main/java/io/quarkus/jaxrs/client/reactive/deployment/JaxrsClientReactiveProcessor.java index 4887dd1ae1a9a7..ff75d278888e03 100644 --- a/extensions/resteasy-reactive/jaxrs-client-reactive/deployment/src/main/java/io/quarkus/jaxrs/client/reactive/deployment/JaxrsClientReactiveProcessor.java +++ b/extensions/resteasy-reactive/jaxrs-client-reactive/deployment/src/main/java/io/quarkus/jaxrs/client/reactive/deployment/JaxrsClientReactiveProcessor.java @@ -14,6 +14,7 @@ import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.OBJECT; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.PART_TYPE_NAME; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_FORM_PARAM; +import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_MULTI; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.UNI; import java.io.Closeable; @@ -204,7 +205,7 @@ public class JaxrsClientReactiveProcessor { private static final DotName NOT_BODY = DotName.createSimple("io.quarkus.rest.client.reactive.NotBody"); - private static final Set ASYNC_RETURN_TYPES = Set.of(COMPLETION_STAGE, UNI, MULTI); + private static final Set ASYNC_RETURN_TYPES = Set.of(COMPLETION_STAGE, UNI, MULTI, REST_MULTI); public static final DotName BYTE = DotName.createSimple(Byte.class.getName()); public static final MethodDescriptor MULTIPART_RESPONSE_DATA_ADD_FILLER = MethodDescriptor .ofMethod(MultipartResponseDataBase.class, "addFiller", void.class, FieldFiller.class); @@ -1937,7 +1938,7 @@ private void handleReturn(ClassInfo restClientInterface, String defaultMediaType if (ASYNC_RETURN_TYPES.contains(paramType.name())) { returnCategory = paramType.name().equals(COMPLETION_STAGE) ? ReturnCategory.COMPLETION_STAGE - : paramType.name().equals(MULTI) + : paramType.name().equals(MULTI) || paramType.name().equals(REST_MULTI) ? ReturnCategory.MULTI : ReturnCategory.UNI; @@ -2738,6 +2739,7 @@ private enum ReturnCategory { COMPLETION_STAGE, UNI, MULTI, + REST_MULTI, COROUTINE } diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/main/java/io/quarkus/resteasy/reactive/jackson/deployment/processor/ResteasyReactiveJacksonProcessor.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/main/java/io/quarkus/resteasy/reactive/jackson/deployment/processor/ResteasyReactiveJacksonProcessor.java index 679f4a3fbc915b..a2187966147cf4 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/main/java/io/quarkus/resteasy/reactive/jackson/deployment/processor/ResteasyReactiveJacksonProcessor.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/main/java/io/quarkus/resteasy/reactive/jackson/deployment/processor/ResteasyReactiveJacksonProcessor.java @@ -308,6 +308,7 @@ public void handleFieldSecurity(ResteasyReactiveResourceMethodEntriesBuildItem r effectiveReturnType.name().equals(ResteasyReactiveDotNames.UNI) || effectiveReturnType.name().equals(ResteasyReactiveDotNames.COMPLETABLE_FUTURE) || effectiveReturnType.name().equals(ResteasyReactiveDotNames.COMPLETION_STAGE) || + effectiveReturnType.name().equals(ResteasyReactiveDotNames.REST_MULTI) || effectiveReturnType.name().equals(ResteasyReactiveDotNames.MULTI)) { if (effectiveReturnType.kind() != Type.Kind.PARAMETERIZED_TYPE) { continue; diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamResource.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamResource.java index b6a32a40456d0b..facd204287c1bf 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamResource.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamResource.java @@ -13,6 +13,7 @@ import jakarta.ws.rs.sse.SseBroadcaster; import jakarta.ws.rs.sse.SseEventSink; +import org.jboss.resteasy.reactive.RestMulti; import org.jboss.resteasy.reactive.RestStreamElementType; import org.jboss.resteasy.reactive.common.util.RestMediaType; @@ -97,7 +98,8 @@ public void sseJson2(Sse sse, SseEventSink sink) throws IOException { @GET @RestStreamElementType(MediaType.APPLICATION_JSON) public Multi multiJson() { - return Multi.createFrom().items(new Message("hello"), new Message("stef")); + return RestMulti.from(Multi.createFrom().items(new Message("hello"), new Message("stef"))) + .header("foo", "bar").build(); } @Path("json/multi2") @@ -135,9 +137,9 @@ public Multi multiStreamJsonFast() { for (int i = 0; i < 5000; i++) { ids.add(UUID.randomUUID()); } - return Multi.createFrom().items(ids::stream) + return RestMulti.from(Multi.createFrom().items(ids::stream) .onItem().transform(id -> new Message(id.toString())) - .onOverflow().buffer(81920); + .onOverflow().buffer(81920)).header("foo", "bar").build(); } } diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jaxb/deployment/src/main/java/io/quarkus/resteasy/reactive/jaxb/deployment/ResteasyReactiveJaxbProcessor.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jaxb/deployment/src/main/java/io/quarkus/resteasy/reactive/jaxb/deployment/ResteasyReactiveJaxbProcessor.java index 56b59165cf6c8c..4754235d71973e 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jaxb/deployment/src/main/java/io/quarkus/resteasy/reactive/jaxb/deployment/ResteasyReactiveJaxbProcessor.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jaxb/deployment/src/main/java/io/quarkus/resteasy/reactive/jaxb/deployment/ResteasyReactiveJaxbProcessor.java @@ -158,6 +158,7 @@ private ClassInfo getEffectiveClassInfo(Type type, IndexView indexView) { effectiveType.name().equals(ResteasyReactiveDotNames.UNI) || effectiveType.name().equals(ResteasyReactiveDotNames.COMPLETABLE_FUTURE) || effectiveType.name().equals(ResteasyReactiveDotNames.COMPLETION_STAGE) || + effectiveType.name().equals(ResteasyReactiveDotNames.REST_MULTI) || effectiveType.name().equals(ResteasyReactiveDotNames.MULTI)) { if (effectiveType.kind() != Type.Kind.PARAMETERIZED_TYPE) { return null; diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-links/deployment/src/main/java/io/quarkus/resteasy/reactive/links/deployment/LinksContainerFactory.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-links/deployment/src/main/java/io/quarkus/resteasy/reactive/links/deployment/LinksContainerFactory.java index 5081617941c735..9c4ddafd41688e 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-links/deployment/src/main/java/io/quarkus/resteasy/reactive/links/deployment/LinksContainerFactory.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-links/deployment/src/main/java/io/quarkus/resteasy/reactive/links/deployment/LinksContainerFactory.java @@ -3,6 +3,7 @@ import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.COMPLETABLE_FUTURE; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.COMPLETION_STAGE; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.MULTI; +import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_MULTI; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_RESPONSE; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.UNI; @@ -161,6 +162,7 @@ private Type getNonAsyncReturnType(Type returnType) { || COMPLETABLE_FUTURE.equals(parameterizedType.name()) || UNI.equals(parameterizedType.name()) || MULTI.equals(parameterizedType.name()) + || REST_MULTI.equals(parameterizedType.name()) || REST_RESPONSE.equals(parameterizedType.name())) { return parameterizedType.arguments().get(0); } diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/main/java/io/quarkus/resteasy/reactive/server/deployment/ResteasyReactiveProcessor.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/main/java/io/quarkus/resteasy/reactive/server/deployment/ResteasyReactiveProcessor.java index a758d2a5201485..439bceb1405ff3 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/main/java/io/quarkus/resteasy/reactive/server/deployment/ResteasyReactiveProcessor.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/main/java/io/quarkus/resteasy/reactive/server/deployment/ResteasyReactiveProcessor.java @@ -7,6 +7,7 @@ import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.DATE_FORMAT; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.MULTI; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.PUBLISHER; +import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_MULTI; import java.io.File; import java.io.IOException; @@ -553,6 +554,7 @@ public void accept(EndpointIndexer.ResourceMethodCallbackEntry entry) { if (paramsRequireReflection || MULTI.toString().equals(entry.getResourceMethod().getSimpleReturnType()) || + REST_MULTI.toString().equals(entry.getResourceMethod().getSimpleReturnType()) || PUBLISHER.toString().equals(entry.getResourceMethod().getSimpleReturnType()) || filtersAccessResourceMethod(resourceInterceptorsBuildItem.getResourceInterceptors()) || entry.additionalRegisterClassForReflectionCheck()) { diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/headers/ResponseHeaderTest.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/headers/ResponseHeaderTest.java index eb0f85732cff5f..41e19c703830de 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/headers/ResponseHeaderTest.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/headers/ResponseHeaderTest.java @@ -7,11 +7,14 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import jakarta.ws.rs.DefaultValue; import jakarta.ws.rs.GET; import jakarta.ws.rs.Path; import org.jboss.resteasy.reactive.ResponseHeader; import org.jboss.resteasy.reactive.ResponseStatus; +import org.jboss.resteasy.reactive.RestMulti; +import org.jboss.resteasy.reactive.RestQuery; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -118,6 +121,40 @@ public void testStringThrowsException() { assertFalse(headers.hasHeaderWithName("Access-Control-Allow-Origin")); } + @Test + public void testReturnRestMulti() { + Map expectedHeaders = Map.of( + "Access-Control-Allow-Origin", "foo", + "Keep-Alive", "bar"); + RestAssured + .given() + .get("/test/rest-multi") + .then() + .statusCode(200) + .headers(expectedHeaders); + } + + @Test + public void testReturnRestMulti2() { + RestAssured + .given() + .get("/test/rest-multi2") + .then() + .statusCode(200) + .headers(Map.of( + "Access-Control-Allow-Origin", "foo", + "Keep-Alive", "bar")); + + RestAssured + .given() + .get("/test/rest-multi2?keepAlive=dummy") + .then() + .statusCode(200) + .headers(Map.of( + "Access-Control-Allow-Origin", "foo", + "Keep-Alive", "dummy")); + } + @Path("/test") public static class TestResource { @@ -190,6 +227,22 @@ public String throwExceptionPlain() { throw createException(); } + @ResponseHeader(name = "Access-Control-Allow-Origin", value = "*") + @ResponseHeader(name = "Keep-Alive", value = "timeout=5, max=997") + @GET + @Path("/rest-multi") + public RestMulti getTestRestMulti() { + return RestMulti.from(Multi.createFrom().item("test")).header("Access-Control-Allow-Origin", "foo") + .header("Keep-Alive", "bar").build(); + } + + @GET + @Path("/rest-multi2") + public RestMulti getTestRestMulti2(@DefaultValue("bar") @RestQuery String keepAlive) { + return RestMulti.from(Multi.createFrom().item("test")).header("Access-Control-Allow-Origin", "foo") + .header("Keep-Alive", keepAlive).build(); + } + private IllegalArgumentException createException() { IllegalArgumentException result = new IllegalArgumentException(); result.setStackTrace(EMPTY_STACK_TRACE); diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/status/ResponseStatusTest.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/status/ResponseStatusTest.java index a8496f9d1d8b9a..6321d4a7598bb1 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/status/ResponseStatusTest.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/status/ResponseStatusTest.java @@ -7,6 +7,7 @@ import jakarta.ws.rs.Path; import org.jboss.resteasy.reactive.ResponseStatus; +import org.jboss.resteasy.reactive.RestMulti; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -93,6 +94,24 @@ public void testStringThrowsException() { .statusCode(500); } + @Test + public void testReturnRestMulti() { + RestAssured + .given() + .get("/test/rest-multi") + .then() + .statusCode(210); + } + + @Test + public void testReturnRestMulti2() { + RestAssured + .given() + .get("/test/rest-multi2") + .then() + .statusCode(211); + } + @Path("/test") public static class TestResource { @@ -154,6 +173,19 @@ public String throwExceptionPlain() { throw createException(); } + @ResponseStatus(202) + @GET + @Path("/rest-multi") + public RestMulti getTestRestMulti() { + return RestMulti.from(Multi.createFrom().item("test")).status(210).build(); + } + + @GET + @Path("/rest-multi2") + public RestMulti getTestRestMulti2() { + return RestMulti.from(Multi.createFrom().item("test")).status(211).build(); + } + private IllegalArgumentException createException() { IllegalArgumentException result = new IllegalArgumentException(); result.setStackTrace(EMPTY_STACK_TRACE); diff --git a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/EndpointIndexer.java b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/EndpointIndexer.java index ae927ba11f923a..e52a18d68850aa 100644 --- a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/EndpointIndexer.java +++ b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/EndpointIndexer.java @@ -57,6 +57,7 @@ import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_FORM_PARAM; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_HEADER_PARAM; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_MATRIX_PARAM; +import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_MULTI; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_PATH_PARAM; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_QUERY_PARAM; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_RESPONSE; @@ -990,6 +991,7 @@ private Type getNonAsyncReturnType(Type returnType) { || COMPLETABLE_FUTURE.equals(parameterizedType.name()) || UNI.equals(parameterizedType.name()) || MULTI.equals(parameterizedType.name()) + || REST_MULTI.equals(parameterizedType.name()) || REST_RESPONSE.equals(parameterizedType.name())) { return parameterizedType.arguments().get(0); } diff --git a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java index dd8e13857d185b..191449b5533108 100644 --- a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java +++ b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java @@ -88,6 +88,7 @@ import org.jboss.resteasy.reactive.RestForm; import org.jboss.resteasy.reactive.RestHeader; import org.jboss.resteasy.reactive.RestMatrix; +import org.jboss.resteasy.reactive.RestMulti; import org.jboss.resteasy.reactive.RestPath; import org.jboss.resteasy.reactive.RestQuery; import org.jboss.resteasy.reactive.RestResponse; @@ -202,6 +203,7 @@ public final class ResteasyReactiveDotNames { public static final DotName UNI = DotName.createSimple(Uni.class.getName()); public static final DotName MULTI = DotName.createSimple(Multi.class.getName()); + public static final DotName REST_MULTI = DotName.createSimple(RestMulti.class.getName()); public static final DotName COMPLETION_STAGE = DotName.createSimple(CompletionStage.class.getName()); public static final DotName COMPLETABLE_FUTURE = DotName.createSimple(CompletableFuture.class.getName()); public static final DotName PUBLISHER = DotName.createSimple(Publisher.class.getName()); diff --git a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/RestMulti.java b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/RestMulti.java new file mode 100644 index 00000000000000..34108bf1db7505 --- /dev/null +++ b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/RestMulti.java @@ -0,0 +1,77 @@ +package org.jboss.resteasy.reactive; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import org.jboss.resteasy.reactive.common.util.CaseInsensitiveMap; +import org.jboss.resteasy.reactive.common.util.MultivaluedTreeMap; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.smallrye.mutiny.operators.AbstractMulti; +import io.smallrye.mutiny.subscription.MultiSubscriber; + +/** + * A wrapper around {@link Multi} that gives resource methods a way to specify the HTTP status code and HTTP headers + * when streaming a result. + */ +public class RestMulti extends AbstractMulti { + + private final Multi multi; + private final Integer status; + private final MultivaluedTreeMap headers; + + public static RestMulti.Builder from(Multi multi) { + return new RestMulti.Builder<>(multi); + } + + private RestMulti(Builder builder) { + this.multi = builder.multi; + this.status = builder.status; + this.headers = builder.headers; + } + + @Override + public void subscribe(MultiSubscriber subscriber) { + multi.subscribe(Infrastructure.onMultiSubscription(multi, subscriber)); + } + + public Integer getStatus() { + return status; + } + + public Map> getHeaders() { + return headers; + } + + public static class Builder { + private final Multi multi; + + private Integer status; + + private final MultivaluedTreeMap headers = new CaseInsensitiveMap<>(); + + private Builder(Multi multi) { + this.multi = Objects.requireNonNull(multi, "multi cannot be null"); + } + + public Builder status(int status) { + this.status = status; + return this; + } + + public Builder header(String name, String value) { + if (value == null) { + headers.remove(name); + return this; + } + headers.add(name, value); + return this; + } + + public RestMulti build() { + return new RestMulti<>(this); + } + } +} diff --git a/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java b/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java index 5cf0e9ce56ad48..53393c76317b64 100644 --- a/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java +++ b/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java @@ -5,6 +5,7 @@ import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.LEGACY_PUBLISHER; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.MULTI; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.PUBLISHER; +import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_MULTI; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.UNI; import java.util.Collections; @@ -33,7 +34,8 @@ public List scan(MethodInfo method, ClassInfo actualEndp return Collections.singletonList(new FixedHandlerChainCustomizer(new UniResponseHandler(), HandlerChainCustomizer.Phase.AFTER_METHOD_INVOKE)); } - if (returnTypeName.equals(MULTI) || returnTypeName.equals(PUBLISHER) || returnTypeName.equals(LEGACY_PUBLISHER)) { + if (returnTypeName.equals(MULTI) || returnTypeName.equals(REST_MULTI) || returnTypeName.equals(PUBLISHER) + || returnTypeName.equals(LEGACY_PUBLISHER)) { return Collections.singletonList(new FixedHandlerChainCustomizer(new PublisherResponseHandler(), HandlerChainCustomizer.Phase.AFTER_METHOD_INVOKE)); } @@ -44,7 +46,7 @@ public List scan(MethodInfo method, ClassInfo actualEndp public boolean isMethodSignatureAsync(MethodInfo method) { DotName returnTypeName = method.returnType().name(); return returnTypeName.equals(COMPLETION_STAGE) || returnTypeName.equals(COMPLETABLE_FUTURE) || - returnTypeName.equals(UNI) || returnTypeName.equals(MULTI) || returnTypeName.equals(PUBLISHER) || - returnTypeName.equals(LEGACY_PUBLISHER); + returnTypeName.equals(UNI) || returnTypeName.equals(MULTI) || returnTypeName.equals(REST_MULTI) || + returnTypeName.equals(PUBLISHER) || returnTypeName.equals(LEGACY_PUBLISHER); } } diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/SseUtil.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/SseUtil.java index edef2f560405d0..c6f1aed6baf717 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/SseUtil.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/SseUtil.java @@ -165,8 +165,6 @@ public static void setHeaders(ResteasyReactiveRequestContext context, ServerHttp for (int i = 0; i < customizers.size(); i++) { customizers.get(i).customize(response); } - // FIXME: other headers? - } } } diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/StreamingUtil.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/StreamingUtil.java index 8140b79ca2f394..b32281d6833d7c 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/StreamingUtil.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/StreamingUtil.java @@ -88,7 +88,6 @@ public static void setHeaders(ResteasyReactiveRequestContext context, ServerHttp for (int i = 0; i < customizers.size(); i++) { customizers.get(i).customize(response); } - // FIXME: other headers? } } } diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java index d10f6fcfdaecd0..98aefaf1c2f9ba 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java @@ -3,6 +3,7 @@ import static org.jboss.resteasy.reactive.server.jaxrs.SseEventSinkImpl.EMPTY_BUFFER; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -17,6 +18,7 @@ import jakarta.ws.rs.sse.OutboundSseEvent; import org.jboss.logging.Logger; +import org.jboss.resteasy.reactive.RestMulti; import org.jboss.resteasy.reactive.common.util.RestMediaType; import org.jboss.resteasy.reactive.common.util.ServerMediaType; import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext; @@ -294,14 +296,37 @@ private boolean requiresChunkedStream(MediaType mediaType) { } private void handleChunkedStreaming(ResteasyReactiveRequestContext requestContext, Publisher result, boolean json) { - result.subscribe(new ChunkedStreamingMultiSubscriber(requestContext, streamingResponseCustomizers, json)); + result.subscribe(new ChunkedStreamingMultiSubscriber(requestContext, determineCustomizers(result), json)); + } + + private List determineCustomizers(Publisher publisher) { + if (publisher instanceof RestMulti) { + RestMulti restMulti = (RestMulti) publisher; + Map> headers = restMulti.getHeaders(); + Integer status = restMulti.getStatus(); + if (headers.isEmpty() && (status == null)) { + return streamingResponseCustomizers; + } + List result = new ArrayList<>(streamingResponseCustomizers.size() + 2); + result.addAll(streamingResponseCustomizers); // these are added first so that the result specific values will take precedence if there are conflicts + if (!headers.isEmpty()) { + result.add(new StreamingResponseCustomizer.AddHeadersCustomizer(headers)); + } + if (status != null) { + result.add(new StreamingResponseCustomizer.StatusCustomizer(status)); + } + return result; + } + + return streamingResponseCustomizers; } private void handleStreaming(ResteasyReactiveRequestContext requestContext, Publisher result, boolean json) { - result.subscribe(new StreamingMultiSubscriber(requestContext, streamingResponseCustomizers, json)); + result.subscribe(new StreamingMultiSubscriber(requestContext, determineCustomizers(result), json)); } private void handleSse(ResteasyReactiveRequestContext requestContext, Publisher result) { + List streamingResponseCustomizers = determineCustomizers(result); SseUtil.setHeaders(requestContext, requestContext.serverResponse(), streamingResponseCustomizers); requestContext.suspend(); requestContext.serverResponse().write(EMPTY_BUFFER, new Consumer() {