From 377dc28565eecaf45fb98f0937a59c09196de1cc Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Thu, 7 Oct 2021 08:15:29 +0200 Subject: [PATCH] Dev mode - gRPC: close streams on reload for non-mutiny services - resolves #17428 - also use codemirror editor in Dev UI --- .../grpc/deployment/GrpcServerProcessor.java | 2 +- .../devmode/GrpcDevConsoleProcessor.java | 103 +++++++++++++++++- .../main/resources/dev-templates/service.html | 32 +++++- .../grpc/runtime/devmode/CollectStreams.java | 20 ++++ .../grpc/runtime/devmode/GrpcServices.java | 2 + .../devmode/StreamCollectorInterceptor.java | 84 ++++++++++++++ 6 files changed, 237 insertions(+), 6 deletions(-) create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/CollectStreams.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/StreamCollectorInterceptor.java diff --git a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java index 80d734ff7aa81..527b74171fdb1 100644 --- a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java +++ b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java @@ -261,7 +261,7 @@ private Set gatherBlockingMethods(ClassInfo service) { AnnotationsTransformerBuildItem transformUserDefinedServices(CombinedIndexBuildItem combinedIndexBuildItem, CustomScopeAnnotationsBuildItem customScopes) { // User-defined services usually only declare the @GrpcService qualifier - // We need to add @GrpcEnableRequestContext and @Singleton if needed + // We need to add @Singleton if needed Set userDefinedServices = new HashSet<>(); for (AnnotationInstance annotation : combinedIndexBuildItem.getIndex().getAnnotations(GrpcDotNames.GRPC_SERVICE)) { if (annotation.target().kind() == Kind.CLASS) { diff --git a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/devmode/GrpcDevConsoleProcessor.java b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/devmode/GrpcDevConsoleProcessor.java index 343f45d0d68ef..e0fe378ab8f5c 100644 --- a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/devmode/GrpcDevConsoleProcessor.java +++ b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/devmode/GrpcDevConsoleProcessor.java @@ -13,6 +13,8 @@ import javax.inject.Singleton; +import org.jboss.jandex.AnnotationInstance; +import org.jboss.jandex.AnnotationTarget.Kind; import org.jboss.jandex.ClassInfo; import org.jboss.jandex.DotName; import org.jboss.jandex.IndexView; @@ -28,9 +30,12 @@ import io.grpc.MethodDescriptor.PrototypeMarshaller; import io.grpc.ServiceDescriptor; import io.quarkus.arc.deployment.AdditionalBeanBuildItem; +import io.quarkus.arc.deployment.AnnotationsTransformerBuildItem; import io.quarkus.arc.deployment.GeneratedBeanBuildItem; import io.quarkus.arc.deployment.GeneratedBeanGizmoAdaptor; import io.quarkus.arc.deployment.UnremovableBeanBuildItem; +import io.quarkus.arc.processor.AnnotationsTransformer; +import io.quarkus.arc.processor.DotNames; import io.quarkus.arc.runtime.BeanLookupSupplier; import io.quarkus.deployment.IsDevelopment; import io.quarkus.deployment.annotations.BuildProducer; @@ -50,21 +55,29 @@ import io.quarkus.grpc.deployment.DelegatingGrpcBeanBuildItem; import io.quarkus.grpc.deployment.GrpcDotNames; import io.quarkus.grpc.protoc.plugin.MutinyGrpcGenerator; +import io.quarkus.grpc.runtime.devmode.CollectStreams; import io.quarkus.grpc.runtime.devmode.DelegatingGrpcBeansStorage; import io.quarkus.grpc.runtime.devmode.GrpcDevConsoleRecorder; import io.quarkus.grpc.runtime.devmode.GrpcServices; +import io.quarkus.grpc.runtime.devmode.StreamCollectorInterceptor; public class GrpcDevConsoleProcessor { @BuildStep(onlyIf = IsDevelopment.class) - public void devConsoleInfo(BuildProducer beans, - BuildProducer infos) { - beans.produce(AdditionalBeanBuildItem.unremovableOf(GrpcServices.class)); + public void devConsoleInfo(BuildProducer infos) { infos.produce( new DevConsoleRuntimeTemplateInfoBuildItem("grpcServices", new BeanLookupSupplier(GrpcServices.class))); } + @BuildStep(onlyIf = IsDevelopment.class) + public AdditionalBeanBuildItem beans() { + return AdditionalBeanBuildItem.builder() + .addBeanClass(GrpcServices.class) + .addBeanClasses(StreamCollectorInterceptor.class, CollectStreams.class) + .build(); + } + @BuildStep(onlyIf = IsDevelopment.class) void prepareDelegatingBeanStorage( List delegatingBeans, @@ -135,6 +148,54 @@ public DevConsoleRouteBuildItem createWebSocketEndpoint(GrpcDevConsoleRecorder r return new DevConsoleRouteBuildItem("grpc-test", "GET", recorder.handler()); } + @BuildStep(onlyIf = IsDevelopment.class) + AnnotationsTransformerBuildItem transformUserDefinedServices(CombinedIndexBuildItem combinedIndexBuildItem) { + Set servicesToTransform = new HashSet<>(); + IndexView index = combinedIndexBuildItem.getIndex(); + for (AnnotationInstance annotation : index.getAnnotations(GrpcDotNames.GRPC_SERVICE)) { + if (annotation.target().kind() == Kind.CLASS) { + ClassInfo serviceClass = annotation.target().asClass(); + // Transform a service if it's using the grpc-java API directly: + // 1. Must not implement MutinyService + if (getRawTypesInHierarchy(serviceClass, index).contains(GrpcDotNames.MUTINY_SERVICE)) { + continue; + } + // 2. The enclosing class of an extended class that implements BindableService must not implement MutinyGrpc + ClassInfo abstractBindableService = findAbstractBindableService(serviceClass, index); + if (abstractBindableService != null) { + ClassInfo enclosingClass = serviceClass.enclosingClass() != null + ? index.getClassByName(serviceClass.enclosingClass()) + : null; + if (enclosingClass != null + && getRawTypesInHierarchy(enclosingClass, index).contains(GrpcDotNames.MUTINY_GRPC)) { + continue; + } + } + servicesToTransform.add(annotation.target().asClass().name()); + } + } + if (servicesToTransform.isEmpty()) { + return null; + } + return new AnnotationsTransformerBuildItem( + new AnnotationsTransformer() { + @Override + public boolean appliesTo(Kind kind) { + return kind == Kind.CLASS; + } + + @Override + public void transform(TransformationContext context) { + ClassInfo clazz = context.getTarget().asClass(); + if (servicesToTransform.contains(clazz.name())) { + context.transform() + .add(CollectStreams.class) + .done(); + } + } + }); + } + Collection> getGrpcServices(IndexView index) throws ClassNotFoundException { ClassLoader tccl = Thread.currentThread().getContextClassLoader(); Set serviceClassNames = new HashSet<>(); @@ -163,4 +224,40 @@ Collection> getGrpcServices(IndexView index) throws ClassNotFoundExcept return serviceClasses; } + private Set getRawTypesInHierarchy(ClassInfo clazz, IndexView index) { + Set rawTypes = new HashSet<>(); + addRawTypes(clazz, index, rawTypes); + return rawTypes; + } + + private void addRawTypes(ClassInfo clazz, IndexView index, Set rawTypes) { + rawTypes.add(clazz.name()); + for (DotName interfaceName : clazz.interfaceNames()) { + rawTypes.add(interfaceName); + ClassInfo interfaceClazz = index.getClassByName(interfaceName); + if (interfaceClazz != null) { + addRawTypes(interfaceClazz, index, rawTypes); + } + } + if (clazz.superName() != null && !clazz.superName().equals(DotNames.OBJECT)) { + ClassInfo superClazz = index.getClassByName(clazz.superName()); + if (superClazz != null) { + addRawTypes(superClazz, index, rawTypes); + } + } + } + + private ClassInfo findAbstractBindableService(ClassInfo clazz, IndexView index) { + if (clazz.interfaceNames().contains(GrpcDotNames.BINDABLE_SERVICE)) { + return clazz; + } + if (clazz.superName() != null && !clazz.superName().equals(DotNames.OBJECT)) { + ClassInfo superClazz = index.getClassByName(clazz.superName()); + if (superClazz != null) { + return findAbstractBindableService(superClazz, index); + } + } + return null; + } + } diff --git a/extensions/grpc/deployment/src/main/resources/dev-templates/service.html b/extensions/grpc/deployment/src/main/resources/dev-templates/service.html index f339400225c43..18e3aaf5155e4 100644 --- a/extensions/grpc/deployment/src/main/resources/dev-templates/service.html +++ b/extensions/grpc/deployment/src/main/resources/dev-templates/service.html @@ -33,8 +33,16 @@ span.connected-status { font-size: 0.5em; margin-left: 1em; +} +.CodeMirror { + height: auto; + border: 1px solid #ddd; } {/style} + {#styleref} + + + {/styleref} {#script} var grpcWS; var requestId = 0; @@ -143,8 +151,20 @@ openInIDE($(this).text()); }); + // Use codemirror editors document.querySelectorAll('.grpc-input') - .forEach(textArea => textArea.style.height = (textArea.scrollHeight + 5) + "px"); + .forEach(function(textArea) { + const editor = CodeMirror.fromTextArea(textArea, { + mode: { name: "javascript", json: true }, + styleActiveLine: true, + lineNumbers: true, + lineWrapping: true, + extraKeys: {"Ctrl-Space": "autocomplete"} + }); + editor.on("blur", function(codeMirror) { codeMirror.save(); }); + editor.refresh(); + }); + connect(); }); @@ -189,8 +209,16 @@ showConnected(connection); } } - + {/script} + + {#scriptref} + + + + + {/scriptref} + {#breadcrumbs} Services{/breadcrumbs} {#title}{info:grpcServices.get(currentRequest.params.get('name')).name}{/title} {#body} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/CollectStreams.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/CollectStreams.java new file mode 100644 index 0000000000000..f4aebfe2910ae --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/CollectStreams.java @@ -0,0 +1,20 @@ +package io.quarkus.grpc.runtime.devmode; + +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import javax.interceptor.InterceptorBinding; + +/** + * + * @see StreamCollectorInterceptor + */ +@InterceptorBinding +@Target({ TYPE }) +@Retention(RUNTIME) +public @interface CollectStreams { + +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/GrpcServices.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/GrpcServices.java index dd075a6283408..a389c25d53061 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/GrpcServices.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/GrpcServices.java @@ -15,6 +15,7 @@ import io.grpc.MethodDescriptor.MethodType; import io.grpc.ServerMethodDefinition; import io.quarkus.arc.Subclass; +import io.quarkus.arc.Unremovable; import io.quarkus.dev.console.DevConsoleManager; import io.quarkus.grpc.runtime.GrpcServerRecorder; import io.quarkus.grpc.runtime.GrpcServerRecorder.GrpcServiceDefinition; @@ -22,6 +23,7 @@ import io.quarkus.grpc.runtime.devmode.GrpcServices.ServiceDefinitionAndStatus; import io.quarkus.grpc.runtime.health.GrpcHealthStorage; +@Unremovable @Singleton public class GrpcServices extends AbstractMap { diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/StreamCollectorInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/StreamCollectorInterceptor.java new file mode 100644 index 0000000000000..95281821719c1 --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/StreamCollectorInterceptor.java @@ -0,0 +1,84 @@ +package io.quarkus.grpc.runtime.devmode; + +import javax.annotation.Priority; +import javax.interceptor.AroundInvoke; +import javax.interceptor.Interceptor; +import javax.interceptor.InvocationContext; + +import io.grpc.stub.StreamObserver; +import io.quarkus.grpc.runtime.ServerCalls; +import io.quarkus.grpc.runtime.StreamCollector; + +@CollectStreams +@Priority(1) +@Interceptor +public class StreamCollectorInterceptor { + + private final StreamCollector streamCollector; + + public StreamCollectorInterceptor() { + this.streamCollector = ServerCalls.getStreamCollector(); + } + + @SuppressWarnings("unchecked") + @AroundInvoke + Object collect(InvocationContext context) throws Exception { + // Wraps the first StreamObserver parameter if available + Object[] params = context.getParameters(); + int streamIndex = 0; + StreamObserver stream = null; + for (int i = 0; i < params.length; i++) { + Object param = params[i]; + if (param == null) { + continue; + } + if (StreamObserver.class.isAssignableFrom(param.getClass())) { + stream = (StreamObserver) param; + streamIndex = i; + break; + } + } + if (stream == null) { + return context.proceed(); + } + streamCollector.add(stream); + Object[] newParams = new Object[params.length]; + for (int i = 0; i < params.length; i++) { + if (i == streamIndex) { + newParams[i] = new StreamObserverWrapper<>(stream); + } else { + newParams[i] = params[i]; + } + } + context.setParameters(newParams); + return context.proceed(); + } + + private final class StreamObserverWrapper implements StreamObserver { + + private final StreamObserver delegate; + + public StreamObserverWrapper(StreamObserver delegate) { + this.delegate = delegate; + } + + @Override + public void onNext(T value) { + delegate.onNext(value); + } + + @Override + public void onError(Throwable t) { + delegate.onError(t); + streamCollector.remove(delegate); + } + + @Override + public void onCompleted() { + delegate.onCompleted(); + streamCollector.remove(delegate); + } + + } + +}