From f4cc1687c43beae3bb9db4f49559b289c5ff24b7 Mon Sep 17 00:00:00 2001 From: Georgios Andrianakis Date: Mon, 8 May 2023 14:55:58 +0300 Subject: [PATCH] Introduce a way to set headers and status code for streaming response Relates to: #33130 --- docs/src/main/asciidoc/resteasy-reactive.adoc | 32 ++++++++ .../JaxrsClientReactiveProcessor.java | 6 +- .../ResteasyReactiveJacksonProcessor.java | 1 + .../test/streams/StreamResource.java | 8 +- .../ResteasyReactiveJaxbProcessor.java | 1 + .../deployment/LinksContainerFactory.java | 2 + .../deployment/ResteasyReactiveProcessor.java | 2 + .../test/headers/ResponseHeaderTest.java | 53 +++++++++++++ .../test/status/ResponseStatusTest.java | 32 ++++++++ .../common/processor/EndpointIndexer.java | 2 + .../processor/ResteasyReactiveDotNames.java | 2 + .../jboss/resteasy/reactive/RestMulti.java | 77 +++++++++++++++++++ .../scanning/AsyncReturnTypeScanner.java | 8 +- .../reactive/server/core/SseUtil.java | 2 - .../reactive/server/core/StreamingUtil.java | 1 - .../handlers/PublisherResponseHandler.java | 29 ++++++- 16 files changed, 245 insertions(+), 13 deletions(-) create mode 100644 independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/RestMulti.java diff --git a/docs/src/main/asciidoc/resteasy-reactive.adoc b/docs/src/main/asciidoc/resteasy-reactive.adoc index bbc232ca45c4e..85bc78ead1d7e 100644 --- a/docs/src/main/asciidoc/resteasy-reactive.adoc +++ b/docs/src/main/asciidoc/resteasy-reactive.adoc @@ -933,6 +933,38 @@ impression that you can set headers or HTTP status codes, which is not true afte response. Exception mappers are also not invoked because part of the response may already have been written. +[TIP] +==== +If you need to set custom HTTP headers and / or the HTTP response, then you can return `org.jboss.resteasy.reactive.RestMulti` instead, like this: + +[source,java] +---- +package org.acme.rest; + +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; + +import org.eclipse.microprofile.reactive.messaging.Channel; + +import io.smallrye.mutiny.Multi; +import org.jboss.resteasy.reactive.RestMulti; + +@Path("logs") +public class Endpoint { + + @Inject + @Channel("log-out") + Multi logs; + + @GET + public Multi streamLogs() { + return RestMulti.from(logs).status(222).header("foo", "bar").build(); + } +} +---- +==== + === Server-Sent Event (SSE) support If you want to stream JSON objects in your response, you can use diff --git a/extensions/resteasy-reactive/jaxrs-client-reactive/deployment/src/main/java/io/quarkus/jaxrs/client/reactive/deployment/JaxrsClientReactiveProcessor.java b/extensions/resteasy-reactive/jaxrs-client-reactive/deployment/src/main/java/io/quarkus/jaxrs/client/reactive/deployment/JaxrsClientReactiveProcessor.java index 4887dd1ae1a9a..ff75d278888e0 100644 --- a/extensions/resteasy-reactive/jaxrs-client-reactive/deployment/src/main/java/io/quarkus/jaxrs/client/reactive/deployment/JaxrsClientReactiveProcessor.java +++ b/extensions/resteasy-reactive/jaxrs-client-reactive/deployment/src/main/java/io/quarkus/jaxrs/client/reactive/deployment/JaxrsClientReactiveProcessor.java @@ -14,6 +14,7 @@ import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.OBJECT; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.PART_TYPE_NAME; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_FORM_PARAM; +import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_MULTI; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.UNI; import java.io.Closeable; @@ -204,7 +205,7 @@ public class JaxrsClientReactiveProcessor { private static final DotName NOT_BODY = DotName.createSimple("io.quarkus.rest.client.reactive.NotBody"); - private static final Set ASYNC_RETURN_TYPES = Set.of(COMPLETION_STAGE, UNI, MULTI); + private static final Set ASYNC_RETURN_TYPES = Set.of(COMPLETION_STAGE, UNI, MULTI, REST_MULTI); public static final DotName BYTE = DotName.createSimple(Byte.class.getName()); public static final MethodDescriptor MULTIPART_RESPONSE_DATA_ADD_FILLER = MethodDescriptor .ofMethod(MultipartResponseDataBase.class, "addFiller", void.class, FieldFiller.class); @@ -1937,7 +1938,7 @@ private void handleReturn(ClassInfo restClientInterface, String defaultMediaType if (ASYNC_RETURN_TYPES.contains(paramType.name())) { returnCategory = paramType.name().equals(COMPLETION_STAGE) ? ReturnCategory.COMPLETION_STAGE - : paramType.name().equals(MULTI) + : paramType.name().equals(MULTI) || paramType.name().equals(REST_MULTI) ? ReturnCategory.MULTI : ReturnCategory.UNI; @@ -2738,6 +2739,7 @@ private enum ReturnCategory { COMPLETION_STAGE, UNI, MULTI, + REST_MULTI, COROUTINE } diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/main/java/io/quarkus/resteasy/reactive/jackson/deployment/processor/ResteasyReactiveJacksonProcessor.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/main/java/io/quarkus/resteasy/reactive/jackson/deployment/processor/ResteasyReactiveJacksonProcessor.java index 679f4a3fbc915..a2187966147cf 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/main/java/io/quarkus/resteasy/reactive/jackson/deployment/processor/ResteasyReactiveJacksonProcessor.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/main/java/io/quarkus/resteasy/reactive/jackson/deployment/processor/ResteasyReactiveJacksonProcessor.java @@ -308,6 +308,7 @@ public void handleFieldSecurity(ResteasyReactiveResourceMethodEntriesBuildItem r effectiveReturnType.name().equals(ResteasyReactiveDotNames.UNI) || effectiveReturnType.name().equals(ResteasyReactiveDotNames.COMPLETABLE_FUTURE) || effectiveReturnType.name().equals(ResteasyReactiveDotNames.COMPLETION_STAGE) || + effectiveReturnType.name().equals(ResteasyReactiveDotNames.REST_MULTI) || effectiveReturnType.name().equals(ResteasyReactiveDotNames.MULTI)) { if (effectiveReturnType.kind() != Type.Kind.PARAMETERIZED_TYPE) { continue; diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamResource.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamResource.java index b6a32a40456d0..facd204287c1b 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamResource.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamResource.java @@ -13,6 +13,7 @@ import jakarta.ws.rs.sse.SseBroadcaster; import jakarta.ws.rs.sse.SseEventSink; +import org.jboss.resteasy.reactive.RestMulti; import org.jboss.resteasy.reactive.RestStreamElementType; import org.jboss.resteasy.reactive.common.util.RestMediaType; @@ -97,7 +98,8 @@ public void sseJson2(Sse sse, SseEventSink sink) throws IOException { @GET @RestStreamElementType(MediaType.APPLICATION_JSON) public Multi multiJson() { - return Multi.createFrom().items(new Message("hello"), new Message("stef")); + return RestMulti.from(Multi.createFrom().items(new Message("hello"), new Message("stef"))) + .header("foo", "bar").build(); } @Path("json/multi2") @@ -135,9 +137,9 @@ public Multi multiStreamJsonFast() { for (int i = 0; i < 5000; i++) { ids.add(UUID.randomUUID()); } - return Multi.createFrom().items(ids::stream) + return RestMulti.from(Multi.createFrom().items(ids::stream) .onItem().transform(id -> new Message(id.toString())) - .onOverflow().buffer(81920); + .onOverflow().buffer(81920)).header("foo", "bar").build(); } } diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jaxb/deployment/src/main/java/io/quarkus/resteasy/reactive/jaxb/deployment/ResteasyReactiveJaxbProcessor.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jaxb/deployment/src/main/java/io/quarkus/resteasy/reactive/jaxb/deployment/ResteasyReactiveJaxbProcessor.java index 56b59165cf6c8..4754235d71973 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jaxb/deployment/src/main/java/io/quarkus/resteasy/reactive/jaxb/deployment/ResteasyReactiveJaxbProcessor.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jaxb/deployment/src/main/java/io/quarkus/resteasy/reactive/jaxb/deployment/ResteasyReactiveJaxbProcessor.java @@ -158,6 +158,7 @@ private ClassInfo getEffectiveClassInfo(Type type, IndexView indexView) { effectiveType.name().equals(ResteasyReactiveDotNames.UNI) || effectiveType.name().equals(ResteasyReactiveDotNames.COMPLETABLE_FUTURE) || effectiveType.name().equals(ResteasyReactiveDotNames.COMPLETION_STAGE) || + effectiveType.name().equals(ResteasyReactiveDotNames.REST_MULTI) || effectiveType.name().equals(ResteasyReactiveDotNames.MULTI)) { if (effectiveType.kind() != Type.Kind.PARAMETERIZED_TYPE) { return null; diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-links/deployment/src/main/java/io/quarkus/resteasy/reactive/links/deployment/LinksContainerFactory.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-links/deployment/src/main/java/io/quarkus/resteasy/reactive/links/deployment/LinksContainerFactory.java index 5081617941c73..9c4ddafd41688 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-links/deployment/src/main/java/io/quarkus/resteasy/reactive/links/deployment/LinksContainerFactory.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-links/deployment/src/main/java/io/quarkus/resteasy/reactive/links/deployment/LinksContainerFactory.java @@ -3,6 +3,7 @@ import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.COMPLETABLE_FUTURE; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.COMPLETION_STAGE; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.MULTI; +import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_MULTI; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_RESPONSE; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.UNI; @@ -161,6 +162,7 @@ private Type getNonAsyncReturnType(Type returnType) { || COMPLETABLE_FUTURE.equals(parameterizedType.name()) || UNI.equals(parameterizedType.name()) || MULTI.equals(parameterizedType.name()) + || REST_MULTI.equals(parameterizedType.name()) || REST_RESPONSE.equals(parameterizedType.name())) { return parameterizedType.arguments().get(0); } diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/main/java/io/quarkus/resteasy/reactive/server/deployment/ResteasyReactiveProcessor.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/main/java/io/quarkus/resteasy/reactive/server/deployment/ResteasyReactiveProcessor.java index a758d2a520148..439bceb1405ff 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/main/java/io/quarkus/resteasy/reactive/server/deployment/ResteasyReactiveProcessor.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/main/java/io/quarkus/resteasy/reactive/server/deployment/ResteasyReactiveProcessor.java @@ -7,6 +7,7 @@ import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.DATE_FORMAT; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.MULTI; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.PUBLISHER; +import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_MULTI; import java.io.File; import java.io.IOException; @@ -553,6 +554,7 @@ public void accept(EndpointIndexer.ResourceMethodCallbackEntry entry) { if (paramsRequireReflection || MULTI.toString().equals(entry.getResourceMethod().getSimpleReturnType()) || + REST_MULTI.toString().equals(entry.getResourceMethod().getSimpleReturnType()) || PUBLISHER.toString().equals(entry.getResourceMethod().getSimpleReturnType()) || filtersAccessResourceMethod(resourceInterceptorsBuildItem.getResourceInterceptors()) || entry.additionalRegisterClassForReflectionCheck()) { diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/headers/ResponseHeaderTest.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/headers/ResponseHeaderTest.java index eb0f85732cff5..41e19c703830d 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/headers/ResponseHeaderTest.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/headers/ResponseHeaderTest.java @@ -7,11 +7,14 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import jakarta.ws.rs.DefaultValue; import jakarta.ws.rs.GET; import jakarta.ws.rs.Path; import org.jboss.resteasy.reactive.ResponseHeader; import org.jboss.resteasy.reactive.ResponseStatus; +import org.jboss.resteasy.reactive.RestMulti; +import org.jboss.resteasy.reactive.RestQuery; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -118,6 +121,40 @@ public void testStringThrowsException() { assertFalse(headers.hasHeaderWithName("Access-Control-Allow-Origin")); } + @Test + public void testReturnRestMulti() { + Map expectedHeaders = Map.of( + "Access-Control-Allow-Origin", "foo", + "Keep-Alive", "bar"); + RestAssured + .given() + .get("/test/rest-multi") + .then() + .statusCode(200) + .headers(expectedHeaders); + } + + @Test + public void testReturnRestMulti2() { + RestAssured + .given() + .get("/test/rest-multi2") + .then() + .statusCode(200) + .headers(Map.of( + "Access-Control-Allow-Origin", "foo", + "Keep-Alive", "bar")); + + RestAssured + .given() + .get("/test/rest-multi2?keepAlive=dummy") + .then() + .statusCode(200) + .headers(Map.of( + "Access-Control-Allow-Origin", "foo", + "Keep-Alive", "dummy")); + } + @Path("/test") public static class TestResource { @@ -190,6 +227,22 @@ public String throwExceptionPlain() { throw createException(); } + @ResponseHeader(name = "Access-Control-Allow-Origin", value = "*") + @ResponseHeader(name = "Keep-Alive", value = "timeout=5, max=997") + @GET + @Path("/rest-multi") + public RestMulti getTestRestMulti() { + return RestMulti.from(Multi.createFrom().item("test")).header("Access-Control-Allow-Origin", "foo") + .header("Keep-Alive", "bar").build(); + } + + @GET + @Path("/rest-multi2") + public RestMulti getTestRestMulti2(@DefaultValue("bar") @RestQuery String keepAlive) { + return RestMulti.from(Multi.createFrom().item("test")).header("Access-Control-Allow-Origin", "foo") + .header("Keep-Alive", keepAlive).build(); + } + private IllegalArgumentException createException() { IllegalArgumentException result = new IllegalArgumentException(); result.setStackTrace(EMPTY_STACK_TRACE); diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/status/ResponseStatusTest.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/status/ResponseStatusTest.java index a8496f9d1d8b9..6321d4a7598bb 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/status/ResponseStatusTest.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/status/ResponseStatusTest.java @@ -7,6 +7,7 @@ import jakarta.ws.rs.Path; import org.jboss.resteasy.reactive.ResponseStatus; +import org.jboss.resteasy.reactive.RestMulti; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -93,6 +94,24 @@ public void testStringThrowsException() { .statusCode(500); } + @Test + public void testReturnRestMulti() { + RestAssured + .given() + .get("/test/rest-multi") + .then() + .statusCode(210); + } + + @Test + public void testReturnRestMulti2() { + RestAssured + .given() + .get("/test/rest-multi2") + .then() + .statusCode(211); + } + @Path("/test") public static class TestResource { @@ -154,6 +173,19 @@ public String throwExceptionPlain() { throw createException(); } + @ResponseStatus(202) + @GET + @Path("/rest-multi") + public RestMulti getTestRestMulti() { + return RestMulti.from(Multi.createFrom().item("test")).status(210).build(); + } + + @GET + @Path("/rest-multi2") + public RestMulti getTestRestMulti2() { + return RestMulti.from(Multi.createFrom().item("test")).status(211).build(); + } + private IllegalArgumentException createException() { IllegalArgumentException result = new IllegalArgumentException(); result.setStackTrace(EMPTY_STACK_TRACE); diff --git a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/EndpointIndexer.java b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/EndpointIndexer.java index ae927ba11f923..e52a18d68850a 100644 --- a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/EndpointIndexer.java +++ b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/EndpointIndexer.java @@ -57,6 +57,7 @@ import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_FORM_PARAM; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_HEADER_PARAM; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_MATRIX_PARAM; +import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_MULTI; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_PATH_PARAM; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_QUERY_PARAM; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_RESPONSE; @@ -990,6 +991,7 @@ private Type getNonAsyncReturnType(Type returnType) { || COMPLETABLE_FUTURE.equals(parameterizedType.name()) || UNI.equals(parameterizedType.name()) || MULTI.equals(parameterizedType.name()) + || REST_MULTI.equals(parameterizedType.name()) || REST_RESPONSE.equals(parameterizedType.name())) { return parameterizedType.arguments().get(0); } diff --git a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java index dd8e13857d185..191449b553310 100644 --- a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java +++ b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java @@ -88,6 +88,7 @@ import org.jboss.resteasy.reactive.RestForm; import org.jboss.resteasy.reactive.RestHeader; import org.jboss.resteasy.reactive.RestMatrix; +import org.jboss.resteasy.reactive.RestMulti; import org.jboss.resteasy.reactive.RestPath; import org.jboss.resteasy.reactive.RestQuery; import org.jboss.resteasy.reactive.RestResponse; @@ -202,6 +203,7 @@ public final class ResteasyReactiveDotNames { public static final DotName UNI = DotName.createSimple(Uni.class.getName()); public static final DotName MULTI = DotName.createSimple(Multi.class.getName()); + public static final DotName REST_MULTI = DotName.createSimple(RestMulti.class.getName()); public static final DotName COMPLETION_STAGE = DotName.createSimple(CompletionStage.class.getName()); public static final DotName COMPLETABLE_FUTURE = DotName.createSimple(CompletableFuture.class.getName()); public static final DotName PUBLISHER = DotName.createSimple(Publisher.class.getName()); diff --git a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/RestMulti.java b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/RestMulti.java new file mode 100644 index 0000000000000..34108bf1db750 --- /dev/null +++ b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/RestMulti.java @@ -0,0 +1,77 @@ +package org.jboss.resteasy.reactive; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import org.jboss.resteasy.reactive.common.util.CaseInsensitiveMap; +import org.jboss.resteasy.reactive.common.util.MultivaluedTreeMap; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.smallrye.mutiny.operators.AbstractMulti; +import io.smallrye.mutiny.subscription.MultiSubscriber; + +/** + * A wrapper around {@link Multi} that gives resource methods a way to specify the HTTP status code and HTTP headers + * when streaming a result. + */ +public class RestMulti extends AbstractMulti { + + private final Multi multi; + private final Integer status; + private final MultivaluedTreeMap headers; + + public static RestMulti.Builder from(Multi multi) { + return new RestMulti.Builder<>(multi); + } + + private RestMulti(Builder builder) { + this.multi = builder.multi; + this.status = builder.status; + this.headers = builder.headers; + } + + @Override + public void subscribe(MultiSubscriber subscriber) { + multi.subscribe(Infrastructure.onMultiSubscription(multi, subscriber)); + } + + public Integer getStatus() { + return status; + } + + public Map> getHeaders() { + return headers; + } + + public static class Builder { + private final Multi multi; + + private Integer status; + + private final MultivaluedTreeMap headers = new CaseInsensitiveMap<>(); + + private Builder(Multi multi) { + this.multi = Objects.requireNonNull(multi, "multi cannot be null"); + } + + public Builder status(int status) { + this.status = status; + return this; + } + + public Builder header(String name, String value) { + if (value == null) { + headers.remove(name); + return this; + } + headers.add(name, value); + return this; + } + + public RestMulti build() { + return new RestMulti<>(this); + } + } +} diff --git a/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java b/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java index 5cf0e9ce56ad4..53393c76317b6 100644 --- a/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java +++ b/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java @@ -5,6 +5,7 @@ import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.LEGACY_PUBLISHER; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.MULTI; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.PUBLISHER; +import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_MULTI; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.UNI; import java.util.Collections; @@ -33,7 +34,8 @@ public List scan(MethodInfo method, ClassInfo actualEndp return Collections.singletonList(new FixedHandlerChainCustomizer(new UniResponseHandler(), HandlerChainCustomizer.Phase.AFTER_METHOD_INVOKE)); } - if (returnTypeName.equals(MULTI) || returnTypeName.equals(PUBLISHER) || returnTypeName.equals(LEGACY_PUBLISHER)) { + if (returnTypeName.equals(MULTI) || returnTypeName.equals(REST_MULTI) || returnTypeName.equals(PUBLISHER) + || returnTypeName.equals(LEGACY_PUBLISHER)) { return Collections.singletonList(new FixedHandlerChainCustomizer(new PublisherResponseHandler(), HandlerChainCustomizer.Phase.AFTER_METHOD_INVOKE)); } @@ -44,7 +46,7 @@ public List scan(MethodInfo method, ClassInfo actualEndp public boolean isMethodSignatureAsync(MethodInfo method) { DotName returnTypeName = method.returnType().name(); return returnTypeName.equals(COMPLETION_STAGE) || returnTypeName.equals(COMPLETABLE_FUTURE) || - returnTypeName.equals(UNI) || returnTypeName.equals(MULTI) || returnTypeName.equals(PUBLISHER) || - returnTypeName.equals(LEGACY_PUBLISHER); + returnTypeName.equals(UNI) || returnTypeName.equals(MULTI) || returnTypeName.equals(REST_MULTI) || + returnTypeName.equals(PUBLISHER) || returnTypeName.equals(LEGACY_PUBLISHER); } } diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/SseUtil.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/SseUtil.java index edef2f560405d..c6f1aed6baf71 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/SseUtil.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/SseUtil.java @@ -165,8 +165,6 @@ public static void setHeaders(ResteasyReactiveRequestContext context, ServerHttp for (int i = 0; i < customizers.size(); i++) { customizers.get(i).customize(response); } - // FIXME: other headers? - } } } diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/StreamingUtil.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/StreamingUtil.java index 8140b79ca2f39..b32281d6833d7 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/StreamingUtil.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/StreamingUtil.java @@ -88,7 +88,6 @@ public static void setHeaders(ResteasyReactiveRequestContext context, ServerHttp for (int i = 0; i < customizers.size(); i++) { customizers.get(i).customize(response); } - // FIXME: other headers? } } } diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java index d10f6fcfdaecd..98aefaf1c2f9b 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java @@ -3,6 +3,7 @@ import static org.jboss.resteasy.reactive.server.jaxrs.SseEventSinkImpl.EMPTY_BUFFER; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -17,6 +18,7 @@ import jakarta.ws.rs.sse.OutboundSseEvent; import org.jboss.logging.Logger; +import org.jboss.resteasy.reactive.RestMulti; import org.jboss.resteasy.reactive.common.util.RestMediaType; import org.jboss.resteasy.reactive.common.util.ServerMediaType; import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext; @@ -294,14 +296,37 @@ private boolean requiresChunkedStream(MediaType mediaType) { } private void handleChunkedStreaming(ResteasyReactiveRequestContext requestContext, Publisher result, boolean json) { - result.subscribe(new ChunkedStreamingMultiSubscriber(requestContext, streamingResponseCustomizers, json)); + result.subscribe(new ChunkedStreamingMultiSubscriber(requestContext, determineCustomizers(result), json)); + } + + private List determineCustomizers(Publisher publisher) { + if (publisher instanceof RestMulti) { + RestMulti restMulti = (RestMulti) publisher; + Map> headers = restMulti.getHeaders(); + Integer status = restMulti.getStatus(); + if (headers.isEmpty() && (status == null)) { + return streamingResponseCustomizers; + } + List result = new ArrayList<>(streamingResponseCustomizers.size() + 2); + result.addAll(streamingResponseCustomizers); // these are added first so that the result specific values will take precedence if there are conflicts + if (!headers.isEmpty()) { + result.add(new StreamingResponseCustomizer.AddHeadersCustomizer(headers)); + } + if (status != null) { + result.add(new StreamingResponseCustomizer.StatusCustomizer(status)); + } + return result; + } + + return streamingResponseCustomizers; } private void handleStreaming(ResteasyReactiveRequestContext requestContext, Publisher result, boolean json) { - result.subscribe(new StreamingMultiSubscriber(requestContext, streamingResponseCustomizers, json)); + result.subscribe(new StreamingMultiSubscriber(requestContext, determineCustomizers(result), json)); } private void handleSse(ResteasyReactiveRequestContext requestContext, Publisher result) { + List streamingResponseCustomizers = determineCustomizers(result); SseUtil.setHeaders(requestContext, requestContext.serverResponse(), streamingResponseCustomizers); requestContext.suspend(); requestContext.serverResponse().write(EMPTY_BUFFER, new Consumer() {