Skip to content

Commit

Permalink
Merge pull request #18182 from ntrp/main
Browse files Browse the repository at this point in the history
  • Loading branch information
cescoffier authored Jun 30, 2021
2 parents 7b1cbe2 + a6176f9 commit 9dda157
Show file tree
Hide file tree
Showing 6 changed files with 369 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.quarkus.gizmo.MethodDescriptor;
import io.quarkus.gizmo.ResultHandle;
import io.quarkus.vertx.web.runtime.MultiJsonArraySupport;
import io.quarkus.vertx.web.runtime.MultiNdjsonSupport;
import io.quarkus.vertx.web.runtime.MultiSseSupport;
import io.quarkus.vertx.web.runtime.MultiSupport;
import io.quarkus.vertx.web.runtime.RouteHandler;
Expand Down Expand Up @@ -114,6 +115,15 @@ class Methods {
"subscribeObject",
Void.TYPE, Multi.class, RoutingContext.class);

static final MethodDescriptor IS_NDJSON = MethodDescriptor.ofMethod(MultiNdjsonSupport.class, "isNdjson", Boolean.TYPE,
Multi.class);
static final MethodDescriptor MULTI_NDJSON_SUBSCRIBE_STRING = MethodDescriptor.ofMethod(MultiNdjsonSupport.class,
"subscribeString",
Void.TYPE, Multi.class, RoutingContext.class);
static final MethodDescriptor MULTI_NDJSON_SUBSCRIBE_OBJECT = MethodDescriptor.ofMethod(MultiNdjsonSupport.class,
"subscribeObject",
Void.TYPE, Multi.class, RoutingContext.class);

