From 7225af6aa695cd1577821697d745614435ada20d Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Thu, 25 Jun 2020 16:25:12 +0200 Subject: [PATCH 1/5] Add support for Uni and synchronous result in reactive routes --- .../web/deployment/HandlerDescriptor.java | 76 +++++ .../quarkus/vertx/web/deployment/Methods.java | 93 ++++++ .../web/deployment/VertxWebProcessor.java | 301 +++++++++++++----- .../vertx/web/mutiny/MutinyRouteTest.java | 197 ++++++++++++ 4 files changed, 586 insertions(+), 81 deletions(-) create mode 100644 extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/HandlerDescriptor.java create mode 100644 extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/Methods.java create mode 100644 extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/MutinyRouteTest.java diff --git a/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/HandlerDescriptor.java b/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/HandlerDescriptor.java new file mode 100644 index 0000000000000..09f3821a6c769 --- /dev/null +++ b/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/HandlerDescriptor.java @@ -0,0 +1,76 @@ +package io.quarkus.vertx.web.deployment; + +import org.jboss.jandex.DotName; +import org.jboss.jandex.MethodInfo; +import org.jboss.jandex.Type; + +import io.smallrye.mutiny.Uni; +import io.vertx.core.buffer.Buffer; + +/** + * Describe a request handler. + */ +public class HandlerDescriptor { + + private final MethodInfo method; + + public HandlerDescriptor(MethodInfo method) { + this.method = method; + } + + public Type getReturnType() { + return method.returnType(); + } + + public boolean isReturningVoid() { + return method.returnType().kind().equals(Type.Kind.VOID); + } + + public boolean isReturningUni() { + return method.returnType().name().equals(DotName.createSimple(Uni.class.getName())); + } + + public Type getContentType() { + if (isReturningVoid()) { + return null; + } + if (isReturningUni()) { + return getReturnType().asParameterizedType().arguments().get(0); + } + return getReturnType(); + } + + public boolean isContentTypeString() { + Type type = getContentType(); + if (type == null) { + return false; + } + return type.name().equals(DotName.createSimple(String.class.getName())); + } + + public boolean isContentTypeBuffer() { + Type type = getContentType(); + if (type == null) { + return false; + } + return type.name().equals(DotName.createSimple(Buffer.class.getName())); + } + + public boolean isContentTypeRxBuffer() { + Type type = getContentType(); + if (type == null) { + return false; + } + return type.name() + .equals(DotName.createSimple(io.vertx.reactivex.core.buffer.Buffer.class.getName())); + } + + public boolean isContentTypeMutinyBuffer() { + Type type = getContentType(); + if (type == null) { + return false; + } + return type.name().equals(DotName.createSimple(io.vertx.mutiny.core.buffer.Buffer.class.getName())); + } + +} diff --git a/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/Methods.java b/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/Methods.java new file mode 100644 index 0000000000000..5e518f4a627c7 --- /dev/null +++ b/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/Methods.java @@ -0,0 +1,93 @@ +package io.quarkus.vertx.web.deployment; + +import java.util.function.Consumer; + +import org.jboss.jandex.DotName; + +import io.quarkus.gizmo.BytecodeCreator; +import io.quarkus.gizmo.MethodDescriptor; +import io.quarkus.gizmo.ResultHandle; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.groups.UniSubscribe; +import io.smallrye.mutiny.subscription.Cancellable; +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.json.Json; +import io.vertx.ext.web.RoutingContext; + +public class Methods { + + static final MethodDescriptor GET_HEADERS = MethodDescriptor + .ofMethod(HttpServerResponse.class, "headers", MultiMap.class); + static final MethodDescriptor MULTIMAP_GET = MethodDescriptor + .ofMethod(MultiMap.class, "get", String.class, String.class); + static final MethodDescriptor MULTIMAP_SET = MethodDescriptor + .ofMethod(MultiMap.class, "set", MultiMap.class, String.class, String.class); + + static final MethodDescriptor RESPONSE = MethodDescriptor + .ofMethod(RoutingContext.class, "response", HttpServerResponse.class); + + static final MethodDescriptor FAIL = MethodDescriptor + .ofMethod(RoutingContext.class, "fail", Void.TYPE, Throwable.class); + static final MethodDescriptor SUBSCRIBE = MethodDescriptor.ofMethod(Uni.class, "subscribe", UniSubscribe.class); + + static final MethodDescriptor SUBSCRIBE_WITH = MethodDescriptor + .ofMethod(UniSubscribe.class, "with", Cancellable.class, Consumer.class, Consumer.class); + static final MethodDescriptor END = MethodDescriptor.ofMethod(HttpServerResponse.class, "end", Void.TYPE); + + static final MethodDescriptor END_WITH_STRING = MethodDescriptor + .ofMethod(HttpServerResponse.class, "end", Void.TYPE, String.class); + static final MethodDescriptor END_WITH_BUFFER = MethodDescriptor + .ofMethod(HttpServerResponse.class, "end", Void.TYPE, Buffer.class); + static final MethodDescriptor SET_STATUS = MethodDescriptor + .ofMethod(HttpServerResponse.class, "setStatusCode", HttpServerResponse.class, Integer.TYPE); + static final MethodDescriptor RX_GET_DELEGATE = MethodDescriptor + .ofMethod(io.vertx.reactivex.core.buffer.Buffer.class, "getDelegate", Buffer.class); + + static final MethodDescriptor MUTINY_GET_DELEGATE = MethodDescriptor + .ofMethod(io.vertx.mutiny.core.buffer.Buffer.class, "getDelegate", Buffer.class); + static final MethodDescriptor JSON_ENCODE = MethodDescriptor + .ofMethod(Json.class, "encode", String.class, Object.class); + + private Methods() { + // Avoid direct instantiation + } + + static void fail(BytecodeCreator creator, ResultHandle rc, ResultHandle exception) { + creator.invokeInterfaceMethod(FAIL, rc, exception); + } + + public static void returnAndClose(BytecodeCreator creator) { + creator.returnValue(null); + creator.close(); + } + + static boolean isNoContent(HandlerDescriptor descriptor) { + return descriptor.getContentType().name() + .equals(DotName.createSimple(Void.class.getName())); + } + + static ResultHandle createNpeBecauseItemIfNull(BytecodeCreator writer) { + return writer.newInstance( + MethodDescriptor.ofConstructor(NullPointerException.class, String.class), + writer.load("Invalid value returned by Uni: `null`")); + } + + static MethodDescriptor getEndMethodForContentType(HandlerDescriptor descriptor) { + if (descriptor.isContentTypeBuffer() || descriptor.isContentTypeRxBuffer() || descriptor + .isContentTypeMutinyBuffer()) { + return END_WITH_BUFFER; + } + return END_WITH_STRING; + } + + static void setContentTypeToJson(ResultHandle response, BytecodeCreator invoke) { + ResultHandle ct = invoke.load("Content-Type"); + ResultHandle headers = invoke.invokeInterfaceMethod(GET_HEADERS, response); + ResultHandle current = invoke.invokeInterfaceMethod(MULTIMAP_GET, headers, ct); + BytecodeCreator branch = invoke.ifNull(current).trueBranch(); + branch.invokeInterfaceMethod(MULTIMAP_SET, headers, ct, branch.load("application/json")); + branch.close(); + } +} diff --git a/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/VertxWebProcessor.java b/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/VertxWebProcessor.java index 56e7f24d5dcf5..4db4e9e0213b1 100644 --- a/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/VertxWebProcessor.java +++ b/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/VertxWebProcessor.java @@ -4,18 +4,9 @@ import static org.objectweb.asm.Opcodes.ACC_PRIVATE; import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; -import java.util.Objects; -import java.util.Set; +import java.util.function.Consumer; import java.util.function.Function; import javax.enterprise.context.ContextNotActiveException; @@ -24,34 +15,15 @@ import javax.enterprise.context.spi.CreationalContext; import javax.inject.Singleton; -import org.jboss.jandex.AnnotationInstance; -import org.jboss.jandex.AnnotationValue; -import org.jboss.jandex.ClassInfo; -import org.jboss.jandex.DotName; -import org.jboss.jandex.IndexView; -import org.jboss.jandex.MethodInfo; -import org.jboss.jandex.Type; +import org.jboss.jandex.*; import org.jboss.logging.Logger; -import io.quarkus.arc.Arc; -import io.quarkus.arc.ArcContainer; -import io.quarkus.arc.InjectableBean; -import io.quarkus.arc.InjectableContext; -import io.quarkus.arc.InjectableReferenceProvider; -import io.quarkus.arc.deployment.AnnotationsTransformerBuildItem; -import io.quarkus.arc.deployment.BeanArchiveIndexBuildItem; -import io.quarkus.arc.deployment.CustomScopeAnnotationsBuildItem; -import io.quarkus.arc.deployment.UnremovableBeanBuildItem; +import io.quarkus.arc.*; +import io.quarkus.arc.deployment.*; import io.quarkus.arc.deployment.UnremovableBeanBuildItem.BeanClassAnnotationExclusion; -import io.quarkus.arc.deployment.ValidationPhaseBuildItem; import io.quarkus.arc.deployment.ValidationPhaseBuildItem.ValidationErrorBuildItem; import io.quarkus.arc.impl.CreationalContextImpl; -import io.quarkus.arc.processor.AnnotationStore; -import io.quarkus.arc.processor.AnnotationsTransformer; -import io.quarkus.arc.processor.BeanInfo; -import io.quarkus.arc.processor.BuildExtension; -import io.quarkus.arc.processor.BuiltinScope; -import io.quarkus.arc.processor.DotNames; +import io.quarkus.arc.processor.*; import io.quarkus.deployment.Feature; import io.quarkus.deployment.GeneratedClassGizmoAdaptor; import io.quarkus.deployment.annotations.BuildProducer; @@ -63,14 +35,7 @@ import io.quarkus.deployment.builditem.ShutdownContextBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; import io.quarkus.deployment.util.HashUtil; -import io.quarkus.gizmo.AssignableResultHandle; -import io.quarkus.gizmo.BytecodeCreator; -import io.quarkus.gizmo.ClassCreator; -import io.quarkus.gizmo.ClassOutput; -import io.quarkus.gizmo.FieldCreator; -import io.quarkus.gizmo.MethodCreator; -import io.quarkus.gizmo.MethodDescriptor; -import io.quarkus.gizmo.ResultHandle; +import io.quarkus.gizmo.*; import io.quarkus.vertx.http.deployment.FilterBuildItem; import io.quarkus.vertx.http.deployment.RequireBodyHandlerBuildItem; import io.quarkus.vertx.http.deployment.RouteBuildItem; @@ -79,10 +44,8 @@ import io.quarkus.vertx.web.RouteBase; import io.quarkus.vertx.web.RouteFilter; import io.quarkus.vertx.web.RoutingExchange; -import io.quarkus.vertx.web.runtime.RouteHandler; -import io.quarkus.vertx.web.runtime.RouteMatcher; -import io.quarkus.vertx.web.runtime.RoutingExchangeImpl; -import io.quarkus.vertx.web.runtime.VertxWebRecorder; +import io.quarkus.vertx.web.runtime.*; +import io.smallrye.mutiny.Uni; import io.vertx.core.Handler; import io.vertx.core.http.HttpMethod; import io.vertx.ext.web.Router; @@ -112,9 +75,11 @@ class VertxWebProcessor { private static final String VALUE_ORDER = "order"; private static final String SLASH = "/"; - private static final MethodDescriptor ARC_CONTAINER = MethodDescriptor.ofMethod(Arc.class, "container", ArcContainer.class); - private static final MethodDescriptor ARC_CONTAINER_GET_ACTIVE_CONTEXT = MethodDescriptor.ofMethod(ArcContainer.class, - "getActiveContext", InjectableContext.class, Class.class); + private static final MethodDescriptor ARC_CONTAINER = MethodDescriptor + .ofMethod(Arc.class, "container", ArcContainer.class); + private static final MethodDescriptor ARC_CONTAINER_GET_ACTIVE_CONTEXT = MethodDescriptor + .ofMethod(ArcContainer.class, + "getActiveContext", InjectableContext.class, Class.class); private static final MethodDescriptor ARC_CONTAINER_BEAN = MethodDescriptor.ofMethod(ArcContainer.class, "bean", InjectableBean.class, String.class); private static final MethodDescriptor BEAN_GET_SCOPE = MethodDescriptor.ofMethod(InjectableBean.class, "getScope", @@ -122,16 +87,19 @@ class VertxWebProcessor { private static final MethodDescriptor CONTEXT_GET = MethodDescriptor.ofMethod(Context.class, "get", Object.class, Contextual.class, CreationalContext.class); - private static final MethodDescriptor CONTEXT_GET_IF_PRESENT = MethodDescriptor.ofMethod(Context.class, "get", Object.class, - Contextual.class); + private static final MethodDescriptor CONTEXT_GET_IF_PRESENT = MethodDescriptor + .ofMethod(Context.class, "get", Object.class, + Contextual.class); private static final MethodDescriptor INJECTABLE_REF_PROVIDER_GET = MethodDescriptor.ofMethod( InjectableReferenceProvider.class, "get", Object.class, CreationalContext.class); - private static final MethodDescriptor INJECTABLE_BEAN_DESTROY = MethodDescriptor.ofMethod(InjectableBean.class, "destroy", - void.class, Object.class, - CreationalContext.class); + private static final MethodDescriptor INJECTABLE_BEAN_DESTROY = MethodDescriptor + .ofMethod(InjectableBean.class, "destroy", + void.class, Object.class, + CreationalContext.class); static final MethodDescriptor OBJECT_CONSTRUCTOR = MethodDescriptor.ofConstructor(Object.class); + public static final DotName DOTNAME_UNI = DotName.createSimple(Uni.class.getName()); @BuildStep FeatureBuildItem feature() { @@ -221,8 +189,8 @@ void addAdditionalRoutes( Map matchers = new HashMap<>(); for (AnnotatedRouteHandlerBuildItem businessMethod : routeHandlerBusinessMethods) { - - String handlerClass = generateHandler(businessMethod.getBean(), businessMethod.getMethod(), classOutput); + String handlerClass = generateHandler(new HandlerDescriptor(businessMethod.getMethod()), + businessMethod.getBean(), businessMethod.getMethod(), classOutput); reflectiveClasses.produce(new ReflectiveClassBuildItem(false, false, handlerClass)); Handler routingHandler = recorder.createHandler(handlerClass); @@ -320,7 +288,8 @@ void addAdditionalRoutes( } for (AnnotatedRouteFilterBuildItem filterMethod : routeFilterBusinessMethods) { - String handlerClass = generateHandler(filterMethod.getBean(), filterMethod.getMethod(), classOutput); + String handlerClass = generateHandler(new HandlerDescriptor(filterMethod.getMethod()), + filterMethod.getBean(), filterMethod.getMethod(), classOutput); reflectiveClasses.produce(new ReflectiveClassBuildItem(false, false, handlerClass)); Handler routingHandler = recorder.createHandler(handlerClass); AnnotationValue priorityValue = filterMethod.getRouteFilter().value(); @@ -360,10 +329,16 @@ public void transform(TransformationContext context) { } private void validateRouteMethod(BeanInfo bean, MethodInfo method, DotName[] validParamTypes) { - if (!method.returnType().kind().equals(Type.Kind.VOID)) { - throw new IllegalStateException( - String.format("Route handler business method must return void [method: %s, bean: %s]", method, bean)); + if (method.returnType().name().equals(DOTNAME_UNI)) { + List types = method.returnType().asParameterizedType().arguments(); + if (types.isEmpty()) { + throw new IllegalStateException( + String.format( + "Route handler business returning a `Uni` must have a generic parameter [method: %s, bean: %s]", + method, bean)); + } } + List params = method.parameters(); boolean hasInvalidParam = true; if (params.size() == 1) { @@ -381,7 +356,7 @@ private void validateRouteMethod(BeanInfo bean, MethodInfo method, DotName[] val } } - private String generateHandler(BeanInfo bean, MethodInfo method, ClassOutput classOutput) { + private String generateHandler(HandlerDescriptor desc, BeanInfo bean, MethodInfo method, ClassOutput classOutput) { String baseName; if (bean.getImplClazz().enclosingClass() != null) { @@ -418,13 +393,14 @@ private String generateHandler(BeanInfo bean, MethodInfo method, ClassOutput cla } implementConstructor(bean, invokerCreator, beanField, contextField, containerField); - implementInvoke(bean, method, invokerCreator, beanField, contextField, containerField); + implementInvoke(desc, bean, method, invokerCreator, beanField, contextField, containerField); invokerCreator.close(); return generatedName.replace('/', '.'); } - void implementConstructor(BeanInfo bean, ClassCreator invokerCreator, FieldCreator beanField, FieldCreator contextField, + void implementConstructor(BeanInfo bean, ClassCreator invokerCreator, FieldCreator beanField, + FieldCreator contextField, FieldCreator containerField) { MethodCreator constructor = invokerCreator.getMethodCreator("", void.class); // Invoke super() @@ -450,9 +426,10 @@ void implementConstructor(BeanInfo bean, ClassCreator invokerCreator, FieldCreat constructor.returnValue(null); } - void implementInvoke(BeanInfo bean, MethodInfo method, ClassCreator invokerCreator, FieldCreator beanField, + void implementInvoke(HandlerDescriptor descriptor, BeanInfo bean, MethodInfo method, ClassCreator invokerCreator, + FieldCreator beanField, FieldCreator contextField, FieldCreator containerField) { - // The descriptor is: void invoke(RoutingContext context) + // The descriptor is: void invoke(RoutingContext rc) MethodCreator invoke = invokerCreator.getMethodCreator("invoke", void.class, RoutingContext.class); ResultHandle beanHandle = invoke.readInstanceField(beanField.getFieldDescriptor(), invoke.getThis()); AssignableResultHandle beanInstanceHandle = invoke.createVariable(Object.class); @@ -496,25 +473,63 @@ void implementInvoke(BeanInfo bean, MethodInfo method, ClassCreator invokerCreat ResultHandle paramHandle; MethodDescriptor methodDescriptor; + String returnType = descriptor.getReturnType().name().toString(); + + // TODO Make Routing Context optional, allow injected Response and Request individually. + ResultHandle rc = invoke.getMethodParam(0); if (method.parameters().get(0).name().equals(ROUTING_CONTEXT)) { - paramHandle = invoke.getMethodParam(0); - methodDescriptor = MethodDescriptor.ofMethod(bean.getImplClazz().name().toString(), method.name(), void.class, - RoutingContext.class); + paramHandle = rc; + methodDescriptor = MethodDescriptor + .ofMethod(bean.getImplClazz().name().toString(), method.name(), returnType, + RoutingContext.class); } else if (method.parameters().get(0).name().equals(RX_ROUTING_CONTEXT)) { paramHandle = invoke.newInstance( - MethodDescriptor.ofConstructor(io.vertx.reactivex.ext.web.RoutingContext.class, RoutingContext.class), - invoke.getMethodParam(0)); - methodDescriptor = MethodDescriptor.ofMethod(bean.getImplClazz().name().toString(), method.name(), void.class, - io.vertx.reactivex.ext.web.RoutingContext.class); + MethodDescriptor + .ofConstructor(io.vertx.reactivex.ext.web.RoutingContext.class, RoutingContext.class), + rc); + methodDescriptor = MethodDescriptor + .ofMethod(bean.getImplClazz().name().toString(), method.name(), returnType, + io.vertx.reactivex.ext.web.RoutingContext.class); } else { - paramHandle = invoke.newInstance(MethodDescriptor.ofConstructor(RoutingExchangeImpl.class, RoutingContext.class), - invoke.getMethodParam(0)); - methodDescriptor = MethodDescriptor.ofMethod(bean.getImplClazz().name().toString(), method.name(), void.class, - RoutingExchange.class); + paramHandle = invoke + .newInstance(MethodDescriptor.ofConstructor(RoutingExchangeImpl.class, RoutingContext.class), + rc); + methodDescriptor = MethodDescriptor + .ofMethod(bean.getImplClazz().name().toString(), method.name(), returnType, + RoutingExchange.class); } // Invoke the business method handler - invoke.invokeVirtualMethod(methodDescriptor, beanInstanceHandle, paramHandle); + ResultHandle res = invoke.invokeVirtualMethod(methodDescriptor, beanInstanceHandle, paramHandle); + + // Get the response: HttpServerResponse response = rc.response() + ResultHandle response = invoke.invokeInterfaceMethod(Methods.RESPONSE, rc); + MethodDescriptor end = Methods.getEndMethodForContentType(descriptor); + if (descriptor.isReturningUni()) { + // The method returns a Uni. + // We subscribe to this Uni and write the provided item in the HTTP response + // If the method returned null, we fail + // If the provided item is null and the method does not return a Uni, we fail + // If the provided item is null, and the method return a Uni, we reply with a 204 - NO CONTENT + // If the provided item is not null, if it's a string or buffer, the response.end method is used to write the response + // If the provided item is not null, and it's an object, the item is mapped to JSON and written into the response + + FunctionCreator successCallback = getUniOnItemCallback(descriptor, invoke, rc, end, response); + FunctionCreator failureCallback = getUniOnFailureCallback(invoke, rc); + + ResultHandle sub = invoke.invokeInterfaceMethod(Methods.SUBSCRIBE, res); + invoke.invokeVirtualMethod(Methods.SUBSCRIBE_WITH, sub, successCallback.getInstance(), + failureCallback.getInstance()); + } else if (descriptor.getContentType() != null) { + // The method returns "something" in a synchronous manner, write it into the response + + // If the method returned null, we fail + // If the method returns string or buffer, the response.end method is used to write the response + // If the method returns an object, the result is mapped to JSON and written into the response + + ResultHandle content = getContentToWrite(descriptor, response, res, invoke); + invoke.invokeInterfaceMethod(end, response, content); + } // Destroy dependent instance afterwards if (BuiltinScope.DEPENDENT.is(bean.getScope())) { @@ -524,6 +539,126 @@ void implementInvoke(BeanInfo bean, MethodInfo method, ClassCreator invokerCreat invoke.returnValue(null); } + /** + * Generates the following function depending on the payload type + * + * If the method returns a {@code Uni} + * + *
+     *     item -> rc.response().setStatusCode(204).end();
+     * 
+ * + * If the method returns a {@code Uni}: + * + *
+     *     item -> {
+     *       if (item != null) {
+     *          Buffer buffer = getBuffer(item); // Manage RX and Mutiny buffer
+     *          rc.response().end(buffer);
+     *       } else {
+     *           rc.fail(new NullPointerException(...);
+     *       }
+     *     }
+     * 
+ * + * If the method returns a {@code Uni} : + * + *
+     *     item -> {
+     *       if (item != null) {
+     *          rc.response().end(item);
+     *       } else {
+     *           rc.fail(new NullPointerException(...);
+     *       }
+     *     }
+     * 
+ * + * If the method returns a {@code Uni} : + * + *
+     *     item -> {
+     *       if (item != null) {
+     *          String json = Json.encode(item);
+     *          rc.response().end(json);
+     *       } else {
+     *           rc.fail(new NullPointerException(...);
+     *       }
+     *     }
+     * 
+ * + * This last version also set the {@code content-type} header to {@code application/json }if not set. + * + * @param descriptor the method descriptor + * @param invoke the main bytecode writer + * @param rc the reference to the routing context variable + * @param end the end method to use + * @param response the reference to the response variable + * @return the function creator + */ + private FunctionCreator getUniOnItemCallback(HandlerDescriptor descriptor, MethodCreator invoke, ResultHandle rc, + MethodDescriptor end, ResultHandle response) { + FunctionCreator callback = invoke.createFunction(Consumer.class); + BytecodeCreator creator = callback.getBytecode(); + if (Methods.isNoContent(descriptor)) { // Uni - so return a 204. + creator.invokeInterfaceMethod(Methods.SET_STATUS, response, creator.load(204)); + creator.invokeInterfaceMethod(Methods.END, response); + } else { + // Check if the item is null + ResultHandle item = creator.getMethodParam(0); + BranchResult isItemNull = creator.ifNull(item); + + BytecodeCreator itemIfNotNull = isItemNull.falseBranch(); + ResultHandle content = getContentToWrite(descriptor, response, item, itemIfNotNull); + itemIfNotNull.invokeInterfaceMethod(end, response, content); + itemIfNotNull.close(); + + BytecodeCreator resultNull = isItemNull.trueBranch(); + ResultHandle npe = Methods.createNpeBecauseItemIfNull(resultNull); + resultNull.invokeInterfaceMethod(Methods.FAIL, rc, npe); + resultNull.close(); + } + Methods.returnAndClose(creator); + return callback; + } + + /** + * Generates the following function: + * + *
+     *     throwable -> rc.fail(throwable);
+     * 
+ * + * @param writer the bytecode writer + * @param rc the reference to the RoutingContext variable + * @return the function creator. + */ + private FunctionCreator getUniOnFailureCallback(MethodCreator writer, ResultHandle rc) { + FunctionCreator callback = writer.createFunction(Consumer.class); + BytecodeCreator creator = callback.getBytecode(); + Methods.fail(creator, rc, creator.getMethodParam(0)); + Methods.returnAndClose(creator); + return callback; + } + + private ResultHandle getContentToWrite(HandlerDescriptor descriptor, ResultHandle response, ResultHandle res, + BytecodeCreator writer) { + if (descriptor.isContentTypeString() || descriptor.isContentTypeBuffer()) { + return res; + } + + if (descriptor.isContentTypeRxBuffer()) { + return writer.invokeVirtualMethod(Methods.RX_GET_DELEGATE, res); + } + + if (descriptor.isContentTypeMutinyBuffer()) { + return writer.invokeVirtualMethod(Methods.MUTINY_GET_DELEGATE, res); + } + + // Encode to Json + Methods.setContentTypeToJson(response, writer); + return writer.invokeStaticMethod(Methods.JSON_ENCODE, res); + } + private static String dashify(String value) { StringBuilder ret = new StringBuilder(); char[] chars = value.toCharArray(); @@ -543,7 +678,8 @@ private void detectConflictingRoutes(Map matchers) { } // First we need to group matchers that could potentially match the same request Set> groups = new HashSet<>(); - for (Iterator> iterator = matchers.entrySet().iterator(); iterator.hasNext();) { + for (Iterator> iterator = matchers.entrySet().iterator(); iterator + .hasNext();) { Entry entry = iterator.next(); LinkedHashSet group = new LinkedHashSet<>(); group.add(entry.getKey()); @@ -603,15 +739,18 @@ static boolean canMatchSameRequest(RouteMatcher m1, RouteMatcher m2) { } } // methods not matching - if (m1.getMethods().length > 0 && m2.getMethods().length > 0 && !Arrays.equals(m1.getMethods(), m2.getMethods())) { + if (m1.getMethods().length > 0 && m2.getMethods().length > 0 && !Arrays + .equals(m1.getMethods(), m2.getMethods())) { return false; } // produces not matching - if (m1.getProduces().length > 0 && m2.getProduces().length > 0 && !Arrays.equals(m1.getProduces(), m2.getProduces())) { + if (m1.getProduces().length > 0 && m2.getProduces().length > 0 && !Arrays + .equals(m1.getProduces(), m2.getProduces())) { return false; } // consumes not matching - if (m1.getConsumes().length > 0 && m2.getConsumes().length > 0 && !Arrays.equals(m1.getConsumes(), m2.getConsumes())) { + if (m1.getConsumes().length > 0 && m2.getConsumes().length > 0 && !Arrays + .equals(m1.getConsumes(), m2.getConsumes())) { return false; } return true; diff --git a/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/MutinyRouteTest.java b/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/MutinyRouteTest.java new file mode 100644 index 0000000000000..b761e73e33e03 --- /dev/null +++ b/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/MutinyRouteTest.java @@ -0,0 +1,197 @@ +package io.quarkus.vertx.web.mutiny; + +import static io.restassured.RestAssured.when; +import static org.hamcrest.Matchers.*; + +import java.io.IOException; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.vertx.web.Route; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.RoutingContext; + +public class MutinyRouteTest { + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class).addClasses(SimpleBean.class)); + + @Test + public void testUni() { + when().get("/hello").then().statusCode(200).body(is("Hello world!")); + when().get("/hello-buffer").then().statusCode(200).body(is("Buffer")); + when().get("/hello-on-pool").then().statusCode(200).body(is("Pool")); + when().get("/hello-rx-buffer").then().statusCode(200).body(is("RX Buffer")); + when().get("/hello-mutiny-buffer").then().statusCode(200).body(is("Mutiny Buffer")); + + when().get("/person").then().statusCode(200) + .body("name", is("neo")) + .body("id", is(12345)) + .header("content-type", "application/json"); + + when().get("/person-content-type-set").then().statusCode(200) + .body("name", is("neo")) + .body("id", is(12345)) + .header("content-type", "application/json;charset=utf-8"); + + when().get("/failure").then().statusCode(500).body(containsString("boom")); + when().get("/sync-failure").then().statusCode(500).body(containsString("boom")); + + when().get("/null").then().statusCode(500).body(containsString("null")); + when().get("/uni-null").then().statusCode(500); + when().get("/void").then().statusCode(204).body(hasLength(0)); + } + + @Test + public void testSync() { + when().get("hello-sync").then().statusCode(200) + .body(is("Sync Hello world")) + .header("content-type", is(nullValue())); + + when().get("hello-buffer-sync").then().statusCode(200).body(is("Sync Buffer")) + .header("content-type", is(nullValue())); + + when().get("hello-buffer-rx-sync").then().statusCode(200).body(is("Sync RX Buffer")) + .header("content-type", is(nullValue())); + + when().get("hello-buffer-mutiny-sync").then().statusCode(200).body(is("Sync Mutiny Buffer")) + .header("content-type", is(nullValue())); + + when().get("/person-sync").then().statusCode(200) + .body("name", is("neo")) + .body("id", is(12345)) + .header("content-type", "application/json"); + + when().get("/person-sync-content-type-set").then().statusCode(200) + .body("name", is("neo")) + .body("id", is(12345)) + .header("content-type", "application/json;charset=utf-8"); + + when().get("/fail-sync") + .then().statusCode(500) + .body(containsString("boom")); + } + + static class SimpleBean { + + @Route(path = "hello") + Uni hello(RoutingContext context) { + return Uni.createFrom().item("Hello world!"); + } + + @Route(path = "hello-sync") + String helloSync(RoutingContext context) { + return "Sync Hello world"; + } + + @Route(path = "hello-buffer-sync") + Buffer helloBufferSync(RoutingContext context) { + return Buffer.buffer("Sync Buffer"); + } + + @Route(path = "hello-buffer-rx-sync") + io.vertx.reactivex.core.buffer.Buffer helloRxBufferSync(RoutingContext context) { + return io.vertx.reactivex.core.buffer.Buffer.buffer("Sync RX Buffer"); + } + + @Route(path = "hello-buffer-mutiny-sync") + io.vertx.mutiny.core.buffer.Buffer helloMutinyBufferSync(RoutingContext context) { + return io.vertx.mutiny.core.buffer.Buffer.buffer("Sync Mutiny Buffer"); + } + + @Route(path = "hello-buffer") + Uni helloWithBuffer(RoutingContext context) { + return Uni.createFrom().item(Buffer.buffer("Buffer")); + } + + @Route(path = "hello-rx-buffer") + Uni helloWithRxBuffer(RoutingContext context) { + return Uni.createFrom().item(io.vertx.reactivex.core.buffer.Buffer.buffer("RX Buffer")); + } + + @Route(path = "hello-mutiny-buffer") + Uni helloWithMutinyBuffer(RoutingContext context) { + return Uni.createFrom().item(io.vertx.mutiny.core.buffer.Buffer.buffer("Mutiny Buffer")); + } + + @Route(path = "hello-on-pool") + Uni helloOnPool(RoutingContext context) { + return Uni.createFrom().item("Pool") + .emitOn(Infrastructure.getDefaultExecutor()); + } + + @Route(path = "failure") + Uni fail(RoutingContext context) { + return Uni.createFrom(). failure(new IOException("boom")) + .emitOn(Infrastructure.getDefaultExecutor()); + } + + @Route(path = "fail-sync") + String failSync(RoutingContext context) { + throw new IllegalStateException("boom"); + } + + @Route(path = "sync-failure") + Uni failUniSync(RoutingContext context) { + throw new IllegalStateException("boom"); + } + + @Route(path = "null") + Uni uniNull(RoutingContext context) { + return null; + } + + @Route(path = "void") + Uni uniOfVoid(RoutingContext context) { + return Uni.createFrom().nullItem(); + } + + @Route(path = "uni-null") + Uni produceNull(RoutingContext context) { + return Uni.createFrom().nullItem(); + } + + @Route(path = "person-sync", produces = "application/json") + Person getPerson(RoutingContext context) { + return new Person("neo", 12345); + } + + @Route(path = "person", produces = "application/json") + Uni getPersonAsUni(RoutingContext context) { + return Uni.createFrom().item(() -> new Person("neo", 12345)).emitOn(Infrastructure.getDefaultExecutor()); + } + + @Route(path = "person-sync-content-type-set", produces = "application/json") + Person getPersonUtf8(RoutingContext context) { + context.response().putHeader("content-type", "application/json;charset=utf-8"); + return new Person("neo", 12345); + } + + @Route(path = "person-content-type-set", produces = "application/json") + Uni getPersonAsUniUtf8(RoutingContext context) { + return Uni.createFrom().item(() -> new Person("neo", 12345)) + .onItem() + .invoke(x -> context.response().putHeader("content-type", "application/json;charset=utf-8")) + .emitOn(Infrastructure.getDefaultExecutor()); + } + + } + + static class Person { + public String name; + public int id; + + public Person(String name, int id) { + this.name = name; + this.id = id; + } + } + +} From 5c748c5f2b6163053033bcc57612981a04166933 Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Mon, 29 Jun 2020 14:08:34 +0200 Subject: [PATCH 2/5] Add support for Multi to Reactive Routes --- .../web/deployment/HandlerDescriptor.java | 12 +- .../quarkus/vertx/web/deployment/Methods.java | 21 +- .../web/deployment/VertxWebProcessor.java | 28 ++- .../vertx/web/mutiny/MultiRouteTest.java | 176 +++++++++++++++ .../vertx/web/mutiny/SyncRouteTest.java | 102 +++++++++ ...MutinyRouteTest.java => UniRouteTest.java} | 70 +----- .../vertx/web/runtime/MultiSupport.java | 208 ++++++++++++++++++ 7 files changed, 542 insertions(+), 75 deletions(-) create mode 100644 extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/MultiRouteTest.java create mode 100644 extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/SyncRouteTest.java rename extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/{MutinyRouteTest.java => UniRouteTest.java} (64%) create mode 100644 extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSupport.java diff --git a/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/HandlerDescriptor.java b/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/HandlerDescriptor.java index 09f3821a6c769..ada5335530075 100644 --- a/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/HandlerDescriptor.java +++ b/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/HandlerDescriptor.java @@ -4,6 +4,7 @@ import org.jboss.jandex.MethodInfo; import org.jboss.jandex.Type; +import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.vertx.core.buffer.Buffer; @@ -12,6 +13,8 @@ */ public class HandlerDescriptor { + private static final DotName DOT_NAME_UNI = DotName.createSimple(Uni.class.getName()); + private static final DotName DOT_NAME_MULTI = DotName.createSimple(Multi.class.getName()); private final MethodInfo method; public HandlerDescriptor(MethodInfo method) { @@ -27,7 +30,11 @@ public boolean isReturningVoid() { } public boolean isReturningUni() { - return method.returnType().name().equals(DotName.createSimple(Uni.class.getName())); + return method.returnType().name().equals(DOT_NAME_UNI); + } + + public boolean isReturningMulti() { + return method.returnType().name().equals(DOT_NAME_MULTI); } public Type getContentType() { @@ -37,6 +44,9 @@ public Type getContentType() { if (isReturningUni()) { return getReturnType().asParameterizedType().arguments().get(0); } + if (isReturningMulti()) { + return getReturnType().asParameterizedType().arguments().get(0); + } return getReturnType(); } diff --git a/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/Methods.java b/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/Methods.java index 5e518f4a627c7..4b7654c28e305 100644 --- a/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/Methods.java +++ b/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/Methods.java @@ -7,6 +7,8 @@ import io.quarkus.gizmo.BytecodeCreator; import io.quarkus.gizmo.MethodDescriptor; import io.quarkus.gizmo.ResultHandle; +import io.quarkus.vertx.web.runtime.MultiSupport; +import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.groups.UniSubscribe; import io.smallrye.mutiny.subscription.Cancellable; @@ -30,12 +32,25 @@ public class Methods { static final MethodDescriptor FAIL = MethodDescriptor .ofMethod(RoutingContext.class, "fail", Void.TYPE, Throwable.class); - static final MethodDescriptor SUBSCRIBE = MethodDescriptor.ofMethod(Uni.class, "subscribe", UniSubscribe.class); - static final MethodDescriptor SUBSCRIBE_WITH = MethodDescriptor + static final MethodDescriptor UNI_SUBSCRIBE = MethodDescriptor.ofMethod(Uni.class, "subscribe", UniSubscribe.class); + static final MethodDescriptor UNI_SUBSCRIBE_WITH = MethodDescriptor .ofMethod(UniSubscribe.class, "with", Cancellable.class, Consumer.class, Consumer.class); - static final MethodDescriptor END = MethodDescriptor.ofMethod(HttpServerResponse.class, "end", Void.TYPE); + static final MethodDescriptor MULTI_SUBSCRIBE_VOID = MethodDescriptor.ofMethod(MultiSupport.class, "subscribeVoid", + Void.TYPE, Multi.class, RoutingContext.class); + static final MethodDescriptor MULTI_SUBSCRIBE_STRING = MethodDescriptor.ofMethod(MultiSupport.class, "subscribeString", + Void.TYPE, Multi.class, RoutingContext.class); + static final MethodDescriptor MULTI_SUBSCRIBE_BUFFER = MethodDescriptor.ofMethod(MultiSupport.class, "subscribeBuffer", + Void.TYPE, Multi.class, RoutingContext.class); + static final MethodDescriptor MULTI_SUBSCRIBE_RX_BUFFER = MethodDescriptor.ofMethod(MultiSupport.class, "subscribeRxBuffer", + Void.TYPE, Multi.class, RoutingContext.class); + static final MethodDescriptor MULTI_SUBSCRIBE_MUTINY_BUFFER = MethodDescriptor.ofMethod(MultiSupport.class, + "subscribeMutinyBuffer", Void.TYPE, Multi.class, RoutingContext.class); + static final MethodDescriptor MULTI_SUBSCRIBE_OBJECT = MethodDescriptor.ofMethod(MultiSupport.class, "subscribeObject", + Void.TYPE, Multi.class, RoutingContext.class); + + static final MethodDescriptor END = MethodDescriptor.ofMethod(HttpServerResponse.class, "end", Void.TYPE); static final MethodDescriptor END_WITH_STRING = MethodDescriptor .ofMethod(HttpServerResponse.class, "end", Void.TYPE, String.class); static final MethodDescriptor END_WITH_BUFFER = MethodDescriptor diff --git a/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/VertxWebProcessor.java b/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/VertxWebProcessor.java index 4db4e9e0213b1..acd094c74e981 100644 --- a/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/VertxWebProcessor.java +++ b/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/VertxWebProcessor.java @@ -517,16 +517,38 @@ void implementInvoke(HandlerDescriptor descriptor, BeanInfo bean, MethodInfo met FunctionCreator successCallback = getUniOnItemCallback(descriptor, invoke, rc, end, response); FunctionCreator failureCallback = getUniOnFailureCallback(invoke, rc); - ResultHandle sub = invoke.invokeInterfaceMethod(Methods.SUBSCRIBE, res); - invoke.invokeVirtualMethod(Methods.SUBSCRIBE_WITH, sub, successCallback.getInstance(), + ResultHandle sub = invoke.invokeInterfaceMethod(Methods.UNI_SUBSCRIBE, res); + invoke.invokeVirtualMethod(Methods.UNI_SUBSCRIBE_WITH, sub, successCallback.getInstance(), failureCallback.getInstance()); + } else if (descriptor.isReturningMulti()) { + // The method returns a Multi. + // We subscribe to this Multi and write the provided items (one by one) in the HTTP response. + // On completion, we "end" the response + // If the method returned null, we fail + // If the provided item is null we fail + // If the multi is empty, and the method return a Multi, we reply with a 204 - NO CONTENT + // If the produce item is a string or buffer, the response.write method is used to write the response + // If the produce item is an object, the item is mapped to JSON and written into the response. The response is a JSON array. + + if (Methods.isNoContent(descriptor)) { // Multi - so return a 204. + invoke.invokeStaticMethod(Methods.MULTI_SUBSCRIBE_VOID, res, rc); + } else if (descriptor.isContentTypeBuffer()) { + invoke.invokeStaticMethod(Methods.MULTI_SUBSCRIBE_BUFFER, res, rc); + } else if (descriptor.isContentTypeMutinyBuffer()) { + invoke.invokeStaticMethod(Methods.MULTI_SUBSCRIBE_MUTINY_BUFFER, res, rc); + } else if (descriptor.isContentTypeRxBuffer()) { + invoke.invokeStaticMethod(Methods.MULTI_SUBSCRIBE_RX_BUFFER, res, rc); + } else if (descriptor.isContentTypeString()) { + invoke.invokeStaticMethod(Methods.MULTI_SUBSCRIBE_STRING, res, rc); + } else { // Multi - encode to json. + invoke.invokeStaticMethod(Methods.MULTI_SUBSCRIBE_OBJECT, res, rc); + } } else if (descriptor.getContentType() != null) { // The method returns "something" in a synchronous manner, write it into the response // If the method returned null, we fail // If the method returns string or buffer, the response.end method is used to write the response // If the method returns an object, the result is mapped to JSON and written into the response - ResultHandle content = getContentToWrite(descriptor, response, res, invoke); invoke.invokeInterfaceMethod(end, response, content); } diff --git a/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/MultiRouteTest.java b/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/MultiRouteTest.java new file mode 100644 index 0000000000000..72c07060ce719 --- /dev/null +++ b/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/MultiRouteTest.java @@ -0,0 +1,176 @@ +package io.quarkus.vertx.web.mutiny; + +import static io.restassured.RestAssured.when; +import static org.hamcrest.Matchers.*; + +import java.io.IOException; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.vertx.web.Route; +import io.smallrye.mutiny.Multi; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.RoutingContext; + +public class MultiRouteTest { + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class).addClasses(SimpleBean.class)); + + @Test + public void testMultiRoute() { + when().get("/hello").then().statusCode(200) + .body(is("[\"Hello world!\"]")) + .header("content-type", "application/json"); + when().get("/hellos").then().statusCode(200) + .body(is("[\"hello\",\"world\",\"!\"]")) + .header("content-type", "application/json"); + when().get("/no-hello").then().statusCode(200).body(is("[]")) + .header("content-type", "application/json"); + // status already sent, but not the end of the array + when().get("/hello-and-fail").then().statusCode(200) + .body(containsString("[\"Hello\"")) + .body(not(containsString("]"))); + + when().get("/buffer").then().statusCode(200).body(is("Buffer")) + .header("content-type", is(nullValue())); + when().get("/buffers").then().statusCode(200).body(is("Buffer Buffer Buffer.")); + when().get("/buffers-and-fail").then().statusCode(200).body(containsString("Buffer")); + + when().get("/rx-buffer").then().statusCode(200).body(is("BufferBuffer")); + when().get("/mutiny-buffer").then().statusCode(200).body(is("BufferBuffer")); + + when().get("/void").then().statusCode(204).body(hasLength(0)); + + when().get("/people").then().statusCode(200) + .body("size()", is(3)) + .body("[0].name", is("superman")) + .body("[1].name", is("batman")) + .body("[2].name", is("spiderman")) + .header("content-type", "application/json"); + + when().get("/people-content-type").then().statusCode(200) + .body("size()", is(3)) + .body("[0].name", is("superman")) + .body("[1].name", is("batman")) + .body("[2].name", is("spiderman")) + .header("content-type", "application/json;charset=utf-8"); + + when().get("/failure").then().statusCode(500).body(containsString("boom")); + when().get("/null").then().statusCode(500).body(containsString("null")); + when().get("/sync-failure").then().statusCode(500).body(containsString("null")); + + } + + static class SimpleBean { + + @Route(path = "hello") + Multi hello(RoutingContext context) { + return Multi.createFrom().item("Hello world!"); + } + + @Route(path = "hellos") + Multi hellos(RoutingContext context) { + return Multi.createFrom().items("hello", "world", "!"); + } + + @Route(path = "no-hello") + Multi noHello(RoutingContext context) { + return Multi.createFrom().empty(); + } + + @Route(path = "hello-and-fail") + Multi helloAndFail(RoutingContext context) { + return Multi.createBy().concatenating().streams( + Multi.createFrom().item("Hello"), + Multi.createFrom().failure(new IOException("boom"))); + } + + @Route(path = "buffer") + Multi buffer(RoutingContext context) { + return Multi.createFrom().item(Buffer.buffer("Buffer")); + } + + @Route(path = "buffers") + Multi buffers(RoutingContext context) { + return Multi.createFrom() + .items(Buffer.buffer("Buffer"), Buffer.buffer(" Buffer"), Buffer.buffer(" Buffer.")); + } + + @Route(path = "buffers-and-fail") + Multi buffersAndFail(RoutingContext context) { + return Multi.createBy().concatenating().collectFailures().streams( + Multi.createFrom().items(Buffer.buffer("Buffer"), Buffer.buffer(" Buffer"), + Buffer.buffer(" Buffer.")), + Multi.createFrom().failure(new IOException("boom"))); + + } + + @Route(path = "rx-buffer") + Multi bufferRx(RoutingContext context) { + return Multi.createFrom().items(io.vertx.reactivex.core.buffer.Buffer.buffer("Buffer"), + io.vertx.reactivex.core.buffer.Buffer.buffer("Buffer")); + } + + @Route(path = "mutiny-buffer") + Multi bufferMutiny(RoutingContext context) { + return Multi.createFrom().items(io.vertx.mutiny.core.buffer.Buffer.buffer("Buffer"), + io.vertx.mutiny.core.buffer.Buffer.buffer("Buffer")); + } + + @Route(path = "void") + Multi multiVoid(RoutingContext context) { + return Multi.createFrom().range(0, 200) + .onItem().ignore(); + } + + @Route(path = "/people") + Multi people(RoutingContext context) { + return Multi.createFrom().items( + new Person("superman", 1), + new Person("batman", 2), + new Person("spiderman", 3)); + } + + @Route(path = "/people-content-type") + Multi peopleWithContentType(RoutingContext context) { + context.response().putHeader("content-type", "application/json;charset=utf-8"); + return Multi.createFrom().items( + new Person("superman", 1), + new Person("batman", 2), + new Person("spiderman", 3)); + } + + @Route(path = "/failure") + Multi fail(RoutingContext context) { + return Multi.createFrom().failure(new IOException("boom")); + } + + @Route(path = "/sync-failure") + Multi failSync(RoutingContext context) { + throw new IllegalStateException("boom"); + } + + @Route(path = "/null") + Multi uniNull(RoutingContext context) { + return null; + } + + } + + static class Person { + public String name; + public int id; + + public Person(String name, int id) { + this.name = name; + this.id = id; + } + } + +} diff --git a/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/SyncRouteTest.java b/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/SyncRouteTest.java new file mode 100644 index 0000000000000..f28ae96e75aa6 --- /dev/null +++ b/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/SyncRouteTest.java @@ -0,0 +1,102 @@ +package io.quarkus.vertx.web.mutiny; + +import static io.restassured.RestAssured.when; +import static org.hamcrest.Matchers.*; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.vertx.web.Route; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.RoutingContext; + +public class SyncRouteTest { + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class).addClasses(SimpleBean.class)); + + @Test + public void testSynchronousRoute() { + when().get("hello-sync").then().statusCode(200) + .body(is("Sync Hello world")) + .header("content-type", is(nullValue())); + + when().get("hello-buffer-sync").then().statusCode(200).body(is("Sync Buffer")) + .header("content-type", is(nullValue())); + + when().get("hello-buffer-rx-sync").then().statusCode(200).body(is("Sync RX Buffer")) + .header("content-type", is(nullValue())); + + when().get("hello-buffer-mutiny-sync").then().statusCode(200).body(is("Sync Mutiny Buffer")) + .header("content-type", is(nullValue())); + + when().get("/person-sync").then().statusCode(200) + .body("name", is("neo")) + .body("id", is(12345)) + .header("content-type", "application/json"); + + when().get("/person-sync-content-type-set").then().statusCode(200) + .body("name", is("neo")) + .body("id", is(12345)) + .header("content-type", "application/json;charset=utf-8"); + + when().get("/fail-sync") + .then().statusCode(500) + .body(containsString("boom")); + } + + static class SimpleBean { + + @Route(path = "hello-sync") + String helloSync(RoutingContext context) { + return "Sync Hello world"; + } + + @Route(path = "hello-buffer-sync") + Buffer helloBufferSync(RoutingContext context) { + return Buffer.buffer("Sync Buffer"); + } + + @Route(path = "hello-buffer-rx-sync") + io.vertx.reactivex.core.buffer.Buffer helloRxBufferSync(RoutingContext context) { + return io.vertx.reactivex.core.buffer.Buffer.buffer("Sync RX Buffer"); + } + + @Route(path = "hello-buffer-mutiny-sync") + io.vertx.mutiny.core.buffer.Buffer helloMutinyBufferSync(RoutingContext context) { + return io.vertx.mutiny.core.buffer.Buffer.buffer("Sync Mutiny Buffer"); + } + + @Route(path = "fail-sync") + String failSync(RoutingContext context) { + throw new IllegalStateException("boom"); + } + + @Route(path = "person-sync", produces = "application/json") + Person getPerson(RoutingContext context) { + return new Person("neo", 12345); + } + + @Route(path = "person-sync-content-type-set", produces = "application/json") + Person getPersonUtf8(RoutingContext context) { + context.response().putHeader("content-type", "application/json;charset=utf-8"); + return new Person("neo", 12345); + } + + } + + static class Person { + public String name; + public int id; + + public Person(String name, int id) { + this.name = name; + this.id = id; + } + } + +} diff --git a/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/MutinyRouteTest.java b/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/UniRouteTest.java similarity index 64% rename from extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/MutinyRouteTest.java rename to extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/UniRouteTest.java index b761e73e33e03..11f3377ec92e9 100644 --- a/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/MutinyRouteTest.java +++ b/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/UniRouteTest.java @@ -17,14 +17,14 @@ import io.vertx.core.buffer.Buffer; import io.vertx.ext.web.RoutingContext; -public class MutinyRouteTest { +public class UniRouteTest { @RegisterExtension static final QuarkusUnitTest config = new QuarkusUnitTest() .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class).addClasses(SimpleBean.class)); @Test - public void testUni() { + public void testUniRoute() { when().get("/hello").then().statusCode(200).body(is("Hello world!")); when().get("/hello-buffer").then().statusCode(200).body(is("Buffer")); when().get("/hello-on-pool").then().statusCode(200).body(is("Pool")); @@ -49,36 +49,6 @@ public void testUni() { when().get("/void").then().statusCode(204).body(hasLength(0)); } - @Test - public void testSync() { - when().get("hello-sync").then().statusCode(200) - .body(is("Sync Hello world")) - .header("content-type", is(nullValue())); - - when().get("hello-buffer-sync").then().statusCode(200).body(is("Sync Buffer")) - .header("content-type", is(nullValue())); - - when().get("hello-buffer-rx-sync").then().statusCode(200).body(is("Sync RX Buffer")) - .header("content-type", is(nullValue())); - - when().get("hello-buffer-mutiny-sync").then().statusCode(200).body(is("Sync Mutiny Buffer")) - .header("content-type", is(nullValue())); - - when().get("/person-sync").then().statusCode(200) - .body("name", is("neo")) - .body("id", is(12345)) - .header("content-type", "application/json"); - - when().get("/person-sync-content-type-set").then().statusCode(200) - .body("name", is("neo")) - .body("id", is(12345)) - .header("content-type", "application/json;charset=utf-8"); - - when().get("/fail-sync") - .then().statusCode(500) - .body(containsString("boom")); - } - static class SimpleBean { @Route(path = "hello") @@ -86,26 +56,6 @@ Uni hello(RoutingContext context) { return Uni.createFrom().item("Hello world!"); } - @Route(path = "hello-sync") - String helloSync(RoutingContext context) { - return "Sync Hello world"; - } - - @Route(path = "hello-buffer-sync") - Buffer helloBufferSync(RoutingContext context) { - return Buffer.buffer("Sync Buffer"); - } - - @Route(path = "hello-buffer-rx-sync") - io.vertx.reactivex.core.buffer.Buffer helloRxBufferSync(RoutingContext context) { - return io.vertx.reactivex.core.buffer.Buffer.buffer("Sync RX Buffer"); - } - - @Route(path = "hello-buffer-mutiny-sync") - io.vertx.mutiny.core.buffer.Buffer helloMutinyBufferSync(RoutingContext context) { - return io.vertx.mutiny.core.buffer.Buffer.buffer("Sync Mutiny Buffer"); - } - @Route(path = "hello-buffer") Uni helloWithBuffer(RoutingContext context) { return Uni.createFrom().item(Buffer.buffer("Buffer")); @@ -133,11 +83,6 @@ Uni fail(RoutingContext context) { .emitOn(Infrastructure.getDefaultExecutor()); } - @Route(path = "fail-sync") - String failSync(RoutingContext context) { - throw new IllegalStateException("boom"); - } - @Route(path = "sync-failure") Uni failUniSync(RoutingContext context) { throw new IllegalStateException("boom"); @@ -158,22 +103,11 @@ Uni produceNull(RoutingContext context) { return Uni.createFrom().nullItem(); } - @Route(path = "person-sync", produces = "application/json") - Person getPerson(RoutingContext context) { - return new Person("neo", 12345); - } - @Route(path = "person", produces = "application/json") Uni getPersonAsUni(RoutingContext context) { return Uni.createFrom().item(() -> new Person("neo", 12345)).emitOn(Infrastructure.getDefaultExecutor()); } - @Route(path = "person-sync-content-type-set", produces = "application/json") - Person getPersonUtf8(RoutingContext context) { - context.response().putHeader("content-type", "application/json;charset=utf-8"); - return new Person("neo", 12345); - } - @Route(path = "person-content-type-set", produces = "application/json") Uni getPersonAsUniUtf8(RoutingContext context) { return Uni.createFrom().item(() -> new Person("neo", 12345)) diff --git a/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSupport.java b/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSupport.java new file mode 100644 index 0000000000000..9f7c5c7107c7b --- /dev/null +++ b/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSupport.java @@ -0,0 +1,208 @@ +package io.quarkus.vertx.web.runtime; + +import java.util.function.Consumer; +import java.util.function.Function; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import io.smallrye.mutiny.Multi; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.json.Json; +import io.vertx.ext.web.RoutingContext; + +@SuppressWarnings("ReactiveStreamsSubscriberImplementation") +public class MultiSupport { + + private MultiSupport() { + // Avoid direct instantiation. + } + + public static void subscribeVoid(Multi multi, RoutingContext rc) { + HttpServerResponse response = rc.response(); + multi.subscribe().with( + new Consumer() { + @Override + public void accept(Void item) { + // do nothing + } + }, + new Consumer() { + @Override + public void accept(Throwable failure) { + rc.fail(failure); + } + }, + new Runnable() { + @Override + public void run() { + response.setStatusCode(204).end(); + } + }); + } + + public static void subscribeString(Multi multi, RoutingContext rc) { + HttpServerResponse response = rc.response(); + multi.subscribe().withSubscriber(new Subscriber() { + Subscription upstream; + + @Override + public void onSubscribe(Subscription subscription) { + this.upstream = subscription; + this.upstream.request(1); + } + + @Override + public void onNext(String item) { + String toBeWritten; + if (response.bytesWritten() == 0) { + response.setChunked(true); + MultiMap headers = response.headers(); + if (headers.get("content-type") == null) { + headers.set("content-type", "application/json"); + } + toBeWritten = "[\"" + item + "\""; + } else { + toBeWritten = ",\"" + item + "\""; + } + response.write(toBeWritten, new Handler>() { + @Override + public void handle(AsyncResult ar) { + onWriteDone(upstream, ar, rc); + } + }); + } + + @Override + public void onError(Throwable throwable) { + rc.fail(throwable); + } + + @Override + public void onComplete() { + completeJsonArray(response); + } + }); + } + + private static void onWriteDone(Subscription subscription, AsyncResult ar, RoutingContext rc) { + if (ar.failed()) { + rc.fail(ar.cause()); + } else { + subscription.request(1); + } + } + + public static void subscribeBuffer(Multi multi, RoutingContext rc) { + HttpServerResponse response = rc.response(); + multi.subscribe().withSubscriber(new Subscriber() { + Subscription upstream; + + @Override + public void onSubscribe(Subscription subscription) { + this.upstream = subscription; + response.setChunked(true); + this.upstream.request(1); + } + + @Override + public void onNext(Buffer item) { + response.write(item, new Handler>() { + @Override + public void handle(AsyncResult ar) { + onWriteDone(upstream, ar, rc); + } + }); + } + + @Override + public void onError(Throwable throwable) { + rc.fail(throwable); + } + + @Override + public void onComplete() { + response.end(); + } + }); + } + + public static void subscribeMutinyBuffer(Multi multi, RoutingContext rc) { + subscribeBuffer(multi.map(new Function() { + @Override + public Buffer apply(io.vertx.mutiny.core.buffer.Buffer b) { + return b.getDelegate(); + } + }), rc); + } + + public static void subscribeRxBuffer(Multi multi, RoutingContext rc) { + subscribeBuffer(multi.map(new Function() { + @Override + public Buffer apply(io.vertx.reactivex.core.buffer.Buffer b) { + return b.getDelegate(); + } + }), rc); + } + + public static void subscribeObject(Multi multi, RoutingContext rc) { + HttpServerResponse response = rc.response(); + multi.subscribe().withSubscriber(new Subscriber() { + Subscription upstream; + + @Override + public void onSubscribe(Subscription subscription) { + this.upstream = subscription; + this.upstream.request(1); + } + + @Override + public void onNext(Object item) { + String toBeWritten; + if (response.bytesWritten() == 0) { + response.setChunked(true); + MultiMap headers = response.headers(); + if (headers.get("content-type") == null) { + headers.set("content-type", "application/json"); + } + toBeWritten = "[" + Json.encodeToBuffer(item); + } else { + toBeWritten = "," + Json.encodeToBuffer(item); + } + response.write(toBeWritten, new Handler>() { + @Override + public void handle(AsyncResult ar) { + onWriteDone(upstream, ar, rc); + } + }); + } + + @Override + public void onError(Throwable throwable) { + rc.fail(throwable); + } + + @Override + public void onComplete() { + completeJsonArray(response); + } + }); + } + + private static void completeJsonArray(HttpServerResponse response) { + if (response.bytesWritten() == 0) { // No item + MultiMap headers = response.headers(); + if (headers.get("content-type") == null) { + headers.set("content-type", "application/json"); + } + response.end("[]"); + } else { + response.end("]"); + } + } + +} From 91e4741765370c374a37c99e9192b6b05cf1ca03 Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Tue, 30 Jun 2020 11:21:20 +0200 Subject: [PATCH 3/5] Add support for SSE response to Reactive Routes --- .../quarkus/vertx/web/deployment/Methods.java | 19 ++ .../web/deployment/VertxWebProcessor.java | 85 +++-- .../vertx/web/mutiny/SSEMultiRouteTest.java | 306 ++++++++++++++++++ .../io/quarkus/vertx/web/ReactiveRoutes.java | 93 ++++++ .../vertx/web/runtime/MultiSseSupport.java | 173 ++++++++++ .../quarkus/vertx/web/runtime/SSEMulti.java | 26 ++ 6 files changed, 680 insertions(+), 22 deletions(-) create mode 100644 extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/SSEMultiRouteTest.java create mode 100644 extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/ReactiveRoutes.java create mode 100644 extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSseSupport.java create mode 100644 extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/SSEMulti.java diff --git a/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/Methods.java b/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/Methods.java index 4b7654c28e305..d248c543a047c 100644 --- a/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/Methods.java +++ b/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/Methods.java @@ -7,6 +7,7 @@ import io.quarkus.gizmo.BytecodeCreator; import io.quarkus.gizmo.MethodDescriptor; import io.quarkus.gizmo.ResultHandle; +import io.quarkus.vertx.web.runtime.MultiSseSupport; import io.quarkus.vertx.web.runtime.MultiSupport; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; @@ -50,6 +51,24 @@ public class Methods { static final MethodDescriptor MULTI_SUBSCRIBE_OBJECT = MethodDescriptor.ofMethod(MultiSupport.class, "subscribeObject", Void.TYPE, Multi.class, RoutingContext.class); + static final MethodDescriptor IS_SSE = MethodDescriptor.ofMethod(MultiSseSupport.class, "isSSE", Boolean.TYPE, Multi.class); + static final MethodDescriptor MULTI_SSE_SUBSCRIBE_VOID = MethodDescriptor.ofMethod(MultiSseSupport.class, "subscribeVoid", + Void.TYPE, Multi.class, RoutingContext.class); + static final MethodDescriptor MULTI_SSE_SUBSCRIBE_STRING = MethodDescriptor.ofMethod(MultiSseSupport.class, + "subscribeString", + Void.TYPE, Multi.class, RoutingContext.class); + static final MethodDescriptor MULTI_SSE_SUBSCRIBE_BUFFER = MethodDescriptor.ofMethod(MultiSseSupport.class, + "subscribeBuffer", + Void.TYPE, Multi.class, RoutingContext.class); + static final MethodDescriptor MULTI_SSE_SUBSCRIBE_RX_BUFFER = MethodDescriptor.ofMethod(MultiSseSupport.class, + "subscribeRxBuffer", + Void.TYPE, Multi.class, RoutingContext.class); + static final MethodDescriptor MULTI_SSE_SUBSCRIBE_MUTINY_BUFFER = MethodDescriptor.ofMethod(MultiSseSupport.class, + "subscribeMutinyBuffer", Void.TYPE, Multi.class, RoutingContext.class); + static final MethodDescriptor MULTI_SSE_SUBSCRIBE_OBJECT = MethodDescriptor.ofMethod(MultiSseSupport.class, + "subscribeObject", + Void.TYPE, Multi.class, RoutingContext.class); + static final MethodDescriptor END = MethodDescriptor.ofMethod(HttpServerResponse.class, "end", Void.TYPE); static final MethodDescriptor END_WITH_STRING = MethodDescriptor .ofMethod(HttpServerResponse.class, "end", Void.TYPE, String.class); diff --git a/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/VertxWebProcessor.java b/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/VertxWebProcessor.java index acd094c74e981..bbed6893afd94 100644 --- a/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/VertxWebProcessor.java +++ b/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/VertxWebProcessor.java @@ -521,28 +521,17 @@ void implementInvoke(HandlerDescriptor descriptor, BeanInfo bean, MethodInfo met invoke.invokeVirtualMethod(Methods.UNI_SUBSCRIBE_WITH, sub, successCallback.getInstance(), failureCallback.getInstance()); } else if (descriptor.isReturningMulti()) { - // The method returns a Multi. - // We subscribe to this Multi and write the provided items (one by one) in the HTTP response. - // On completion, we "end" the response - // If the method returned null, we fail - // If the provided item is null we fail - // If the multi is empty, and the method return a Multi, we reply with a 204 - NO CONTENT - // If the produce item is a string or buffer, the response.write method is used to write the response - // If the produce item is an object, the item is mapped to JSON and written into the response. The response is a JSON array. - - if (Methods.isNoContent(descriptor)) { // Multi - so return a 204. - invoke.invokeStaticMethod(Methods.MULTI_SUBSCRIBE_VOID, res, rc); - } else if (descriptor.isContentTypeBuffer()) { - invoke.invokeStaticMethod(Methods.MULTI_SUBSCRIBE_BUFFER, res, rc); - } else if (descriptor.isContentTypeMutinyBuffer()) { - invoke.invokeStaticMethod(Methods.MULTI_SUBSCRIBE_MUTINY_BUFFER, res, rc); - } else if (descriptor.isContentTypeRxBuffer()) { - invoke.invokeStaticMethod(Methods.MULTI_SUBSCRIBE_RX_BUFFER, res, rc); - } else if (descriptor.isContentTypeString()) { - invoke.invokeStaticMethod(Methods.MULTI_SUBSCRIBE_STRING, res, rc); - } else { // Multi - encode to json. - invoke.invokeStaticMethod(Methods.MULTI_SUBSCRIBE_OBJECT, res, rc); - } + + // 2 cases - regular multi vs. sse multi, we need to check the type. + BranchResult branches = invoke.ifTrue(invoke.invokeStaticMethod(Methods.IS_SSE, res)); + BytecodeCreator isSSE = branches.trueBranch(); + handleSSEMulti(descriptor, isSSE, rc, res); + isSSE.close(); + + BytecodeCreator isRegular = branches.falseBranch(); + handleRegularMulti(descriptor, isRegular, rc, res); + isRegular.close(); + } else if (descriptor.getContentType() != null) { // The method returns "something" in a synchronous manner, write it into the response @@ -561,6 +550,58 @@ void implementInvoke(HandlerDescriptor descriptor, BeanInfo bean, MethodInfo met invoke.returnValue(null); } + private void handleRegularMulti(HandlerDescriptor descriptor, BytecodeCreator writer, ResultHandle rc, + ResultHandle res) { + // The method returns a Multi. + // We subscribe to this Multi and write the provided items (one by one) in the HTTP response. + // On completion, we "end" the response + // If the method returned null, we fail + // If the provided item is null we fail + // If the multi is empty, and the method return a Multi, we reply with a 204 - NO CONTENT + // If the produce item is a string or buffer, the response.write method is used to write the response + // If the produce item is an object, the item is mapped to JSON and written into the response. The response is a JSON array. + + if (Methods.isNoContent(descriptor)) { // Multi - so return a 204. + writer.invokeStaticMethod(Methods.MULTI_SUBSCRIBE_VOID, res, rc); + } else if (descriptor.isContentTypeBuffer()) { + writer.invokeStaticMethod(Methods.MULTI_SUBSCRIBE_BUFFER, res, rc); + } else if (descriptor.isContentTypeMutinyBuffer()) { + writer.invokeStaticMethod(Methods.MULTI_SUBSCRIBE_MUTINY_BUFFER, res, rc); + } else if (descriptor.isContentTypeRxBuffer()) { + writer.invokeStaticMethod(Methods.MULTI_SUBSCRIBE_RX_BUFFER, res, rc); + } else if (descriptor.isContentTypeString()) { + writer.invokeStaticMethod(Methods.MULTI_SUBSCRIBE_STRING, res, rc); + } else { // Multi - encode to json. + writer.invokeStaticMethod(Methods.MULTI_SUBSCRIBE_OBJECT, res, rc); + } + } + + private void handleSSEMulti(HandlerDescriptor descriptor, BytecodeCreator writer, ResultHandle rc, + ResultHandle res) { + // The method returns a Multi that needs to be written as server-sent event. + // We subscribe to this Multi and write the provided items (one by one) in the HTTP response. + // On completion, we "end" the response + // If the method returned null, we fail + // If the provided item is null we fail + // If the multi is empty, and the method return a Multi, we reply with a 204 - NO CONTENT (as regular) + // If the produce item is a string or buffer, the response.write method is used to write the events in the response + // If the produce item is an object, the item is mapped to JSON and included in the `data` section of the event. + + if (Methods.isNoContent(descriptor)) { // Multi - so return a 204. + writer.invokeStaticMethod(Methods.MULTI_SUBSCRIBE_VOID, res, rc); + } else if (descriptor.isContentTypeBuffer()) { + writer.invokeStaticMethod(Methods.MULTI_SSE_SUBSCRIBE_BUFFER, res, rc); + } else if (descriptor.isContentTypeMutinyBuffer()) { + writer.invokeStaticMethod(Methods.MULTI_SSE_SUBSCRIBE_MUTINY_BUFFER, res, rc); + } else if (descriptor.isContentTypeRxBuffer()) { + writer.invokeStaticMethod(Methods.MULTI_SSE_SUBSCRIBE_RX_BUFFER, res, rc); + } else if (descriptor.isContentTypeString()) { + writer.invokeStaticMethod(Methods.MULTI_SSE_SUBSCRIBE_STRING, res, rc); + } else { // Multi - encode to json. + writer.invokeStaticMethod(Methods.MULTI_SSE_SUBSCRIBE_OBJECT, res, rc); + } + } + /** * Generates the following function depending on the payload type * diff --git a/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/SSEMultiRouteTest.java b/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/SSEMultiRouteTest.java new file mode 100644 index 0000000000000..ffedf65a83c06 --- /dev/null +++ b/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/SSEMultiRouteTest.java @@ -0,0 +1,306 @@ +package io.quarkus.vertx.web.mutiny; + +import static io.restassured.RestAssured.when; +import static org.hamcrest.Matchers.*; + +import java.io.IOException; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.vertx.web.ReactiveRoutes; +import io.quarkus.vertx.web.Route; +import io.smallrye.mutiny.Multi; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.RoutingContext; + +public class SSEMultiRouteTest { + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class).addClasses(SimpleBean.class)); + + @Test + public void testSSEMultiRoute() { + when().get("/hello").then().statusCode(200) + .body(is("data: Hello world!\nid: 0\n\n")) + .header("content-type", "text/event-stream"); + + when().get("/hellos").then().statusCode(200) + .body(containsString( + // @formatter:off + "data: hello\nid: 0\n\n" + + "data: world\nid: 1\n\n" + + "data: !\nid: 2\n\n")) + // @formatter:on + .header("content-type", "text/event-stream"); + + when().get("/no-hello").then().statusCode(200).body(hasLength(0)) + .header("content-type", "text/event-stream"); + + // We get the item followed by the exception + when().get("/hello-and-fail").then().statusCode(200) + .body(containsString("id: 0")) + .body(containsString("boom")); + + when().get("/buffer").then().statusCode(200) + .body(is("data: Buffer\nid: 0\n\n")) + .header("content-type", is("text/event-stream")); + + when().get("/buffers").then().statusCode(200) + .body(is("data: Buffer\nid: 0\n\ndata: Buffer\nid: 1\n\ndata: Buffer.\nid: 2\n\n")) + .header("content-type", is("text/event-stream")); + + when().get("/rx-buffer").then().statusCode(200) + .body(is("data: Buffer\nid: 0\n\ndata: RX\nid: 1\n\n")) + .header("content-type", is("text/event-stream")); + + when().get("/mutiny-buffer").then().statusCode(200) + .body(is("data: Buffer\nid: 0\n\ndata: Mutiny\nid: 1\n\n")) + .header("content-type", is("text/event-stream")); + + when().get("/void").then().statusCode(204).body(hasLength(0)); + + when().get("/people").then().statusCode(200) + .body(is( + // @formatter:off + "data: {\"name\":\"superman\",\"id\":1}\nid: 0\n\n" + + "data: {\"name\":\"batman\",\"id\":2}\nid: 1\n\n" + + "data: {\"name\":\"spiderman\",\"id\":3}\nid: 2\n\n")) + // @formatter:on + .header("content-type", is("text/event-stream")); + + when().get("/people-content-type").then().statusCode(200) + .body(is( + // @formatter:off + "data: {\"name\":\"superman\",\"id\":1}\nid: 0\n\n" + + "data: {\"name\":\"batman\",\"id\":2}\nid: 1\n\n" + + "data: {\"name\":\"spiderman\",\"id\":3}\nid: 2\n\n")) + // @formatter:on + .header("content-type", is("text/event-stream;charset=utf-8")); + + when().get("/people-as-event").then().statusCode(200) + .body(is( + // @formatter:off + "event: person\ndata: {\"name\":\"superman\",\"id\":1}\nid: 1\n\n" + + "event: person\ndata: {\"name\":\"batman\",\"id\":2}\nid: 2\n\n" + + "event: person\ndata: {\"name\":\"spiderman\",\"id\":3}\nid: 3\n\n")) + // @formatter:on + .header("content-type", is("text/event-stream")); + + when().get("/people-as-event-without-id").then().statusCode(200) + .body(is( + // @formatter:off + "event: person\ndata: {\"name\":\"superman\",\"id\":1}\nid: 0\n\n" + + "event: person\ndata: {\"name\":\"batman\",\"id\":2}\nid: 1\n\n" + + "event: person\ndata: {\"name\":\"spiderman\",\"id\":3}\nid: 2\n\n")) + // @formatter:on + .header("content-type", is("text/event-stream")); + + when().get("/people-as-event-without-event").then().statusCode(200) + .body(is( + // @formatter:off + "data: {\"name\":\"superman\",\"id\":1}\nid: 1\n\n" + + "data: {\"name\":\"batman\",\"id\":2}\nid: 2\n\n" + + "data: {\"name\":\"spiderman\",\"id\":3}\nid: 3\n\n")) + // @formatter:on + .header("content-type", is("text/event-stream")); + + when().get("/failure").then().statusCode(500).body(containsString("boom")); + when().get("/null").then().statusCode(500).body(containsString("null")); + when().get("/sync-failure").then().statusCode(500).body(containsString("null")); + + } + + static class SimpleBean { + + @Route(path = "hello") + Multi hello(RoutingContext context) { + return ReactiveRoutes.asEventStream(Multi.createFrom().item("Hello world!")); + } + + @Route(path = "hellos") + Multi hellos(RoutingContext context) { + return ReactiveRoutes.asEventStream(Multi.createFrom().items("hello", "world", "!")); + } + + @Route(path = "no-hello") + Multi noHello(RoutingContext context) { + return ReactiveRoutes.asEventStream(Multi.createFrom().empty()); + } + + @Route(path = "hello-and-fail") + Multi helloAndFail(RoutingContext context) { + return ReactiveRoutes.asEventStream(Multi.createBy().concatenating().streams( + Multi.createFrom().item("Hello"), + Multi.createFrom().failure(() -> new IOException("boom")))); + } + + @Route(path = "buffer") + Multi buffer(RoutingContext context) { + return ReactiveRoutes.asEventStream(Multi.createFrom().item(Buffer.buffer("Buffer"))); + } + + @Route(path = "buffers") + Multi buffers(RoutingContext context) { + return ReactiveRoutes.asEventStream(Multi.createFrom() + .items(Buffer.buffer("Buffer"), Buffer.buffer("Buffer"), Buffer.buffer("Buffer."))); + } + + @Route(path = "rx-buffer") + Multi bufferRx(RoutingContext context) { + return ReactiveRoutes + .asEventStream(Multi.createFrom().items(io.vertx.reactivex.core.buffer.Buffer.buffer("Buffer"), + io.vertx.reactivex.core.buffer.Buffer.buffer("RX"))); + } + + @Route(path = "mutiny-buffer") + Multi bufferMutiny(RoutingContext context) { + return ReactiveRoutes + .asEventStream(Multi.createFrom().items(io.vertx.mutiny.core.buffer.Buffer.buffer("Buffer"), + io.vertx.mutiny.core.buffer.Buffer.buffer("Mutiny"))); + } + + @Route(path = "void") + Multi multiVoid(RoutingContext context) { + return ReactiveRoutes.asEventStream(Multi.createFrom().range(0, 200) + .onItem().ignore()); + } + + @Route(path = "/people") + Multi people(RoutingContext context) { + return ReactiveRoutes.asEventStream(Multi.createFrom().items( + new Person("superman", 1), + new Person("batman", 2), + new Person("spiderman", 3))); + } + + @Route(path = "/people-as-event") + Multi peopleAsEvent(RoutingContext context) { + return ReactiveRoutes.asEventStream(Multi.createFrom().items( + new PersonAsEvent("superman", 1), + new PersonAsEvent("batman", 2), + new PersonAsEvent("spiderman", 3))); + } + + @Route(path = "/people-as-event-without-id") + Multi peopleAsEventWithoutId(RoutingContext context) { + return ReactiveRoutes.asEventStream(Multi.createFrom().items( + new PersonAsEventWithoutId("superman", 1), + new PersonAsEventWithoutId("batman", 2), + new PersonAsEventWithoutId("spiderman", 3))); + } + + @Route(path = "/people-as-event-without-event") + Multi peopleAsEventWithoutEvent(RoutingContext context) { + return ReactiveRoutes.asEventStream(Multi.createFrom().items( + new PersonAsEventWithoutEvent("superman", 1), + new PersonAsEventWithoutEvent("batman", 2), + new PersonAsEventWithoutEvent("spiderman", 3))); + } + + @Route(path = "/people-content-type") + Multi peopleWithContentType(RoutingContext context) { + context.response().putHeader("content-type", "text/event-stream;charset=utf-8"); + return ReactiveRoutes.asEventStream(Multi.createFrom().items( + new Person("superman", 1), + new Person("batman", 2), + new Person("spiderman", 3))); + } + + @Route(path = "/failure") + Multi fail(RoutingContext context) { + return ReactiveRoutes.asEventStream(Multi.createFrom().failure(new IOException("boom"))); + } + + @Route(path = "/sync-failure") + Multi failSync(RoutingContext context) { + throw new IllegalStateException("boom"); + } + + @Route(path = "/null") + Multi uniNull(RoutingContext context) { + return null; + } + + } + + static class Person { + public String name; + public int id; + + public Person(String name, int id) { + this.name = name; + this.id = id; + } + } + + static class PersonAsEvent implements ReactiveRoutes.ServerSentEvent { + public String name; + public int id; + + public PersonAsEvent(String name, int id) { + this.name = name; + this.id = id; + } + + @Override + public Person data() { + return new Person(name, id); + } + + @Override + public long id() { + return id; + } + + @Override + public String event() { + return "person"; + } + } + + static class PersonAsEventWithoutId implements ReactiveRoutes.ServerSentEvent { + public String name; + public int id; + + public PersonAsEventWithoutId(String name, int id) { + this.name = name; + this.id = id; + } + + @Override + public Person data() { + return new Person(name, id); + } + + @Override + public String event() { + return "person"; + } + } + + static class PersonAsEventWithoutEvent implements ReactiveRoutes.ServerSentEvent { + public String name; + public int id; + + public PersonAsEventWithoutEvent(String name, int id) { + this.name = name; + this.id = id; + } + + @Override + public Person data() { + return new Person(name, id); + } + + @Override + public long id() { + return id; + } + } + +} diff --git a/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/ReactiveRoutes.java b/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/ReactiveRoutes.java new file mode 100644 index 0000000000000..038d8a104b231 --- /dev/null +++ b/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/ReactiveRoutes.java @@ -0,0 +1,93 @@ +package io.quarkus.vertx.web; + +import java.util.Objects; + +import io.quarkus.vertx.web.runtime.SSEMulti; +import io.smallrye.mutiny.Multi; + +/** + * Provides utility methods, mainly to handle {@code text/event-stream} responses. + */ +public class ReactiveRoutes { + + private ReactiveRoutes() { + // Avoid direct instantiation. + } + + /** + * Indicates the the given stream should be written as server-sent-event in the response. + * Returning a {@code multi} wrapped using this method produces a {@code text/event-stream} response. Each item + * is written as an event in the response. The response automatically enables the chunked encoding and set the + * content type. + *

+ * If the item is a String, the {@code data} part of the event is this string. An {@code id} is automatically + * generated. + * If the item is a Buffer, the {@code data} part of the event is this buffer. An {@code id} is automatically + * generated. + * If the item is an Object, the {@code data} part of the event is the JSON representation of this object. An + * {@code id} is automatically generated. + * If the item is an {@link ServerSentEvent}, the {@code data} part of the event is the JSON representation of this + * {@link ServerSentEvent#data()}. The {@code id} is computed from {@link ServerSentEvent#id()} (generated if not + * implemented). The {@code event} section (ignored in all the other case) is computed from + * {@link ServerSentEvent#event()}. + *

+ * Example of usage: + * + *

+     * @Route(path = "/people")
+     * Multi<Person> people(RoutingContext context) {
+     *     return ReactiveRoutes.asEventStream(Multi.createFrom().items(
+     *             new Person("superman", 1),
+     *             new Person("batman", 2),
+     *             new Person("spiderman", 3)));
+     * }
+     * 
+ * + * @param multi the multi to be written + * @param the type of item, can be string, buffer, object or io.quarkus.vertx.web.ReactiveRoutes.ServerSentEvent + * @return the wrapped multi + */ + public static Multi asEventStream(Multi multi) { + return new SSEMulti<>(Objects.requireNonNull(multi, "The passed multi must not be `null`")); + } + + /** + * A class allowing to customized how the server sent events are written. + *

+ * The {@code data} section of the resulting event is the JSON representation of the result from {@link #data()}. + * If {@link #event()} does not return {@code null}, the {@code event} section is written with the result as value. + * If {@link #id()} is implemented, the {@code id} section uses this value. + * + * @param the type of payload, use for the {@code data} section of the event. + */ + public interface ServerSentEvent { + + /** + * The {@code event} section. + * + * @return the name of the event. If {@code null}, the written event won't have an {@code event} section + */ + default String event() { + return null; + } + + /** + * The {@code data} section. + * + * @return the object that will be encoded to JSON. Must not be {@code null} + */ + T data(); + + /** + * The {@code id} section. + * If not implemented, an automatic id is inserted. + * + * @return the id + */ + default long id() { + return -1L; + } + + } + +} diff --git a/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSseSupport.java b/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSseSupport.java new file mode 100644 index 0000000000000..d1f18782bec90 --- /dev/null +++ b/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSseSupport.java @@ -0,0 +1,173 @@ +package io.quarkus.vertx.web.runtime; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import io.quarkus.vertx.web.ReactiveRoutes; +import io.smallrye.mutiny.Multi; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.json.Json; +import io.vertx.ext.web.RoutingContext; + +@SuppressWarnings("ReactiveStreamsSubscriberImplementation") +public class MultiSseSupport { + + private MultiSseSupport() { + // Avoid direct instantiation. + } + + public static void subscribeString(Multi multi, RoutingContext rc) { + subscribeBuffer(multi.map(new Function() { + @Override + public Buffer apply(String s) { + return Buffer.buffer(s); + } + }), rc); + } + + private static void initialize(HttpServerResponse response) { + if (response.bytesWritten() == 0) { + MultiMap headers = response.headers(); + if (headers.get("content-type") == null) { + headers.set("content-type", "text/event-stream"); + } + response.setChunked(true); + } + } + + private static void onWriteDone(Subscription subscription, AsyncResult ar, RoutingContext rc) { + if (ar.failed()) { + rc.fail(ar.cause()); + } else { + subscription.request(1); + } + } + + public static void write(Multi multi, RoutingContext rc) { + HttpServerResponse response = rc.response(); + multi.subscribe().withSubscriber(new Subscriber() { + Subscription upstream; + + @Override + public void onSubscribe(Subscription subscription) { + this.upstream = subscription; + this.upstream.request(1); + } + + @Override + public void onNext(Buffer item) { + initialize(response); + response.write(item, new Handler>() { + @Override + public void handle(AsyncResult ar) { + onWriteDone(upstream, ar, rc); + } + }); + } + + @Override + public void onError(Throwable throwable) { + rc.fail(throwable); + } + + @Override + public void onComplete() { + endOfStream(response); + } + }); + } + + public static void subscribeBuffer(Multi multi, RoutingContext rc) { + HttpServerResponse response = rc.response(); + multi.subscribe().withSubscriber(new Subscriber() { + Subscription upstream; + final AtomicLong count = new AtomicLong(); + + @Override + public void onSubscribe(Subscription subscription) { + this.upstream = subscription; + this.upstream.request(1); + } + + @Override + public void onNext(Buffer item) { + initialize(response); + Buffer buffer = Buffer.buffer("data: ").appendBuffer(item).appendString("\n") + .appendString("id: " + count.getAndIncrement()) + .appendString("\n\n"); + response.write(buffer, new Handler>() { + @Override + public void handle(AsyncResult ar) { + onWriteDone(upstream, ar, rc); + } + }); + } + + @Override + public void onError(Throwable throwable) { + rc.fail(throwable); + } + + @Override + public void onComplete() { + endOfStream(response); + } + }); + } + + public static void subscribeMutinyBuffer(Multi multi, RoutingContext rc) { + subscribeBuffer(multi.map(new Function() { + @Override + public Buffer apply(io.vertx.mutiny.core.buffer.Buffer b) { + return b.getDelegate(); + } + }), rc); + } + + public static void subscribeRxBuffer(Multi multi, RoutingContext rc) { + subscribeBuffer(multi.map(new Function() { + @Override + public Buffer apply(io.vertx.reactivex.core.buffer.Buffer b) { + return b.getDelegate(); + } + }), rc); + } + + public static void subscribeObject(Multi multi, RoutingContext rc) { + AtomicLong count = new AtomicLong(); + write(multi.map(new Function() { + @Override + public Buffer apply(Object o) { + if (o instanceof ReactiveRoutes.ServerSentEvent) { + ReactiveRoutes.ServerSentEvent ev = (ReactiveRoutes.ServerSentEvent) o; + long id = ev.id() != -1 ? ev.id() : count.getAndIncrement(); + String e = ev.event() == null ? "" : "event: " + ev.event() + "\n"; + return Buffer.buffer(e + "data: " + Json.encodeToBuffer(ev.data()) + "\nid: " + id + "\n\n"); + } else { + return Buffer.buffer("data: " + Json.encodeToBuffer(o) + "\nid: " + count.getAndIncrement() + "\n\n"); + } + } + }), rc); + } + + private static void endOfStream(HttpServerResponse response) { + if (response.bytesWritten() == 0) { // No item + MultiMap headers = response.headers(); + if (headers.get("content-type") == null) { + headers.set("content-type", "text/event-stream"); + } + } + response.end(); + } + + public static boolean isSSE(Multi multi) { + return multi instanceof SSEMulti; + } +} diff --git a/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/SSEMulti.java b/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/SSEMulti.java new file mode 100644 index 0000000000000..c82a6ab2f9b23 --- /dev/null +++ b/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/SSEMulti.java @@ -0,0 +1,26 @@ +package io.quarkus.vertx.web.runtime; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.smallrye.mutiny.operators.AbstractMulti; +import io.smallrye.mutiny.subscription.MultiSubscriber; + +/** + * Just a wrapped to capture the fact that the items must be written as SSE. + * + * @param the type of item. + */ +public class SSEMulti extends AbstractMulti { + + private final Multi multi; + + public SSEMulti(Multi multi) { + this.multi = multi; + } + + @Override + public void subscribe(MultiSubscriber subscriber) { + multi.subscribe(Infrastructure.onMultiSubscription(multi, subscriber)); + } + +} From 00a6480706294e28269bebc93681856e7e7d90ae Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Tue, 30 Jun 2020 18:14:59 +0200 Subject: [PATCH 4/5] Extend reactive route documentation with the Mutiny support --- docs/src/main/asciidoc/reactive-routes.adoc | 183 ++++++++++++++++++++ 1 file changed, 183 insertions(+) diff --git a/docs/src/main/asciidoc/reactive-routes.adoc b/docs/src/main/asciidoc/reactive-routes.adoc index fa8784941a75f..f7947822612b6 100644 --- a/docs/src/main/asciidoc/reactive-routes.adoc +++ b/docs/src/main/asciidoc/reactive-routes.adoc @@ -163,6 +163,189 @@ public class SimpleRoutes { ---- <1> The `path` value is used as a prefix for any route method declared on the class where `Route#path()` is used. The `produces` value is used for content-based routing for all routes where `Route#produces()` is empty. +=== Returning Unis + +In a reactive route, you can return a `Uni` directly: + +[source,java] +---- +@Route(path = "/hello") +Uni hello(RoutingContext context) { + return Uni.createFrom().item("Hello world!"); +} + +@Route(path = "/person") +Uni getPerson(RoutingContext context) { + return Uni.createFrom().item(() -> new Person("neo", 12345)); +} +---- + +Returning `Unis` is convenient when using a reactive client: + +[source,java] +---- +@Route(path = "/mail") +Uni sendEmail(RoutingContext context) { + return mailer.send(...); +} +---- + +The item produced by the returned `Uni` can be: + +* a string - written into the HTTP response directly +* a buffer - written into the HTTP response directly +* an object - written into the HTTP response after having been encoded into JSON. +The `content-type` header is set to `application/json` if not already set. + +If the returned `Uni` produces a failure (or is `null`), an HTTP 500 response is written. + +Returning a `Uni` produces a 204 response (no content). + +=== Returning results + +You can also return a result directly: + +[source, java] +---- +@Route(path = "/hello") +String helloSync(RoutingContext context) { + return "Hello world"; +} +---- + +Be aware, the processing must be **non-blocking** as reactive routes are invoked on the IO Thread. +Otherwise, use the `blocking` attribute of the `@Route` annotation. + +The method can return: + +* a string - written into the HTTP response directly +* a buffer - written into the HTTP response directly +* an object - written into the HTTP response after having been encoded into JSON. +The `content-type` header is set to `application/json` if not already set. + +=== Returning Multis + +A reactive route can return a `Multi`. +The items are written one by one, in the response. +The response `Transfer-Encoding` header is set to `chunked`. + +[source, java] +---- +@Route(path = "/hello") +Multi hellos(RoutingContext context) { + return Multi.createFrom().items("hello", "world", "!"); // <1> +} +---- +1. Produces `["hello", "world", "!"]` + +The method can return: + +* a `Multi` - the items are written one by one in a JSON Array. +The `content-type` is set to `application/json` if not already set. +* a `Multi` - the buffers are written one by one (one per _chunk_) without any processing. +* a `Multi` - the items are encoded to JSON written one by one in a JSON Array. +The `content-type` attribute is set to `application/json` if not already set: + +[source, java] +---- +@Route(path = "/people") +Multi people(RoutingContext context) { + return Multi.createFrom().items( + new Person("superman", 1), + new Person("batman", 2), + new Person("spiderman", 3)); +} +---- + +Ths previous snippet produces: + +[source, json] +---- +[ + {"name":"superman", "id": 1} // chunk 1 + ,{"name":"batman", "id": 2} // chunk 2 + ,{"name":"spiderman", "id": 3} // chunk 3 +] +---- + +=== Event Stream and Server-Sent Event support + +You can return a `Multi` to produce an event source (stream of server sent events). +To enable this feature, you need to wrap the returned `Multi` using `io.quarkus.vertx.web.ReactiveRoutes.asEventStream`: + +[source, java] +---- +@Route(path = "/people") +Multi people(RoutingContext context) { + return ReactiveRoutes.asEventStream(Multi.createFrom().items( + new Person("superman", 1), + new Person("batman", 2), + new Person("spiderman", 3))); +} +---- + +This method would produce: + +[source, text] +---- +data: {"name":"superman", "id": 1} +id: 0 + +data: {"name":"batman", "id": 2} +id: 1 + +data: {"name":"spiderman", "id": 3} +id: 2 + +---- + +You can also implement the `io.quarkus.vertx.web.ReactiveRoutes.ServerSentEvent` interface to customize the `event` and `id` section of the server sent event: + +[source, java] +---- +class PersonEvent implements ReactiveRoutes.ServerSentEvent { + public String name; + public int id; + + public PersonEvent(String name, int id) { + this.name = name; + this.id = id; + } + + @Override + public Person data() { + return new Person(name, id); // Will be JSON encoded + } + + @Override + public long id() { + return id; + } + + @Override + public String event() { + return "person"; + } +} +---- + +Using a `Multi` (wrapped using `io.quarkus.vertx.web.ReactiveRoutes.asEventStream`) would produce: + +[source, text] +---- +event: person +data: {"name":"superman", "id": 1} +id: 1 + +event: person +data: {"name":"batman", "id": 2} +id: 2 + +event: person +data: {"name":"spiderman", "id": 3} +id: 3 + +---- == Using the Vert.x Web Router From 673c1b5d56bf76491cc35d56af1e88caf93e0321 Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Thu, 2 Jul 2020 16:34:45 +0200 Subject: [PATCH 5/5] Handle Json Array streaming separately from the unprocessed stream. --- docs/src/main/asciidoc/reactive-routes.adoc | 48 +++++- .../quarkus/vertx/web/deployment/Methods.java | 20 ++- .../web/deployment/VertxWebProcessor.java | 42 ++++- .../vertx/web/mutiny/JsonMultiRouteTest.java | 147 ++++++++++++++++++ .../vertx/web/mutiny/MultiRouteTest.java | 40 ++--- .../io/quarkus/vertx/web/ReactiveRoutes.java | 34 ++++ .../vertx/web/runtime/JsonArrayMulti.java | 26 ++++ .../web/runtime/MultiJsonArraySupport.java | 118 ++++++++++++++ .../vertx/web/runtime/MultiSupport.java | 101 +----------- 9 files changed, 438 insertions(+), 138 deletions(-) create mode 100644 extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/JsonMultiRouteTest.java create mode 100644 extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/JsonArrayMulti.java create mode 100644 extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiJsonArraySupport.java diff --git a/docs/src/main/asciidoc/reactive-routes.adoc b/docs/src/main/asciidoc/reactive-routes.adoc index f7947822612b6..ed8e03f4ddf5c 100644 --- a/docs/src/main/asciidoc/reactive-routes.adoc +++ b/docs/src/main/asciidoc/reactive-routes.adoc @@ -236,15 +236,14 @@ Multi hellos(RoutingContext context) { return Multi.createFrom().items("hello", "world", "!"); // <1> } ---- -1. Produces `["hello", "world", "!"]` +1. Produces `helloworld!` The method can return: -* a `Multi` - the items are written one by one in a JSON Array. -The `content-type` is set to `application/json` if not already set. +* a `Multi` - the items are written one by one (one per _chunk_) in the response. * a `Multi` - the buffers are written one by one (one per _chunk_) without any processing. -* a `Multi` - the items are encoded to JSON written one by one in a JSON Array. -The `content-type` attribute is set to `application/json` if not already set: +* a `Multi` - the items are encoded to JSON written one by one in the response. + [source, java] ---- @@ -259,15 +258,48 @@ Multi people(RoutingContext context) { Ths previous snippet produces: +[source, json] +---- +{"name":"superman", "id": 1} // chunk 1 +{"name":"batman", "id": 2} // chunk 2 +{"name":"spiderman", "id": 3} // chunk 3 +---- + +=== Streaming JSON Array items + +You can return a `Multi` to produce a JSON Array, where every item is an item from this array. +The response is written item by item to the client. +The `content-type` is set to `application/json` if not set already. + +To use this feature, you need to wrap the returned `Multi` using `io.quarkus.vertx.web.ReactiveRoutes.asJsonArray`: + +[source, java] +---- +@Route(path = "/people") +Multi people(RoutingContext context) { + return ReactiveRoutes.asJsonArray(Multi.createFrom().items( + new Person("superman", 1), + new Person("batman", 2), + new Person("spiderman", 3))); +} +---- + +Ths previous snippet produces: + [source, json] ---- [ - {"name":"superman", "id": 1} // chunk 1 - ,{"name":"batman", "id": 2} // chunk 2 - ,{"name":"spiderman", "id": 3} // chunk 3 + {"name":"superman", "id": 1} // chunk 1 + ,{"name":"batman", "id": 2} // chunk 2 + ,{"name":"spiderman", "id": 3} // chunk 3 ] ---- +Only `Multi`, `Multi` and `Multi` can be written into the JSON Array. +Using a `Multi` produces an empty array. +You cannot use `Multi`. +If you need to use `Buffer`, transform the content into a JSON or String representation first. + === Event Stream and Server-Sent Event support You can return a `Multi` to produce an event source (stream of server sent events). diff --git a/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/Methods.java b/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/Methods.java index d248c543a047c..73835d2dac06f 100644 --- a/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/Methods.java +++ b/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/Methods.java @@ -7,6 +7,7 @@ import io.quarkus.gizmo.BytecodeCreator; import io.quarkus.gizmo.MethodDescriptor; import io.quarkus.gizmo.ResultHandle; +import io.quarkus.vertx.web.runtime.MultiJsonArraySupport; import io.quarkus.vertx.web.runtime.MultiSseSupport; import io.quarkus.vertx.web.runtime.MultiSupport; import io.smallrye.mutiny.Multi; @@ -52,8 +53,6 @@ public class Methods { Void.TYPE, Multi.class, RoutingContext.class); static final MethodDescriptor IS_SSE = MethodDescriptor.ofMethod(MultiSseSupport.class, "isSSE", Boolean.TYPE, Multi.class); - static final MethodDescriptor MULTI_SSE_SUBSCRIBE_VOID = MethodDescriptor.ofMethod(MultiSseSupport.class, "subscribeVoid", - Void.TYPE, Multi.class, RoutingContext.class); static final MethodDescriptor MULTI_SSE_SUBSCRIBE_STRING = MethodDescriptor.ofMethod(MultiSseSupport.class, "subscribeString", Void.TYPE, Multi.class, RoutingContext.class); @@ -69,6 +68,23 @@ public class Methods { "subscribeObject", Void.TYPE, Multi.class, RoutingContext.class); + static final MethodDescriptor IS_JSON_ARRAY = MethodDescriptor.ofMethod(MultiJsonArraySupport.class, "isJsonArray", + Boolean.TYPE, Multi.class); + static final MethodDescriptor MULTI_JSON_SUBSCRIBE_VOID = MethodDescriptor.ofMethod(MultiJsonArraySupport.class, + "subscribeVoid", Void.TYPE, Multi.class, RoutingContext.class); + static final MethodDescriptor MULTI_JSON_SUBSCRIBE_STRING = MethodDescriptor.ofMethod(MultiJsonArraySupport.class, + "subscribeString", Void.TYPE, Multi.class, RoutingContext.class); + static final MethodDescriptor MULTI_JSON_SUBSCRIBE_BUFFER = MethodDescriptor.ofMethod(MultiJsonArraySupport.class, + "subscribeBuffer", Void.TYPE, Multi.class, RoutingContext.class); + static final MethodDescriptor MULTI_JSON_SUBSCRIBE_RX_BUFFER = MethodDescriptor.ofMethod(MultiJsonArraySupport.class, + "subscribeRxBuffer", Void.TYPE, Multi.class, RoutingContext.class); + static final MethodDescriptor MULTI_JSON_SUBSCRIBE_MUTINY_BUFFER = MethodDescriptor.ofMethod(MultiJsonArraySupport.class, + "subscribeMutinyBuffer", Void.TYPE, Multi.class, RoutingContext.class); + static final MethodDescriptor MULTI_JSON_SUBSCRIBE_OBJECT = MethodDescriptor.ofMethod(MultiJsonArraySupport.class, + "subscribeObject", Void.TYPE, Multi.class, RoutingContext.class); + static final MethodDescriptor MULTI_JSON_FAIL = MethodDescriptor.ofMethod(MultiJsonArraySupport.class, + "fail", Void.TYPE, RoutingContext.class); + static final MethodDescriptor END = MethodDescriptor.ofMethod(HttpServerResponse.class, "end", Void.TYPE); static final MethodDescriptor END_WITH_STRING = MethodDescriptor .ofMethod(HttpServerResponse.class, "end", Void.TYPE, String.class); diff --git a/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/VertxWebProcessor.java b/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/VertxWebProcessor.java index bbed6893afd94..1e4a570d1f945 100644 --- a/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/VertxWebProcessor.java +++ b/extensions/vertx-web/deployment/src/main/java/io/quarkus/vertx/web/deployment/VertxWebProcessor.java @@ -522,15 +522,21 @@ void implementInvoke(HandlerDescriptor descriptor, BeanInfo bean, MethodInfo met failureCallback.getInstance()); } else if (descriptor.isReturningMulti()) { - // 2 cases - regular multi vs. sse multi, we need to check the type. - BranchResult branches = invoke.ifTrue(invoke.invokeStaticMethod(Methods.IS_SSE, res)); - BytecodeCreator isSSE = branches.trueBranch(); + // 3 cases - regular multi vs. sse multi vs. json array multi, we need to check the type. + BranchResult isItSSE = invoke.ifTrue(invoke.invokeStaticMethod(Methods.IS_SSE, res)); + BytecodeCreator isSSE = isItSSE.trueBranch(); handleSSEMulti(descriptor, isSSE, rc, res); isSSE.close(); - BytecodeCreator isRegular = branches.falseBranch(); + BytecodeCreator isNotSSE = isItSSE.falseBranch(); + BranchResult isItJson = isNotSSE.ifTrue(isNotSSE.invokeStaticMethod(Methods.IS_JSON_ARRAY, res)); + BytecodeCreator isJson = isItJson.trueBranch(); + handleJsonArrayMulti(descriptor, isJson, rc, res); + isJson.close(); + BytecodeCreator isRegular = isItJson.falseBranch(); handleRegularMulti(descriptor, isRegular, rc, res); isRegular.close(); + isNotSSE.close(); } else if (descriptor.getContentType() != null) { // The method returns "something" in a synchronous manner, write it into the response @@ -584,8 +590,8 @@ private void handleSSEMulti(HandlerDescriptor descriptor, BytecodeCreator writer // If the method returned null, we fail // If the provided item is null we fail // If the multi is empty, and the method return a Multi, we reply with a 204 - NO CONTENT (as regular) - // If the produce item is a string or buffer, the response.write method is used to write the events in the response - // If the produce item is an object, the item is mapped to JSON and included in the `data` section of the event. + // If the produced item is a string or buffer, the response.write method is used to write the events in the response + // If the produced item is an object, the item is mapped to JSON and included in the `data` section of the event. if (Methods.isNoContent(descriptor)) { // Multi - so return a 204. writer.invokeStaticMethod(Methods.MULTI_SUBSCRIBE_VOID, res, rc); @@ -602,6 +608,30 @@ private void handleSSEMulti(HandlerDescriptor descriptor, BytecodeCreator writer } } + private void handleJsonArrayMulti(HandlerDescriptor descriptor, BytecodeCreator writer, ResultHandle rc, + ResultHandle res) { + // The method returns a Multi that needs to be written as JSON Array. + // We subscribe to this Multi and write the provided items (one by one) in the HTTP response. + // On completion, we "end" the response + // If the method returned null, we fail + // If the provided item is null we fail + // If the multi is empty, we send an empty JSON array + // If the produced item is a string, the response.write method is used to write the events in the response + // If the produced item is an object, the item is mapped to JSON and included in the `data` section of the event. + // If the produced item is a buffer, we fail + + if (Methods.isNoContent(descriptor)) { // Multi - so return a 204. + writer.invokeStaticMethod(Methods.MULTI_JSON_SUBSCRIBE_VOID, res, rc); + } else if (descriptor.isContentTypeString()) { + writer.invokeStaticMethod(Methods.MULTI_JSON_SUBSCRIBE_STRING, res, rc); + } else if (descriptor.isContentTypeBuffer() || descriptor.isContentTypeRxBuffer() + || descriptor.isContentTypeMutinyBuffer()) { + writer.invokeStaticMethod(Methods.MULTI_JSON_FAIL, rc); + } else { // Multi - encode to json. + writer.invokeStaticMethod(Methods.MULTI_JSON_SUBSCRIBE_OBJECT, res, rc); + } + } + /** * Generates the following function depending on the payload type * diff --git a/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/JsonMultiRouteTest.java b/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/JsonMultiRouteTest.java new file mode 100644 index 0000000000000..2cbf8a3615d48 --- /dev/null +++ b/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/JsonMultiRouteTest.java @@ -0,0 +1,147 @@ +package io.quarkus.vertx.web.mutiny; + +import static io.restassured.RestAssured.when; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; + +import java.io.IOException; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.vertx.web.ReactiveRoutes; +import io.quarkus.vertx.web.Route; +import io.smallrye.mutiny.Multi; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.RoutingContext; + +public class JsonMultiRouteTest { + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class).addClasses(SimpleBean.class)); + + @Test + public void testMultiRoute() { + when().get("/hello").then().statusCode(200) + .body(is("[\"Hello world!\"]")) + .header("content-type", "application/json"); + when().get("/hellos").then().statusCode(200) + .body(is("[\"hello\",\"world\",\"!\"]")) + .header("content-type", "application/json"); + when().get("/no-hello").then().statusCode(200).body(is("[]")) + .header("content-type", "application/json"); + // status already sent, but not the end of the array + when().get("/hello-and-fail").then().statusCode(200) + .body(containsString("[\"Hello\"")) + .body(not(containsString("]"))); + + when().get("/buffers").then().statusCode(500); + + when().get("/void").then().statusCode(200).body(is("[]")); + + when().get("/people").then().statusCode(200) + .body("size()", is(3)) + .body("[0].name", is("superman")) + .body("[1].name", is("batman")) + .body("[2].name", is("spiderman")) + .header("content-type", "application/json"); + + when().get("/people-content-type").then().statusCode(200) + .body("size()", is(3)) + .body("[0].name", is("superman")) + .body("[1].name", is("batman")) + .body("[2].name", is("spiderman")) + .header("content-type", "application/json;charset=utf-8"); + + when().get("/failure").then().statusCode(500).body(containsString("boom")); + when().get("/null").then().statusCode(500).body(containsString("null")); + when().get("/sync-failure").then().statusCode(500).body(containsString("null")); + + } + + static class SimpleBean { + + @Route(path = "hello") + Multi hello(RoutingContext context) { + return ReactiveRoutes.asJsonArray(Multi.createFrom().item("Hello world!")); + } + + @Route(path = "hellos") + Multi hellos(RoutingContext context) { + return ReactiveRoutes.asJsonArray(Multi.createFrom().items("hello", "world", "!")); + } + + @Route(path = "no-hello") + Multi noHello(RoutingContext context) { + return ReactiveRoutes.asJsonArray(Multi.createFrom().empty()); + } + + @Route(path = "hello-and-fail") + Multi helloAndFail(RoutingContext context) { + return ReactiveRoutes.asJsonArray(Multi.createBy().concatenating().streams( + Multi.createFrom().item("Hello"), + Multi.createFrom().failure(new IOException("boom")))); + } + + @Route(path = "buffers") + Multi buffers(RoutingContext context) { + return ReactiveRoutes.asJsonArray(Multi.createFrom() + .items(Buffer.buffer("Buffer"), Buffer.buffer(" Buffer"), Buffer.buffer(" Buffer."))); + } + + @Route(path = "void") + Multi multiVoid(RoutingContext context) { + return ReactiveRoutes.asJsonArray(Multi.createFrom().range(0, 200) + .onItem().ignore()); + } + + @Route(path = "/people") + Multi people(RoutingContext context) { + return ReactiveRoutes.asJsonArray(Multi.createFrom().items( + new Person("superman", 1), + new Person("batman", 2), + new Person("spiderman", 3))); + } + + @Route(path = "/people-content-type") + Multi peopleWithContentType(RoutingContext context) { + context.response().putHeader("content-type", "application/json;charset=utf-8"); + return ReactiveRoutes.asJsonArray(Multi.createFrom().items( + new Person("superman", 1), + new Person("batman", 2), + new Person("spiderman", 3))); + } + + @Route(path = "/failure") + Multi fail(RoutingContext context) { + return Multi.createFrom().failure(new IOException("boom")); + } + + @Route(path = "/sync-failure") + Multi failSync(RoutingContext context) { + throw new IllegalStateException("boom"); + } + + @Route(path = "/null") + Multi uniNull(RoutingContext context) { + return null; + } + + } + + static class Person { + public String name; + public int id; + + public Person(String name, int id) { + this.name = name; + this.id = id; + } + } + +} diff --git a/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/MultiRouteTest.java b/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/MultiRouteTest.java index 72c07060ce719..0fa54f732fec2 100644 --- a/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/MultiRouteTest.java +++ b/extensions/vertx-web/deployment/src/test/java/io/quarkus/vertx/web/mutiny/MultiRouteTest.java @@ -25,17 +25,16 @@ public class MultiRouteTest { @Test public void testMultiRoute() { when().get("/hello").then().statusCode(200) - .body(is("[\"Hello world!\"]")) - .header("content-type", "application/json"); + .body(is("Hello world!")) + .header("content-type", is(nullValue())); when().get("/hellos").then().statusCode(200) - .body(is("[\"hello\",\"world\",\"!\"]")) - .header("content-type", "application/json"); - when().get("/no-hello").then().statusCode(200).body(is("[]")) - .header("content-type", "application/json"); + .body(is("helloworld!")) + .header("content-type", is(nullValue())); + when().get("/no-hello").then().statusCode(204).body(hasLength(0)) + .header("content-type", is(nullValue())); // status already sent, but not the end of the array when().get("/hello-and-fail").then().statusCode(200) - .body(containsString("[\"Hello\"")) - .body(not(containsString("]"))); + .body(containsString("Hello")); when().get("/buffer").then().statusCode(200).body(is("Buffer")) .header("content-type", is(nullValue())); @@ -48,18 +47,10 @@ public void testMultiRoute() { when().get("/void").then().statusCode(204).body(hasLength(0)); when().get("/people").then().statusCode(200) - .body("size()", is(3)) - .body("[0].name", is("superman")) - .body("[1].name", is("batman")) - .body("[2].name", is("spiderman")) - .header("content-type", "application/json"); - - when().get("/people-content-type").then().statusCode(200) - .body("size()", is(3)) - .body("[0].name", is("superman")) - .body("[1].name", is("batman")) - .body("[2].name", is("spiderman")) - .header("content-type", "application/json;charset=utf-8"); + .body(containsString("{\"name\":\"superman\",\"id\":1}")) + .body(containsString("{\"name\":\"batman\",\"id\":2}")) + .body(containsString("{\"name\":\"spiderman\",\"id\":3}")) + .header("content-type", is(nullValue())); when().get("/failure").then().statusCode(500).body(containsString("boom")); when().get("/null").then().statusCode(500).body(containsString("null")); @@ -137,15 +128,6 @@ Multi people(RoutingContext context) { new Person("spiderman", 3)); } - @Route(path = "/people-content-type") - Multi peopleWithContentType(RoutingContext context) { - context.response().putHeader("content-type", "application/json;charset=utf-8"); - return Multi.createFrom().items( - new Person("superman", 1), - new Person("batman", 2), - new Person("spiderman", 3)); - } - @Route(path = "/failure") Multi fail(RoutingContext context) { return Multi.createFrom().failure(new IOException("boom")); diff --git a/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/ReactiveRoutes.java b/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/ReactiveRoutes.java index 038d8a104b231..15cce699bd710 100644 --- a/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/ReactiveRoutes.java +++ b/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/ReactiveRoutes.java @@ -2,6 +2,7 @@ import java.util.Objects; +import io.quarkus.vertx.web.runtime.JsonArrayMulti; import io.quarkus.vertx.web.runtime.SSEMulti; import io.smallrye.mutiny.Multi; @@ -51,6 +52,39 @@ public static Multi asEventStream(Multi multi) { return new SSEMulti<>(Objects.requireNonNull(multi, "The passed multi must not be `null`")); } + /** + * Indicates the the given stream should be written as a chunked JSON array in the response. + * Returning a {@code multi} wrapped using this method produces a {@code application/json} response. Each item + * is written as an JSON object in the response. The response automatically enables the chunked encoding and set the + * content type. + *

+ * If the item is a String, the content is written in the array. + * If the item is an Object, the content is transformed to JSON and written in the array. + *

+ * Note that the array is written in the response item by item, without accumulating the data. + * + * Example of usage: + * + *

+     * @Route(path = "/people")
+     * Multi<Person> people(RoutingContext context) {
+     *     return ReactiveRoutes.asJsonArray(Multi.createFrom().items(
+     *             new Person("superman", 1),
+     *             new Person("batman", 2),
+     *             new Person("spiderman", 3)));
+     * }
+     * 
+ * + * This example produces: {@code [{"name":"superman", "id":1}, {...}, {..,}]} + * + * @param multi the multi to be written + * @param the type of item, can be string or object + * @return the wrapped multi + */ + public static Multi asJsonArray(Multi multi) { + return new JsonArrayMulti<>(Objects.requireNonNull(multi, "The passed multi must not be `null`")); + } + /** * A class allowing to customized how the server sent events are written. *

diff --git a/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/JsonArrayMulti.java b/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/JsonArrayMulti.java new file mode 100644 index 0000000000000..a4b6af267f5fa --- /dev/null +++ b/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/JsonArrayMulti.java @@ -0,0 +1,26 @@ +package io.quarkus.vertx.web.runtime; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.smallrye.mutiny.operators.AbstractMulti; +import io.smallrye.mutiny.subscription.MultiSubscriber; + +/** + * Just a wrapped to capture the fact that the items must be written as JSON Array. + * + * @param the type of item. + */ +public class JsonArrayMulti extends AbstractMulti { + + private final Multi multi; + + public JsonArrayMulti(Multi multi) { + this.multi = multi; + } + + @Override + public void subscribe(MultiSubscriber subscriber) { + multi.subscribe(Infrastructure.onMultiSubscription(multi, subscriber)); + } + +} diff --git a/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiJsonArraySupport.java b/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiJsonArraySupport.java new file mode 100644 index 0000000000000..38417f89b7ad0 --- /dev/null +++ b/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiJsonArraySupport.java @@ -0,0 +1,118 @@ +package io.quarkus.vertx.web.runtime; + +import java.util.function.Function; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import io.smallrye.mutiny.Multi; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.json.Json; +import io.vertx.ext.web.RoutingContext; + +@SuppressWarnings("ReactiveStreamsSubscriberImplementation") +public class MultiJsonArraySupport { + + private MultiJsonArraySupport() { + // Avoid direct instantiation. + } + + public static void subscribeVoid(Multi multi, RoutingContext rc) { + subscribeString(multi.onItem().castTo(String.class), rc); + } + + public static void subscribeString(Multi multi, RoutingContext rc) { + write(multi.map(new Function() { + @Override + public Buffer apply(String s) { + return Buffer.buffer("\"" + s + "\""); + } + }), rc); + } + + private static void write(Multi multi, RoutingContext rc) { + HttpServerResponse response = rc.response(); + multi.subscribe().withSubscriber(new Subscriber() { + Subscription upstream; + + @Override + public void onSubscribe(Subscription subscription) { + this.upstream = subscription; + this.upstream.request(1); + } + + @Override + public void onNext(Buffer item) { + Buffer toBeWritten; + if (response.bytesWritten() == 0) { + response.setChunked(true); + MultiMap headers = response.headers(); + if (headers.get("content-type") == null) { + headers.set("content-type", "application/json"); + } + toBeWritten = Buffer.buffer("[").appendBuffer(item); + } else { + toBeWritten = Buffer.buffer(",").appendBuffer(item); + } + response.write(toBeWritten, new Handler>() { + @Override + public void handle(AsyncResult ar) { + onWriteDone(upstream, ar, rc); + } + }); + } + + @Override + public void onError(Throwable throwable) { + rc.fail(throwable); + } + + @Override + public void onComplete() { + completeJsonArray(response); + } + }); + } + + private static void onWriteDone(Subscription subscription, AsyncResult ar, RoutingContext rc) { + if (ar.failed()) { + rc.fail(ar.cause()); + } else { + subscription.request(1); + } + } + + public static void subscribeObject(Multi multi, RoutingContext rc) { + write(multi.map(new Function() { + @Override + public Buffer apply(Object item) { + return Json.encodeToBuffer(item); + } + }), rc); + } + + public static void fail(RoutingContext rc) { + rc.fail(new Exception("Unsupported type")); + } + + private static void completeJsonArray(HttpServerResponse response) { + if (response.bytesWritten() == 0) { // No item + MultiMap headers = response.headers(); + if (headers.get("content-type") == null) { + headers.set("content-type", "application/json"); + } + response.end("[]"); + } else { + response.end("]"); + } + } + + public static boolean isJsonArray(Multi multi) { + return multi instanceof JsonArrayMulti; + } + +} diff --git a/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSupport.java b/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSupport.java index 9f7c5c7107c7b..91ec3db62ed0f 100644 --- a/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSupport.java +++ b/extensions/vertx-web/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSupport.java @@ -9,7 +9,6 @@ import io.smallrye.mutiny.Multi; import io.vertx.core.AsyncResult; import io.vertx.core.Handler; -import io.vertx.core.MultiMap; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpServerResponse; import io.vertx.core.json.Json; @@ -46,47 +45,7 @@ public void run() { } public static void subscribeString(Multi multi, RoutingContext rc) { - HttpServerResponse response = rc.response(); - multi.subscribe().withSubscriber(new Subscriber() { - Subscription upstream; - - @Override - public void onSubscribe(Subscription subscription) { - this.upstream = subscription; - this.upstream.request(1); - } - - @Override - public void onNext(String item) { - String toBeWritten; - if (response.bytesWritten() == 0) { - response.setChunked(true); - MultiMap headers = response.headers(); - if (headers.get("content-type") == null) { - headers.set("content-type", "application/json"); - } - toBeWritten = "[\"" + item + "\""; - } else { - toBeWritten = ",\"" + item + "\""; - } - response.write(toBeWritten, new Handler>() { - @Override - public void handle(AsyncResult ar) { - onWriteDone(upstream, ar, rc); - } - }); - } - - @Override - public void onError(Throwable throwable) { - rc.fail(throwable); - } - - @Override - public void onComplete() { - completeJsonArray(response); - } - }); + subscribeBuffer(multi.map(s -> Buffer.buffer(s)), rc); } private static void onWriteDone(Subscription subscription, AsyncResult ar, RoutingContext rc) { @@ -126,6 +85,9 @@ public void onError(Throwable throwable) { @Override public void onComplete() { + if (response.bytesWritten() == 0) { + response.setStatusCode(204); + } response.end(); } }); @@ -150,59 +112,12 @@ public Buffer apply(io.vertx.reactivex.core.buffer.Buffer b) { } public static void subscribeObject(Multi multi, RoutingContext rc) { - HttpServerResponse response = rc.response(); - multi.subscribe().withSubscriber(new Subscriber() { - Subscription upstream; - - @Override - public void onSubscribe(Subscription subscription) { - this.upstream = subscription; - this.upstream.request(1); - } - - @Override - public void onNext(Object item) { - String toBeWritten; - if (response.bytesWritten() == 0) { - response.setChunked(true); - MultiMap headers = response.headers(); - if (headers.get("content-type") == null) { - headers.set("content-type", "application/json"); - } - toBeWritten = "[" + Json.encodeToBuffer(item); - } else { - toBeWritten = "," + Json.encodeToBuffer(item); - } - response.write(toBeWritten, new Handler>() { - @Override - public void handle(AsyncResult ar) { - onWriteDone(upstream, ar, rc); - } - }); - } - - @Override - public void onError(Throwable throwable) { - rc.fail(throwable); - } - + subscribeBuffer(multi.map(new Function() { @Override - public void onComplete() { - completeJsonArray(response); - } - }); - } - - private static void completeJsonArray(HttpServerResponse response) { - if (response.bytesWritten() == 0) { // No item - MultiMap headers = response.headers(); - if (headers.get("content-type") == null) { - headers.set("content-type", "application/json"); + public Buffer apply(Object o) { + return Json.encodeToBuffer(o); } - response.end("[]"); - } else { - response.end("]"); - } + }), rc); } }