From 6648510d38f0dcb728836292157ae94aa5c79780 Mon Sep 17 00:00:00 2001 From: Jose Date: Thu, 20 Jan 2022 07:41:50 +0100 Subject: [PATCH] Add JSON streaming for RESTEasy Reactive Jsonb and Jackson We now support json streaming. Example: ``` @Path("stream-json/multi") @GET @Produces(SseMediaType.APPLICATION_STREAM_JSON) @RestSseElementType(MediaType.APPLICATION_JSON) public Multi multiStreamJson() { return Multi.createFrom().items(new Message("hello"), new Message("stef")); } ``` We support "application/x-ndjson" and "application/stream+json". Moreover, I've added a new utility class called `SseMediaType` to be used by users. The implementation is similar to `MediaType`. --- .../deployment/pom.xml | 10 ++ .../ResteasyReactiveJacksonProcessor.java | 10 +- .../jackson/deployment/test/sse/Message.java | 13 ++ .../deployment/test/sse/SseResource.java | 125 +++++++++++++++ .../deployment/test/sse/SseTestCase.java | 146 ++++++++++++++++++ .../ResteasyReactiveJsonbProcessor.java | 3 +- .../deployment/test/sse/SseResource.java | 17 ++ .../deployment/test/sse/SseTestCase.java | 28 ++++ .../reactive/common/util/RestMediaType.java | 18 +++ .../handlers/PublisherResponseHandler.java | 87 +++++++++-- 10 files changed, 439 insertions(+), 18 deletions(-) create mode 100644 extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/Message.java create mode 100644 extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/SseResource.java create mode 100644 extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/SseTestCase.java create mode 100644 independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/util/RestMediaType.java diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/pom.xml b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/pom.xml index d89c9b3b3e4c2..87da39c8b69fc 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/pom.xml +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/pom.xml @@ -40,6 +40,16 @@ quarkus-hibernate-validator-deployment test + + org.assertj + assertj-core + test + + + io.quarkus + quarkus-jaxrs-client-reactive-deployment + test + diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/main/java/io/quarkus/resteasy/reactive/jackson/deployment/processor/ResteasyReactiveJacksonProcessor.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/main/java/io/quarkus/resteasy/reactive/jackson/deployment/processor/ResteasyReactiveJacksonProcessor.java index 2df5d371d6ccd..4aeaac1a2d00e 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/main/java/io/quarkus/resteasy/reactive/jackson/deployment/processor/ResteasyReactiveJacksonProcessor.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/main/java/io/quarkus/resteasy/reactive/jackson/deployment/processor/ResteasyReactiveJacksonProcessor.java @@ -23,6 +23,7 @@ import org.jboss.logging.Logger; import org.jboss.resteasy.reactive.common.model.ResourceMethod; import org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames; +import org.jboss.resteasy.reactive.common.util.RestMediaType; import org.jboss.resteasy.reactive.server.util.MethodId; import com.fasterxml.jackson.annotation.JsonView; @@ -137,15 +138,18 @@ void additionalProviders(List jacksonFeatureBuildItems, additionalWriters .produce(new MessageBodyWriterBuildItem(getJacksonMessageBodyWriter(applicationNeedsSpecialJacksonFeatures), Object.class.getName(), - Collections.singletonList(MediaType.APPLICATION_JSON))); + List.of(MediaType.APPLICATION_JSON, RestMediaType.APPLICATION_NDJSON, + RestMediaType.APPLICATION_STREAM_JSON))); additionalWriters .produce(new MessageBodyWriterBuildItem(VertxJsonArrayMessageBodyWriter.class.getName(), JsonArray.class.getName(), - Collections.singletonList(MediaType.APPLICATION_JSON))); + List.of(MediaType.APPLICATION_JSON, RestMediaType.APPLICATION_NDJSON, + RestMediaType.APPLICATION_STREAM_JSON))); additionalWriters .produce(new MessageBodyWriterBuildItem(VertxJsonObjectMessageBodyWriter.class.getName(), JsonObject.class.getName(), - Collections.singletonList(MediaType.APPLICATION_JSON))); + List.of(MediaType.APPLICATION_JSON, RestMediaType.APPLICATION_NDJSON, + RestMediaType.APPLICATION_STREAM_JSON))); } private String getJacksonMessageBodyWriter(boolean applicationNeedsSpecialJacksonFeatures) { diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/Message.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/Message.java new file mode 100644 index 0000000000000..c09c92acb9672 --- /dev/null +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/Message.java @@ -0,0 +1,13 @@ +package io.quarkus.resteasy.reactive.jackson.deployment.test.sse; + +public class Message { + public String name; + + public Message(String name) { + this.name = name; + } + + // for Jsonb + public Message() { + } +} diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/SseResource.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/SseResource.java new file mode 100644 index 0000000000000..f1622e35051a3 --- /dev/null +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/SseResource.java @@ -0,0 +1,125 @@ +package io.quarkus.resteasy.reactive.jackson.deployment.test.sse; + +import java.io.IOException; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.sse.Sse; +import javax.ws.rs.sse.SseBroadcaster; +import javax.ws.rs.sse.SseEventSink; + +import org.jboss.resteasy.reactive.RestSseElementType; +import org.jboss.resteasy.reactive.common.util.RestMediaType; + +import io.smallrye.common.annotation.Blocking; +import io.smallrye.mutiny.Multi; + +@Path("sse") +public class SseResource { + + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + public void sse(Sse sse, SseEventSink sink) { + if (sink == null) { + throw new IllegalStateException("No client connected."); + } + SseBroadcaster sseBroadcaster = sse.newBroadcaster(); + + sseBroadcaster.register(sink); + sseBroadcaster.broadcast(sse.newEventBuilder().data("hello").build()) + .thenCompose(v -> sseBroadcaster.broadcast(sse.newEventBuilder().data("stef").build())) + .thenAccept(v -> sseBroadcaster.close()); + } + + @Path("multi") + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + public Multi multiText() { + return Multi.createFrom().items("hello", "stef"); + } + + @Path("json") + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + @RestSseElementType(MediaType.APPLICATION_JSON) + public void sseJson(Sse sse, SseEventSink sink) throws IOException { + if (sink == null) { + throw new IllegalStateException("No client connected."); + } + SseBroadcaster sseBroadcaster = sse.newBroadcaster(); + + sseBroadcaster.register(sink); + sseBroadcaster.broadcast(sse.newEventBuilder().data(new Message("hello")).build()) + .thenCompose(v -> sseBroadcaster.broadcast(sse.newEventBuilder().data(new Message("stef")).build())) + .thenAccept(v -> sseBroadcaster.close()); + } + + @Blocking + @Path("blocking/json") + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + @RestSseElementType(MediaType.APPLICATION_JSON) + public void blockingSseJson(Sse sse, SseEventSink sink) throws IOException { + if (sink == null) { + throw new IllegalStateException("No client connected."); + } + SseBroadcaster sseBroadcaster = sse.newBroadcaster(); + + sseBroadcaster.register(sink); + sseBroadcaster.broadcast(sse.newEventBuilder().data(new Message("hello")).build()) + .thenCompose(v -> sseBroadcaster.broadcast(sse.newEventBuilder().data(new Message("stef")).build())) + .thenAccept(v -> sseBroadcaster.close()); + } + + @Path("json2") + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + public void sseJson2(Sse sse, SseEventSink sink) throws IOException { + if (sink == null) { + throw new IllegalStateException("No client connected."); + } + SseBroadcaster sseBroadcaster = sse.newBroadcaster(); + + // Same as sseJson but set mediaType in builder + sseBroadcaster.register(sink); + sseBroadcaster + .broadcast(sse.newEventBuilder().data(new Message("hello")).mediaType(MediaType.APPLICATION_JSON_TYPE).build()) + .thenCompose(v -> sseBroadcaster.broadcast( + sse.newEventBuilder().mediaType(MediaType.APPLICATION_JSON_TYPE).data(new Message("stef")).build())) + .thenAccept(v -> sseBroadcaster.close()); + } + + @Path("json/multi") + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + @RestSseElementType(MediaType.APPLICATION_JSON) + public Multi multiJson() { + return Multi.createFrom().items(new Message("hello"), new Message("stef")); + } + + @Path("json/multi2") + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + public Multi multiDefaultElementType() { + return Multi.createFrom().items(new Message("hello"), new Message("stef")); + } + + @Path("ndjson/multi") + @GET + @Produces(RestMediaType.APPLICATION_NDJSON) + @RestSseElementType(MediaType.APPLICATION_JSON) + public Multi multiNdJson() { + return Multi.createFrom().items(new Message("hello"), new Message("stef")); + } + + @Path("stream-json/multi") + @GET + @Produces(RestMediaType.APPLICATION_STREAM_JSON) + @RestSseElementType(MediaType.APPLICATION_JSON) + public Multi multiStreamJson() { + return Multi.createFrom().items(new Message("hello"), new Message("stef")); + } + +} diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/SseTestCase.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/SseTestCase.java new file mode 100644 index 0000000000000..1f29b42de959c --- /dev/null +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/SseTestCase.java @@ -0,0 +1,146 @@ +package io.quarkus.resteasy.reactive.jackson.deployment.test.sse; + +import static io.restassured.RestAssured.when; +import static org.assertj.core.api.Assertions.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; + +import java.net.URI; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.sse.InboundSseEvent; +import javax.ws.rs.sse.SseEventSource; + +import org.apache.http.HttpStatus; +import org.jboss.resteasy.reactive.client.impl.MultiInvoker; +import org.jboss.resteasy.reactive.common.util.RestMediaType; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.smallrye.mutiny.Multi; + +public class SseTestCase { + + @TestHTTPResource + URI uri; + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) + .addClasses(SseResource.class, Message.class)); + + @Test + public void testSseFromSse() throws Exception { + testSse("sse"); + } + + @Test + public void testSseFromMulti() throws Exception { + testSse("sse/multi"); + } + + private void testSse(String path) throws Exception { + Client client = ClientBuilder.newBuilder().build(); + WebTarget target = client.target(uri.toString() + path); + // do not reconnect + try (SseEventSource eventSource = SseEventSource.target(target).reconnectingEvery(Integer.MAX_VALUE, TimeUnit.SECONDS) + .build()) { + CompletableFuture> res = new CompletableFuture<>(); + List collect = Collections.synchronizedList(new ArrayList<>()); + eventSource.register(new Consumer() { + @Override + public void accept(InboundSseEvent inboundSseEvent) { + collect.add(inboundSseEvent.readData()); + } + }, new Consumer() { + @Override + public void accept(Throwable throwable) { + res.completeExceptionally(throwable); + } + }, () -> { + res.complete(collect); + }); + eventSource.open(); + assertThat(res.get(5, TimeUnit.SECONDS)).containsExactly("hello", "stef"); + } + } + + @Test + public void testMultiFromSse() { + testMulti("sse"); + } + + @Test + public void testMultiFromMulti() { + testMulti("sse/multi"); + } + + private void testMulti(String path) { + Client client = ClientBuilder.newBuilder().build(); + WebTarget target = client.target(uri.toString() + path); + Multi multi = target.request().rx(MultiInvoker.class).get(String.class); + List list = multi.collect().asList().await().atMost(Duration.ofSeconds(30)); + assertThat(list).containsExactly("hello", "stef"); + } + + @Test + public void testJsonMultiFromSse() { + testJsonMulti("sse/json"); + testJsonMulti("sse/json2"); + testJsonMulti("sse/blocking/json"); + } + + @Test + public void testJsonMultiFromMulti() { + testJsonMulti("sse/json/multi"); + } + + @Test + public void testJsonMultiFromMultiWithDefaultElementType() { + testJsonMulti("sse/json/multi2"); + } + + @Test + public void testNdJsonMultiFromMulti() { + when().get(uri.toString() + "sse/ndjson/multi") + .then().statusCode(HttpStatus.SC_OK) + // @formatter:off + .body(is("{\"name\":\"hello\"}/n" + + "{\"name\":\"stef\"}/n")) + // @formatter:on + .header(HttpHeaders.CONTENT_TYPE, containsString(RestMediaType.APPLICATION_NDJSON)); + } + + @Test + public void testStreamJsonMultiFromMulti() { + when().get(uri.toString() + "sse/stream-json/multi") + .then().statusCode(HttpStatus.SC_OK) + // @formatter:off + .body(is("{\"name\":\"hello\"}/n" + + "{\"name\":\"stef\"}/n")) + // @formatter:on + .header(HttpHeaders.CONTENT_TYPE, containsString(RestMediaType.APPLICATION_STREAM_JSON)); + } + + private void testJsonMulti(String path) { + Client client = ClientBuilder.newBuilder().build(); + WebTarget target = client.target(uri.toString() + path); + Multi multi = target.request().rx(MultiInvoker.class).get(Message.class); + List list = multi.collect().asList().await().atMost(Duration.ofSeconds(30)); + assertThat(list).extracting("name").containsExactly("hello", "stef"); + } +} diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/main/java/io/quarkus/resteasy/reactive/jsonb/deployment/processor/ResteasyReactiveJsonbProcessor.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/main/java/io/quarkus/resteasy/reactive/jsonb/deployment/processor/ResteasyReactiveJsonbProcessor.java index 44d18f62cdbc7..d9af2a611b97b 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/main/java/io/quarkus/resteasy/reactive/jsonb/deployment/processor/ResteasyReactiveJsonbProcessor.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/main/java/io/quarkus/resteasy/reactive/jsonb/deployment/processor/ResteasyReactiveJsonbProcessor.java @@ -5,6 +5,7 @@ import javax.ws.rs.core.MediaType; +import org.jboss.resteasy.reactive.common.util.RestMediaType; import org.jboss.resteasy.reactive.server.jsonb.JsonbMessageBodyReader; import org.jboss.resteasy.reactive.server.jsonb.JsonbMessageBodyWriter; @@ -53,7 +54,7 @@ void additionalProviders(BuildProducer additionalBean, additionalReaders.produce(new MessageBodyReaderBuildItem(JsonbMessageBodyReader.class.getName(), Object.class.getName(), Collections.singletonList(MediaType.APPLICATION_JSON))); additionalWriters.produce(new MessageBodyWriterBuildItem(JsonbMessageBodyWriter.class.getName(), Object.class.getName(), - Collections.singletonList(MediaType.APPLICATION_JSON))); + List.of(MediaType.APPLICATION_JSON, RestMediaType.APPLICATION_NDJSON, RestMediaType.APPLICATION_STREAM_JSON))); } @BuildStep diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/sse/SseResource.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/sse/SseResource.java index 3a3d4619a81d0..eff18d0e526d7 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/sse/SseResource.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/sse/SseResource.java @@ -11,6 +11,7 @@ import javax.ws.rs.sse.SseEventSink; import org.jboss.resteasy.reactive.RestSseElementType; +import org.jboss.resteasy.reactive.common.util.RestMediaType; import io.smallrye.common.annotation.Blocking; import io.smallrye.mutiny.Multi; @@ -105,4 +106,20 @@ public Multi multiDefaultElementType() { return Multi.createFrom().items(new Message("hello"), new Message("stef")); } + @Path("ndjson/multi") + @GET + @Produces(RestMediaType.APPLICATION_NDJSON) + @RestSseElementType(MediaType.APPLICATION_JSON) + public Multi multiNdJson() { + return Multi.createFrom().items(new Message("hello"), new Message("stef")); + } + + @Path("stream-json/multi") + @GET + @Produces(RestMediaType.APPLICATION_STREAM_JSON) + @RestSseElementType(MediaType.APPLICATION_JSON) + public Multi multiStreamJson() { + return Multi.createFrom().items(new Message("hello"), new Message("stef")); + } + } diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/sse/SseTestCase.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/sse/SseTestCase.java index 31df8d75355c6..9e00b014c544e 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/sse/SseTestCase.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/sse/SseTestCase.java @@ -1,6 +1,9 @@ package io.quarkus.resteasy.reactive.jsonb.deployment.test.sse; +import static io.restassured.RestAssured.when; import static org.assertj.core.api.Assertions.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; import java.net.URI; import java.time.Duration; @@ -14,10 +17,13 @@ import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.sse.InboundSseEvent; import javax.ws.rs.sse.SseEventSource; +import org.apache.http.HttpStatus; import org.jboss.resteasy.reactive.client.impl.MultiInvoker; +import org.jboss.resteasy.reactive.common.util.RestMediaType; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -106,6 +112,28 @@ public void testJsonMultiFromMultiWithDefaultElementType() { testJsonMulti("sse/json/multi2"); } + @Test + public void testNdJsonMultiFromMulti() { + when().get(uri.toString() + "sse/ndjson/multi") + .then().statusCode(HttpStatus.SC_OK) + // @formatter:off + .body(is("{\"name\":\"hello\"}/n" + + "{\"name\":\"stef\"}/n")) + // @formatter:on + .header(HttpHeaders.CONTENT_TYPE, containsString(RestMediaType.APPLICATION_NDJSON)); + } + + @Test + public void testStreamJsonMultiFromMulti() { + when().get(uri.toString() + "sse/stream-json/multi") + .then().statusCode(HttpStatus.SC_OK) + // @formatter:off + .body(is("{\"name\":\"hello\"}/n" + + "{\"name\":\"stef\"}/n")) + // @formatter:on + .header(HttpHeaders.CONTENT_TYPE, containsString(RestMediaType.APPLICATION_STREAM_JSON)); + } + private void testJsonMulti(String path) { Client client = ClientBuilder.newBuilder().build(); WebTarget target = client.target(uri.toString() + path); diff --git a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/util/RestMediaType.java b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/util/RestMediaType.java new file mode 100644 index 0000000000000..f2a034bb46558 --- /dev/null +++ b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/util/RestMediaType.java @@ -0,0 +1,18 @@ +package org.jboss.resteasy.reactive.common.util; + +import javax.ws.rs.core.MediaType; + +/** + * Extended media types in Resteasy Reactive. + */ +public final class RestMediaType { + + public static final String APPLICATION_NDJSON = "application/x-ndjson"; + public static final MediaType APPLICATION_NDJSON_TYPE = new MediaType("application", "x-ndjson"); + public static final String APPLICATION_STREAM_JSON = "application/stream+json"; + public static final MediaType APPLICATION_STREAM_JSON_TYPE = new MediaType("application", "stream+json"); + + private RestMediaType() { + + } +} diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java index aa474d8ec2cff..43ddb8b91cf78 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java @@ -7,6 +7,7 @@ import java.util.function.BiFunction; import javax.ws.rs.core.MediaType; import org.jboss.logging.Logger; +import org.jboss.resteasy.reactive.common.util.RestMediaType; import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext; import org.jboss.resteasy.reactive.server.core.SseUtil; import org.jboss.resteasy.reactive.server.core.StreamingUtil; @@ -25,6 +26,8 @@ */ public class PublisherResponseHandler implements ServerRestHandler { + private static final String JSON = "json"; + private List streamingResponseCustomizers = Collections.emptyList(); public void setStreamingResponseCustomizers(List streamingResponseCustomizers) { @@ -57,6 +60,35 @@ public Object apply(Object v, Throwable t) { } } + private static class ChunkedStreamingMultiSubscriber extends StreamingMultiSubscriber { + + private static final String LINE_SEPARATOR = "/n"; + + private boolean isFirstItem = true; + + ChunkedStreamingMultiSubscriber(ResteasyReactiveRequestContext requestContext, + List customizers, boolean json) { + super(requestContext, customizers, json); + } + + @Override + protected String messagePrefix() { + // When message is chunked, we don't need to add prefixes at first + if (isFirstItem) { + isFirstItem = false; + return null; + } + + // If it's not the first message, we need to append the messages with end of line delimiter. + return LINE_SEPARATOR; + } + + @Override + protected String onCompleteText() { + return LINE_SEPARATOR; + } + } + private static class StreamingMultiSubscriber extends AbstractMultiSubscriber { // Huge hack to stream valid json @@ -75,7 +107,7 @@ private static class StreamingMultiSubscriber extends AbstractMultiSubscriber { @Override public void onNext(Object item) { hadItem = true; - StreamingUtil.send(requestContext, customizers, item, json ? nextJsonPrefix : null) + StreamingUtil.send(requestContext, customizers, item, messagePrefix()) .handle(new BiFunction() { @Override public Object apply(Object v, Throwable t) { @@ -104,13 +136,7 @@ public void onComplete() { StreamingUtil.setHeaders(requestContext, requestContext.serverResponse(), customizers); } if (json) { - String postfix; - // check if we never sent the open prefix - if (!hadItem) { - postfix = "[]"; - } else { - postfix = "]"; - } + String postfix = onCompleteText(); byte[] postfixBytes = postfix.getBytes(StandardCharsets.US_ASCII); requestContext.serverResponse().write(postfixBytes).handle((v, t) -> { super.onComplete(); @@ -120,6 +146,23 @@ public void onComplete() { super.onComplete(); } } + + protected String onCompleteText() { + String postfix; + // check if we never sent the open prefix + if (!hadItem) { + postfix = "[]"; + } else { + postfix = "]"; + } + + return postfix; + } + + protected String messagePrefix() { + // if it's json, the message prefix starts with `[`. + return json ? nextJsonPrefix : null; + } } static abstract class AbstractMultiSubscriber implements Subscriber { @@ -197,24 +240,40 @@ public void handle(ResteasyReactiveRequestContext requestContext) throws Excepti // media type negotiation and fixed entity writer set up, perhaps it's better than // cancelling the normal route? // or make this SSE produce build-time - MediaType[] mediaTypes = requestContext.getTarget().getProduces().getSortedMediaTypes(); - if (mediaTypes.length != 1) + MediaType[] mediaTypes = requestContext.getTarget().getProduces().getSortedOriginalMediaTypes(); + if (mediaTypes.length != 1) { throw new IllegalStateException( "Negotiation or dynamic media type not supported yet for Multi: please use a single @Produces annotation"); - requestContext.setResponseContentType(mediaTypes[0]); + } + + MediaType mediaType = mediaTypes[0]; + requestContext.setResponseContentType(mediaType); // this is the non-async return type requestContext.setGenericReturnType(requestContext.getTarget().getReturnType()); // we have several possibilities here, but in all we suspend requestContext.suspend(); - if (mediaTypes[0].isCompatible(MediaType.SERVER_SENT_EVENTS_TYPE)) { + if (mediaType.isCompatible(MediaType.SERVER_SENT_EVENTS_TYPE)) { handleSse(requestContext, result); } else { - boolean json = mediaTypes[0].isCompatible(MediaType.APPLICATION_JSON_TYPE); - handleStreaming(requestContext, result, json); + boolean json = mediaType.toString().contains(JSON); + if (requiresChunkedStream(mediaType)) { + handleChunkedStreaming(requestContext, result, json); + } else { + handleStreaming(requestContext, result, json); + } } } } + private boolean requiresChunkedStream(MediaType mediaType) { + return mediaType.isCompatible(RestMediaType.APPLICATION_NDJSON_TYPE) + || mediaType.isCompatible(RestMediaType.APPLICATION_STREAM_JSON_TYPE); + } + + private void handleChunkedStreaming(ResteasyReactiveRequestContext requestContext, Publisher result, boolean json) { + result.subscribe(new ChunkedStreamingMultiSubscriber(requestContext, streamingResponseCustomizers, json)); + } + private void handleStreaming(ResteasyReactiveRequestContext requestContext, Publisher result, boolean json) { result.subscribe(new StreamingMultiSubscriber(requestContext, streamingResponseCustomizers, json)); }