From c640504b49e9b134ad235833fc052a5bf3e3f03e Mon Sep 17 00:00:00 2001 From: Georgios Andrianakis Date: Mon, 8 May 2023 19:35:29 +0300 Subject: [PATCH] Introduce an async variant of RestMulti Closes: #26523 --- docs/src/main/asciidoc/resteasy-reactive.adoc | 53 +++- .../test/streams/StreamResource.java | 47 ++- .../test/streams/StreamTestCase.java | 14 +- .../test/headers/ResponseHeaderTest.java | 62 +++- .../test/status/ResponseStatusTest.java | 49 ++- .../jboss/resteasy/reactive/RestMulti.java | 280 +++++++++++++++--- .../handlers/PublisherResponseHandler.java | 80 ++--- 7 files changed, 503 insertions(+), 82 deletions(-) diff --git a/docs/src/main/asciidoc/resteasy-reactive.adoc b/docs/src/main/asciidoc/resteasy-reactive.adoc index 85bc78ead1d7e0..2c468c5113f149 100644 --- a/docs/src/main/asciidoc/resteasy-reactive.adoc +++ b/docs/src/main/asciidoc/resteasy-reactive.adoc @@ -933,8 +933,7 @@ impression that you can set headers or HTTP status codes, which is not true afte response. Exception mappers are also not invoked because part of the response may already have been written. -[TIP] -==== +==== Customizing headers and status If you need to set custom HTTP headers and / or the HTTP response, then you can return `org.jboss.resteasy.reactive.RestMulti` instead, like this: [source,java] @@ -959,11 +958,52 @@ public class Endpoint { @GET public Multi streamLogs() { - return RestMulti.from(logs).status(222).header("foo", "bar").build(); + return RestMulti.fromMultiData(logs).status(222).header("foo", "bar").build(); + } +} +---- + +In more advanced cases where the headers and / or status can only be obtained from the results of an async call, the `RestMulti.fromUniResponse` needs to be used. +Here is an example of such a use case: + +[source,java] +---- +package org.acme.rest; + +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; + +import java.util.List;import java.util.Map;import org.eclipse.microprofile.reactive.messaging.Channel; + +import io.smallrye.mutiny.Multi; +import org.jboss.resteasy.reactive.RestMulti; + +@Path("logs") +public class Endpoint { + + interface SomeService { + Uni get(); + } + + interface SomeResponse { + Multi data; + + String myHeader(); + } + + private final SomeService someService; + + public Endpoint(SomeService someService) { + this.someService = someService; + } + + @GET + public Multi streamLogs() { + return RestMulti.fromUniResponse(someService.get(), SomeResponse::data, (r -> Map.of("MyHeader", List.of(r.myHeader())))); } } ---- -==== === Server-Sent Event (SSE) support @@ -1054,6 +1094,11 @@ public class Endpoint { <3> Set the event name, i.e. the value of the `event` field of a SSE message. <4> Set the data, i.e. the value of the `data` field of a SSE message. +[WARNING] +==== +Manipulation of the returned HTTP headers and status code is not possible via `RestMulti.fromUniResponse` because when returning SSE responses the headers and status code cannot be delayed until the response becomes available. +==== + === Controlling HTTP Caching features RESTEasy Reactive provides the link:{resteasy-reactive-common-api}/org/jboss/resteasy/reactive/Cache.html[`@Cache`] diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamResource.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamResource.java index facd204287c1bf..dfc905597b3384 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamResource.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamResource.java @@ -1,8 +1,10 @@ package io.quarkus.resteasy.reactive.jackson.deployment.test.streams; import java.io.IOException; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.UUID; import jakarta.ws.rs.GET; @@ -19,6 +21,7 @@ import io.smallrye.common.annotation.Blocking; import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; @Path("streams") public class StreamResource { @@ -98,7 +101,7 @@ public void sseJson2(Sse sse, SseEventSink sink) throws IOException { @GET @RestStreamElementType(MediaType.APPLICATION_JSON) public Multi multiJson() { - return RestMulti.from(Multi.createFrom().items(new Message("hello"), new Message("stef"))) + return RestMulti.fromMultiData(Multi.createFrom().items(new Message("hello"), new Message("stef"))) .header("foo", "bar").build(); } @@ -112,11 +115,24 @@ public Multi multiDefaultElementType() { @Path("ndjson/multi") @GET @Produces(RestMediaType.APPLICATION_NDJSON) - @RestStreamElementType(MediaType.APPLICATION_JSON) public Multi multiNdJson() { return Multi.createFrom().items(new Message("hello"), new Message("stef")); } + @Path("ndjson/multi2") + @GET + @Produces(RestMediaType.APPLICATION_NDJSON) + public Multi multiNdJson2() { + + return RestMulti.fromUniResponse( + Uni.createFrom().item( + () -> new Wrapper(Multi.createFrom().items(new Message("hello"), new Message("stef")), + new AbstractMap.SimpleEntry<>("foo", "bar"), 222)), + Wrapper::getData, + Wrapper::getHeaders, + Wrapper::getStatus); + } + @Path("stream-json/multi") @GET @Produces(RestMediaType.APPLICATION_STREAM_JSON) @@ -137,9 +153,34 @@ public Multi multiStreamJsonFast() { for (int i = 0; i < 5000; i++) { ids.add(UUID.randomUUID()); } - return RestMulti.from(Multi.createFrom().items(ids::stream) + return RestMulti.fromMultiData(Multi.createFrom().items(ids::stream) .onItem().transform(id -> new Message(id.toString())) .onOverflow().buffer(81920)).header("foo", "bar").build(); } + private static final class Wrapper { + public final Multi data; + + public final Map> headers; + private final Integer status; + + public Wrapper(Multi data, Map.Entry header, Integer status) { + this.data = data; + this.status = status; + this.headers = Map.of(header.getKey(), List.of(header.getValue())); + } + + public Multi getData() { + return data; + } + + public Map> getHeaders() { + return headers; + } + + public Integer getStatus() { + return status; + } + } + } diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamTestCase.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamTestCase.java index 7414d57504e3ae..8c7704d24c472f 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamTestCase.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/streams/StreamTestCase.java @@ -128,6 +128,18 @@ public void testNdJsonMultiFromMulti() { .header(HttpHeaders.CONTENT_TYPE, containsString(RestMediaType.APPLICATION_NDJSON)); } + @Test + public void testNdJsonMultiFromMulti2() { + when().get(uri.toString() + "streams/ndjson/multi2") + .then().statusCode(222) + // @formatter:off + .body(is("{\"name\":\"hello\"}\n" + + "{\"name\":\"stef\"}\n")) + // @formatter:on + .header(HttpHeaders.CONTENT_TYPE, containsString(RestMediaType.APPLICATION_NDJSON)) + .header("foo", "bar"); + } + @Test public void testStreamJsonMultiFromMulti() { when().get(uri.toString() + "streams/stream-json/multi") @@ -141,7 +153,7 @@ public void testStreamJsonMultiFromMulti() { private void testJsonMulti(String path) { Client client = ClientBuilder.newBuilder().register(new JacksonBasicMessageBodyReader(new ObjectMapper())).build(); - ; + WebTarget target = client.target(uri.toString() + path); Multi multi = target.request().rx(MultiInvoker.class).get(Message.class); List list = multi.collect().asList().await().atMost(Duration.ofSeconds(30)); diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/headers/ResponseHeaderTest.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/headers/ResponseHeaderTest.java index 41e19c703830de..6b9774f5c16716 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/headers/ResponseHeaderTest.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/headers/ResponseHeaderTest.java @@ -1,8 +1,10 @@ package io.quarkus.resteasy.reactive.server.test.headers; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertFalse; +import java.nio.charset.StandardCharsets; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -10,6 +12,7 @@ import jakarta.ws.rs.DefaultValue; import jakarta.ws.rs.GET; import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; import org.jboss.resteasy.reactive.ResponseHeader; import org.jboss.resteasy.reactive.ResponseStatus; @@ -155,6 +158,27 @@ public void testReturnRestMulti2() { "Keep-Alive", "dummy")); } + @Test + public void testReturnRestMulti3() { + RestAssured + .given() + .get("/test/rest-multi3") + .then() + .statusCode(200) + .headers(Map.of( + "header1", "foo", + "header2", "bar")); + + RestAssured + .given() + .get("/test/rest-multi3?h1=h1&h2=h2") + .then() + .statusCode(200) + .headers(Map.of( + "header1", "h1", + "header2", "h2")); + } + @Path("/test") public static class TestResource { @@ -232,21 +256,53 @@ public String throwExceptionPlain() { @GET @Path("/rest-multi") public RestMulti getTestRestMulti() { - return RestMulti.from(Multi.createFrom().item("test")).header("Access-Control-Allow-Origin", "foo") + return RestMulti.fromMultiData(Multi.createFrom().item("test")).header("Access-Control-Allow-Origin", "foo") .header("Keep-Alive", "bar").build(); } @GET @Path("/rest-multi2") public RestMulti getTestRestMulti2(@DefaultValue("bar") @RestQuery String keepAlive) { - return RestMulti.from(Multi.createFrom().item("test")).header("Access-Control-Allow-Origin", "foo") + return RestMulti.fromMultiData(Multi.createFrom().item("test")).header("Access-Control-Allow-Origin", "foo") .header("Keep-Alive", keepAlive).build(); } + @GET + @Path("/rest-multi3") + @Produces("application/octet-stream") + public RestMulti getTestRestMulti3(@DefaultValue("foo") @RestQuery("h1") String header1, + @DefaultValue("bar") @RestQuery("h2") String header2) { + return RestMulti.fromUniResponse(getWrapper(header1, header2), Wrapper::getData, Wrapper::getHeaders); + } + private IllegalArgumentException createException() { IllegalArgumentException result = new IllegalArgumentException(); result.setStackTrace(EMPTY_STACK_TRACE); return result; } + + private Uni getWrapper(String header1, String header2) { + return Uni.createFrom().item( + () -> new Wrapper(Multi.createFrom().item("test".getBytes(StandardCharsets.UTF_8)), header1, header2)); + } + + private static final class Wrapper { + public final Multi data; + + public final Map> headers; + + public Wrapper(Multi data, String header1, String header2) { + this.data = data; + this.headers = Map.of("header1", List.of(header1), "header2", List.of(header2)); + } + + public Multi getData() { + return data; + } + + public Map> getHeaders() { + return headers; + } + } } } diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/status/ResponseStatusTest.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/status/ResponseStatusTest.java index 6321d4a7598bb1..d41dbd0aea057a 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/status/ResponseStatusTest.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/status/ResponseStatusTest.java @@ -3,11 +3,13 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import jakarta.ws.rs.DefaultValue; import jakarta.ws.rs.GET; import jakarta.ws.rs.Path; import org.jboss.resteasy.reactive.ResponseStatus; import org.jboss.resteasy.reactive.RestMulti; +import org.jboss.resteasy.reactive.RestQuery; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -112,6 +114,36 @@ public void testReturnRestMulti2() { .statusCode(211); } + @Test + public void testReturnRestMulti3() { + RestAssured + .given() + .get("/test/rest-multi3") + .then() + .statusCode(200); + + RestAssured + .given() + .get("/test/rest-multi3?status=212") + .then() + .statusCode(212); + } + + @Test + public void testReturnRestMulti4() { + RestAssured + .given() + .get("/test/rest-multi4") + .then() + .statusCode(200); + + RestAssured + .given() + .get("/test/rest-multi4") + .then() + .statusCode(200); + } + @Path("/test") public static class TestResource { @@ -177,13 +209,26 @@ public String throwExceptionPlain() { @GET @Path("/rest-multi") public RestMulti getTestRestMulti() { - return RestMulti.from(Multi.createFrom().item("test")).status(210).build(); + return RestMulti.fromMultiData(Multi.createFrom().item("test")).status(210).build(); } @GET @Path("/rest-multi2") public RestMulti getTestRestMulti2() { - return RestMulti.from(Multi.createFrom().item("test")).status(211).build(); + return RestMulti.fromMultiData(Multi.createFrom().item("test")).status(211).build(); + } + + @GET + @Path("/rest-multi3") + public RestMulti getTestRestMulti3(@DefaultValue("200") @RestQuery Integer status) { + return RestMulti.fromUniResponse(Uni.createFrom().item("unused"), s -> Multi.createFrom().item("test"), null, + s -> status); + } + + @GET + @Path("/rest-multi4") + public RestMulti getTestRestMulti4() { + return RestMulti.fromUniResponse(Uni.createFrom().item("unused"), s -> Multi.createFrom().item("test")); } private IllegalArgumentException createException() { diff --git a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/RestMulti.java b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/RestMulti.java index 34108bf1db7505..b9fc7ed4228c0c 100644 --- a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/RestMulti.java +++ b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/RestMulti.java @@ -1,77 +1,291 @@ package org.jboss.resteasy.reactive; +import static io.smallrye.mutiny.helpers.ParameterValidation.MAPPER_RETURNED_NULL; +import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull; + +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import org.jboss.resteasy.reactive.common.util.CaseInsensitiveMap; import org.jboss.resteasy.reactive.common.util.MultivaluedTreeMap; +import io.smallrye.mutiny.Context; import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.helpers.EmptyUniSubscription; +import io.smallrye.mutiny.helpers.Subscriptions; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.operators.AbstractMulti; +import io.smallrye.mutiny.operators.AbstractUni; +import io.smallrye.mutiny.subscription.ContextSupport; import io.smallrye.mutiny.subscription.MultiSubscriber; +import io.smallrye.mutiny.subscription.UniSubscriber; +import io.smallrye.mutiny.subscription.UniSubscription; /** * A wrapper around {@link Multi} that gives resource methods a way to specify the HTTP status code and HTTP headers * when streaming a result. */ -public class RestMulti extends AbstractMulti { +@SuppressWarnings("ReactiveStreamsUnusedPublisher") +public abstract class RestMulti extends AbstractMulti { - private final Multi multi; - private final Integer status; - private final MultivaluedTreeMap headers; + public abstract Integer getStatus(); - public static RestMulti.Builder from(Multi multi) { - return new RestMulti.Builder<>(multi); - } + public abstract Map> getHeaders(); - private RestMulti(Builder builder) { - this.multi = builder.multi; - this.status = builder.status; - this.headers = builder.headers; + public static RestMulti.SyncRestMulti.Builder fromMultiData(Multi multi) { + return new RestMulti.SyncRestMulti.Builder<>(multi); } - @Override - public void subscribe(MultiSubscriber subscriber) { - multi.subscribe(Infrastructure.onMultiSubscription(multi, subscriber)); + public static RestMulti fromUniResponse(Uni uni, + Function> dataExtractor) { + return fromUniResponse(uni, dataExtractor, null, null); } - public Integer getStatus() { - return status; + public static RestMulti fromUniResponse(Uni uni, + Function> dataExtractor, + Function>> headersExtractor) { + return fromUniResponse(uni, dataExtractor, headersExtractor, null); } - public Map> getHeaders() { - return headers; + public static RestMulti fromUniResponse(Uni uni, + Function> dataExtractor, + Function>> headersExtractor, + Function statusExtractor) { + Function> actualDataExtractor = Infrastructure + .decorate(nonNull(dataExtractor, "dataExtractor")); + return (RestMulti) Infrastructure.onMultiCreation(new AsyncRestMulti<>(uni, actualDataExtractor, + headersExtractor, statusExtractor)); } - public static class Builder { + public static class SyncRestMulti extends RestMulti { + private final Multi multi; + private final Integer status; + private final MultivaluedTreeMap headers; - private Integer status; + @Override + public void subscribe(MultiSubscriber subscriber) { + multi.subscribe(Infrastructure.onMultiSubscription(multi, subscriber)); + } - private final MultivaluedTreeMap headers = new CaseInsensitiveMap<>(); + private SyncRestMulti(Builder builder) { + this.multi = builder.multi; + this.status = builder.status; + this.headers = builder.headers; + } - private Builder(Multi multi) { - this.multi = Objects.requireNonNull(multi, "multi cannot be null"); + @Override + public Integer getStatus() { + return status; } - public Builder status(int status) { - this.status = status; - return this; + @Override + public Map> getHeaders() { + return headers; } - public Builder header(String name, String value) { - if (value == null) { - headers.remove(name); + public static class Builder { + private final Multi multi; + + private Integer status; + + private final MultivaluedTreeMap headers = new CaseInsensitiveMap<>(); + + private Builder(Multi multi) { + this.multi = Objects.requireNonNull(multi, "multi cannot be null"); + } + + public Builder status(int status) { + this.status = status; return this; } - headers.add(name, value); - return this; + + public Builder header(String name, String value) { + if (value == null) { + headers.remove(name); + return this; + } + headers.add(name, value); + return this; + } + + public RestMulti build() { + return new SyncRestMulti<>(this); + } + } + } + + // Copied from: io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni while adding header and status extraction + public static class AsyncRestMulti extends RestMulti { + + private final Function> dataExtractor; + private final Function statusExtractor; + private final Function>> headersExtractor; + private final AtomicReference status; + private final AtomicReference>> headers; + private final Uni upstream; + + public AsyncRestMulti(Uni upstream, + Function> dataExtractor, + Function>> headersExtractor, + Function statusExtractor) { + this.upstream = upstream; + this.dataExtractor = dataExtractor; + this.statusExtractor = statusExtractor; + this.headersExtractor = headersExtractor; + this.status = new AtomicReference<>(null); + this.headers = new AtomicReference<>(Collections.emptyMap()); + } + + @Override + public void subscribe(MultiSubscriber subscriber) { + if (subscriber == null) { + throw new NullPointerException("The subscriber must not be `null`"); + } + AbstractUni.subscribe(upstream, new FlatMapPublisherSubscriber<>(subscriber, dataExtractor, statusExtractor, status, + headersExtractor, headers)); + } + + @Override + public Integer getStatus() { + return status.get(); + } + + @Override + public Map> getHeaders() { + return headers.get(); } - public RestMulti build() { - return new RestMulti<>(this); + static final class FlatMapPublisherSubscriber + implements Flow.Subscriber, UniSubscriber, Flow.Subscription, ContextSupport { + + private final AtomicReference secondUpstream; + private final AtomicReference firstUpstream; + private final Flow.Subscriber downstream; + private final Function> dataExtractor; + private final Function statusExtractor; + private final AtomicReference status; + private final Function>> headersExtractor; + private final AtomicReference>> headers; + private final AtomicLong requested = new AtomicLong(); + + public FlatMapPublisherSubscriber(Flow.Subscriber downstream, + Function> dataExtractor, + Function statusExtractor, + AtomicReference status, + Function>> headersExtractor, + AtomicReference>> headers) { + this.downstream = downstream; + this.dataExtractor = dataExtractor; + this.statusExtractor = statusExtractor; + this.status = status; + this.headersExtractor = headersExtractor; + this.headers = headers; + this.firstUpstream = new AtomicReference<>(); + this.secondUpstream = new AtomicReference<>(); + } + + @Override + public void onNext(O item) { + downstream.onNext(item); + } + + @Override + public void onError(Throwable failure) { + downstream.onError(failure); + } + + @Override + public void onComplete() { + downstream.onComplete(); + } + + @Override + public void request(long n) { + Subscriptions.requestIfNotNullOrAccumulate(secondUpstream, requested, n); + } + + @Override + public void cancel() { + UniSubscription subscription = firstUpstream.getAndSet(EmptyUniSubscription.CANCELLED); + if (subscription != null && subscription != EmptyUniSubscription.CANCELLED) { + subscription.cancel(); + } + Subscriptions.cancel(secondUpstream); + } + + @Override + public Context context() { + if (downstream instanceof ContextSupport) { + return ((ContextSupport) downstream).context(); + } else { + return Context.empty(); + } + } + + /** + * Called when we get the subscription from the upstream UNI + * + * @param subscription the subscription allowing to cancel the computation. + */ + @Override + public void onSubscribe(UniSubscription subscription) { + if (firstUpstream.compareAndSet(null, subscription)) { + downstream.onSubscribe(this); + } + } + + /** + * Called after we produced the {@link Flow.Publisher} and subscribe on it. + * + * @param subscription the subscription from the produced {@link Flow.Publisher} + */ + @Override + public void onSubscribe(Flow.Subscription subscription) { + if (secondUpstream.compareAndSet(null, subscription)) { + long r = requested.getAndSet(0L); + if (r != 0L) { + subscription.request(r); + } + } + } + + @Override + public void onItem(I item) { + Multi publisher; + + try { + publisher = dataExtractor.apply(item); + if (publisher == null) { + throw new NullPointerException(MAPPER_RETURNED_NULL); + } + if (headersExtractor != null) { + headers.set(headersExtractor.apply(item)); + } + if (statusExtractor != null) { + status.set(statusExtractor.apply(item)); + } + } catch (Throwable ex) { + downstream.onError(ex); + return; + } + + publisher.subscribe(this); + } + + @Override + public void onFailure(Throwable failure) { + downstream.onError(failure); + } } + } + } diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java index 98aefaf1c2f9ba..1791a388da2751 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java @@ -48,8 +48,8 @@ public void setStreamingResponseCustomizers(List st private static class SseMultiSubscriber extends AbstractMultiSubscriber { - SseMultiSubscriber(ResteasyReactiveRequestContext requestContext, List customizers) { - super(requestContext, customizers); + SseMultiSubscriber(ResteasyReactiveRequestContext requestContext, List staticCustomizers) { + super(requestContext, staticCustomizers); } @Override @@ -60,7 +60,7 @@ public void onNext(Object item) { } else { event = new OutboundSseEventImpl.BuilderImpl().data(item).build(); } - SseUtil.send(requestContext, event, customizers).whenComplete(new BiConsumer() { + SseUtil.send(requestContext, event, staticCustomizers).whenComplete(new BiConsumer() { @Override public void accept(Object v, Throwable t) { if (t != null) { @@ -83,8 +83,8 @@ private static class ChunkedStreamingMultiSubscriber extends StreamingMultiSubsc private boolean isFirstItem = true; ChunkedStreamingMultiSubscriber(ResteasyReactiveRequestContext requestContext, - List customizers, boolean json) { - super(requestContext, customizers, json); + List staticCustomizers, Publisher publisher, boolean json) { + super(requestContext, staticCustomizers, publisher, json); } @Override @@ -112,9 +112,13 @@ private static class StreamingMultiSubscriber extends AbstractMultiSubscriber { private String nextJsonPrefix; private boolean hadItem; - StreamingMultiSubscriber(ResteasyReactiveRequestContext requestContext, List customizers, + private final Publisher publisher; + + StreamingMultiSubscriber(ResteasyReactiveRequestContext requestContext, + List staticCustomizers, Publisher publisher, boolean json) { - super(requestContext, customizers); + super(requestContext, staticCustomizers); + this.publisher = publisher; this.json = json; this.nextJsonPrefix = "["; this.hadItem = false; @@ -122,6 +126,7 @@ private static class StreamingMultiSubscriber extends AbstractMultiSubscriber { @Override public void onNext(Object item) { + List customizers = determineCustomizers(!hadItem); hadItem = true; StreamingUtil.send(requestContext, customizers, item, messagePrefix()) .handle(new BiFunction() { @@ -146,10 +151,34 @@ public Object apply(Object v, Throwable t) { }); } + private List determineCustomizers(boolean isFirst) { + // we only need to obtain the customizers from the Publisher if it's the first time we are sending data and the Publisher has customizable data + // at this point no matter the type of RestMulti we can safely obtain the headers and status + if (isFirst && (publisher instanceof RestMulti)) { + RestMulti restMulti = (RestMulti) publisher; + Map> headers = restMulti.getHeaders(); + Integer status = restMulti.getStatus(); + if (headers.isEmpty() && (status == null)) { + return staticCustomizers; + } + List result = new ArrayList<>(staticCustomizers.size() + 2); + result.addAll(staticCustomizers); // these are added first so that the result specific values will take precedence if there are conflicts + if (!headers.isEmpty()) { + result.add(new StreamingResponseCustomizer.AddHeadersCustomizer(headers)); + } + if (status != null) { + result.add(new StreamingResponseCustomizer.StatusCustomizer(status)); + } + return result; + } + + return staticCustomizers; + } + @Override public void onComplete() { if (!hadItem) { - StreamingUtil.setHeaders(requestContext, requestContext.serverResponse(), customizers); + StreamingUtil.setHeaders(requestContext, requestContext.serverResponse(), staticCustomizers); } if (json) { String postfix = onCompleteText(); @@ -161,6 +190,7 @@ public void onComplete() { } else { super.onComplete(); } + } protected String onCompleteText() { @@ -184,12 +214,13 @@ protected String messagePrefix() { static abstract class AbstractMultiSubscriber implements Subscriber { protected Subscription subscription; protected ResteasyReactiveRequestContext requestContext; - protected List customizers; + protected List staticCustomizers; private boolean weClosed = false; - AbstractMultiSubscriber(ResteasyReactiveRequestContext requestContext, List customizers) { + AbstractMultiSubscriber(ResteasyReactiveRequestContext requestContext, + List staticCustomizers) { this.requestContext = requestContext; - this.customizers = customizers; + this.staticCustomizers = staticCustomizers; // let's make sure we never restart by accident, also make sure we're not marked as completed requestContext.restart(AWOL, true); requestContext.serverResponse().addCloseHandler(() -> { @@ -296,37 +327,14 @@ private boolean requiresChunkedStream(MediaType mediaType) { } private void handleChunkedStreaming(ResteasyReactiveRequestContext requestContext, Publisher result, boolean json) { - result.subscribe(new ChunkedStreamingMultiSubscriber(requestContext, determineCustomizers(result), json)); - } - - private List determineCustomizers(Publisher publisher) { - if (publisher instanceof RestMulti) { - RestMulti restMulti = (RestMulti) publisher; - Map> headers = restMulti.getHeaders(); - Integer status = restMulti.getStatus(); - if (headers.isEmpty() && (status == null)) { - return streamingResponseCustomizers; - } - List result = new ArrayList<>(streamingResponseCustomizers.size() + 2); - result.addAll(streamingResponseCustomizers); // these are added first so that the result specific values will take precedence if there are conflicts - if (!headers.isEmpty()) { - result.add(new StreamingResponseCustomizer.AddHeadersCustomizer(headers)); - } - if (status != null) { - result.add(new StreamingResponseCustomizer.StatusCustomizer(status)); - } - return result; - } - - return streamingResponseCustomizers; + result.subscribe(new ChunkedStreamingMultiSubscriber(requestContext, streamingResponseCustomizers, result, json)); } private void handleStreaming(ResteasyReactiveRequestContext requestContext, Publisher result, boolean json) { - result.subscribe(new StreamingMultiSubscriber(requestContext, determineCustomizers(result), json)); + result.subscribe(new StreamingMultiSubscriber(requestContext, streamingResponseCustomizers, result, json)); } private void handleSse(ResteasyReactiveRequestContext requestContext, Publisher result) { - List streamingResponseCustomizers = determineCustomizers(result); SseUtil.setHeaders(requestContext, requestContext.serverResponse(), streamingResponseCustomizers); requestContext.suspend(); requestContext.serverResponse().write(EMPTY_BUFFER, new Consumer() {