From 8b9c8b2107a2aa3c2394779d940f7d9e5175ef58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Szynkiewicz?= Date: Wed, 16 Jun 2021 11:51:44 +0200 Subject: [PATCH] gRPC: fix request context propagation --- .../quarkus/grpc/deployment/GrpcDotNames.java | 2 - .../grpc/deployment/GrpcServerProcessor.java | 18 +-- .../grpc/runtime/GrpcServerRecorder.java | 2 +- .../runtime/health/GrpcHealthEndpoint.java | 4 +- .../grpc/runtime/supports/Channels.java | 4 +- .../blocking/BlockingExecutionHandler.java | 48 +++++++ .../BlockingServerInterceptor.java | 112 +++++++-------- .../DevModeBlockingExecutionHandler.java | 26 ++++ .../context/GrpcEnableRequestContext.java | 16 --- .../GrpcRequestContextCdiInterceptor.java | 42 ------ .../GrpcRequestContextGrpcInterceptor.java | 130 ++++++++++++----- .../context/GrpcRequestContextHolder.java | 21 --- .../BlockingServerInterceptorTest.java | 20 ++- .../grpc-hibernate-reactive/pom.xml | 135 ++++++++++++++++++ .../com/example/reactive/ContextChecker.java | 29 ++++ .../main/java/com/example/reactive/Item.java | 10 ++ .../com/example/reactive/ReactiveService.java | 62 ++++++++ .../example/reactive/RequestScopeBean.java | 21 +++ .../src/main/proto/test.proto | 18 +++ .../src/main/resources/application.properties | 1 + .../example/reactive/ReactiveServiceTest.java | 49 +++++++ integration-tests/grpc-hibernate/pom.xml | 113 +++++++++++++++ .../grpc/hibernate/ContextChecker.java | 29 ++++ .../java/com/example/grpc/hibernate/Item.java | 14 ++ .../com/example/grpc/hibernate/ItemDao.java | 18 +++ .../grpc/hibernate/RawTestService.java | 112 +++++++++++++++ .../grpc/hibernate/RequestScopeBean.java | 21 +++ .../example/grpc/hibernate/TestService.java | 78 ++++++++++ .../grpc-hibernate/src/main/proto/test.proto | 25 ++++ .../src/main/resources/application.properties | 5 + .../grpc/hibernate/BlockingMutinyTest.java | 95 ++++++++++++ .../grpc/hibernate/BlockingRawTest.java | 99 +++++++++++++ .../example/grpc/hibernate/TestResources.java | 8 ++ integration-tests/pom.xml | 2 + 34 files changed, 1185 insertions(+), 204 deletions(-) create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingExecutionHandler.java rename extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/{ => blocking}/BlockingServerInterceptor.java (63%) create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/DevModeBlockingExecutionHandler.java delete mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcEnableRequestContext.java delete mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextCdiInterceptor.java delete mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextHolder.java create mode 100644 integration-tests/grpc-hibernate-reactive/pom.xml create mode 100644 integration-tests/grpc-hibernate-reactive/src/main/java/com/example/reactive/ContextChecker.java create mode 100644 integration-tests/grpc-hibernate-reactive/src/main/java/com/example/reactive/Item.java create mode 100644 integration-tests/grpc-hibernate-reactive/src/main/java/com/example/reactive/ReactiveService.java create mode 100644 integration-tests/grpc-hibernate-reactive/src/main/java/com/example/reactive/RequestScopeBean.java create mode 100644 integration-tests/grpc-hibernate-reactive/src/main/proto/test.proto create mode 100644 integration-tests/grpc-hibernate-reactive/src/main/resources/application.properties create mode 100644 integration-tests/grpc-hibernate-reactive/src/test/java/com/example/reactive/ReactiveServiceTest.java create mode 100644 integration-tests/grpc-hibernate/pom.xml create mode 100644 integration-tests/grpc-hibernate/src/main/java/com/example/grpc/hibernate/ContextChecker.java create mode 100644 integration-tests/grpc-hibernate/src/main/java/com/example/grpc/hibernate/Item.java create mode 100644 integration-tests/grpc-hibernate/src/main/java/com/example/grpc/hibernate/ItemDao.java create mode 100644 integration-tests/grpc-hibernate/src/main/java/com/example/grpc/hibernate/RawTestService.java create mode 100644 integration-tests/grpc-hibernate/src/main/java/com/example/grpc/hibernate/RequestScopeBean.java create mode 100644 integration-tests/grpc-hibernate/src/main/java/com/example/grpc/hibernate/TestService.java create mode 100644 integration-tests/grpc-hibernate/src/main/proto/test.proto create mode 100644 integration-tests/grpc-hibernate/src/main/resources/application.properties create mode 100644 integration-tests/grpc-hibernate/src/test/java/com/example/grpc/hibernate/BlockingMutinyTest.java create mode 100644 integration-tests/grpc-hibernate/src/test/java/com/example/grpc/hibernate/BlockingRawTest.java create mode 100644 integration-tests/grpc-hibernate/src/test/java/com/example/grpc/hibernate/TestResources.java diff --git a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcDotNames.java b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcDotNames.java index fcb50ee773935..a9c69da3eac72 100644 --- a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcDotNames.java +++ b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcDotNames.java @@ -18,7 +18,6 @@ import io.quarkus.grpc.runtime.MutinyStub; import io.quarkus.grpc.runtime.supports.Channels; import io.quarkus.grpc.runtime.supports.GrpcClientConfigProvider; -import io.quarkus.grpc.runtime.supports.context.GrpcEnableRequestContext; import io.smallrye.common.annotation.Blocking; import io.smallrye.common.annotation.NonBlocking; @@ -28,7 +27,6 @@ public class GrpcDotNames { public static final DotName CHANNEL = DotName.createSimple(Channel.class.getName()); public static final DotName GRPC_CLIENT = DotName.createSimple(GrpcClient.class.getName()); public static final DotName GRPC_SERVICE = DotName.createSimple(GrpcService.class.getName()); - public static final DotName GRPC_ENABLE_REQUEST_CONTEXT = DotName.createSimple(GrpcEnableRequestContext.class.getName()); public static final DotName BLOCKING = DotName.createSimple(Blocking.class.getName()); public static final DotName NON_BLOCKING = DotName.createSimple(NonBlocking.class.getName()); diff --git a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java index bd2b6a848dcc3..0b9ed47add0fc 100644 --- a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java +++ b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java @@ -34,7 +34,6 @@ import io.quarkus.arc.processor.AnnotationsTransformer; import io.quarkus.arc.processor.BeanInfo; import io.quarkus.arc.processor.BuiltinScope; -import io.quarkus.arc.processor.Transformation; import io.quarkus.deployment.IsDevelopment; import io.quarkus.deployment.IsNormal; import io.quarkus.deployment.annotations.BuildProducer; @@ -60,8 +59,6 @@ import io.quarkus.grpc.runtime.config.GrpcServerBuildTimeConfig; import io.quarkus.grpc.runtime.health.GrpcHealthEndpoint; import io.quarkus.grpc.runtime.health.GrpcHealthStorage; -import io.quarkus.grpc.runtime.supports.context.GrpcEnableRequestContext; -import io.quarkus.grpc.runtime.supports.context.GrpcRequestContextCdiInterceptor; import io.quarkus.kubernetes.spi.KubernetesPortBuildItem; import io.quarkus.netty.deployment.MinNettyAllocatorMaxOrderBuildItem; import io.quarkus.runtime.LaunchMode; @@ -240,14 +237,11 @@ public boolean appliesTo(Kind kind) { @Override public void transform(TransformationContext context) { ClassInfo clazz = context.getTarget().asClass(); - if (userDefinedServices.contains(clazz.name())) { - // Add @GrpcEnableRequestContext to activate the request context during each call - Transformation transform = context.transform().add(GrpcDotNames.GRPC_ENABLE_REQUEST_CONTEXT); - if (!customScopes.isScopeDeclaredOn(clazz)) { - // Add @Singleton to make it a bean - transform.add(BuiltinScope.SINGLETON.getName()); - } - transform.done(); + if (userDefinedServices.contains(clazz.name()) && !customScopes.isScopeDeclaredOn(clazz)) { + // Add @Singleton to make it a bean + context.transform() + .add(BuiltinScope.SINGLETON.getName()) + .done(); } } }); @@ -303,8 +297,6 @@ void registerBeans(BuildProducer beans, List bindables, BuildProducer features) { // @GrpcService is a CDI qualifier beans.produce(new AdditionalBeanBuildItem(GrpcService.class)); - beans.produce(new AdditionalBeanBuildItem(GrpcRequestContextCdiInterceptor.class)); - beans.produce(new AdditionalBeanBuildItem(GrpcEnableRequestContext.class)); if (!bindables.isEmpty() || LaunchMode.current() == LaunchMode.DEVELOPMENT) { beans.produce(AdditionalBeanBuildItem.unremovableOf(GrpcContainer.class)); diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java index d9fa84bcc36de..d7ed2c0dd396f 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java @@ -41,8 +41,8 @@ import io.quarkus.grpc.runtime.devmode.GrpcServerReloader; import io.quarkus.grpc.runtime.health.GrpcHealthStorage; import io.quarkus.grpc.runtime.reflection.ReflectionService; -import io.quarkus.grpc.runtime.supports.BlockingServerInterceptor; import io.quarkus.grpc.runtime.supports.CompressionInterceptor; +import io.quarkus.grpc.runtime.supports.blocking.BlockingServerInterceptor; import io.quarkus.grpc.runtime.supports.context.GrpcRequestContextGrpcInterceptor; import io.quarkus.runtime.LaunchMode; import io.quarkus.runtime.RuntimeValue; diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/health/GrpcHealthEndpoint.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/health/GrpcHealthEndpoint.java index 3a22108856d73..f76df3363ebb7 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/health/GrpcHealthEndpoint.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/health/GrpcHealthEndpoint.java @@ -10,14 +10,12 @@ import grpc.health.v1.HealthOuterClass.HealthCheckResponse.ServingStatus; import grpc.health.v1.MutinyHealthGrpc; import io.quarkus.grpc.GrpcService; -import io.quarkus.grpc.runtime.supports.context.GrpcEnableRequestContext; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor; -// Note that we need to add the scope and interceptor binding explicitly because this class is not part of the index +// Note that we need to add the scope explicitly because this class is not part of the index @Singleton -@GrpcEnableRequestContext @GrpcService public class GrpcHealthEndpoint extends MutinyHealthGrpc.HealthImplBase { diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/Channels.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/Channels.java index 853da64e73cb0..c8e3eaa77346c 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/Channels.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/Channels.java @@ -59,6 +59,9 @@ public static Channel createChannel(String name) throws SSLException { GrpcClientConfiguration config = configProvider.getConfiguration(name); if (config == null && LaunchMode.current() == LaunchMode.TEST) { + LOGGER.infof( + "gRPC client %s created without configuration. We are assuming that it's created to test your gRPC services.", + name); config = testConfig(configProvider.getServerConfiguration()); } @@ -164,7 +167,6 @@ public static Channel createChannel(String name) throws SSLException { } private static GrpcClientConfiguration testConfig(GrpcServerConfiguration serverConfiguration) { - LOGGER.info("gRPC client created without configuration. We are assuming that it's created to test your gRPC services."); GrpcClientConfiguration config = new GrpcClientConfiguration(); config.port = serverConfiguration.testPort; config.host = serverConfiguration.host; diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingExecutionHandler.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingExecutionHandler.java new file mode 100644 index 0000000000000..7b92ac0f6d5c6 --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingExecutionHandler.java @@ -0,0 +1,48 @@ +package io.quarkus.grpc.runtime.supports.blocking; + +import java.util.function.Consumer; + +import io.grpc.Context; +import io.grpc.ServerCall; +import io.quarkus.arc.InjectableContext; +import io.quarkus.arc.ManagedContext; +import io.vertx.core.Handler; +import io.vertx.core.Promise; + +class BlockingExecutionHandler implements Handler> { + private final ServerCall.Listener delegate; + private final Context grpcContext; + private final Consumer> consumer; + private final InjectableContext.ContextState state; + private final ManagedContext requestContext; + + public BlockingExecutionHandler(Consumer> consumer, Context grpcContext, + ServerCall.Listener delegate, InjectableContext.ContextState state, + ManagedContext requestContext) { + this.consumer = consumer; + this.grpcContext = grpcContext; + this.delegate = delegate; + this.state = state; + this.requestContext = requestContext; + } + + @Override + public void handle(Promise event) { + final Context previous = Context.current(); + grpcContext.attach(); + try { + requestContext.activate(state); + try { + consumer.accept(delegate); + } catch (Throwable any) { + event.fail(any); + return; + } finally { + requestContext.deactivate(); + } + event.complete(); + } finally { + grpcContext.detach(previous); + } + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingServerInterceptor.java similarity index 63% rename from extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptor.java rename to extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingServerInterceptor.java index 47c68974b8a04..01d6784800e23 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptor.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingServerInterceptor.java @@ -1,8 +1,7 @@ -package io.quarkus.grpc.runtime.supports; +package io.quarkus.grpc.runtime.supports.blocking; import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.function.Consumer; @@ -13,12 +12,15 @@ import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; +import io.quarkus.arc.Arc; +import io.quarkus.arc.InjectableContext.ContextState; +import io.quarkus.arc.ManagedContext; import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.core.Vertx; /** - * gRPC Server interceptor offloading the execution of the gRPC method on a wroker thread if the method is annotated + * gRPC Server interceptor offloading the execution of the gRPC method on a worker thread if the method is annotated * with {@link io.smallrye.common.annotation.Blocking}. * * For non-annotated methods, the interceptor acts as a pass-through. @@ -62,13 +64,23 @@ public ServerCall.Listener interceptCall(ServerCall replay = new ReplayListener<>(); - + final ManagedContext requestContext = getRequestContext(); + // context should always be active here + // it is initialized by io.quarkus.grpc.runtime.supports.context.GrpcRequestContextGrpcInterceptor + // that should always be called before this interceptor + ContextState state = requestContext.getState(); + ReplayListener replay = new ReplayListener<>(state); vertx.executeBlocking(new Handler>() { @Override public void handle(Promise f) { - ServerCall.Listener listener = next.startCall(call, headers); - replay.setDelegate(listener); + ServerCall.Listener listener; + try { + requestContext.activate(state); + listener = next.startCall(call, headers); + } finally { + requestContext.deactivate(); + } + replay.setDelegate(listener, requestContext); f.complete(null); } }, null); @@ -87,30 +99,46 @@ public void handle(Promise f) { */ private class ReplayListener extends ServerCall.Listener { private ServerCall.Listener delegate; - private final List>> incomingEvents = new LinkedList<>(); + private final List>> incomingEvents = new ArrayList<>(); + private final ContextState requestContextState; + + private ReplayListener(ContextState requestContextState) { + this.requestContextState = requestContextState; + } - synchronized void setDelegate(ServerCall.Listener delegate) { + synchronized void setDelegate(ServerCall.Listener delegate, + ManagedContext requestContext) { this.delegate = delegate; - for (Consumer> event : incomingEvents) { - event.accept(delegate); + requestContext.activate(requestContextState); + try { + for (Consumer> event : incomingEvents) { + event.accept(delegate); + } + } finally { + requestContext.deactivate(); } incomingEvents.clear(); } private synchronized void executeOnContextOrEnqueue(Consumer> consumer) { if (this.delegate != null) { - final Context grpcContext = Context.current(); - Handler> blockingHandler = new BlockingExecutionHandler<>(consumer, grpcContext, delegate); - if (devMode) { - blockingHandler = new DevModeBlockingExecutionHandler(Thread.currentThread().getContextClassLoader(), - blockingHandler); - } - vertx.executeBlocking(blockingHandler, true, null); + executeBlockingWithRequestContext(consumer); } else { incomingEvents.add(consumer); } } + private void executeBlockingWithRequestContext(Consumer> consumer) { + final Context grpcContext = Context.current(); + Handler> blockingHandler = new BlockingExecutionHandler<>(consumer, grpcContext, delegate, + requestContextState, getRequestContext()); + if (devMode) { + blockingHandler = new DevModeBlockingExecutionHandler(Thread.currentThread().getContextClassLoader(), + blockingHandler); + } + vertx.executeBlocking(blockingHandler, true, null); + } + @Override public void onMessage(ReqT message) { executeOnContextOrEnqueue(new Consumer>() { @@ -142,50 +170,8 @@ public void onReady() { } } - private static class DevModeBlockingExecutionHandler implements Handler> { - - final ClassLoader tccl; - final Handler> delegate; - - public DevModeBlockingExecutionHandler(ClassLoader tccl, Handler> delegate) { - this.tccl = tccl; - this.delegate = delegate; - } - - @Override - public void handle(Promise event) { - ClassLoader originalTccl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(tccl); - try { - delegate.handle(event); - } finally { - Thread.currentThread().setContextClassLoader(originalTccl); - } - } - } - - private static class BlockingExecutionHandler implements Handler> { - private final ServerCall.Listener delegate; - private final Context grpcContext; - private final Consumer> consumer; - - public BlockingExecutionHandler(Consumer> consumer, Context grpcContext, - ServerCall.Listener delegate) { - this.consumer = consumer; - this.grpcContext = grpcContext; - this.delegate = delegate; - } - - @Override - public void handle(Promise event) { - final Context previous = Context.current(); - grpcContext.attach(); - try { - consumer.accept(delegate); - event.complete(); - } finally { - grpcContext.detach(previous); - } - } + // protected for tests + protected ManagedContext getRequestContext() { + return Arc.container().requestContext(); } } diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/DevModeBlockingExecutionHandler.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/DevModeBlockingExecutionHandler.java new file mode 100644 index 0000000000000..b6b08ab3100d4 --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/DevModeBlockingExecutionHandler.java @@ -0,0 +1,26 @@ +package io.quarkus.grpc.runtime.supports.blocking; + +import io.vertx.core.Handler; +import io.vertx.core.Promise; + +class DevModeBlockingExecutionHandler implements Handler> { + + final ClassLoader tccl; + final Handler> delegate; + + public DevModeBlockingExecutionHandler(ClassLoader tccl, Handler> delegate) { + this.tccl = tccl; + this.delegate = delegate; + } + + @Override + public void handle(Promise event) { + ClassLoader originalTccl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(tccl); + try { + delegate.handle(event); + } finally { + Thread.currentThread().setContextClassLoader(originalTccl); + } + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcEnableRequestContext.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcEnableRequestContext.java deleted file mode 100644 index b304025ae982b..0000000000000 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcEnableRequestContext.java +++ /dev/null @@ -1,16 +0,0 @@ -package io.quarkus.grpc.runtime.supports.context; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Inherited; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -import javax.interceptor.InterceptorBinding; - -@Inherited -@InterceptorBinding -@Target({ ElementType.TYPE }) -@Retention(RetentionPolicy.RUNTIME) -public @interface GrpcEnableRequestContext { -} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextCdiInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextCdiInterceptor.java deleted file mode 100644 index 52915fd4d7ef9..0000000000000 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextCdiInterceptor.java +++ /dev/null @@ -1,42 +0,0 @@ -package io.quarkus.grpc.runtime.supports.context; - -import javax.annotation.Priority; -import javax.interceptor.AroundInvoke; -import javax.interceptor.Interceptor; -import javax.interceptor.InvocationContext; - -import io.quarkus.arc.Arc; -import io.quarkus.arc.ManagedContext; -import io.vertx.core.Context; -import io.vertx.core.Vertx; - -@Interceptor -@GrpcEnableRequestContext -@Priority(Interceptor.Priority.PLATFORM_BEFORE) -public class GrpcRequestContextCdiInterceptor { - - @AroundInvoke - public Object cleanUpContext(InvocationContext invocationContext) throws Exception { - boolean cleanUp = false; - ManagedContext requestContext = Arc.container().requestContext(); - if (!requestContext.isActive()) { - Context context = Vertx.currentContext(); - - if (context != null) { - cleanUp = true; - requestContext.activate(); - GrpcRequestContextHolder contextHolder = GrpcRequestContextHolder.get(context); - if (contextHolder != null) { - contextHolder.state = requestContext.getState(); - } - } - } - try { - return invocationContext.proceed(); - } finally { - if (cleanUp) { - requestContext.deactivate(); - } - } - } -} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextGrpcInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextGrpcInterceptor.java index 31a408e11327d..7841ae58a2899 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextGrpcInterceptor.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextGrpcInterceptor.java @@ -1,23 +1,22 @@ package io.quarkus.grpc.runtime.supports.context; -import org.jboss.logmanager.Logger; +import org.jboss.logging.Logger; -import io.grpc.ForwardingServerCall; import io.grpc.ForwardingServerCallListener; import io.grpc.Metadata; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; -import io.grpc.Status; import io.quarkus.arc.Arc; +import io.quarkus.arc.InjectableContext; import io.quarkus.arc.ManagedContext; import io.vertx.core.Context; import io.vertx.core.Vertx; public class GrpcRequestContextGrpcInterceptor implements ServerInterceptor { + private static final Logger log = Logger.getLogger(GrpcRequestContextGrpcInterceptor.class.getName()); private final ManagedContext reqContext; - private static final Logger LOGGER = Logger.getLogger(GrpcRequestContextGrpcInterceptor.class.getName()); public GrpcRequestContextGrpcInterceptor() { reqContext = Arc.container().requestContext(); @@ -31,50 +30,105 @@ public ServerCall.Listener interceptCall(ServerCall delegate = next - .startCall(new ForwardingServerCall.SimpleForwardingServerCall(call) { - - @Override - public void close(Status status, Metadata trailers) { - super.close(status, trailers); - if (contextHolder.state != null) { - reqContext.destroy(contextHolder.state); - } - } - }, headers); + InjectableContext.ContextState state; + if (!reqContext.isActive()) { + reqContext.activate(); + state = reqContext.getState(); + } else { + state = null; + log.warn("Request context already active when gRPC request started"); + } // a gRPC service can return a StreamObserver and instead of doing the work // directly in the method body, do stuff that requires a request context in StreamObserver's methods // let's propagate the request context to these methods: - return new ForwardingServerCallListener.SimpleForwardingServerCallListener(delegate) { + try { + return new ForwardingServerCallListener.SimpleForwardingServerCallListener( + next.startCall(call, headers)) { - @Override - public void onMessage(ReqT message) { - activateContext(); - super.onMessage(message); - } + @Override + public void onMessage(ReqT message) { + boolean activated = activateContext(); + try { + super.onMessage(message); + } finally { + if (activated) { + deactivateContext(); + } + } + } - @Override - public void onReady() { - activateContext(); - super.onReady(); - } + @Override + public void onReady() { + boolean activated = activateContext(); + try { + super.onReady(); + } finally { + if (activated) { + deactivateContext(); + } + } + } - @Override - public void onComplete() { - activateContext(); - super.onComplete(); - } + @Override + public void onHalfClose() { + boolean activated = activateContext(); + try { + super.onHalfClose(); + } finally { + if (activated) { + deactivateContext(); + } + } + } - private void activateContext() { - if (contextHolder.state != null && !reqContext.isActive()) { - reqContext.activate(contextHolder.state); + @Override + public void onCancel() { + boolean activated = activateContext(); + try { + super.onCancel(); + } finally { + if (activated) { + deactivateContext(); + } + if (state != null) { + reqContext.destroy(state); + } + } + } + + @Override + public void onComplete() { + boolean activated = activateContext(); + try { + super.onComplete(); + } finally { + if (activated) { + deactivateContext(); + } + if (state != null) { + reqContext.destroy(state); + } + } + } + + private void deactivateContext() { + reqContext.deactivate(); + } + + private boolean activateContext() { + if (state != null && !reqContext.isActive()) { + reqContext.activate(state); + return true; + } + return false; } - } - }; + }; + } finally { + reqContext.deactivate(); + } } else { - LOGGER.warning("Unable to activate the request scope - interceptor not called on the Vert.x event loop"); + log.warn("Unable to activate the request scope - interceptor not called on the Vert.x event loop"); return next.startCall(call, headers); } } diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextHolder.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextHolder.java deleted file mode 100644 index bfd5e6753b817..0000000000000 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextHolder.java +++ /dev/null @@ -1,21 +0,0 @@ -package io.quarkus.grpc.runtime.supports.context; - -import io.quarkus.arc.InjectableContext; -import io.vertx.core.Context; - -public class GrpcRequestContextHolder { - - private static final String GRPC_REQUEST_CONTEXT_STATE = "GRPC_REQUEST_CONTEXT_STATE"; - - volatile InjectableContext.ContextState state; - - public static GrpcRequestContextHolder initialize(Context vertxContext) { - GrpcRequestContextHolder contextHolder = new GrpcRequestContextHolder(); - vertxContext.put(GRPC_REQUEST_CONTEXT_STATE, contextHolder); - return contextHolder; - } - - public static GrpcRequestContextHolder get(Context vertxContext) { - return vertxContext.get(GRPC_REQUEST_CONTEXT_STATE); - } -} diff --git a/extensions/grpc/runtime/src/test/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptorTest.java b/extensions/grpc/runtime/src/test/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptorTest.java index 6c759e8b10287..96a27e392e6eb 100644 --- a/extensions/grpc/runtime/src/test/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptorTest.java +++ b/extensions/grpc/runtime/src/test/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptorTest.java @@ -4,7 +4,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.util.Arrays; +import java.util.Collections; import java.util.concurrent.CountDownLatch; import org.junit.jupiter.api.BeforeEach; @@ -16,8 +16,12 @@ import io.grpc.MethodDescriptor; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; +import io.quarkus.arc.InjectableContext; +import io.quarkus.arc.ManagedContext; +import io.quarkus.grpc.runtime.supports.blocking.BlockingServerInterceptor; import io.vertx.core.Vertx; +@SuppressWarnings({ "rawtypes", "unchecked" }) class BlockingServerInterceptorTest { public static final Context.Key USERNAME = Context.key("username"); @@ -27,7 +31,15 @@ class BlockingServerInterceptorTest { @BeforeEach void setup() { vertx = Vertx.vertx(); - blockingServerInterceptor = new BlockingServerInterceptor(vertx, Arrays.asList("blocking"), false); + InjectableContext.ContextState contextState = mock(InjectableContext.ContextState.class); + ManagedContext requestContext = mock(ManagedContext.class); + when(requestContext.getState()).thenReturn(contextState); + blockingServerInterceptor = new BlockingServerInterceptor(vertx, Collections.singletonList("blocking"), false) { + @Override + protected ManagedContext getRequestContext() { + return requestContext; + } + }; } @Test @@ -61,8 +73,8 @@ void testContextPropagation() throws Exception { static class BlockingServerCallHandler implements ServerCallHandler { String threadName; String contextUserName; - private CountDownLatch latch = new CountDownLatch(1); - private CountDownLatch setupLatch = new CountDownLatch(1); + private final CountDownLatch latch = new CountDownLatch(1); + private final CountDownLatch setupLatch = new CountDownLatch(1); @Override public ServerCall.Listener startCall(ServerCall serverCall, Metadata metadata) { diff --git a/integration-tests/grpc-hibernate-reactive/pom.xml b/integration-tests/grpc-hibernate-reactive/pom.xml new file mode 100644 index 0000000000000..5e7ebb74449a7 --- /dev/null +++ b/integration-tests/grpc-hibernate-reactive/pom.xml @@ -0,0 +1,135 @@ + + + 4.0.0 + + + quarkus-integration-tests-parent + io.quarkus + 999-SNAPSHOT + + quarkus-integration-test-grpc-hibernate-reactive + Quarkus - Integration Tests - gRPC - Hibernate Reactive + + + + io.quarkus + quarkus-grpc + + + io.quarkus + quarkus-hibernate-reactive-panache + + + io.quarkus + quarkus-reactive-pg-client + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + org.awaitility + awaitility + test + + + org.assertj + assertj-core + test + + + + + io.quarkus + quarkus-grpc-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-hibernate-reactive-panache-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-reactive-pg-client-deployment + ${project.version} + pom + test + + + * + * + + + + + + + + + io.quarkus + quarkus-maven-plugin + + + + generate-code + build + + + + + + maven-surefire-plugin + + true + + + + + + + + devservices-postgresql + + + start-containers + + + + + + maven-surefire-plugin + + false + + + + + + + + diff --git a/integration-tests/grpc-hibernate-reactive/src/main/java/com/example/reactive/ContextChecker.java b/integration-tests/grpc-hibernate-reactive/src/main/java/com/example/reactive/ContextChecker.java new file mode 100644 index 0000000000000..3ab8366bd47b0 --- /dev/null +++ b/integration-tests/grpc-hibernate-reactive/src/main/java/com/example/reactive/ContextChecker.java @@ -0,0 +1,29 @@ +package com.example.reactive; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +@ApplicationScoped +public class ContextChecker { + private final Map requestContexts = new ConcurrentHashMap<>(); + + @Inject + RequestScopeBean requestScopeBean; + + int newContextId(String caller) { + String original; + int contextId = requestScopeBean.getId(); + if ((original = requestContexts.put(contextId, caller)) != null) { + throw new RuntimeException( + "request context reused from a different call, original usage: " + original + ", duplicate: " + caller); + } + return contextId; + } + + public int requestContextId() { + return requestScopeBean.getId(); + } +} diff --git a/integration-tests/grpc-hibernate-reactive/src/main/java/com/example/reactive/Item.java b/integration-tests/grpc-hibernate-reactive/src/main/java/com/example/reactive/Item.java new file mode 100644 index 0000000000000..667250ca300cc --- /dev/null +++ b/integration-tests/grpc-hibernate-reactive/src/main/java/com/example/reactive/Item.java @@ -0,0 +1,10 @@ +package com.example.reactive; + +import javax.persistence.Entity; + +import io.quarkus.hibernate.reactive.panache.PanacheEntity; + +@Entity +public class Item extends PanacheEntity { + public String text; +} diff --git a/integration-tests/grpc-hibernate-reactive/src/main/java/com/example/reactive/ReactiveService.java b/integration-tests/grpc-hibernate-reactive/src/main/java/com/example/reactive/ReactiveService.java new file mode 100644 index 0000000000000..d76b9a1aef9e5 --- /dev/null +++ b/integration-tests/grpc-hibernate-reactive/src/main/java/com/example/reactive/ReactiveService.java @@ -0,0 +1,62 @@ +package com.example.reactive; + +import javax.annotation.PostConstruct; +import javax.inject.Inject; + +import io.quarkus.arc.Arc; +import io.quarkus.arc.ManagedContext; +import io.quarkus.grpc.GrpcService; +import io.quarkus.hibernate.reactive.panache.Panache; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor; + +@GrpcService +public class ReactiveService implements ReactiveTest { + + BroadcastProcessor broadcast = BroadcastProcessor.create(); + + @Inject + ContextChecker contextChecker; + + ManagedContext requestContext; + + @PostConstruct + public void setUp() { + requestContext = Arc.container().requestContext(); + } + + @Override + public Uni add(Test.Item request) { + contextChecker.newContextId("ReactiveService#add"); + return Panache.withTransaction(() -> { + Item newItem = new Item(); + newItem.text = request.getText(); + return Item.persist(newItem) + .replaceWith(newItem); + }).onItem().invoke(newItem -> broadcast.onNext(newItem)) + .replaceWith(Test.Empty.getDefaultInstance()); + } + + @Override + public Multi watch(Test.Empty request) { + int contextId = contextChecker.newContextId("ReactiveService#watch"); + Multi existing = Item. streamAll() + .map(item -> Test.Item.newBuilder().setText(item.text).build()); + return Multi.createBy().concatenating() + .streams(existing, broadcast.map(i -> i.text) + .map(Test.Item.newBuilder()::setText) + .map(Test.Item.Builder::build)) + .onItem().invoke( + () -> { + if (contextChecker.requestContextId() != contextId) { + throw new RuntimeException("Different context for `onItem` and `ReactiveService#watch` method"); + } + if (!requestContext.isActive()) { + throw new RuntimeException( + "Request context not active for `onItem` in `ReactiveService#watch`"); + } + }) + .onCancellation().invoke(() -> System.out.println("canceled")); + } +} diff --git a/integration-tests/grpc-hibernate-reactive/src/main/java/com/example/reactive/RequestScopeBean.java b/integration-tests/grpc-hibernate-reactive/src/main/java/com/example/reactive/RequestScopeBean.java new file mode 100644 index 0000000000000..fd4a742c47617 --- /dev/null +++ b/integration-tests/grpc-hibernate-reactive/src/main/java/com/example/reactive/RequestScopeBean.java @@ -0,0 +1,21 @@ +package com.example.reactive; + +import java.util.concurrent.atomic.AtomicInteger; + +import javax.annotation.PostConstruct; +import javax.enterprise.context.RequestScoped; + +@RequestScoped +public class RequestScopeBean { + private static final AtomicInteger idSequence = new AtomicInteger(); + private int id; + + @PostConstruct + public void setUp() { + id = idSequence.getAndIncrement(); + } + + public int getId() { + return id; + } +} diff --git a/integration-tests/grpc-hibernate-reactive/src/main/proto/test.proto b/integration-tests/grpc-hibernate-reactive/src/main/proto/test.proto new file mode 100644 index 0000000000000..6b1159a5de25b --- /dev/null +++ b/integration-tests/grpc-hibernate-reactive/src/main/proto/test.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; + +package example; + +option java_package = "com.example.reactive"; + +service ReactiveTest { + rpc watch(Empty) returns (stream Item); + rpc add(Item) returns (Empty); +} + +message Empty { + +} + +message Item { + string text = 1; +} diff --git a/integration-tests/grpc-hibernate-reactive/src/main/resources/application.properties b/integration-tests/grpc-hibernate-reactive/src/main/resources/application.properties new file mode 100644 index 0000000000000..f42b38adfeac7 --- /dev/null +++ b/integration-tests/grpc-hibernate-reactive/src/main/resources/application.properties @@ -0,0 +1 @@ +quarkus.hibernate-orm.database.generation=drop-and-create \ No newline at end of file diff --git a/integration-tests/grpc-hibernate-reactive/src/test/java/com/example/reactive/ReactiveServiceTest.java b/integration-tests/grpc-hibernate-reactive/src/test/java/com/example/reactive/ReactiveServiceTest.java new file mode 100644 index 0000000000000..e57748f9bed87 --- /dev/null +++ b/integration-tests/grpc-hibernate-reactive/src/test/java/com/example/reactive/ReactiveServiceTest.java @@ -0,0 +1,49 @@ +package com.example.reactive; + +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import io.quarkus.grpc.GrpcClient; +import io.quarkus.test.junit.QuarkusTest; +import io.smallrye.mutiny.subscription.Cancellable; + +@QuarkusTest +public class ReactiveServiceTest { + + public static final int TIMEOUT = 60; + + @GrpcClient + ReactiveTest client; + + @Test + @Timeout(TIMEOUT) + void shouldWatchAndAddMultipleTimes() { + List collected = new CopyOnWriteArrayList<>(); + + Cancellable watch = client.watch(com.example.reactive.Test.Empty.getDefaultInstance()) + .onFailure().invoke(Throwable::printStackTrace) + .subscribe().with(item -> collected.add(item.getText())); + List expected = Collections.synchronizedList(new ArrayList<>()); + + for (int i = 0; i < 10; i++) { + String text = "hello world" + i; + expected.add(text); + client.add(com.example.reactive.Test.Item.newBuilder().setText(text).build()) + .onFailure().invoke(Throwable::printStackTrace) + .await().atMost(Duration.ofSeconds(TIMEOUT / 6)); + } + + await().atMost(Duration.ofSeconds(TIMEOUT / 2)) + .until(() -> collected.containsAll(expected)); + + watch.cancel(); + } +} diff --git a/integration-tests/grpc-hibernate/pom.xml b/integration-tests/grpc-hibernate/pom.xml new file mode 100644 index 0000000000000..44ea85097e4fb --- /dev/null +++ b/integration-tests/grpc-hibernate/pom.xml @@ -0,0 +1,113 @@ + + + 4.0.0 + + + quarkus-integration-tests-parent + io.quarkus + 999-SNAPSHOT + + quarkus-integration-test-grpc-hibernate + Quarkus - Integration Tests - gRPC - Hibernate + + + + io.quarkus + quarkus-grpc + + + io.quarkus + quarkus-hibernate-orm + + + io.quarkus + quarkus-jdbc-h2 + + + io.quarkus + quarkus-test-h2 + test + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + org.awaitility + awaitility + test + + + org.assertj + assertj-core + test + + + + + io.quarkus + quarkus-grpc-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-hibernate-orm-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-jdbc-h2-deployment + ${project.version} + pom + test + + + * + * + + + + + + + + + io.quarkus + quarkus-maven-plugin + + + + generate-code + build + + + + + + + + diff --git a/integration-tests/grpc-hibernate/src/main/java/com/example/grpc/hibernate/ContextChecker.java b/integration-tests/grpc-hibernate/src/main/java/com/example/grpc/hibernate/ContextChecker.java new file mode 100644 index 0000000000000..44f4e121e7124 --- /dev/null +++ b/integration-tests/grpc-hibernate/src/main/java/com/example/grpc/hibernate/ContextChecker.java @@ -0,0 +1,29 @@ +package com.example.grpc.hibernate; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +@ApplicationScoped +public class ContextChecker { + private final Map requestContexts = new ConcurrentHashMap<>(); + + @Inject + RequestScopeBean requestScopeBean; + + int newContextId(String caller) { + String original; + int contextId = requestScopeBean.getId(); + if ((original = requestContexts.put(contextId, caller)) != null) { + throw new RuntimeException( + "request context reused from a different call, original usage: " + original + ", duplicate: " + caller); + } + return contextId; + } + + public int requestContextId() { + return requestScopeBean.getId(); + } +} diff --git a/integration-tests/grpc-hibernate/src/main/java/com/example/grpc/hibernate/Item.java b/integration-tests/grpc-hibernate/src/main/java/com/example/grpc/hibernate/Item.java new file mode 100644 index 0000000000000..b59f0303f3f41 --- /dev/null +++ b/integration-tests/grpc-hibernate/src/main/java/com/example/grpc/hibernate/Item.java @@ -0,0 +1,14 @@ +package com.example.grpc.hibernate; + +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.Id; + +@Entity(name = "Item") +public class Item { + @Id + @GeneratedValue + public Long id; + + public String text; +} \ No newline at end of file diff --git a/integration-tests/grpc-hibernate/src/main/java/com/example/grpc/hibernate/ItemDao.java b/integration-tests/grpc-hibernate/src/main/java/com/example/grpc/hibernate/ItemDao.java new file mode 100644 index 0000000000000..c3b63700cd91d --- /dev/null +++ b/integration-tests/grpc-hibernate/src/main/java/com/example/grpc/hibernate/ItemDao.java @@ -0,0 +1,18 @@ +package com.example.grpc.hibernate; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; +import javax.persistence.EntityManager; +import javax.transaction.Transactional; + +@ApplicationScoped +public class ItemDao { + + @Inject + EntityManager entityManager; + + @Transactional + public void add(Item newItem) { + entityManager.persist(newItem); + } +} diff --git a/integration-tests/grpc-hibernate/src/main/java/com/example/grpc/hibernate/RawTestService.java b/integration-tests/grpc-hibernate/src/main/java/com/example/grpc/hibernate/RawTestService.java new file mode 100644 index 0000000000000..406675b93728d --- /dev/null +++ b/integration-tests/grpc-hibernate/src/main/java/com/example/grpc/hibernate/RawTestService.java @@ -0,0 +1,112 @@ +package com.example.grpc.hibernate; + +import java.util.List; + +import javax.annotation.PostConstruct; +import javax.inject.Inject; +import javax.persistence.EntityManager; +import javax.transaction.Transactional; + +import org.jboss.logging.Logger; + +import com.example.test.TestOuterClass; +import com.example.test.TestRawGrpc; + +import io.grpc.stub.StreamObserver; +import io.quarkus.arc.Arc; +import io.quarkus.arc.ManagedContext; +import io.quarkus.grpc.GrpcService; +import io.smallrye.common.annotation.Blocking; + +@GrpcService +@Blocking +public class RawTestService extends TestRawGrpc.TestRawImplBase { + + private static final Logger log = Logger.getLogger(RawTestService.class); + private static final TestOuterClass.Empty EMPTY = TestOuterClass.Empty.getDefaultInstance(); + + @Inject + EntityManager entityManager; + + @Inject + ItemDao dao; + + @Inject + ContextChecker contextChecker; + + ManagedContext requestContext; + + @PostConstruct + public void setUp() { + requestContext = Arc.container().requestContext(); + } + + @Override + @Transactional + public void add(TestOuterClass.Item request, StreamObserver responseObserver) { + contextChecker.newContextId("RawTestService#add"); + Item item = new Item(); + item.text = request.getText(); + entityManager.persist(item); + responseObserver.onNext(EMPTY); + responseObserver.onCompleted(); + } + + @Override + @Blocking + @Transactional + public void clear(TestOuterClass.Empty request, StreamObserver responseObserver) { + contextChecker.newContextId("RawTestService#clear"); + entityManager.createQuery("DELETE from Item") + .executeUpdate(); + responseObserver.onNext(EMPTY); + responseObserver.onCompleted(); + } + + @Override + public void getAll(TestOuterClass.Empty request, StreamObserver responseObserver) { + contextChecker.newContextId("RawTestService#getAll"); + List items = entityManager.createQuery("from Item", Item.class) + .getResultList(); + for (Item item : items) { + responseObserver.onNext(TestOuterClass.Item.newBuilder().setText(item.text).build()); + } + responseObserver.onCompleted(); + } + + @Override + public StreamObserver bidi(StreamObserver responseObserver) { + int contextId = contextChecker.newContextId("RawTestService#bidi"); + + return new StreamObserver() { + @Override + public void onNext(TestOuterClass.Item value) { + if (contextChecker.requestContextId() != contextId) { + throw new RuntimeException("Different context for onNext and RawTestService#bidi method"); + } + Item newItem = new Item(); + newItem.text = value.getText(); + dao.add(newItem); + + responseObserver.onNext(value); + } + + @Override + public void onError(Throwable t) { + log.error("bidi onError", t); + } + + @Override + public void onCompleted() { + if (contextChecker.requestContextId() != contextId) { + throw new RuntimeException("Different context for onCompleted and RawTestService#bidi method"); + } + if (!requestContext.isActive()) { + throw new RuntimeException("Request context not active for `onCompleted`"); + } + responseObserver.onCompleted(); + } + }; + } + +} diff --git a/integration-tests/grpc-hibernate/src/main/java/com/example/grpc/hibernate/RequestScopeBean.java b/integration-tests/grpc-hibernate/src/main/java/com/example/grpc/hibernate/RequestScopeBean.java new file mode 100644 index 0000000000000..7666a7868d7d6 --- /dev/null +++ b/integration-tests/grpc-hibernate/src/main/java/com/example/grpc/hibernate/RequestScopeBean.java @@ -0,0 +1,21 @@ +package com.example.grpc.hibernate; + +import java.util.concurrent.atomic.AtomicInteger; + +import javax.annotation.PostConstruct; +import javax.enterprise.context.RequestScoped; + +@RequestScoped +public class RequestScopeBean { + private static final AtomicInteger idSequence = new AtomicInteger(); + private int id; + + @PostConstruct + public void setUp() { + id = idSequence.getAndIncrement(); + } + + public int getId() { + return id; + } +} diff --git a/integration-tests/grpc-hibernate/src/main/java/com/example/grpc/hibernate/TestService.java b/integration-tests/grpc-hibernate/src/main/java/com/example/grpc/hibernate/TestService.java new file mode 100644 index 0000000000000..dfc465f903534 --- /dev/null +++ b/integration-tests/grpc-hibernate/src/main/java/com/example/grpc/hibernate/TestService.java @@ -0,0 +1,78 @@ +package com.example.grpc.hibernate; + +import java.util.List; + +import javax.inject.Inject; +import javax.persistence.EntityManager; +import javax.transaction.Transactional; + +import com.example.test.Test; +import com.example.test.TestOuterClass; + +import io.quarkus.grpc.GrpcService; +import io.smallrye.common.annotation.Blocking; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; + +@GrpcService +public class TestService implements Test { + private static final TestOuterClass.Empty EMPTY = TestOuterClass.Empty.getDefaultInstance(); + + @Inject + EntityManager entityManager; + + @Inject + ItemDao dao; + + @Inject + ContextChecker contextChecker; + + @Override + @Blocking + @Transactional + public Uni add(TestOuterClass.Item request) { + contextChecker.newContextId("TestService#add"); + Item item = new Item(); + item.text = request.getText(); + entityManager.persist(item); + return Uni.createFrom().item(EMPTY); + } + + @Override + @Blocking + @Transactional + public Uni clear(TestOuterClass.Empty request) { + contextChecker.newContextId("TestService#clear"); + entityManager.createQuery("DELETE from Item") + .executeUpdate(); + return Uni.createFrom().item(EMPTY); + } + + @Override + @Blocking + public Multi getAll(TestOuterClass.Empty request) { + contextChecker.newContextId("TestService#getAll"); + List items = entityManager.createQuery("from Item", Item.class) + .getResultList(); + return Multi.createFrom().iterable(items) + .map(i -> TestOuterClass.Item.newBuilder().setText(i.text).build()); + } + + @Override + @Blocking + public Multi bidi(Multi request) { + int contextId = contextChecker.newContextId("TestService#bidi"); + return Multi.createFrom().emitter( + emitter -> request.subscribe().with( + item -> { + if (contextChecker.requestContextId() != contextId) { + throw new RuntimeException("Different context for subscriber and TestService#bidi method"); + } + Item newItem = new Item(); + newItem.text = item.getText(); + dao.add(newItem); + emitter.emit(item); + })); + } + +} diff --git a/integration-tests/grpc-hibernate/src/main/proto/test.proto b/integration-tests/grpc-hibernate/src/main/proto/test.proto new file mode 100644 index 0000000000000..db7b07b26e7e0 --- /dev/null +++ b/integration-tests/grpc-hibernate/src/main/proto/test.proto @@ -0,0 +1,25 @@ +syntax = "proto3"; + +package test; + +option java_package = "com.example.test"; + +service Test { + rpc Add(Item) returns (Empty); + rpc getAll(Empty) returns (stream Item); + rpc clear(Empty) returns (Empty); + rpc bidi(stream Item) returns (stream Item); +} + +service TestRaw { + rpc Add(Item) returns (Empty); + rpc getAll(Empty) returns (stream Item); + rpc clear(Empty) returns (Empty); + rpc bidi(stream Item) returns (stream Item); +} + +message Empty { +} +message Item { + string text = 1; +} diff --git a/integration-tests/grpc-hibernate/src/main/resources/application.properties b/integration-tests/grpc-hibernate/src/main/resources/application.properties new file mode 100644 index 0000000000000..a78788facf306 --- /dev/null +++ b/integration-tests/grpc-hibernate/src/main/resources/application.properties @@ -0,0 +1,5 @@ +quarkus.datasource.db-kind=h2 +quarkus.datasource.jdbc.url=jdbc:h2:tcp://localhost/mem:test + +quarkus.hibernate-orm.dialect=org.hibernate.dialect.H2Dialect +quarkus.hibernate-orm.database.generation=drop-and-create diff --git a/integration-tests/grpc-hibernate/src/test/java/com/example/grpc/hibernate/BlockingMutinyTest.java b/integration-tests/grpc-hibernate/src/test/java/com/example/grpc/hibernate/BlockingMutinyTest.java new file mode 100644 index 0000000000000..d4549ed068330 --- /dev/null +++ b/integration-tests/grpc-hibernate/src/test/java/com/example/grpc/hibernate/BlockingMutinyTest.java @@ -0,0 +1,95 @@ +package com.example.grpc.hibernate; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Timeout; + +import com.example.test.Test; +import com.example.test.TestOuterClass; + +import io.quarkus.grpc.GrpcClient; +import io.quarkus.test.junit.QuarkusTest; +import io.smallrye.mutiny.Multi; + +@QuarkusTest +public class BlockingMutinyTest { + + public static final int NO_OF_ELTS = 100; + public static final int TIMEOUT = 60; + public static final TestOuterClass.Empty EMPTY = TestOuterClass.Empty.getDefaultInstance(); + @GrpcClient + Test client; + + @BeforeEach + void clear() { + client.clear(EMPTY).onFailure().invoke(e -> { + throw new RuntimeException("Failed to clear items", e); + }).await().atMost(Duration.ofSeconds(20)); + } + + @org.junit.jupiter.api.Test + @Timeout(TIMEOUT) + void shouldAddItems() { + List expected = new ArrayList<>(); + for (int i = 0; i < NO_OF_ELTS; i++) { + String text = "text " + i; + expected.add(text); + final int attempt = i; + client.add(TestOuterClass.Item.newBuilder().setText(text).build()) + .onFailure().invoke(e -> { + throw new RuntimeException("Failed to add on attempt " + attempt, e); + }) + .await().atMost(Duration.ofSeconds(5)); + } + + List actual = new ArrayList<>(); + Multi all = client.getAll(EMPTY) + .onFailure().invoke(th -> { + System.out.println("Failed to read"); + th.printStackTrace(); + }); + all.subscribe().with(item -> actual.add(item.getText())); + await().atMost(Duration.ofSeconds(TIMEOUT / 2)) + .until(() -> actual.size() == NO_OF_ELTS); + assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); + } + + @org.junit.jupiter.api.Test + @Timeout(TIMEOUT) + void shouldAddViaBidi() { + List expected = new ArrayList<>(); + List echoed = new ArrayList<>(); + List actual = new ArrayList<>(); + + Multi request = Multi.createFrom().emitter( + m -> { + for (int i = 0; i < NO_OF_ELTS; i++) { + String text = "text " + i; + expected.add(text); + m.emit(TestOuterClass.Item.newBuilder().setText(text).build()); + } + m.complete(); + }); + client.bidi(request).subscribe().with(item -> echoed.add(item.getText())); + + await().atMost(Duration.ofSeconds(TIMEOUT / 2)) + .until(() -> echoed.size() == NO_OF_ELTS); + assertThat(echoed).containsExactlyInAnyOrderElementsOf(expected); + + Multi all = client.getAll(EMPTY) + .onFailure().invoke(th -> { + System.out.println("Failed to read"); + th.printStackTrace(); + }); + all.subscribe().with(item -> actual.add(item.getText())); + await().atMost(Duration.ofSeconds(TIMEOUT / 2)) + .until(() -> actual.size() == NO_OF_ELTS); + assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); + } +} diff --git a/integration-tests/grpc-hibernate/src/test/java/com/example/grpc/hibernate/BlockingRawTest.java b/integration-tests/grpc-hibernate/src/test/java/com/example/grpc/hibernate/BlockingRawTest.java new file mode 100644 index 0000000000000..1d732a476598a --- /dev/null +++ b/integration-tests/grpc-hibernate/src/test/java/com/example/grpc/hibernate/BlockingRawTest.java @@ -0,0 +1,99 @@ +package com.example.grpc.hibernate; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Timeout; + +import com.example.test.Test; +import com.example.test.TestOuterClass; +import com.example.test.TestRaw; + +import io.quarkus.grpc.GrpcClient; +import io.quarkus.test.junit.QuarkusTest; +import io.smallrye.mutiny.Multi; + +@QuarkusTest +public class BlockingRawTest { + + public static final int NO_OF_ELTS = 100; + public static final int TIMEOUT = 60; + public static final TestOuterClass.Empty EMPTY = TestOuterClass.Empty.getDefaultInstance(); + @GrpcClient + TestRaw client; + + @BeforeEach + void clear() { + client.clear(EMPTY).onFailure().invoke(e -> { + throw new RuntimeException("Failed to clear items", e); + }).await().atMost(Duration.ofSeconds(20)); + } + + @org.junit.jupiter.api.Test + @Timeout(TIMEOUT) + void shouldAdd() { + List expected = new ArrayList<>(); + for (int i = 0; i < NO_OF_ELTS; i++) { + String text = "text " + i; + expected.add(text); + final int attempt = i; + client.add(TestOuterClass.Item.newBuilder().setText(text).build()) + .onFailure().invoke(e -> { + throw new RuntimeException("Failed to add on attempt " + attempt, e); + }) + .await().atMost(Duration.ofSeconds(5)); + } + + List actual = new ArrayList<>(); + Multi all = client.getAll(EMPTY) + .onFailure().invoke(th -> { + System.out.println("Failed to read"); + th.printStackTrace(); + }); + all.subscribe().with(item -> actual.add(item.getText())); + await().atMost(Duration.ofSeconds(TIMEOUT / 2)) + .until(() -> actual.size() == NO_OF_ELTS); + assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); + } + + @org.junit.jupiter.api.Test + @Timeout(TIMEOUT) + void shouldAddViaBidi() { + List expected = new ArrayList<>(); + List echoed = new ArrayList<>(); + List actual = new ArrayList<>(); + + Multi request = Multi.createFrom().emitter( + m -> { + for (int i = 0; i < NO_OF_ELTS; i++) { + String text = "text " + i; + expected.add(text); + m.emit(TestOuterClass.Item.newBuilder().setText(text).build()); + } + m.complete(); + }); + client.bidi(request).subscribe().with(item -> echoed.add(item.getText())); + + await().atMost(Duration.ofSeconds(TIMEOUT / 2)) + .until(() -> echoed.size() == NO_OF_ELTS); + assertThat(echoed).containsExactlyInAnyOrderElementsOf(expected); + + Multi all = client.getAll(EMPTY) + .onFailure().invoke(th -> { + System.out.println("Failed to read"); + th.printStackTrace(); + }); + all.subscribe().with(item -> actual.add(item.getText())); + await().atMost(Duration.ofSeconds(TIMEOUT / 2)) + .until(() -> { + System.out.println("no of elements: " + actual.size()); + return actual.size() == NO_OF_ELTS; + }); + assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); + } +} diff --git a/integration-tests/grpc-hibernate/src/test/java/com/example/grpc/hibernate/TestResources.java b/integration-tests/grpc-hibernate/src/test/java/com/example/grpc/hibernate/TestResources.java new file mode 100644 index 0000000000000..37b0f78e27deb --- /dev/null +++ b/integration-tests/grpc-hibernate/src/test/java/com/example/grpc/hibernate/TestResources.java @@ -0,0 +1,8 @@ +package com.example.grpc.hibernate; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.h2.H2DatabaseTestResource; + +@QuarkusTestResource(H2DatabaseTestResource.class) +public class TestResources { +} diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 84e805b9928df..25f3ec437fe8a 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -186,6 +186,8 @@ grpc-interceptors grpc-proto-v2 grpc-health + grpc-hibernate + grpc-hibernate-reactive google-cloud-functions-http google-cloud-functions