diff --git a/docs/src/main/asciidoc/grpc-service-implementation.adoc b/docs/src/main/asciidoc/grpc-service-implementation.adoc index 9427774c9ee222..8327a7c12ea0d9 100644 --- a/docs/src/main/asciidoc/grpc-service-implementation.adoc +++ b/docs/src/main/asciidoc/grpc-service-implementation.adoc @@ -37,11 +37,16 @@ The second implementation base is: `MutinyGreeterGrpc.GreeterImplBase`. Note that these classes are not interfaces but regular classes. When extending them, you need to override the service methods defined in the service definition. +gRPC service implementations have to be annotated with `@GrpcService` annotation. + +NOTE: gRPC services have to be CDI beans with `Singleton` scope. The `@GrpcService` annotation makes sure of that +and, additionally, makes sure that `RequestContext`, needed e.g. to make database calls, is enabled in your service. + == Implementing a service with the default gRPC API To implement a gRPC service using the default gRPC API, create a class extending the default implementation base. Then, override the methods defined in the service interface. -Finally, implement the service as a CDI bean using the `@GrpcService` annotation: +Finally, implement the service and add the `@GrpcService` annotation: [source, java] ---- @@ -65,7 +70,7 @@ public class HelloService extends GreeterGrpc.GreeterImplBase { To implement a gRPC service using the Mutiny gRPC API, create a class extending the Mutiny implementation base. Then, override the methods defined in the service interface. These methods are using Mutiny types. -Finally, implement the service as a CDI bean using the `@Singleton` annotation: +Finally, implement the service and add the `@GrpcService` annotation: [source, java] ---- diff --git a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java index 158adc5dac6338..adb4052c0288b5 100644 --- a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java +++ b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java @@ -1,6 +1,7 @@ package io.quarkus.grpc.deployment; import static io.quarkus.deployment.Feature.GRPC_SERVER; +import static io.quarkus.grpc.deployment.GrpcDotNames.GRPC_SERVICE; import static java.util.Arrays.asList; import java.lang.reflect.Modifier; @@ -18,8 +19,8 @@ import io.grpc.BindableService; import io.grpc.internal.ServerImpl; import io.quarkus.arc.deployment.AdditionalBeanBuildItem; +import io.quarkus.arc.deployment.AnnotationsTransformerBuildItem; import io.quarkus.arc.deployment.ValidationPhaseBuildItem; -import io.quarkus.arc.deployment.ValidationPhaseBuildItem.ValidationErrorBuildItem; import io.quarkus.arc.processor.BeanInfo; import io.quarkus.arc.processor.BuiltinScope; import io.quarkus.deployment.IsDevelopment; @@ -44,6 +45,8 @@ import io.quarkus.grpc.runtime.config.GrpcServerBuildTimeConfig; import io.quarkus.grpc.runtime.health.GrpcHealthEndpoint; import io.quarkus.grpc.runtime.health.GrpcHealthStorage; +import io.quarkus.grpc.runtime.supports.context.GrpcEnableRequestContext; +import io.quarkus.grpc.runtime.supports.context.GrpcRequestContextCdiInterceptor; import io.quarkus.kubernetes.spi.KubernetesPortBuildItem; import io.quarkus.netty.deployment.MinNettyAllocatorMaxOrderBuildItem; import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem; @@ -66,9 +69,11 @@ MinNettyAllocatorMaxOrderBuildItem setMinimalNettyMaxOrderSize() { @BuildStep void discoverBindableServices(BuildProducer bindables, - CombinedIndexBuildItem combinedIndexBuildItem) { + CombinedIndexBuildItem combinedIndexBuildItem, + BuildProducer annotationTransformers) { Collection bindableServices = combinedIndexBuildItem.getIndex() .getAllKnownImplementors(GrpcDotNames.BINDABLE_SERVICE); + for (ClassInfo service : bindableServices) { if (Modifier.isAbstract(service.flags())) { continue; @@ -85,10 +90,16 @@ void discoverBindableServices(BuildProducer bindables, @BuildStep void validateBindableServices(ValidationPhaseBuildItem validationPhase, - BuildProducer errors) { + BuildProducer errors) { for (BeanInfo bean : validationPhase.getContext().beans().classBeans().withBeanType(BindableService.class)) { + //noinspection OptionalGetWithoutIsPresent + if (bean.getTarget().get().asClass().classAnnotation(GRPC_SERVICE) == null) { + errors.produce(new ValidationPhaseBuildItem.ValidationErrorBuildItem( + new IllegalStateException( + "A gRPC service bean must be annotated with io.quarkus.GrpcService: " + bean))); + } if (!bean.getScope().getDotName().equals(BuiltinScope.SINGLETON.getName())) { - errors.produce(new ValidationErrorBuildItem( + errors.produce(new ValidationPhaseBuildItem.ValidationErrorBuildItem( new IllegalStateException("A gRPC service bean must have the javax.inject.Singleton scope: " + bean))); } } @@ -109,6 +120,8 @@ void registerBeans(BuildProducer beans, List bindables, BuildProducer features) { // @GrpcService is a CDI stereotype beans.produce(new AdditionalBeanBuildItem(GrpcService.class)); + beans.produce(new AdditionalBeanBuildItem(GrpcRequestContextCdiInterceptor.class)); + beans.produce(new AdditionalBeanBuildItem(GrpcEnableRequestContext.class)); if (!bindables.isEmpty()) { beans.produce(AdditionalBeanBuildItem.unremovableOf(GrpcContainer.class)); features.produce(new FeatureBuildItem(GRPC_SERVER)); diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/tls/HelloWorldTlsService.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/tls/HelloWorldTlsService.java index a912a1dba40c23..ab4a5971704e13 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/tls/HelloWorldTlsService.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/tls/HelloWorldTlsService.java @@ -1,13 +1,12 @@ package io.quarkus.grpc.client.tls; -import javax.inject.Singleton; - import io.grpc.examples.helloworld.HelloReply; import io.grpc.examples.helloworld.HelloRequest; import io.grpc.examples.helloworld.MutinyGreeterGrpc; +import io.quarkus.grpc.GrpcService; import io.smallrye.mutiny.Uni; -@Singleton +@GrpcService public class HelloWorldTlsService extends MutinyGreeterGrpc.GreeterImplBase { @Override diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/GrpcReflectionTest.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/GrpcReflectionTest.java index df644d8375466f..013480feff26de 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/GrpcReflectionTest.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/GrpcReflectionTest.java @@ -6,8 +6,6 @@ import java.util.Arrays; import java.util.List; -import javax.inject.Singleton; - import org.jboss.shrinkwrap.api.ShrinkWrap; import org.jboss.shrinkwrap.api.spec.JavaArchive; import org.junit.jupiter.api.AfterEach; @@ -34,6 +32,7 @@ import io.grpc.reflection.v1alpha.ServerReflectionResponse; import io.grpc.reflection.v1alpha.ServiceResponse; import io.quarkus.grpc.GrpcClient; +import io.quarkus.grpc.GrpcService; import io.quarkus.test.QuarkusUnitTest; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; @@ -279,7 +278,7 @@ public void cancel() { } } - @Singleton + @GrpcService public static class MyReflectionService extends MutinyReflectableServiceGrpc.ReflectableServiceImplBase { @Override public Uni method(Request request) { diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/GrpcServerTest.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/GrpcServerTest.java index a6fd11bea3196e..1d5a8a5d423e05 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/GrpcServerTest.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/GrpcServerTest.java @@ -7,7 +7,6 @@ import javax.enterprise.inject.Any; import javax.enterprise.inject.Instance; import javax.inject.Inject; -import javax.inject.Singleton; import org.jboss.shrinkwrap.api.ShrinkWrap; import org.jboss.shrinkwrap.api.spec.JavaArchive; @@ -16,6 +15,7 @@ import io.grpc.BindableService; import io.grpc.ServerServiceDefinition; +import io.quarkus.grpc.GrpcService; import io.quarkus.grpc.runtime.GrpcServerRecorder; import io.quarkus.test.QuarkusUnitTest; @@ -39,7 +39,7 @@ public void test() { assertThat(GrpcServerRecorder.getVerticleCount()).isGreaterThan(0); } - @Singleton + @GrpcService static class MyFakeService implements BindableService { @Override @@ -48,7 +48,7 @@ public ServerServiceDefinition bindService() { } } - @Singleton + @GrpcService static class MySecondFakeService implements BindableService { @Override diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/devmode/DevModeTestService.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/devmode/DevModeTestService.java index 939b774c07e7ed..9e1c701fd24554 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/devmode/DevModeTestService.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/devmode/DevModeTestService.java @@ -1,14 +1,16 @@ package io.quarkus.grpc.server.devmode; -import javax.inject.Singleton; +import javax.enterprise.context.RequestScoped; import devmodetest.v1.Devmodetest; import io.grpc.examples.helloworld.GreeterGrpc; import io.grpc.examples.helloworld.HelloReply; import io.grpc.examples.helloworld.HelloRequest; import io.grpc.stub.StreamObserver; +import io.quarkus.arc.Arc; +import io.quarkus.grpc.GrpcService; -@Singleton +@GrpcService public class DevModeTestService extends GreeterGrpc.GreeterImplBase { @Override @@ -20,7 +22,11 @@ public void sayHello(HelloRequest request, StreamObserver responseOb } else { response = greeting + request.getName(); } - responseObserver.onNext(HelloReply.newBuilder().setMessage(response).build()); + if (Arc.container().getActiveContext(RequestScoped.class) != null) { + responseObserver.onNext(HelloReply.newBuilder().setMessage(response).build()); + } else { + throw new IllegalStateException("request context not active, failing"); + } responseObserver.onCompleted(); } } \ No newline at end of file diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/devmode/DevModeTestStreamService.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/devmode/DevModeTestStreamService.java index ff662b8fd513af..694f0f51652332 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/devmode/DevModeTestStreamService.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/devmode/DevModeTestStreamService.java @@ -2,14 +2,13 @@ import java.time.Duration; -import javax.inject.Singleton; - import com.example.test.MutinyStreamsGrpc; import com.example.test.StreamsOuterClass.Item; +import io.quarkus.grpc.GrpcService; import io.smallrye.mutiny.Multi; -@Singleton +@GrpcService public class DevModeTestStreamService extends MutinyStreamsGrpc.StreamsImplBase { public static final String PREFIX = "echo::"; diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/scaling/ThreadReturningGreeterService.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/scaling/ThreadReturningGreeterService.java index bc284e465b1260..ab5a033a4e95b8 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/scaling/ThreadReturningGreeterService.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/scaling/ThreadReturningGreeterService.java @@ -1,13 +1,12 @@ package io.quarkus.grpc.server.scaling; -import javax.inject.Singleton; - import io.grpc.examples.helloworld.GreeterGrpc; import io.grpc.examples.helloworld.HelloReply; import io.grpc.examples.helloworld.HelloRequest; import io.grpc.stub.StreamObserver; +import io.quarkus.grpc.GrpcService; -@Singleton +@GrpcService public class ThreadReturningGreeterService extends GreeterGrpc.GreeterImplBase { @Override diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/services/GrpcCallWithinBlockingService.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/services/GrpcCallWithinBlockingService.java index bb5cf50cd52ee9..f1ed827074afda 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/services/GrpcCallWithinBlockingService.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/services/GrpcCallWithinBlockingService.java @@ -1,7 +1,5 @@ package io.quarkus.grpc.server.services; -import javax.inject.Singleton; - import io.grpc.examples.helloworld.GreeterGrpc; import io.grpc.examples.helloworld.HelloReply; import io.grpc.examples.helloworld.HelloRequest; @@ -10,9 +8,10 @@ import io.grpc.examples.helloworld3.HelloRequest3; import io.grpc.stub.StreamObserver; import io.quarkus.grpc.GrpcClient; +import io.quarkus.grpc.GrpcService; import io.smallrye.common.annotation.Blocking; -@Singleton +@GrpcService public class GrpcCallWithinBlockingService extends Greeter3Grpc.Greeter3ImplBase { @GrpcClient diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/services/MutinyHelloService.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/services/MutinyHelloService.java index ff878d0a40145d..01fa1c1de77fc3 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/services/MutinyHelloService.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/services/MutinyHelloService.java @@ -1,13 +1,12 @@ package io.quarkus.grpc.server.services; -import javax.inject.Singleton; - import io.grpc.examples.helloworld.HelloReply; import io.grpc.examples.helloworld.HelloRequest; import io.grpc.examples.helloworld.MutinyGreeterGrpc; +import io.quarkus.grpc.GrpcService; import io.smallrye.mutiny.Uni; -@Singleton +@GrpcService public class MutinyHelloService extends MutinyGreeterGrpc.GreeterImplBase { @Override diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/services/MutinyTestService.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/services/MutinyTestService.java index 0d18504181c635..9a9bfc3eff48a3 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/services/MutinyTestService.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/services/MutinyTestService.java @@ -5,17 +5,16 @@ import java.util.concurrent.atomic.AtomicInteger; -import javax.inject.Singleton; - import com.google.protobuf.ByteString; import com.google.protobuf.EmptyProtos; import io.grpc.testing.integration.Messages; import io.grpc.testing.integration.MutinyTestServiceGrpc; +import io.quarkus.grpc.GrpcService; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; -@Singleton +@GrpcService public class MutinyTestService extends MutinyTestServiceGrpc.TestServiceImplBase { @Override diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/services/TestService.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/services/TestService.java index 79c5749c867fe2..a2f5e594fe3088 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/services/TestService.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/services/TestService.java @@ -7,16 +7,15 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; -import javax.inject.Singleton; - import com.google.protobuf.ByteString; import com.google.protobuf.EmptyProtos; import io.grpc.stub.StreamObserver; import io.grpc.testing.integration.Messages; import io.grpc.testing.integration.TestServiceGrpc; +import io.quarkus.grpc.GrpcService; -@Singleton +@GrpcService public class TestService extends TestServiceGrpc.TestServiceImplBase { @Override @@ -82,8 +81,6 @@ public void onCompleted() { }; } - ; - @Override public StreamObserver fullDuplexCall( StreamObserver responseObserver) { @@ -122,6 +119,7 @@ public StreamObserver halfDuplexCall( return new StreamObserver() { @Override public void onNext(Messages.StreamingOutputCallRequest streamingOutputCallRequest) { + assertThatTheRequestScopeIsActive(); String payload = streamingOutputCallRequest.getPayload().getBody().toStringUtf8(); ByteString value = ByteString.copyFromUtf8(payload.toUpperCase()); Messages.Payload response = Messages.Payload.newBuilder().setBody(value).build(); diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/GrpcService.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/GrpcService.java index e201c5b0260de3..5d8801452317e2 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/GrpcService.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/GrpcService.java @@ -9,10 +9,13 @@ import javax.enterprise.inject.Stereotype; import javax.inject.Singleton; +import io.quarkus.grpc.runtime.supports.context.GrpcEnableRequestContext; + /** * Stereotype used to mark a gRPC service class. */ @Singleton +@GrpcEnableRequestContext @Stereotype @Target(TYPE) @Retention(RetentionPolicy.RUNTIME) diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java index 0cbbdca5d85c24..6aa081bc8209ce 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java @@ -42,7 +42,7 @@ import io.quarkus.grpc.runtime.reflection.ReflectionService; import io.quarkus.grpc.runtime.supports.BlockingServerInterceptor; import io.quarkus.grpc.runtime.supports.CompressionInterceptor; -import io.quarkus.grpc.runtime.supports.RequestScopeHandlerInterceptor; +import io.quarkus.grpc.runtime.supports.context.GrpcRequestContextGrpcInterceptor; import io.quarkus.runtime.LaunchMode; import io.quarkus.runtime.RuntimeValue; import io.quarkus.runtime.ShutdownContext; @@ -66,6 +66,8 @@ public class GrpcServerRecorder { private static final AtomicInteger grpcVerticleCount = new AtomicInteger(0); private Map> blockingMethodsPerService = Collections.emptyMap(); + private static volatile DevModeWrapper devModeWrapper; + public void initializeGrpcServer(RuntimeValue vertxSupplier, GrpcConfiguration cfg, ShutdownContext shutdown, @@ -90,7 +92,7 @@ public void initializeGrpcServer(RuntimeValue vertxSupplier, if (GrpcServerReloader.getServer() == null) { devModeStart(grpcContainer, vertx, configuration, shutdown, launchMode); } else { - devModeReload(grpcContainer); + devModeReload(grpcContainer, vertx, configuration); } } else { prodStart(grpcContainer, vertx, configuration, launchMode); @@ -159,6 +161,8 @@ private void devModeStart(GrpcContainer grpcContainer, Vertx vertx, GrpcServerCo ShutdownContext shutdown, LaunchMode launchMode) { CompletableFuture future = new CompletableFuture<>(); + devModeWrapper = new DevModeWrapper(Thread.currentThread().getContextClassLoader()); + VertxServer vertxServer = buildServer(vertx, configuration, grpcContainer, launchMode) .start(new Handler>() { // NOSONAR @Override @@ -251,19 +255,18 @@ private static class GrpcServiceDefinition { } public String getImplementationClassName() { - return service.getClass().getName(); + // all grpc services have a io.quarkus.grpc.runtime.supports.context.GrpcRequestContextCdiInterceptor + // this means Arc passes a subclass to grpc internals. That's why we take superclass here + return service.getClass().getSuperclass().getName(); } } - private static void devModeReload(GrpcContainer grpcContainer) { - List svc = collectServiceDefinitions(grpcContainer.getServices()); + private void devModeReload(GrpcContainer grpcContainer, Vertx vertx, GrpcServerConfiguration configuration) { + List services = collectServiceDefinitions(grpcContainer.getServices()); List definitions = new ArrayList<>(); Map> methods = new HashMap<>(); - for (GrpcServiceDefinition service : svc) { - for (ServerMethodDefinition method : service.definition.getMethods()) { - methods.put(method.getMethodDescriptor().getFullMethodName(), method); - } + for (GrpcServiceDefinition service : services) { definitions.add(service.definition); } @@ -272,8 +275,20 @@ private static void devModeReload(GrpcContainer grpcContainer) { for (ServerMethodDefinition method : reflectionService.getMethods()) { methods.put(method.getMethodDescriptor().getFullMethodName(), method); } + List servicesWithInterceptors = new ArrayList<>(); + CompressionInterceptor compressionInterceptor = prepareCompressionInterceptor(configuration); + for (GrpcServiceDefinition service : services) { + servicesWithInterceptors.add(serviceWithInterceptors(vertx, compressionInterceptor, service, true)); + } + + for (ServerServiceDefinition serviceWithInterceptors : servicesWithInterceptors) { + for (ServerMethodDefinition method : serviceWithInterceptors.getMethods()) { + methods.put(method.getMethodDescriptor().getFullMethodName(), method); + } + } + devModeWrapper = new DevModeWrapper(Thread.currentThread().getContextClassLoader()); - GrpcServerReloader.reinitialize(definitions, methods, grpcContainer.getSortedInterceptors()); + GrpcServerReloader.reinitialize(servicesWithInterceptors, methods, grpcContainer.getSortedInterceptors()); } public static int getVerticleCount() { @@ -320,26 +335,11 @@ public void handle(HttpServerOptions options) { List toBeRegistered = collectServiceDefinitions(grpcContainer.getServices()); List definitions = new ArrayList<>(); - CompressionInterceptor compressionInterceptor = null; - if (configuration.compression.isPresent()) { - compressionInterceptor = new CompressionInterceptor(configuration.compression.get()); - } + CompressionInterceptor compressionInterceptor = prepareCompressionInterceptor(configuration); for (GrpcServiceDefinition service : toBeRegistered) { - List interceptors = new ArrayList<>(); - if (compressionInterceptor != null) { - interceptors.add(compressionInterceptor); - } - // We only register the blocking interceptor if needed by at least one method of the service. - if (!blockingMethodsPerService.isEmpty()) { - List list = blockingMethodsPerService.get(service.getImplementationClassName()); - if (list != null) { - interceptors.add(new BlockingServerInterceptor(vertx, list)); - } - } - // Order matters! Request scope must be called first (on the event loop) and so should be last in the list... - interceptors.add(new RequestScopeHandlerInterceptor()); - builder.addService(ServerInterceptors.intercept(service.definition, interceptors)); + builder.addService( + serviceWithInterceptors(vertx, compressionInterceptor, service, launchMode == LaunchMode.DEVELOPMENT)); LOGGER.debugf("Registered gRPC service '%s'", service.definition.getServiceDescriptor().getName()); definitions.add(service.definition); } @@ -367,7 +367,7 @@ public void handle(Promise event) { new Handler>() { @Override public void handle(AsyncResult result) { - command.run(); + devModeWrapper.run(command); } }); } @@ -381,6 +381,38 @@ public void handle(AsyncResult result) { return builder.build(); } + /** + * Compression interceptor if needed, null otherwise + * + * @param configuration gRPC server configuration + * @return interceptor or null + */ + private CompressionInterceptor prepareCompressionInterceptor(GrpcServerConfiguration configuration) { + CompressionInterceptor compressionInterceptor = null; + if (configuration.compression.isPresent()) { + compressionInterceptor = new CompressionInterceptor(configuration.compression.get()); + } + return compressionInterceptor; + } + + private ServerServiceDefinition serviceWithInterceptors(Vertx vertx, CompressionInterceptor compressionInterceptor, + GrpcServiceDefinition service, boolean devMode) { + List interceptors = new ArrayList<>(); + if (compressionInterceptor != null) { + interceptors.add(compressionInterceptor); + } + // We only register the blocking interceptor if needed by at least one method of the service. + if (!blockingMethodsPerService.isEmpty()) { + List list = blockingMethodsPerService.get(service.getImplementationClassName()); + if (list != null) { + interceptors.add(new BlockingServerInterceptor(vertx, list, devMode)); + } + } + // Order matters! Request scope must be called first (on the event loop) and so should be last in the list... + interceptors.add(new GrpcRequestContextGrpcInterceptor()); + return ServerInterceptors.intercept(service.definition, interceptors); + } + private class GrpcServerVerticle extends AbstractVerticle { private final GrpcServerConfiguration configuration; private final GrpcContainer grpcContainer; @@ -432,4 +464,22 @@ public void handle(AsyncResult ar) { }); } } + + private class DevModeWrapper { + private final ClassLoader classLoader; + + public DevModeWrapper(ClassLoader contextClassLoader) { + classLoader = contextClassLoader; + } + + public void run(Runnable command) { + ClassLoader originalTccl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(classLoader); + try { + command.run(); + } finally { + Thread.currentThread().setContextClassLoader(originalTccl); + } + } + } } diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/health/GrpcHealthEndpoint.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/health/GrpcHealthEndpoint.java index 59827c4c4c403b..35d1b96154a87f 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/health/GrpcHealthEndpoint.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/health/GrpcHealthEndpoint.java @@ -4,16 +4,16 @@ import java.util.function.Supplier; import javax.inject.Inject; -import javax.inject.Singleton; import grpc.health.v1.HealthOuterClass; import grpc.health.v1.HealthOuterClass.HealthCheckResponse.ServingStatus; import grpc.health.v1.MutinyHealthGrpc; +import io.quarkus.grpc.GrpcService; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor; -@Singleton +@GrpcService public class GrpcHealthEndpoint extends MutinyHealthGrpc.HealthImplBase { @Inject GrpcHealthStorage healthStorage; diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptor.java index ef5720b49f018b..4986bab8440b81 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptor.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptor.java @@ -28,10 +28,12 @@ public class BlockingServerInterceptor implements ServerInterceptor { private final Vertx vertx; private final List blockingMethods; private final Map cache = new HashMap<>(); + private final boolean devMode; - public BlockingServerInterceptor(Vertx vertx, List blockingMethods) { + public BlockingServerInterceptor(Vertx vertx, List blockingMethods, boolean devMode) { this.vertx = vertx; this.blockingMethods = new ArrayList<>(); + this.devMode = devMode; for (String method : blockingMethods) { this.blockingMethods.add(method.toLowerCase()); } @@ -98,19 +100,12 @@ synchronized void setDelegate(ServerCall.Listener delegate) { private synchronized void executeOnContextOrEnqueue(Consumer> consumer) { if (this.delegate != null) { final Context grpcContext = Context.current(); - vertx.executeBlocking(new Handler>() { - @Override - public void handle(Promise f) { - final Context previous = Context.current(); - grpcContext.attach(); - try { - consumer.accept(delegate); - f.complete(); - } finally { - grpcContext.detach(previous); - } - } - }, true, null); + Handler> blockingHandler = new BlockingExecutionHandler<>(consumer, grpcContext, delegate); + if (devMode) { + blockingHandler = new DevModeBlockingExecutionHandler(Thread.currentThread().getContextClassLoader(), + blockingHandler); + } + vertx.executeBlocking(blockingHandler, true, null); } else { incomingEvents.add(consumer); } @@ -147,4 +142,50 @@ public void onReady() { } } + private static class DevModeBlockingExecutionHandler implements Handler> { + + final ClassLoader tccl; + final Handler> delegate; + + public DevModeBlockingExecutionHandler(ClassLoader tccl, Handler> delegate) { + this.tccl = tccl; + this.delegate = delegate; + } + + @Override + public void handle(Promise event) { + ClassLoader originalTccl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(tccl); + try { + delegate.handle(event); + } finally { + Thread.currentThread().setContextClassLoader(originalTccl); + } + } + } + + private static class BlockingExecutionHandler implements Handler> { + private final ServerCall.Listener delegate; + private final Context grpcContext; + private final Consumer> consumer; + + public BlockingExecutionHandler(Consumer> consumer, Context grpcContext, + ServerCall.Listener delegate) { + this.consumer = consumer; + this.grpcContext = grpcContext; + this.delegate = delegate; + } + + @Override + public void handle(Promise event) { + final Context previous = Context.current(); + grpcContext.attach(); + try { + consumer.accept(delegate); + event.complete(); + } finally { + grpcContext.detach(previous); + } + } + } } diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/RequestScopeHandlerInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/RequestScopeHandlerInterceptor.java deleted file mode 100644 index 40ed99d29f5e53..00000000000000 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/RequestScopeHandlerInterceptor.java +++ /dev/null @@ -1,58 +0,0 @@ -package io.quarkus.grpc.runtime.supports; - -import org.jboss.logmanager.Logger; - -import io.grpc.ForwardingServerCall; -import io.grpc.Metadata; -import io.grpc.ServerCall; -import io.grpc.ServerCallHandler; -import io.grpc.ServerInterceptor; -import io.grpc.Status; -import io.quarkus.arc.Arc; -import io.quarkus.arc.ManagedContext; -import io.vertx.core.Context; -import io.vertx.core.Handler; -import io.vertx.core.Vertx; - -public class RequestScopeHandlerInterceptor implements ServerInterceptor { - - private final ManagedContext reqContext; - private static final Logger LOGGER = Logger.getLogger(RequestScopeHandlerInterceptor.class.getName()); - - public RequestScopeHandlerInterceptor() { - reqContext = Arc.container().requestContext(); - } - - @Override - public ServerCall.Listener interceptCall(ServerCall call, - Metadata headers, - ServerCallHandler next) { - - // This interceptor is called first, so, we should be on the event loop. - Context capturedVertxContext = Vertx.currentContext(); - if (capturedVertxContext != null) { - boolean activateAndDeactivateContext = !reqContext.isActive(); - if (activateAndDeactivateContext) { - reqContext.activate(); - } - return next.startCall(new ForwardingServerCall.SimpleForwardingServerCall(call) { - @Override - public void close(Status status, Metadata trailers) { - super.close(status, trailers); - if (activateAndDeactivateContext) { - capturedVertxContext.runOnContext(new Handler() { - @Override - public void handle(Void ignored) { - reqContext.deactivate(); - } - }); - } - } - }, headers); - } else { - LOGGER.warning("Unable to activate the request scope - interceptor not called on the Vert.x event loop"); - return next.startCall(call, headers); - } - } - -} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcEnableRequestContext.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcEnableRequestContext.java new file mode 100644 index 00000000000000..b304025ae982b6 --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcEnableRequestContext.java @@ -0,0 +1,16 @@ +package io.quarkus.grpc.runtime.supports.context; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import javax.interceptor.InterceptorBinding; + +@Inherited +@InterceptorBinding +@Target({ ElementType.TYPE }) +@Retention(RetentionPolicy.RUNTIME) +public @interface GrpcEnableRequestContext { +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextCdiInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextCdiInterceptor.java new file mode 100644 index 00000000000000..66ff4f43afb780 --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextCdiInterceptor.java @@ -0,0 +1,40 @@ +package io.quarkus.grpc.runtime.supports.context; + +import javax.interceptor.AroundInvoke; +import javax.interceptor.Interceptor; +import javax.interceptor.InvocationContext; + +import io.quarkus.arc.Arc; +import io.quarkus.arc.ManagedContext; +import io.vertx.core.Context; +import io.vertx.core.Vertx; + +@Interceptor +@GrpcEnableRequestContext +public class GrpcRequestContextCdiInterceptor { + + @AroundInvoke + public Object cleanUpContext(InvocationContext invocationContext) throws Exception { + boolean cleanUp = false; + ManagedContext requestContext = Arc.container().requestContext(); + if (!requestContext.isActive()) { + Context context = Vertx.currentContext(); + + if (context != null) { + cleanUp = true; + requestContext.activate(); + GrpcRequestContextHolder contextHolder = GrpcRequestContextHolder.get(context); + if (contextHolder != null) { + contextHolder.state = requestContext.getState(); + } + } + } + try { + return invocationContext.proceed(); + } finally { + if (cleanUp) { + requestContext.deactivate(); + } + } + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextGrpcInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextGrpcInterceptor.java new file mode 100644 index 00000000000000..31a408e11327d8 --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextGrpcInterceptor.java @@ -0,0 +1,81 @@ +package io.quarkus.grpc.runtime.supports.context; + +import org.jboss.logmanager.Logger; + +import io.grpc.ForwardingServerCall; +import io.grpc.ForwardingServerCallListener; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; +import io.quarkus.arc.Arc; +import io.quarkus.arc.ManagedContext; +import io.vertx.core.Context; +import io.vertx.core.Vertx; + +public class GrpcRequestContextGrpcInterceptor implements ServerInterceptor { + + private final ManagedContext reqContext; + private static final Logger LOGGER = Logger.getLogger(GrpcRequestContextGrpcInterceptor.class.getName()); + + public GrpcRequestContextGrpcInterceptor() { + reqContext = Arc.container().requestContext(); + } + + @Override + public ServerCall.Listener interceptCall(ServerCall call, + Metadata headers, + ServerCallHandler next) { + + // This interceptor is called first, so, we should be on the event loop. + Context capturedVertxContext = Vertx.currentContext(); + if (capturedVertxContext != null) { + GrpcRequestContextHolder contextHolder = GrpcRequestContextHolder.initialize(capturedVertxContext); + ServerCall.Listener delegate = next + .startCall(new ForwardingServerCall.SimpleForwardingServerCall(call) { + + @Override + public void close(Status status, Metadata trailers) { + super.close(status, trailers); + if (contextHolder.state != null) { + reqContext.destroy(contextHolder.state); + } + } + }, headers); + + // a gRPC service can return a StreamObserver and instead of doing the work + // directly in the method body, do stuff that requires a request context in StreamObserver's methods + // let's propagate the request context to these methods: + return new ForwardingServerCallListener.SimpleForwardingServerCallListener(delegate) { + + @Override + public void onMessage(ReqT message) { + activateContext(); + super.onMessage(message); + } + + @Override + public void onReady() { + activateContext(); + super.onReady(); + } + + @Override + public void onComplete() { + activateContext(); + super.onComplete(); + } + + private void activateContext() { + if (contextHolder.state != null && !reqContext.isActive()) { + reqContext.activate(contextHolder.state); + } + } + }; + } else { + LOGGER.warning("Unable to activate the request scope - interceptor not called on the Vert.x event loop"); + return next.startCall(call, headers); + } + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextHolder.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextHolder.java new file mode 100644 index 00000000000000..bfd5e6753b8179 --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextHolder.java @@ -0,0 +1,21 @@ +package io.quarkus.grpc.runtime.supports.context; + +import io.quarkus.arc.InjectableContext; +import io.vertx.core.Context; + +public class GrpcRequestContextHolder { + + private static final String GRPC_REQUEST_CONTEXT_STATE = "GRPC_REQUEST_CONTEXT_STATE"; + + volatile InjectableContext.ContextState state; + + public static GrpcRequestContextHolder initialize(Context vertxContext) { + GrpcRequestContextHolder contextHolder = new GrpcRequestContextHolder(); + vertxContext.put(GRPC_REQUEST_CONTEXT_STATE, contextHolder); + return contextHolder; + } + + public static GrpcRequestContextHolder get(Context vertxContext) { + return vertxContext.get(GRPC_REQUEST_CONTEXT_STATE); + } +} diff --git a/extensions/grpc/runtime/src/test/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptorTest.java b/extensions/grpc/runtime/src/test/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptorTest.java index 09ecb78a071583..6c759e8b102871 100644 --- a/extensions/grpc/runtime/src/test/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptorTest.java +++ b/extensions/grpc/runtime/src/test/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptorTest.java @@ -27,7 +27,7 @@ class BlockingServerInterceptorTest { @BeforeEach void setup() { vertx = Vertx.vertx(); - blockingServerInterceptor = new BlockingServerInterceptor(vertx, Arrays.asList("blocking")); + blockingServerInterceptor = new BlockingServerInterceptor(vertx, Arrays.asList("blocking"), false); } @Test diff --git a/integration-tests/grpc-health/src/test/java/io/quarkus/grpc/health/MicroProfileHealthEnabledTest.java b/integration-tests/grpc-health/src/test/java/io/quarkus/grpc/health/MicroProfileHealthEnabledTest.java index ad548c9ac43548..48306dcd83714a 100644 --- a/integration-tests/grpc-health/src/test/java/io/quarkus/grpc/health/MicroProfileHealthEnabledTest.java +++ b/integration-tests/grpc-health/src/test/java/io/quarkus/grpc/health/MicroProfileHealthEnabledTest.java @@ -13,7 +13,6 @@ import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; -import javax.inject.Singleton; import org.hamcrest.Matchers; import org.jboss.shrinkwrap.api.ShrinkWrap; @@ -28,6 +27,7 @@ import io.grpc.BindableService; import io.grpc.ServerServiceDefinition; import io.quarkus.grpc.GrpcClient; +import io.quarkus.grpc.GrpcService; import io.quarkus.test.QuarkusUnitTest; import io.smallrye.mutiny.Multi; @@ -111,7 +111,7 @@ public Multi getStatusStream( } } - @Singleton + @GrpcService public static class FakeService implements BindableService { @Override