diff --git a/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/Methods.java b/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/Methods.java index b122c6ea278ca..778f8f15b382c 100644 --- a/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/Methods.java +++ b/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/Methods.java @@ -27,6 +27,7 @@ import io.quarkus.gizmo.MethodDescriptor; import io.quarkus.gizmo.ResultHandle; import io.quarkus.vertx.web.runtime.MultiJsonArraySupport; +import io.quarkus.vertx.web.runtime.MultiNdjsonSupport; import io.quarkus.vertx.web.runtime.MultiSseSupport; import io.quarkus.vertx.web.runtime.MultiSupport; import io.quarkus.vertx.web.runtime.RouteHandler; @@ -114,6 +115,15 @@ class Methods { "subscribeObject", Void.TYPE, Multi.class, RoutingContext.class); + static final MethodDescriptor IS_NDJSON = MethodDescriptor.ofMethod(MultiNdjsonSupport.class, "isNdjson", Boolean.TYPE, + Multi.class); + static final MethodDescriptor MULTI_NDJSON_SUBSCRIBE_STRING = MethodDescriptor.ofMethod(MultiNdjsonSupport.class, + "subscribeString", + Void.TYPE, Multi.class, RoutingContext.class); + static final MethodDescriptor MULTI_NDJSON_SUBSCRIBE_OBJECT = MethodDescriptor.ofMethod(MultiNdjsonSupport.class, + "subscribeObject", + Void.TYPE, Multi.class, RoutingContext.class); + static final MethodDescriptor IS_JSON_ARRAY = MethodDescriptor.ofMethod(MultiJsonArraySupport.class, "isJsonArray", Boolean.TYPE, Multi.class); static final MethodDescriptor MULTI_JSON_SUBSCRIBE_VOID = MethodDescriptor.ofMethod(MultiJsonArraySupport.class, diff --git a/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/VertxWebProcessor.java b/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/VertxWebProcessor.java index ff8ec96ac6349..ce3919449b246 100644 --- a/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/VertxWebProcessor.java +++ b/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/VertxWebProcessor.java @@ -785,7 +785,13 @@ void implementInvoke(HandlerDescriptor descriptor, BeanInfo bean, MethodInfo met isSSE.close(); BytecodeCreator isNotSSE = isItSSE.falseBranch(); - BranchResult isItJson = isNotSSE.ifTrue(isNotSSE.invokeStaticMethod(Methods.IS_JSON_ARRAY, res)); + BranchResult isItNdJson = isNotSSE.ifTrue(isNotSSE.invokeStaticMethod(Methods.IS_NDJSON, res)); + BytecodeCreator isNdjson = isItNdJson.trueBranch(); + handleNdjsonMulti(descriptor, isNdjson, routingContext, res); + isNdjson.close(); + + BytecodeCreator isNotNdjson = isItNdJson.falseBranch(); + BranchResult isItJson = isNotNdjson.ifTrue(isNotNdjson.invokeStaticMethod(Methods.IS_JSON_ARRAY, res)); BytecodeCreator isJson = isItJson.trueBranch(); handleJsonArrayMulti(descriptor, isJson, routingContext, res); isJson.close(); @@ -918,6 +924,28 @@ private void handleSSEMulti(HandlerDescriptor descriptor, BytecodeCreator writer } } + private void handleNdjsonMulti(HandlerDescriptor descriptor, BytecodeCreator writer, ResultHandle rc, + ResultHandle res) { + // The method returns a Multi that needs to be written as server-sent event. + // We subscribe to this Multi and write the provided items (one by one) in the HTTP response. + // On completion, we "end" the response + // If the method returned null, we fail + // If the provided item is null we fail + // If the multi is empty, and the method return a Multi, we reply with a 204 - NO CONTENT (as regular) + // If the produced item is a string or buffer, the response.write method is used to write the events in the response + // If the produced item is an object, the item is mapped to JSON and included in the `data` section of the event. + + if (Methods.isNoContent(descriptor)) { // Multi - so return a 204. + writer.invokeStaticMethod(Methods.MULTI_SUBSCRIBE_VOID, res, rc); + } else if (descriptor.isContentTypeString()) { + writer.invokeStaticMethod(Methods.MULTI_NDJSON_SUBSCRIBE_STRING, res, rc); + } else if (descriptor.isContentTypeBuffer() || descriptor.isContentTypeMutinyBuffer()) { + writer.invokeStaticMethod(Methods.MULTI_JSON_FAIL, rc); + } else { // Multi - encode to json. + writer.invokeStaticMethod(Methods.MULTI_NDJSON_SUBSCRIBE_OBJECT, res, rc); + } + } + private void handleJsonArrayMulti(HandlerDescriptor descriptor, BytecodeCreator writer, ResultHandle rc, ResultHandle res) { // The method returns a Multi that needs to be written as JSON Array. diff --git a/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/NdjsonMultiRouteTest.java b/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/NdjsonMultiRouteTest.java new file mode 100644 index 0000000000000..97e31d960907c --- /dev/null +++ b/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/NdjsonMultiRouteTest.java @@ -0,0 +1,168 @@ +package io.quarkus.vertx.web.mutiny; + +import static io.restassured.RestAssured.when; +import static org.hamcrest.Matchers.*; + +import java.io.IOException; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.vertx.web.ReactiveRoutes; +import io.quarkus.vertx.web.Route; +import io.smallrye.mutiny.Multi; +import io.vertx.core.http.HttpHeaders; +import io.vertx.ext.web.RoutingContext; + +public class NdjsonMultiRouteTest { + + public static final String CONTENT_TYPE_NDJSON = "application/x-ndjson"; + public static final String CONTENT_TYPE_STREAM_JSON = "application/stream+json"; + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class).addClasses(SimpleBean.class)); + + @Test + public void testNdjsonMultiRoute() { + when().get("/hello").then().statusCode(200) + .body(is("\"Hello world!\"\n")) + .header(HttpHeaders.CONTENT_TYPE.toString(), CONTENT_TYPE_NDJSON); + + when().get("/hellos").then().statusCode(200) + .body(containsString( + // @formatter:off + "\"hello\"\n" + + "\"world\"\n" + + "\"!\"\n")) + // @formatter:on + .header(HttpHeaders.CONTENT_TYPE.toString(), CONTENT_TYPE_NDJSON); + + when().get("/no-hello").then().statusCode(200).body(hasLength(0)) + .header(HttpHeaders.CONTENT_TYPE.toString(), CONTENT_TYPE_NDJSON); + + // We get the item followed by the exception + when().get("/hello-and-fail").then().statusCode(200) + .body(containsString("\"Hello\"")) + .body(containsString("boom")); + + when().get("/void").then().statusCode(204).body(hasLength(0)); + + when().get("/people").then().statusCode(200) + .body(is( + // @formatter:off + "{\"name\":\"superman\",\"id\":1}\n" + + "{\"name\":\"batman\",\"id\":2}\n" + + "{\"name\":\"spiderman\",\"id\":3}\n" + )) + // @formatter:on + .header(HttpHeaders.CONTENT_TYPE.toString(), CONTENT_TYPE_NDJSON); + + when().get("/people-content-type").then().statusCode(200) + .body(is( + // @formatter:off + "{\"name\":\"superman\",\"id\":1}\n" + + "{\"name\":\"batman\",\"id\":2}\n" + + "{\"name\":\"spiderman\",\"id\":3}\n")) + // @formatter:on + .header(HttpHeaders.CONTENT_TYPE.toString(), is(CONTENT_TYPE_NDJSON + ";charset=utf-8")); + + when().get("/people-content-type-stream-json").then().statusCode(200) + .body(is( + // @formatter:off + "{\"name\":\"superman\",\"id\":1}\n" + + "{\"name\":\"batman\",\"id\":2}\n" + + "{\"name\":\"spiderman\",\"id\":3}\n")) + // @formatter:on + .header(HttpHeaders.CONTENT_TYPE.toString(), CONTENT_TYPE_STREAM_JSON); + + when().get("/failure").then().statusCode(500).body(containsString("boom")); + when().get("/null").then().statusCode(500).body(containsString("null")); + when().get("/sync-failure").then().statusCode(500).body(containsString("null")); + } + + static class SimpleBean { + + @Route(path = "hello") + Multi hello(RoutingContext context) { + return ReactiveRoutes.asJsonStream(Multi.createFrom().item("Hello world!")); + } + + @Route(path = "hellos") + Multi hellos(RoutingContext context) { + return ReactiveRoutes.asJsonStream(Multi.createFrom().items("hello", "world", "!")); + } + + @Route(path = "no-hello") + Multi noHello(RoutingContext context) { + return ReactiveRoutes.asJsonStream(Multi.createFrom().empty()); + } + + @Route(path = "hello-and-fail") + Multi helloAndFail(RoutingContext context) { + return ReactiveRoutes.asJsonStream(Multi.createBy().concatenating().streams( + Multi.createFrom().item("Hello"), + Multi.createFrom().failure(() -> new IOException("boom")))); + } + + @Route(path = "void") + Multi multiVoid(RoutingContext context) { + return ReactiveRoutes.asJsonStream(Multi.createFrom().range(0, 200) + .onItem().ignore()); + } + + @Route(path = "/people") + Multi people(RoutingContext context) { + return ReactiveRoutes.asJsonStream(Multi.createFrom().items( + new Person("superman", 1), + new Person("batman", 2), + new Person("spiderman", 3))); + } + + @Route(path = "/people-content-type") + Multi peopleWithContentType(RoutingContext context) { + context.response().putHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE_NDJSON + ";charset=utf-8"); + return ReactiveRoutes.asJsonStream(Multi.createFrom().items( + new Person("superman", 1), + new Person("batman", 2), + new Person("spiderman", 3))); + } + + @Route(path = "/people-content-type-stream-json", produces = { CONTENT_TYPE_STREAM_JSON }) + Multi peopleWithContentTypeStreamJson(RoutingContext context) { + return ReactiveRoutes.asJsonStream(Multi.createFrom().items( + new Person("superman", 1), + new Person("batman", 2), + new Person("spiderman", 3))); + } + + @Route(path = "/failure") + Multi fail(RoutingContext context) { + return ReactiveRoutes.asJsonStream(Multi.createFrom().failure(new IOException("boom"))); + } + + @Route(path = "/sync-failure") + Multi failSync(RoutingContext context) { + throw new IllegalStateException("boom"); + } + + @Route(path = "/null") + Multi uniNull(RoutingContext context) { + return null; + } + } + + static class Person { + public String name; + public int id; + + public Person(String name, int id) { + this.name = name; + this.id = id; + } + } + +} diff --git a/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/ReactiveRoutes.java b/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/ReactiveRoutes.java index 15cce699bd710..f63b98e925038 100644 --- a/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/ReactiveRoutes.java +++ b/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/ReactiveRoutes.java @@ -3,6 +3,7 @@ import java.util.Objects; import io.quarkus.vertx.web.runtime.JsonArrayMulti; +import io.quarkus.vertx.web.runtime.NdjsonMulti; import io.quarkus.vertx.web.runtime.SSEMulti; import io.smallrye.mutiny.Multi; @@ -33,7 +34,7 @@ private ReactiveRoutes() { * {@link ServerSentEvent#event()}. *

* Example of usage: - * + * *

      * @Route(path = "/people")
      * Multi<Person> people(RoutingContext context) {
@@ -52,6 +53,43 @@ public static  Multi asEventStream(Multi multi) {
         return new SSEMulti<>(Objects.requireNonNull(multi, "The passed multi must not be `null`"));
     }
 
+    /**
+     * Indicates the the given stream should be written as a Json stream in the response.
+     * Returning a {@code multi} wrapped using this method produces a {@code application/x-ndjson} response. Each item
+     * is written as an serialized json on a new line in the response. The response automatically enables the chunked
+     * encoding and set the content type.
+     * 

+ * If the item is a String, the content will be wrapped in quotes and written. + * If the item is an Object, then the JSON representation of this object will be written. + *

+ * Example of usage: + * + *

+     * @Route(path = "/people")
+     * Multi<Person> people(RoutingContext context) {
+     *     return ReactiveRoutes.asJsonStream(Multi.createFrom().items(
+     *             new Person("superman", 1),
+     *             new Person("batman", 2),
+     *             new Person("spiderman", 3)));
+     * }
+     * 
+ * + * This example produces: + * + *
+     *  {"name":"superman", "id":1}
+     *  {...}
+     *  {...}
+     * 
+ * + * @param multi the multi to be written + * @param the type of item, can be string, object + * @return the wrapped multi + */ + public static Multi asJsonStream(Multi multi) { + return new NdjsonMulti<>(Objects.requireNonNull(multi, "The passed multi must not be `null`")); + } + /** * Indicates the the given stream should be written as a chunked JSON array in the response. * Returning a {@code multi} wrapped using this method produces a {@code application/json} response. Each item diff --git a/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiNdjsonSupport.java b/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiNdjsonSupport.java new file mode 100644 index 0000000000000..4d7da7b2fc5c8 --- /dev/null +++ b/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiNdjsonSupport.java @@ -0,0 +1,98 @@ +package io.quarkus.vertx.web.runtime; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import io.smallrye.mutiny.Multi; +import io.vertx.core.AsyncResult; +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpHeaders; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.json.Json; +import io.vertx.ext.web.RoutingContext; + +@SuppressWarnings("ReactiveStreamsSubscriberImplementation") +public class MultiNdjsonSupport { + + private MultiNdjsonSupport() { + // Avoid direct instantiation. + } + + private static void initialize(HttpServerResponse response, RoutingContext rc) { + if (response.bytesWritten() == 0) { + MultiMap headers = response.headers(); + if (headers.get(HttpHeaders.CONTENT_TYPE) == null) { + if (rc.getAcceptableContentType() == null) { + headers.set(HttpHeaders.CONTENT_TYPE, "application/x-ndjson"); + } else { + headers.set(HttpHeaders.CONTENT_TYPE, rc.getAcceptableContentType()); + } + } + response.setChunked(true); + } + } + + public static void subscribeString(Multi multi, RoutingContext rc) { + write(multi.map(s -> Buffer.buffer("\"" + s + "\"\n")), rc); + } + + public static void subscribeObject(Multi multi, RoutingContext rc) { + write(multi.map(o -> Buffer.buffer(Json.encode(o) + "\n")), rc); + } + + private static void onWriteDone(Subscription subscription, AsyncResult ar, RoutingContext rc) { + if (ar.failed()) { + rc.fail(ar.cause()); + } else { + subscription.request(1); + } + } + + public static void write(Multi multi, RoutingContext rc) { + HttpServerResponse response = rc.response(); + multi.subscribe().withSubscriber(new Subscriber() { + Subscription upstream; + + @Override + public void onSubscribe(Subscription subscription) { + this.upstream = subscription; + this.upstream.request(1); + } + + @Override + public void onNext(Buffer item) { + initialize(response, rc); + response.write(item, ar -> onWriteDone(upstream, ar, rc)); + } + + @Override + public void onError(Throwable throwable) { + rc.fail(throwable); + } + + @Override + public void onComplete() { + endOfStream(response, rc); + } + }); + } + + private static void endOfStream(HttpServerResponse response, RoutingContext rc) { + if (response.bytesWritten() == 0) { // No item + MultiMap headers = response.headers(); + if (headers.get(HttpHeaders.CONTENT_TYPE) == null) { + if (rc.getAcceptableContentType() == null) { + headers.set(HttpHeaders.CONTENT_TYPE, "application/x-ndjson"); + } else { + headers.set(HttpHeaders.CONTENT_TYPE, rc.getAcceptableContentType()); + } + } + } + response.end(); + } + + public static boolean isNdjson(Multi multi) { + return multi instanceof NdjsonMulti; + } +} diff --git a/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/NdjsonMulti.java b/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/NdjsonMulti.java new file mode 100644 index 0000000000000..dd82ff9089a61 --- /dev/null +++ b/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/NdjsonMulti.java @@ -0,0 +1,25 @@ +package io.quarkus.vertx.web.runtime; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.smallrye.mutiny.operators.AbstractMulti; +import io.smallrye.mutiny.subscription.MultiSubscriber; + +/** + * Just a wrapped to capture the fact that the items must be written with newline delimited JSON. + * + * @param the type of item. + */ +public class NdjsonMulti extends AbstractMulti { + + private final Multi multi; + + public NdjsonMulti(Multi multi) { + this.multi = multi; + } + + @Override + public void subscribe(MultiSubscriber subscriber) { + multi.subscribe(Infrastructure.onMultiSubscription(multi, subscriber)); + } +}