static final MethodDescriptor IS_JSON_ARRAY = MethodDescriptor.ofMethod(MultiJsonArraySupport.class, "isJsonArray",
Boolean.TYPE, Multi.class);
static final MethodDescriptor MULTI_JSON_SUBSCRIBE_VOID = MethodDescriptor.ofMethod(MultiJsonArraySupport.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,13 @@ void implementInvoke(HandlerDescriptor descriptor, BeanInfo bean, MethodInfo met
isSSE.close();

BytecodeCreator isNotSSE = isItSSE.falseBranch();
BranchResult isItJson = isNotSSE.ifTrue(isNotSSE.invokeStaticMethod(Methods.IS_JSON_ARRAY, res));
BranchResult isItNdJson = isNotSSE.ifTrue(isNotSSE.invokeStaticMethod(Methods.IS_NDJSON, res));
BytecodeCreator isNdjson = isItNdJson.trueBranch();
handleNdjsonMulti(descriptor, isNdjson, routingContext, res);
isNdjson.close();

BytecodeCreator isNotNdjson = isItNdJson.falseBranch();
BranchResult isItJson = isNotNdjson.ifTrue(isNotNdjson.invokeStaticMethod(Methods.IS_JSON_ARRAY, res));
BytecodeCreator isJson = isItJson.trueBranch();
handleJsonArrayMulti(descriptor, isJson, routingContext, res);
isJson.close();
Expand Down Expand Up @@ -918,6 +924,28 @@ private void handleSSEMulti(HandlerDescriptor descriptor, BytecodeCreator writer
}
}

private void handleNdjsonMulti(HandlerDescriptor descriptor, BytecodeCreator writer, ResultHandle rc,
ResultHandle res) {
// The method returns a Multi that needs to be written as server-sent event.
// We subscribe to this Multi and write the provided items (one by one) in the HTTP response.
// On completion, we "end" the response
// If the method returned null, we fail
// If the provided item is null we fail
// If the multi is empty, and the method return a Multi<Void>, we reply with a 204 - NO CONTENT (as regular)
// If the produced item is a string or buffer, the response.write method is used to write the events in the response
// If the produced item is an object, the item is mapped to JSON and included in the `data` section of the event.

if (Methods.isNoContent(descriptor)) { // Multi<Void> - so return a 204.
writer.invokeStaticMethod(Methods.MULTI_SUBSCRIBE_VOID, res, rc);
} else if (descriptor.isContentTypeString()) {
writer.invokeStaticMethod(Methods.MULTI_NDJSON_SUBSCRIBE_STRING, res, rc);
} else if (descriptor.isContentTypeBuffer() || descriptor.isContentTypeMutinyBuffer()) {
writer.invokeStaticMethod(Methods.MULTI_JSON_FAIL, rc);
} else { // Multi<Object> - encode to json.
writer.invokeStaticMethod(Methods.MULTI_NDJSON_SUBSCRIBE_OBJECT, res, rc);
}
}

private void handleJsonArrayMulti(HandlerDescriptor descriptor, BytecodeCreator writer, ResultHandle rc,
ResultHandle res) {
// The method returns a Multi that needs to be written as JSON Array.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package io.quarkus.vertx.web.mutiny;

import static io.restassured.RestAssured.when;
import static org.hamcrest.Matchers.*;

import java.io.IOException;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.vertx.web.ReactiveRoutes;
import io.quarkus.vertx.web.Route;
import io.smallrye.mutiny.Multi;
import io.vertx.core.http.HttpHeaders;
import io.vertx.ext.web.RoutingContext;

public class NdjsonMultiRouteTest {

public static final String CONTENT_TYPE_NDJSON = "application/x-ndjson";
public static final String CONTENT_TYPE_STREAM_JSON = "application/stream+json";

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class).addClasses(SimpleBean.class));

@Test
public void testNdjsonMultiRoute() {
when().get("/hello").then().statusCode(200)
.body(is("\"Hello world!\"\n"))
.header(HttpHeaders.CONTENT_TYPE.toString(), CONTENT_TYPE_NDJSON);

when().get("/hellos").then().statusCode(200)
.body(containsString(
// @formatter:off
"\"hello\"\n"
+ "\"world\"\n"
+ "\"!\"\n"))
// @formatter:on
.header(HttpHeaders.CONTENT_TYPE.toString(), CONTENT_TYPE_NDJSON);

when().get("/no-hello").then().statusCode(200).body(hasLength(0))
.header(HttpHeaders.CONTENT_TYPE.toString(), CONTENT_TYPE_NDJSON);

// We get the item followed by the exception
when().get("/hello-and-fail").then().statusCode(200)
.body(containsString("\"Hello\""))
.body(containsString("boom"));

when().get("/void").then().statusCode(204).body(hasLength(0));

when().get("/people").then().statusCode(200)
.body(is(
// @formatter:off
"{\"name\":\"superman\",\"id\":1}\n" +
"{\"name\":\"batman\",\"id\":2}\n" +
"{\"name\":\"spiderman\",\"id\":3}\n"
))
// @formatter:on
.header(HttpHeaders.CONTENT_TYPE.toString(), CONTENT_TYPE_NDJSON);

when().get("/people-content-type").then().statusCode(200)
.body(is(
// @formatter:off
"{\"name\":\"superman\",\"id\":1}\n" +
"{\"name\":\"batman\",\"id\":2}\n" +
"{\"name\":\"spiderman\",\"id\":3}\n"))
// @formatter:on
.header(HttpHeaders.CONTENT_TYPE.toString(), is(CONTENT_TYPE_NDJSON + ";charset=utf-8"));

when().get("/people-content-type-stream-json").then().statusCode(200)
.body(is(
// @formatter:off
"{\"name\":\"superman\",\"id\":1}\n" +
"{\"name\":\"batman\",\"id\":2}\n" +
"{\"name\":\"spiderman\",\"id\":3}\n"))
// @formatter:on
.header(HttpHeaders.CONTENT_TYPE.toString(), CONTENT_TYPE_STREAM_JSON);

when().get("/failure").then().statusCode(500).body(containsString("boom"));
when().get("/null").then().statusCode(500).body(containsString("null"));
when().get("/sync-failure").then().statusCode(500).body(containsString("null"));
}

static class SimpleBean {

@Route(path = "hello")
Multi<String> hello(RoutingContext context) {
return ReactiveRoutes.asJsonStream(Multi.createFrom().item("Hello world!"));
}

@Route(path = "hellos")
Multi<String> hellos(RoutingContext context) {
return ReactiveRoutes.asJsonStream(Multi.createFrom().items("hello", "world", "!"));
}

@Route(path = "no-hello")
Multi<String> noHello(RoutingContext context) {
return ReactiveRoutes.asJsonStream(Multi.createFrom().empty());
}

@Route(path = "hello-and-fail")
Multi<String> helloAndFail(RoutingContext context) {
return ReactiveRoutes.asJsonStream(Multi.createBy().concatenating().streams(
Multi.createFrom().item("Hello"),
Multi.createFrom().failure(() -> new IOException("boom"))));
}

@Route(path = "void")
Multi<Void> multiVoid(RoutingContext context) {
return ReactiveRoutes.asJsonStream(Multi.createFrom().range(0, 200)
.onItem().ignore());
}

@Route(path = "/people")
Multi<Person> people(RoutingContext context) {
return ReactiveRoutes.asJsonStream(Multi.createFrom().items(
new Person("superman", 1),
new Person("batman", 2),
new Person("spiderman", 3)));
}

@Route(path = "/people-content-type")
Multi<Person> peopleWithContentType(RoutingContext context) {
context.response().putHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE_NDJSON + ";charset=utf-8");
return ReactiveRoutes.asJsonStream(Multi.createFrom().items(
new Person("superman", 1),
new Person("batman", 2),
new Person("spiderman", 3)));
}

@Route(path = "/people-content-type-stream-json", produces = { CONTENT_TYPE_STREAM_JSON })
Multi<Person> peopleWithContentTypeStreamJson(RoutingContext context) {
return ReactiveRoutes.asJsonStream(Multi.createFrom().items(
new Person("superman", 1),
new Person("batman", 2),
new Person("spiderman", 3)));
}

@Route(path = "/failure")
Multi<Person> fail(RoutingContext context) {
return ReactiveRoutes.asJsonStream(Multi.createFrom().failure(new IOException("boom")));
}

@Route(path = "/sync-failure")
Multi<Person> failSync(RoutingContext context) {
throw new IllegalStateException("boom");
}

@Route(path = "/null")
Multi<String> uniNull(RoutingContext context) {
return null;
}
}

