Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev mode - gRPC: close streams on reload for non-mutiny services #20579

Merged
merged 1 commit into from
Oct 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ private Set<MethodInfo> gatherBlockingMethods(ClassInfo service) {
AnnotationsTransformerBuildItem transformUserDefinedServices(CombinedIndexBuildItem combinedIndexBuildItem,
CustomScopeAnnotationsBuildItem customScopes) {
// User-defined services usually only declare the @GrpcService qualifier
// We need to add @GrpcEnableRequestContext and @Singleton if needed
// We need to add @Singleton if needed
Set<DotName> userDefinedServices = new HashSet<>();
for (AnnotationInstance annotation : combinedIndexBuildItem.getIndex().getAnnotations(GrpcDotNames.GRPC_SERVICE)) {
if (annotation.target().kind() == Kind.CLASS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

import javax.inject.Singleton;

import org.jboss.jandex.AnnotationInstance;
import org.jboss.jandex.AnnotationTarget.Kind;
import org.jboss.jandex.ClassInfo;
import org.jboss.jandex.DotName;
import org.jboss.jandex.IndexView;
Expand All @@ -28,9 +30,12 @@
import io.grpc.MethodDescriptor.PrototypeMarshaller;
import io.grpc.ServiceDescriptor;
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.deployment.AnnotationsTransformerBuildItem;
import io.quarkus.arc.deployment.GeneratedBeanBuildItem;
import io.quarkus.arc.deployment.GeneratedBeanGizmoAdaptor;
import io.quarkus.arc.deployment.UnremovableBeanBuildItem;
import io.quarkus.arc.processor.AnnotationsTransformer;
import io.quarkus.arc.processor.DotNames;
import io.quarkus.arc.runtime.BeanLookupSupplier;
import io.quarkus.deployment.IsDevelopment;
import io.quarkus.deployment.annotations.BuildProducer;
Expand All @@ -50,21 +55,29 @@
import io.quarkus.grpc.deployment.DelegatingGrpcBeanBuildItem;
import io.quarkus.grpc.deployment.GrpcDotNames;
import io.quarkus.grpc.protoc.plugin.MutinyGrpcGenerator;
import io.quarkus.grpc.runtime.devmode.CollectStreams;
import io.quarkus.grpc.runtime.devmode.DelegatingGrpcBeansStorage;
import io.quarkus.grpc.runtime.devmode.GrpcDevConsoleRecorder;
import io.quarkus.grpc.runtime.devmode.GrpcServices;
import io.quarkus.grpc.runtime.devmode.StreamCollectorInterceptor;

public class GrpcDevConsoleProcessor {

@BuildStep(onlyIf = IsDevelopment.class)
public void devConsoleInfo(BuildProducer<AdditionalBeanBuildItem> beans,
BuildProducer<DevConsoleRuntimeTemplateInfoBuildItem> infos) {
beans.produce(AdditionalBeanBuildItem.unremovableOf(GrpcServices.class));
public void devConsoleInfo(BuildProducer<DevConsoleRuntimeTemplateInfoBuildItem> infos) {
infos.produce(
new DevConsoleRuntimeTemplateInfoBuildItem("grpcServices",
new BeanLookupSupplier(GrpcServices.class)));
}

@BuildStep(onlyIf = IsDevelopment.class)
public AdditionalBeanBuildItem beans() {
return AdditionalBeanBuildItem.builder()
.addBeanClass(GrpcServices.class)
.addBeanClasses(StreamCollectorInterceptor.class, CollectStreams.class)
.build();
}

@BuildStep(onlyIf = IsDevelopment.class)
void prepareDelegatingBeanStorage(
List<DelegatingGrpcBeanBuildItem> delegatingBeans,
Expand Down Expand Up @@ -135,6 +148,54 @@ public DevConsoleRouteBuildItem createWebSocketEndpoint(GrpcDevConsoleRecorder r
return new DevConsoleRouteBuildItem("grpc-test", "GET", recorder.handler());
}

@BuildStep(onlyIf = IsDevelopment.class)
AnnotationsTransformerBuildItem transformUserDefinedServices(CombinedIndexBuildItem combinedIndexBuildItem) {
Set<DotName> servicesToTransform = new HashSet<>();
IndexView index = combinedIndexBuildItem.getIndex();
for (AnnotationInstance annotation : index.getAnnotations(GrpcDotNames.GRPC_SERVICE)) {
if (annotation.target().kind() == Kind.CLASS) {
ClassInfo serviceClass = annotation.target().asClass();
// Transform a service if it's using the grpc-java API directly:
// 1. Must not implement MutinyService
if (getRawTypesInHierarchy(serviceClass, index).contains(GrpcDotNames.MUTINY_SERVICE)) {
continue;
}
// 2. The enclosing class of an extended class that implements BindableService must not implement MutinyGrpc
ClassInfo abstractBindableService = findAbstractBindableService(serviceClass, index);
michalszynkiewicz marked this conversation as resolved.
Show resolved Hide resolved
if (abstractBindableService != null) {
ClassInfo enclosingClass = serviceClass.enclosingClass() != null
? index.getClassByName(serviceClass.enclosingClass())
: null;
if (enclosingClass != null
&& getRawTypesInHierarchy(enclosingClass, index).contains(GrpcDotNames.MUTINY_GRPC)) {
continue;
}
}
servicesToTransform.add(annotation.target().asClass().name());
}
}
if (servicesToTransform.isEmpty()) {
return null;
}
return new AnnotationsTransformerBuildItem(
new AnnotationsTransformer() {
@Override
public boolean appliesTo(Kind kind) {
return kind == Kind.CLASS;
}

@Override
public void transform(TransformationContext context) {
ClassInfo clazz = context.getTarget().asClass();
if (servicesToTransform.contains(clazz.name())) {
context.transform()
.add(CollectStreams.class)
.done();
}
}
});
}

Collection<Class<?>> getGrpcServices(IndexView index) throws ClassNotFoundException {
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
Set<String> serviceClassNames = new HashSet<>();
Expand Down Expand Up @@ -163,4 +224,40 @@ Collection<Class<?>> getGrpcServices(IndexView index) throws ClassNotFoundExcept
return serviceClasses;
}

private Set<DotName> getRawTypesInHierarchy(ClassInfo clazz, IndexView index) {
Set<DotName> rawTypes = new HashSet<>();
addRawTypes(clazz, index, rawTypes);
return rawTypes;
}

private void addRawTypes(ClassInfo clazz, IndexView index, Set<DotName> rawTypes) {
rawTypes.add(clazz.name());
for (DotName interfaceName : clazz.interfaceNames()) {
rawTypes.add(interfaceName);
ClassInfo interfaceClazz = index.getClassByName(interfaceName);
if (interfaceClazz != null) {
addRawTypes(interfaceClazz, index, rawTypes);
}
}
if (clazz.superName() != null && !clazz.superName().equals(DotNames.OBJECT)) {
ClassInfo superClazz = index.getClassByName(clazz.superName());
if (superClazz != null) {
addRawTypes(superClazz, index, rawTypes);
}
}
}

private ClassInfo findAbstractBindableService(ClassInfo clazz, IndexView index) {
if (clazz.interfaceNames().contains(GrpcDotNames.BINDABLE_SERVICE)) {
return clazz;
}
if (clazz.superName() != null && !clazz.superName().equals(DotNames.OBJECT)) {
ClassInfo superClazz = index.getClassByName(clazz.superName());
if (superClazz != null) {
return findAbstractBindableService(superClazz, index);
}
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,16 @@
span.connected-status {
font-size: 0.5em;
margin-left: 1em;
}
.CodeMirror {
height: auto;
border: 1px solid #ddd;
}
{/style}
{#styleref}
<link rel="stylesheet" href="{devRootAppend}/resources/codemirror/lib/codemirror.css">
<link rel="stylesheet" href="{devRootAppend}/resources/codemirror/addon/hint/show-hint.css">
{/styleref}
{#script}
var grpcWS;
var requestId = 0;
Expand Down Expand Up @@ -143,8 +151,20 @@
openInIDE($(this).text());
});

// Use codemirror editors
document.querySelectorAll('.grpc-input')
.forEach(textArea => textArea.style.height = (textArea.scrollHeight + 5) + "px");
.forEach(function(textArea) {
const editor = CodeMirror.fromTextArea(textArea, {
mode: { name: "javascript", json: true },
styleActiveLine: true,
lineNumbers: true,
lineWrapping: true,
extraKeys: {"Ctrl-Space": "autocomplete"}
michalszynkiewicz marked this conversation as resolved.
Show resolved Hide resolved
});
editor.on("blur", function(codeMirror) { codeMirror.save(); });
editor.refresh();
});

connect();
});

Expand Down Expand Up @@ -189,8 +209,16 @@
showConnected(connection);
}
}

{/script}

{#scriptref}
<script src="{devRootAppend}/resources/codemirror/lib/codemirror.js"></script>
<script src="{devRootAppend}/resources/codemirror/mode/javascript/javascript.js"></script>
<script src="{devRootAppend}/resources/codemirror/addon/hint/show-hint.js"></script>
<script src="{devRootAppend}/resources/codemirror/addon/selection/active-line.js"></script>
{/scriptref}

{#breadcrumbs}<i class="fas fa-chevron-right fa-sm breadcrumb-separator"></i> <a href="/q/dev/io.quarkus.quarkus-grpc/services">Services</a>{/breadcrumbs}
{#title}{info:grpcServices.get(currentRequest.params.get('name')).name}{/title}
{#body}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.quarkus.grpc.runtime.devmode;

import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import javax.interceptor.InterceptorBinding;

/**
*
* @see StreamCollectorInterceptor
*/
@InterceptorBinding
@Target({ TYPE })
@Retention(RUNTIME)
public @interface CollectStreams {

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.ServerMethodDefinition;
import io.quarkus.arc.Subclass;
import io.quarkus.arc.Unremovable;
import io.quarkus.dev.console.DevConsoleManager;
import io.quarkus.grpc.runtime.GrpcServerRecorder;
import io.quarkus.grpc.runtime.GrpcServerRecorder.GrpcServiceDefinition;
import io.quarkus.grpc.runtime.config.GrpcConfiguration;
import io.quarkus.grpc.runtime.devmode.GrpcServices.ServiceDefinitionAndStatus;
import io.quarkus.grpc.runtime.health.GrpcHealthStorage;

@Unremovable
@Singleton
public class GrpcServices extends AbstractMap<String, ServiceDefinitionAndStatus> {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package io.quarkus.grpc.runtime.devmode;

import javax.annotation.Priority;
import javax.interceptor.AroundInvoke;
import javax.interceptor.Interceptor;
import javax.interceptor.InvocationContext;

import io.grpc.stub.StreamObserver;
import io.quarkus.grpc.runtime.ServerCalls;
import io.quarkus.grpc.runtime.StreamCollector;

@CollectStreams
@Priority(1)
@Interceptor
public class StreamCollectorInterceptor {

private final StreamCollector streamCollector;

public StreamCollectorInterceptor() {
this.streamCollector = ServerCalls.getStreamCollector();
}

@SuppressWarnings("unchecked")
@AroundInvoke
Object collect(InvocationContext context) throws Exception {
// Wraps the first StreamObserver parameter if available
Object[] params = context.getParameters();
int streamIndex = 0;
StreamObserver<Object> stream = null;
for (int i = 0; i < params.length; i++) {
Object param = params[i];
if (param == null) {
continue;
}
if (StreamObserver.class.isAssignableFrom(param.getClass())) {
stream = (StreamObserver<Object>) param;
streamIndex = i;
break;
}
}
if (stream == null) {
return context.proceed();
}
streamCollector.add(stream);
Object[] newParams = new Object[params.length];
for (int i = 0; i < params.length; i++) {
if (i == streamIndex) {
newParams[i] = new StreamObserverWrapper<>(stream);
} else {
newParams[i] = params[i];
}
}
context.setParameters(newParams);
return context.proceed();
}

private final class StreamObserverWrapper<T> implements StreamObserver<T> {

private final StreamObserver<T> delegate;

public StreamObserverWrapper(StreamObserver<T> delegate) {
this.delegate = delegate;
}

@Override
public void onNext(T value) {
delegate.onNext(value);
}

@Override
public void onError(Throwable t) {
delegate.onError(t);
streamCollector.remove(delegate);
}

@Override
public void onCompleted() {
delegate.onCompleted();
streamCollector.remove(delegate);
}

}

}