Skip to content

Commit

Permalink
Merge pull request #20579 from mkouba/issue-17428
Browse files Browse the repository at this point in the history
Dev mode - gRPC: close streams on reload for non-mutiny services
  • Loading branch information
michalszynkiewicz authored Oct 7, 2021
2 parents 8f312f5 + 377dc28 commit b131a12
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ private Set<MethodInfo> gatherBlockingMethods(ClassInfo service) {
AnnotationsTransformerBuildItem transformUserDefinedServices(CombinedIndexBuildItem combinedIndexBuildItem,
CustomScopeAnnotationsBuildItem customScopes) {
// User-defined services usually only declare the @GrpcService qualifier
// We need to add @GrpcEnableRequestContext and @Singleton if needed
// We need to add @Singleton if needed
Set<DotName> userDefinedServices = new HashSet<>();
for (AnnotationInstance annotation : combinedIndexBuildItem.getIndex().getAnnotations(GrpcDotNames.GRPC_SERVICE)) {
if (annotation.target().kind() == Kind.CLASS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

import javax.inject.Singleton;

import org.jboss.jandex.AnnotationInstance;
import org.jboss.jandex.AnnotationTarget.Kind;
import org.jboss.jandex.ClassInfo;
import org.jboss.jandex.DotName;
import org.jboss.jandex.IndexView;
Expand All @@ -28,9 +30,12 @@
import io.grpc.MethodDescriptor.PrototypeMarshaller;
import io.grpc.ServiceDescriptor;
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.deployment.AnnotationsTransformerBuildItem;
import io.quarkus.arc.deployment.GeneratedBeanBuildItem;
import io.quarkus.arc.deployment.GeneratedBeanGizmoAdaptor;
import io.quarkus.arc.deployment.UnremovableBeanBuildItem;
import io.quarkus.arc.processor.AnnotationsTransformer;
import io.quarkus.arc.processor.DotNames;
import io.quarkus.arc.runtime.BeanLookupSupplier;
import io.quarkus.deployment.IsDevelopment;
import io.quarkus.deployment.annotations.BuildProducer;
Expand All @@ -50,21 +55,29 @@
import io.quarkus.grpc.deployment.DelegatingGrpcBeanBuildItem;
import io.quarkus.grpc.deployment.GrpcDotNames;
import io.quarkus.grpc.protoc.plugin.MutinyGrpcGenerator;
import io.quarkus.grpc.runtime.devmode.CollectStreams;
import io.quarkus.grpc.runtime.devmode.DelegatingGrpcBeansStorage;
import io.quarkus.grpc.runtime.devmode.GrpcDevConsoleRecorder;
import io.quarkus.grpc.runtime.devmode.GrpcServices;
import io.quarkus.grpc.runtime.devmode.StreamCollectorInterceptor;

public class GrpcDevConsoleProcessor {

@BuildStep(onlyIf = IsDevelopment.class)
public void devConsoleInfo(BuildProducer<AdditionalBeanBuildItem> beans,
BuildProducer<DevConsoleRuntimeTemplateInfoBuildItem> infos) {
beans.produce(AdditionalBeanBuildItem.unremovableOf(GrpcServices.class));
public void devConsoleInfo(BuildProducer<DevConsoleRuntimeTemplateInfoBuildItem> infos) {
infos.produce(
new DevConsoleRuntimeTemplateInfoBuildItem("grpcServices",
new BeanLookupSupplier(GrpcServices.class)));
}

@BuildStep(onlyIf = IsDevelopment.class)
public AdditionalBeanBuildItem beans() {
return AdditionalBeanBuildItem.builder()
.addBeanClass(GrpcServices.class)
.addBeanClasses(StreamCollectorInterceptor.class, CollectStreams.class)
.build();
}

@BuildStep(onlyIf = IsDevelopment.class)
void prepareDelegatingBeanStorage(
List<DelegatingGrpcBeanBuildItem> delegatingBeans,
Expand Down Expand Up @@ -135,6 +148,54 @@ public DevConsoleRouteBuildItem createWebSocketEndpoint(GrpcDevConsoleRecorder r
return new DevConsoleRouteBuildItem("grpc-test", "GET", recorder.handler());
}

@BuildStep(onlyIf = IsDevelopment.class)
AnnotationsTransformerBuildItem transformUserDefinedServices(CombinedIndexBuildItem combinedIndexBuildItem) {
Set<DotName> servicesToTransform = new HashSet<>();
IndexView index = combinedIndexBuildItem.getIndex();
for (AnnotationInstance annotation : index.getAnnotations(GrpcDotNames.GRPC_SERVICE)) {
if (annotation.target().kind() == Kind.CLASS) {
ClassInfo serviceClass = annotation.target().asClass();
// Transform a service if it's using the grpc-java API directly:
// 1. Must not implement MutinyService
if (getRawTypesInHierarchy(serviceClass, index).contains(GrpcDotNames.MUTINY_SERVICE)) {
continue;
}
// 2. The enclosing class of an extended class that implements BindableService must not implement MutinyGrpc
ClassInfo abstractBindableService = findAbstractBindableService(serviceClass, index);
if (abstractBindableService != null) {
ClassInfo enclosingClass = serviceClass.enclosingClass() != null
? index.getClassByName(serviceClass.enclosingClass())
: null;
if (enclosingClass != null
&& getRawTypesInHierarchy(enclosingClass, index).contains(GrpcDotNames.MUTINY_GRPC)) {
continue;
}
}
servicesToTransform.add(annotation.target().asClass().name());
}
}
if (servicesToTransform.isEmpty()) {
return null;
}
return new AnnotationsTransformerBuildItem(
new AnnotationsTransformer() {
@Override
public boolean appliesTo(Kind kind) {
return kind == Kind.CLASS;
}

@Override
public void transform(TransformationContext context) {
ClassInfo clazz = context.getTarget().asClass();
if (servicesToTransform.contains(clazz.name())) {
context.transform()
.add(CollectStreams.class)
.done();
}
}
});
}

Collection<Class<?>> getGrpcServices(IndexView index) throws ClassNotFoundException {
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
Set<String> serviceClassNames = new HashSet<>();
Expand Down Expand Up @@ -163,4 +224,40 @@ Collection<Class<?>> getGrpcServices(IndexView index) throws ClassNotFoundExcept
return serviceClasses;
}

private Set<DotName> getRawTypesInHierarchy(ClassInfo clazz, IndexView index) {
Set<DotName> rawTypes = new HashSet<>();
addRawTypes(clazz, index, rawTypes);
return rawTypes;
}

private void addRawTypes(ClassInfo clazz, IndexView index, Set<DotName> rawTypes) {
rawTypes.add(clazz.name());
for (DotName interfaceName : clazz.interfaceNames()) {
rawTypes.add(interfaceName);
ClassInfo interfaceClazz = index.getClassByName(interfaceName);
if (interfaceClazz != null) {
addRawTypes(interfaceClazz, index, rawTypes);
}
}
if (clazz.superName() != null && !clazz.superName().equals(DotNames.OBJECT)) {
ClassInfo superClazz = index.getClassByName(clazz.superName());
if (superClazz != null) {
addRawTypes(superClazz, index, rawTypes);
}
}
}

private ClassInfo findAbstractBindableService(ClassInfo clazz, IndexView index) {
if (clazz.interfaceNames().contains(GrpcDotNames.BINDABLE_SERVICE)) {
return clazz;
}
if (clazz.superName() != null && !clazz.superName().equals(DotNames.OBJECT)) {
ClassInfo superClazz = index.getClassByName(clazz.superName());
if (superClazz != null) {
return findAbstractBindableService(superClazz, index);
}
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,16 @@
span.connected-status {
font-size: 0.5em;
margin-left: 1em;
}
.CodeMirror {
height: auto;
border: 1px solid #ddd;
}
{/style}
{#styleref}
<link rel="stylesheet" href="{devRootAppend}/resources/codemirror/lib/codemirror.css">
<link rel="stylesheet" href="{devRootAppend}/resources/codemirror/addon/hint/show-hint.css">
{/styleref}
{#script}
var grpcWS;
var requestId = 0;
Expand Down Expand Up @@ -143,8 +151,20 @@
openInIDE($(this).text());
});

// Use codemirror editors
document.querySelectorAll('.grpc-input')
.forEach(textArea => textArea.style.height = (textArea.scrollHeight + 5) + "px");
.forEach(function(textArea) {
const editor = CodeMirror.fromTextArea(textArea, {
mode: { name: "javascript", json: true },
styleActiveLine: true,
lineNumbers: true,
lineWrapping: true,
extraKeys: {"Ctrl-Space": "autocomplete"}
});
editor.on("blur", function(codeMirror) { codeMirror.save(); });
editor.refresh();
});

connect();
});

Expand Down Expand Up @@ -189,8 +209,16 @@
showConnected(connection);
}
}

{/script}

{#scriptref}
<script src="{devRootAppend}/resources/codemirror/lib/codemirror.js"></script>
<script src="{devRootAppend}/resources/codemirror/mode/javascript/javascript.js"></script>
<script src="{devRootAppend}/resources/codemirror/addon/hint/show-hint.js"></script>
<script src="{devRootAppend}/resources/codemirror/addon/selection/active-line.js"></script>
{/scriptref}

{#breadcrumbs}<i class="fas fa-chevron-right fa-sm breadcrumb-separator"></i> <a href="/q/dev/io.quarkus.quarkus-grpc/services">Services</a>{/breadcrumbs}
{#title}{info:grpcServices.get(currentRequest.params.get('name')).name}{/title}
{#body}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.quarkus.grpc.runtime.devmode;

import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import javax.interceptor.InterceptorBinding;

/**
*
* @see StreamCollectorInterceptor
*/
@InterceptorBinding
@Target({ TYPE })
@Retention(RUNTIME)
public @interface CollectStreams {

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.ServerMethodDefinition;
import io.quarkus.arc.Subclass;
import io.quarkus.arc.Unremovable;
import io.quarkus.dev.console.DevConsoleManager;
import io.quarkus.grpc.runtime.GrpcServerRecorder;
import io.quarkus.grpc.runtime.GrpcServerRecorder.GrpcServiceDefinition;
import io.quarkus.grpc.runtime.config.GrpcConfiguration;
import io.quarkus.grpc.runtime.devmode.GrpcServices.ServiceDefinitionAndStatus;
import io.quarkus.grpc.runtime.health.GrpcHealthStorage;

@Unremovable
@Singleton
public class GrpcServices extends AbstractMap<String, ServiceDefinitionAndStatus> {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package io.quarkus.grpc.runtime.devmode;

import javax.annotation.Priority;
import javax.interceptor.AroundInvoke;
import javax.interceptor.Interceptor;
import javax.interceptor.InvocationContext;

import io.grpc.stub.StreamObserver;
import io.quarkus.grpc.runtime.ServerCalls;
import io.quarkus.grpc.runtime.StreamCollector;

@CollectStreams
@Priority(1)
@Interceptor
public class StreamCollectorInterceptor {

private final StreamCollector streamCollector;

public StreamCollectorInterceptor() {
this.streamCollector = ServerCalls.getStreamCollector();
}

@SuppressWarnings("unchecked")
@AroundInvoke
Object collect(InvocationContext context) throws Exception {
// Wraps the first StreamObserver parameter if available
Object[] params = context.getParameters();
int streamIndex = 0;
StreamObserver<Object> stream = null;
for (int i = 0; i < params.length; i++) {
Object param = params[i];
if (param == null) {
continue;
}
if (StreamObserver.class.isAssignableFrom(param.getClass())) {
stream = (StreamObserver<Object>) param;
streamIndex = i;
break;
}
}
if (stream == null) {
return context.proceed();
}
streamCollector.add(stream);
Object[] newParams = new Object[params.length];
for (int i = 0; i < params.length; i++) {
if (i == streamIndex) {
newParams[i] = new StreamObserverWrapper<>(stream);
} else {
newParams[i] = params[i];
}
}
context.setParameters(newParams);
return context.proceed();
}

private final class StreamObserverWrapper<T> implements StreamObserver<T> {

private final StreamObserver<T> delegate;

public StreamObserverWrapper(StreamObserver<T> delegate) {
this.delegate = delegate;
}

@Override
public void onNext(T value) {
delegate.onNext(value);
}

@Override
public void onError(Throwable t) {
delegate.onError(t);
streamCollector.remove(delegate);
}

@Override
public void onCompleted() {
delegate.onCompleted();
streamCollector.remove(delegate);
}

}

}

0 comments on commit b131a12

Please sign in to comment.