From 9c6d9c9dd3089162363c2ea76d007872b5bc9fe9 Mon Sep 17 00:00:00 2001 From: Phillip Kruger Date: Fri, 26 Jul 2024 13:34:32 +1000 Subject: [PATCH] Allow streaming data in deployment classpath methods in Dev UI Signed-off-by: Phillip Kruger --- docs/src/main/asciidoc/dev-ui.adoc | 8 +- .../deployment/BuildTimeContentProcessor.java | 8 +- .../deployment/DeploymentMethodBuildItem.java | 12 +- .../devui/deployment/DevUIProcessor.java | 7 +- .../buildtime/BuildTimeActionBuildItem.java | 14 + .../quarkus/devui/runtime/DevUIRecorder.java | 5 +- .../devui/runtime/comms/JsonRpcRouter.java | 248 +++++++++++------- 7 files changed, 193 insertions(+), 109 deletions(-) diff --git a/docs/src/main/asciidoc/dev-ui.adoc b/docs/src/main/asciidoc/dev-ui.adoc index a2bcb930fc7f5..ec327daaa2c15 100644 --- a/docs/src/main/asciidoc/dev-ui.adoc +++ b/docs/src/main/asciidoc/dev-ui.adoc @@ -1035,7 +1035,7 @@ this._observer.cancel(); //<2> https://github.com/phillip-kruger/quarkus-jokes/blob/main/deployment/src/main/resources/dev-ui/qwc-jokes-web-components.js[Example code] -==== JsonRpc against the deployment classpath +=== JsonRpc against the deployment classpath In certain cases you might need to execute methods and/or get data against the deployment classpath. This also happens over JsonRPC communication, but in this case you do not create a JsonRPC Service in the runtime module, you can just supply the code to be run in a supplier in the deployment module. To do this you will produce a `BuildTimeActionBuildItem`, example: @@ -1064,9 +1064,11 @@ you can just supply the code to be run in a supplier in the deployment module. T ---- <1> Return or use a BuildProducer to create a `BuildTimeActionBuildItem` <2> `BuildTimeActionBuildItem` is automatically scoped with your extension namespace -<3> The method name (that can be called from js in the same way as any json-rpc service) is `generateManifests` +<3> Here we add an action, that is the same as a request-response method. The method name (that can be called from js in the same way as any json-rpc service) is `generateManifests`. -===== Dev UI Log +You can also return a `CompletableFuture`/`CompletionStage` as an action, and if you want to stream data you need to use `addSubscription` (rather than `addAction`) and return a `Flow.Publisher`. + +=== Dev UI Log When running a local application using the `999-SNAPSHOT` version, the Dev UI will show a `Dev UI` Log in the footer. This is useful for debugging all JSON RPC messages flowing between the browser and the Quarkus app. diff --git a/extensions/vertx-http/deployment/src/main/java/io/quarkus/devui/deployment/BuildTimeContentProcessor.java b/extensions/vertx-http/deployment/src/main/java/io/quarkus/devui/deployment/BuildTimeContentProcessor.java index b87599fd6656b..ff3954246bce9 100644 --- a/extensions/vertx-http/deployment/src/main/java/io/quarkus/devui/deployment/BuildTimeContentProcessor.java +++ b/extensions/vertx-http/deployment/src/main/java/io/quarkus/devui/deployment/BuildTimeContentProcessor.java @@ -207,6 +207,7 @@ DeploymentMethodBuildItem mapDeploymentMethods( CurateOutcomeBuildItem curateOutcomeBuildItem) { List methodNames = new ArrayList<>(); + List subscriptionNames = new ArrayList<>(); for (BuildTimeActionBuildItem actions : buildTimeActions) { String extensionPathName = actions.getExtensionPathName(curateOutcomeBuildItem); for (BuildTimeAction bta : actions.getActions()) { @@ -214,9 +215,14 @@ DeploymentMethodBuildItem mapDeploymentMethods( DevConsoleManager.register(fullName, bta.getAction()); methodNames.add(fullName); } + for (BuildTimeAction bts : actions.getSubscriptions()) { + String fullName = extensionPathName + "." + bts.getMethodName(); + DevConsoleManager.register(fullName, bts.getAction()); + subscriptionNames.add(fullName); + } } - return new DeploymentMethodBuildItem(methodNames); + return new DeploymentMethodBuildItem(methodNames, subscriptionNames); } private Map getBuildTimeDataForPage(AbstractPageBuildItem pageBuildItem) { diff --git a/extensions/vertx-http/deployment/src/main/java/io/quarkus/devui/deployment/DeploymentMethodBuildItem.java b/extensions/vertx-http/deployment/src/main/java/io/quarkus/devui/deployment/DeploymentMethodBuildItem.java index be146ba582f53..e1822398b18ce 100644 --- a/extensions/vertx-http/deployment/src/main/java/io/quarkus/devui/deployment/DeploymentMethodBuildItem.java +++ b/extensions/vertx-http/deployment/src/main/java/io/quarkus/devui/deployment/DeploymentMethodBuildItem.java @@ -10,9 +10,11 @@ public final class DeploymentMethodBuildItem extends SimpleBuildItem { private final List methods; + private final List subscriptions; - public DeploymentMethodBuildItem(List methods) { + public DeploymentMethodBuildItem(List methods, List subscriptions) { this.methods = methods; + this.subscriptions = subscriptions; } public List getMethods() { @@ -22,4 +24,12 @@ public List getMethods() { public boolean hasMethods() { return this.methods != null && !this.methods.isEmpty(); } + + public List getSubscriptions() { + return this.subscriptions; + } + + public boolean hasSubscriptions() { + return this.subscriptions != null && !this.subscriptions.isEmpty(); + } } diff --git a/extensions/vertx-http/deployment/src/main/java/io/quarkus/devui/deployment/DevUIProcessor.java b/extensions/vertx-http/deployment/src/main/java/io/quarkus/devui/deployment/DevUIProcessor.java index 279e2762fa14b..d94b45e1ae03a 100644 --- a/extensions/vertx-http/deployment/src/main/java/io/quarkus/devui/deployment/DevUIProcessor.java +++ b/extensions/vertx-http/deployment/src/main/java/io/quarkus/devui/deployment/DevUIProcessor.java @@ -379,6 +379,10 @@ void findAllJsonRPCMethods(BuildProducer jsonRPC requestResponseMethods.addAll(deploymentMethodBuildItem.getMethods()); } + if (deploymentMethodBuildItem.hasSubscriptions()) { + subscriptionMethods.addAll(deploymentMethodBuildItem.getSubscriptions()); + } + if (!extensionMethodsMap.isEmpty()) { jsonRPCMethodsProvider.produce(new JsonRPCRuntimeMethodsBuildItem(extensionMethodsMap)); } @@ -409,7 +413,8 @@ void createJsonRpcRouter(DevUIRecorder recorder, DevConsoleManager.setGlobal(DevUIRecorder.DEV_MANAGER_GLOBALS_JSON_MAPPER_FACTORY, JsonMapper.Factory.deploymentLinker().createLinkData(new DevUIDatabindCodec.Factory())); - recorder.createJsonRpcRouter(beanContainer.getValue(), extensionMethodsMap, deploymentMethodBuildItem.getMethods()); + recorder.createJsonRpcRouter(beanContainer.getValue(), extensionMethodsMap, deploymentMethodBuildItem.getMethods(), + deploymentMethodBuildItem.getSubscriptions()); } } diff --git a/extensions/vertx-http/dev-ui-spi/src/main/java/io/quarkus/devui/spi/buildtime/BuildTimeActionBuildItem.java b/extensions/vertx-http/dev-ui-spi/src/main/java/io/quarkus/devui/spi/buildtime/BuildTimeActionBuildItem.java index b505543895ff0..cc01649e2cf44 100644 --- a/extensions/vertx-http/dev-ui-spi/src/main/java/io/quarkus/devui/spi/buildtime/BuildTimeActionBuildItem.java +++ b/extensions/vertx-http/dev-ui-spi/src/main/java/io/quarkus/devui/spi/buildtime/BuildTimeActionBuildItem.java @@ -13,6 +13,7 @@ public final class BuildTimeActionBuildItem extends AbstractDevUIBuildItem { private final List actions = new ArrayList<>(); + private final List subscriptions = new ArrayList<>(); public BuildTimeActionBuildItem() { super(); @@ -34,4 +35,17 @@ public void addAction(String methodName, public List getActions() { return actions; } + + public void addSubscription(BuildTimeAction buildTimeAction) { + this.subscriptions.add(buildTimeAction); + } + + public void addSubscription(String methodName, + Function, T> action) { + this.addSubscription(new BuildTimeAction(methodName, action)); + } + + public List getSubscriptions() { + return subscriptions; + } } diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/devui/runtime/DevUIRecorder.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/devui/runtime/DevUIRecorder.java index ff76ff70811cb..98780949af807 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/devui/runtime/DevUIRecorder.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/devui/runtime/DevUIRecorder.java @@ -46,10 +46,11 @@ public void shutdownTask(ShutdownContext shutdownContext, String devUIBasePath) public void createJsonRpcRouter(BeanContainer beanContainer, Map> extensionMethodsMap, - List deploymentMethods) { + List deploymentMethods, + List deploymentSubscriptions) { JsonRpcRouter jsonRpcRouter = beanContainer.beanInstance(JsonRpcRouter.class); jsonRpcRouter.populateJsonRPCRuntimeMethods(extensionMethodsMap); - jsonRpcRouter.setJsonRPCDeploymentMethods(deploymentMethods); + jsonRpcRouter.setJsonRPCDeploymentActions(deploymentMethods, deploymentSubscriptions); jsonRpcRouter.initializeCodec(createJsonMapper()); } diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/devui/runtime/comms/JsonRpcRouter.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/devui/runtime/comms/JsonRpcRouter.java index e437eb8ccc71c..0767be67c9b99 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/devui/runtime/comms/JsonRpcRouter.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/devui/runtime/comms/JsonRpcRouter.java @@ -10,6 +10,7 @@ import java.util.Map; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Flow; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; @@ -42,7 +43,9 @@ public class JsonRpcRouter { private final Map jsonRpcToRuntimeClassPathJava = new HashMap<>(); // Map json-rpc method to java in deployment classpath - private final List jsonRpcToDeploymentClassPathJava = new ArrayList<>(); + private final List jsonRpcMethodToDeploymentClassPathJava = new ArrayList<>(); + // Map json-rpc subscriptions to java in deployment classpath + private final List jsonRpcSubscriptionToDeploymentClassPathJava = new ArrayList<>(); private static final List SESSIONS = Collections.synchronizedList(new ArrayList<>()); private JsonRpcCodec codec; @@ -84,37 +87,17 @@ public void populateJsonRPCRuntimeMethods(Map methodNames) { - this.jsonRpcToDeploymentClassPathJava.clear(); - this.jsonRpcToDeploymentClassPathJava.addAll(methodNames); + public void setJsonRPCDeploymentActions(List methods, List subscriptions) { + this.jsonRpcMethodToDeploymentClassPathJava.clear(); + this.jsonRpcMethodToDeploymentClassPathJava.addAll(methods); + this.jsonRpcSubscriptionToDeploymentClassPathJava.clear(); + this.jsonRpcSubscriptionToDeploymentClassPathJava.addAll(subscriptions); } public void initializeCodec(JsonMapper jsonMapper) { this.codec = new JsonRpcCodec(jsonMapper); } - private Uni invoke(ReflectionInfo info, Object target, Object[] args) { - if (info.isReturningUni()) { - try { - Uni uni = ((Uni) info.method.invoke(target, args)); - if (info.isExplicitlyBlocking()) { - return uni.runSubscriptionOn(Infrastructure.getDefaultExecutor()); - } else { - return uni; - } - } catch (Exception e) { - return Uni.createFrom().failure(e); - } - } else { - Uni uni = Uni.createFrom().item(Unchecked.supplier(() -> info.method.invoke(target, args))); - if (!info.isExplicitlyNonBlocking()) { - return uni.runSubscriptionOn(Infrastructure.getDefaultExecutor()); - } else { - return uni; - } - } - } - public void addSocket(ServerWebSocket socket) { SESSIONS.add(socket); socket.textMessageHandler((e) -> { @@ -150,34 +133,125 @@ private void purge() { private void route(JsonRpcRequest jsonRpcRequest, ServerWebSocket s) { String jsonRpcMethodName = jsonRpcRequest.getMethod(); - // First check some internal methods - if (jsonRpcMethodName.equalsIgnoreCase(UNSUBSCRIBE)) { - if (this.subscriptions.containsKey(jsonRpcRequest.getId())) { - Cancellable cancellable = this.subscriptions.remove(jsonRpcRequest.getId()); - cancellable.cancel(); - } - codec.writeResponse(s, jsonRpcRequest.getId(), null, MessageType.Void); + if (jsonRpcMethodName.equalsIgnoreCase(UNSUBSCRIBE)) {// First check some internal methods + this.routeUnsubscribe(jsonRpcRequest, s); } else if (this.jsonRpcToRuntimeClassPathJava.containsKey(jsonRpcMethodName)) { // Route to extension (runtime) - ReflectionInfo reflectionInfo = this.jsonRpcToRuntimeClassPathJava.get(jsonRpcMethodName); - Object target = Arc.container().select(reflectionInfo.bean).get(); + this.routeToRuntime(jsonRpcRequest, s); + } else if (this.jsonRpcMethodToDeploymentClassPathJava.contains(jsonRpcMethodName) + || this.jsonRpcSubscriptionToDeploymentClassPathJava.contains(jsonRpcMethodName)) { // Route to extension (deployment) + this.routeToDeployment(jsonRpcRequest, s); + } else { + // Method not found + codec.writeMethodNotFoundResponse(s, jsonRpcRequest.getId(), jsonRpcMethodName); + } + } - if (reflectionInfo.isReturningMulti()) { - Multi multi; - try { - if (jsonRpcRequest.hasParams()) { - Object[] args = getArgsAsObjects(reflectionInfo.params, jsonRpcRequest); - multi = (Multi) reflectionInfo.method.invoke(target, args); + private void routeUnsubscribe(JsonRpcRequest jsonRpcRequest, ServerWebSocket s) { + if (this.subscriptions.containsKey(jsonRpcRequest.getId())) { + Cancellable cancellable = this.subscriptions.remove(jsonRpcRequest.getId()); + cancellable.cancel(); + } + codec.writeResponse(s, jsonRpcRequest.getId(), null, MessageType.Void); + } + + private void routeToRuntime(JsonRpcRequest jsonRpcRequest, ServerWebSocket s) { + String jsonRpcMethodName = jsonRpcRequest.getMethod(); + ReflectionInfo reflectionInfo = this.jsonRpcToRuntimeClassPathJava.get(jsonRpcMethodName); + Object target = Arc.container().select(reflectionInfo.bean).get(); + + if (reflectionInfo.isReturningMulti()) { + this.routeToRuntimeSubscription(jsonRpcRequest, s, jsonRpcMethodName, reflectionInfo, target); + } else { + // The invocation will return a Uni + this.routeToRuntimeMethod(jsonRpcRequest, s, jsonRpcMethodName, reflectionInfo, target); + } + } + + private void routeToRuntimeSubscription(JsonRpcRequest jsonRpcRequest, ServerWebSocket s, String jsonRpcMethodName, + ReflectionInfo reflectionInfo, Object target) { + Multi multi; + try { + if (jsonRpcRequest.hasParams()) { + Object[] args = getArgsAsObjects(reflectionInfo.params, jsonRpcRequest); + multi = (Multi) reflectionInfo.method.invoke(target, args); + } else { + multi = (Multi) reflectionInfo.method.invoke(target); + } + } catch (Exception e) { + logger.errorf(e, "Unable to invoke method %s using JSON-RPC, request was: %s", jsonRpcMethodName, + jsonRpcRequest); + codec.writeErrorResponse(s, jsonRpcRequest.getId(), jsonRpcMethodName, e); + return; + } + + Cancellable cancellable = multi.subscribe() + .with( + item -> { + codec.writeResponse(s, jsonRpcRequest.getId(), item, MessageType.SubscriptionMessage); + }, + failure -> { + codec.writeErrorResponse(s, jsonRpcRequest.getId(), jsonRpcMethodName, failure); + this.subscriptions.remove(jsonRpcRequest.getId()); + }, + () -> this.subscriptions.remove(jsonRpcRequest.getId())); + + this.subscriptions.put(jsonRpcRequest.getId(), cancellable); + codec.writeResponse(s, jsonRpcRequest.getId(), null, MessageType.Void); + } + + private void routeToRuntimeMethod(JsonRpcRequest jsonRpcRequest, ServerWebSocket s, String jsonRpcMethodName, + ReflectionInfo reflectionInfo, Object target) { + Uni uni; + try { + if (jsonRpcRequest.hasParams()) { + Object[] args = getArgsAsObjects(reflectionInfo.params, jsonRpcRequest); + uni = invoke(reflectionInfo, target, args); + } else { + uni = invoke(reflectionInfo, target, new Object[0]); + } + } catch (Exception e) { + logger.errorf(e, "Unable to invoke method %s using JSON-RPC, request was: %s", jsonRpcMethodName, + jsonRpcRequest); + codec.writeErrorResponse(s, jsonRpcRequest.getId(), jsonRpcMethodName, e); + return; + } + uni.subscribe() + .with(item -> { + if (item != null && JsonRpcMessage.class.isAssignableFrom(item.getClass())) { + JsonRpcMessage jsonRpcMessage = (JsonRpcMessage) item; + codec.writeResponse(s, jsonRpcRequest.getId(), jsonRpcMessage.getResponse(), + jsonRpcMessage.getMessageType()); } else { - multi = (Multi) reflectionInfo.method.invoke(target); + codec.writeResponse(s, jsonRpcRequest.getId(), item, + MessageType.Response); } - } catch (Exception e) { - logger.errorf(e, "Unable to invoke method %s using JSON-RPC, request was: %s", jsonRpcMethodName, - jsonRpcRequest); - codec.writeErrorResponse(s, jsonRpcRequest.getId(), jsonRpcMethodName, e); - return; - } + }, failure -> { + Throwable actualFailure; + // If the jsonrpc method is actually + // synchronous, the failure is wrapped in an + // InvocationTargetException, so unwrap it here + if (failure instanceof InvocationTargetException f) { + actualFailure = f.getTargetException(); + } else if (failure.getCause() != null + && failure.getCause() instanceof InvocationTargetException f) { + actualFailure = f.getTargetException(); + } else { + actualFailure = failure; + } + codec.writeErrorResponse(s, jsonRpcRequest.getId(), jsonRpcMethodName, actualFailure); + }); + } + + private void routeToDeployment(JsonRpcRequest jsonRpcRequest, ServerWebSocket s) { + String jsonRpcMethodName = jsonRpcRequest.getMethod(); + Object returnedObject = DevConsoleManager.invoke(jsonRpcMethodName, getArgsAsMap(jsonRpcRequest)); + if (returnedObject != null) { + // Support for Mutiny is diffcult because we are between the runtime and deployment classpath. + // Supporting something like CompletableFuture and Flow.Publisher that is in the JDK works fine + if (returnedObject instanceof Flow.Publisher) { + Flow.Publisher publisher = (Flow.Publisher) returnedObject; - Cancellable cancellable = multi.subscribe() + Cancellable cancellable = Multi.createFrom().publisher(publisher).subscribe() .with( item -> { codec.writeResponse(s, jsonRpcRequest.getId(), item, MessageType.SubscriptionMessage); @@ -190,55 +264,8 @@ private void route(JsonRpcRequest jsonRpcRequest, ServerWebSocket s) { this.subscriptions.put(jsonRpcRequest.getId(), cancellable); codec.writeResponse(s, jsonRpcRequest.getId(), null, MessageType.Void); - } else { - // The invocation will return a Uni - Uni uni; - try { - if (jsonRpcRequest.hasParams()) { - Object[] args = getArgsAsObjects(reflectionInfo.params, jsonRpcRequest); - uni = invoke(reflectionInfo, target, args); - } else { - uni = invoke(reflectionInfo, target, new Object[0]); - } - } catch (Exception e) { - logger.errorf(e, "Unable to invoke method %s using JSON-RPC, request was: %s", jsonRpcMethodName, - jsonRpcRequest); - codec.writeErrorResponse(s, jsonRpcRequest.getId(), jsonRpcMethodName, e); - return; - } - uni.subscribe() - .with(item -> { - if (item != null && JsonRpcMessage.class.isAssignableFrom(item.getClass())) { - JsonRpcMessage jsonRpcMessage = (JsonRpcMessage) item; - codec.writeResponse(s, jsonRpcRequest.getId(), jsonRpcMessage.getResponse(), - jsonRpcMessage.getMessageType()); - } else { - codec.writeResponse(s, jsonRpcRequest.getId(), item, - MessageType.Response); - } - }, failure -> { - Throwable actualFailure; - // If the jsonrpc method is actually - // synchronous, the failure is wrapped in an - // InvocationTargetException, so unwrap it here - if (failure instanceof InvocationTargetException f) { - actualFailure = f.getTargetException(); - } else if (failure.getCause() != null - && failure.getCause() instanceof InvocationTargetException f) { - actualFailure = f.getTargetException(); - } else { - actualFailure = failure; - } - codec.writeErrorResponse(s, jsonRpcRequest.getId(), jsonRpcMethodName, actualFailure); - }); - } - } else if (this.jsonRpcToDeploymentClassPathJava.contains(jsonRpcMethodName)) { // Route to extension (deployment) - Object item = DevConsoleManager.invoke(jsonRpcMethodName, getArgsAsMap(jsonRpcRequest)); - - // Support for Mutiny is diffcult because we are between the runtime and deployment classpath. - // Supporting something like CompletableFuture that is in the JDK works fine - if (item instanceof CompletionStage) { - CompletionStage future = (CompletionStage) item; + } else if (returnedObject instanceof CompletionStage) { + CompletionStage future = (CompletionStage) returnedObject; future.thenAccept(r -> { codec.writeResponse(s, jsonRpcRequest.getId(), r, MessageType.Response); @@ -247,12 +274,31 @@ private void route(JsonRpcRequest jsonRpcRequest, ServerWebSocket s) { return null; }); } else { - codec.writeResponse(s, jsonRpcRequest.getId(), item, + codec.writeResponse(s, jsonRpcRequest.getId(), returnedObject, MessageType.Response); } + } + } + + private Uni invoke(ReflectionInfo info, Object target, Object[] args) { + if (info.isReturningUni()) { + try { + Uni uni = ((Uni) info.method.invoke(target, args)); + if (info.isExplicitlyBlocking()) { + return uni.runSubscriptionOn(Infrastructure.getDefaultExecutor()); + } else { + return uni; + } + } catch (Exception e) { + return Uni.createFrom().failure(e); + } } else { - // Method not found - codec.writeMethodNotFoundResponse(s, jsonRpcRequest.getId(), jsonRpcMethodName); + Uni uni = Uni.createFrom().item(Unchecked.supplier(() -> info.method.invoke(target, args))); + if (!info.isExplicitlyNonBlocking()) { + return uni.runSubscriptionOn(Infrastructure.getDefaultExecutor()); + } else { + return uni; + } } }