static class Person {
public String name;
public int id;

public Person(String name, int id) {
this.name = name;
this.id = id;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Objects;

import io.quarkus.vertx.web.runtime.JsonArrayMulti;
import io.quarkus.vertx.web.runtime.NdjsonMulti;
import io.quarkus.vertx.web.runtime.SSEMulti;
import io.smallrye.mutiny.Multi;

Expand Down Expand Up @@ -33,7 +34,7 @@ private ReactiveRoutes() {
* {@link ServerSentEvent#event()}.
* <p>
* Example of usage:
*
*
* <pre>
* &#64;Route(path = "/people")
* Multi&lt;Person&gt; people(RoutingContext context) {
Expand All @@ -52,6 +53,43 @@ public static <T> Multi<T> asEventStream(Multi<T> multi) {
return new SSEMulti<>(Objects.requireNonNull(multi, "The passed multi must not be `null`"));
}

/**
* Indicates the the given stream should be written as a Json stream in the response.
* Returning a {@code multi} wrapped using this method produces a {@code application/x-ndjson} response. Each item
* is written as an serialized json on a new line in the response. The response automatically enables the chunked
* encoding and set the content type.
* <p>
* If the item is a String, the content will be wrapped in quotes and written.
* If the item is an Object, then the JSON representation of this object will be written.
* <p>
* Example of usage:
*
* <pre>
* &#64;Route(path = "/people")
* Multi&lt;Person&gt; people(RoutingContext context) {
* return ReactiveRoutes.asJsonStream(Multi.createFrom().items(
* new Person("superman", 1),
* new Person("batman", 2),
* new Person("spiderman", 3)));
* }
* </pre>
*
* This example produces:
*
* <pre>
* {"name":"superman", "id":1}
* {...}
* {...}
* </pre>
*
* @param multi the multi to be written
* @param <T> the type of item, can be string, object
* @return the wrapped multi
*/
public static <T> Multi<T> asJsonStream(Multi<T> multi) {
return new NdjsonMulti<>(Objects.requireNonNull(multi, "The passed multi must not be `null`"));
}

/**
* Indicates the the given stream should be written as a <em>chunked</em> JSON array in the response.
* Returning a {@code multi} wrapped using this method produces a {@code application/json} response. Each item
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package io.quarkus.vertx.web.runtime;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.smallrye.mutiny.Multi;
import io.vertx.core.AsyncResult;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.Json;
import io.vertx.ext.web.RoutingContext;

@SuppressWarnings("ReactiveStreamsSubscriberImplementation")
public class MultiNdjsonSupport {

private MultiNdjsonSupport() {
// Avoid direct instantiation.
}

private static void initialize(HttpServerResponse response, RoutingContext rc) {
if (response.bytesWritten() == 0) {
MultiMap headers = response.headers();
if (headers.get(HttpHeaders.CONTENT_TYPE) == null) {
if (rc.getAcceptableContentType() == null) {
headers.set(HttpHeaders.CONTENT_TYPE, "application/x-ndjson");
} else {
headers.set(HttpHeaders.CONTENT_TYPE, rc.getAcceptableContentType());
}
}
response.setChunked(true);
}
}

public static void subscribeString(Multi<String> multi, RoutingContext rc) {
write(multi.map(s -> Buffer.buffer("\"" + s + "\"\n")), rc);
}

public static void subscribeObject(Multi<Object> multi, RoutingContext rc) {
write(multi.map(o -> Buffer.buffer(Json.encode(o) + "\n")), rc);
}

private static void onWriteDone(Subscription subscription, AsyncResult<Void> ar, RoutingContext rc) {
if (ar.failed()) {
rc.fail(ar.cause());
} else {
subscription.request(1);
}
}

public static void write(Multi<Buffer> multi, RoutingContext rc) {
HttpServerResponse response = rc.response();
multi.subscribe().withSubscriber(new Subscriber<Buffer>() {
Subscription upstream;

@Override
public void onSubscribe(Subscription subscription) {
this.upstream = subscription;
this.upstream.request(1);
}

@Override
public void onNext(Buffer item) {
initialize(response, rc);
response.write(item, ar -> onWriteDone(upstream, ar, rc));
}

@Override
public void onError(Throwable throwable) {
rc.fail(throwable);
}

@Override
public void onComplete() {
endOfStream(response, rc);
}
});
}

private static void endOfStream(HttpServerResponse response, RoutingContext rc) {
if (response.bytesWritten() == 0) { // No item
MultiMap headers = response.headers();
if (headers.get(HttpHeaders.CONTENT_TYPE) == null) {
if (rc.getAcceptableContentType() == null) {
headers.set(HttpHeaders.CONTENT_TYPE, "application/x-ndjson");
} else {
headers.set(HttpHeaders.CONTENT_TYPE, rc.getAcceptableContentType());
}
}
}
response.end();
}

public static boolean isNdjson(Multi<?> multi) {
return multi instanceof NdjsonMulti;
}
}
Loading

0 comments on commit 9dda157

Please sign in to comment.