diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/bd/ClientBlockingDeadlineTest.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/bd/ClientBlockingDeadlineTest.java new file mode 100644 index 0000000000000..3455abda0e0fa --- /dev/null +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/bd/ClientBlockingDeadlineTest.java @@ -0,0 +1,46 @@ +package io.quarkus.grpc.client.bd; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.grpc.Deadline; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloRequest; +import io.quarkus.grpc.GrpcClient; +import io.quarkus.test.QuarkusUnitTest; + +public class ClientBlockingDeadlineTest { + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer( + () -> ShrinkWrap.create(JavaArchive.class) + .addPackage(GreeterGrpc.class.getPackage()).addClasses(HelloService.class)) + .withConfigurationResource("hello-config-deadline.properties"); + + @GrpcClient("hello-service") + GreeterGrpc.GreeterBlockingStub stub; + + @Test + public void testCallOptions() { + Deadline deadline = stub.getCallOptions().getDeadline(); + assertNotNull(deadline); + try { + //noinspection ResultOfMethodCallIgnored + stub.sayHello(HelloRequest.newBuilder().setName("Scaladar").build()); + } catch (Exception e) { + Assertions.assertTrue(e instanceof StatusRuntimeException); + StatusRuntimeException sre = (StatusRuntimeException) e; + Status status = sre.getStatus(); + Assertions.assertNotNull(status); + Assertions.assertEquals(Status.DEADLINE_EXCEEDED.getCode(), status.getCode()); + } + } +} diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/bd/HelloService.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/bd/HelloService.java new file mode 100644 index 0000000000000..c3cda52bb6df1 --- /dev/null +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/bd/HelloService.java @@ -0,0 +1,35 @@ +package io.quarkus.grpc.client.bd; + +import java.time.Duration; + +import io.grpc.Context; +import io.grpc.Deadline; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; +import io.grpc.stub.StreamObserver; +import io.quarkus.grpc.GrpcService; +import io.smallrye.common.annotation.Blocking; +import io.smallrye.mutiny.Uni; + +@GrpcService +public class HelloService extends GreeterGrpc.GreeterImplBase { + + @Override + @Blocking + public void sayHello(HelloRequest request, StreamObserver observer) { + Deadline deadline = Context.current().getDeadline(); + if (deadline == null) { + throw new IllegalStateException("Null deadline"); + } + Uni.createFrom() + .item(HelloReply.newBuilder().setMessage("OK").build()) + .onItem() + .delayIt() + .by(Duration.ofMillis(400)).invoke(observer::onNext) + .invoke(observer::onCompleted) + .await() + .indefinitely(); + } + +} diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/deadline/HelloService.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/deadline/HelloService.java index 2d2f018cbcf18..c44b698ba5df6 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/deadline/HelloService.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/deadline/HelloService.java @@ -2,6 +2,8 @@ import java.time.Duration; +import io.grpc.Context; +import io.grpc.Deadline; import io.grpc.examples.helloworld.Greeter; import io.grpc.examples.helloworld.HelloReply; import io.grpc.examples.helloworld.HelloRequest; @@ -13,6 +15,10 @@ public class HelloService implements Greeter { @Override public Uni sayHello(HelloRequest request) { + Deadline deadline = Context.current().getDeadline(); + if (deadline == null) { + throw new IllegalStateException("Null deadline"); + } return Uni.createFrom().item(HelloReply.newBuilder().setMessage("OK").build()).onItem().delayIt() .by(Duration.ofMillis(400)); } diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/MyFirstInterceptor.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/MyFirstInterceptor.java index f9c00822d94a1..e02431d194e59 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/MyFirstInterceptor.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/MyFirstInterceptor.java @@ -30,6 +30,7 @@ public ServerCall.Listener interceptCall(ServerCall serverCallHandler) { Context ctx = Context.current().withValue(KEY_1, "k1").withValue(KEY_2, counter.incrementAndGet()); + ctx.attach(); // Make sure the context is attached to the current duplicated context. return Contexts.interceptCall(ctx, new ForwardingServerCall.SimpleForwardingServerCall<>(serverCall) { @Override @@ -38,7 +39,6 @@ public void close(Status status, Metadata trailers) { super.close(status, trailers); } }, metadata, serverCallHandler); - } public long getLastCall() { diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingServerInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingServerInterceptor.java index 2aa30e397e0ca..358cf9931e0c1 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingServerInterceptor.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingServerInterceptor.java @@ -1,7 +1,6 @@ package io.quarkus.grpc.runtime.supports.blocking; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; @@ -13,6 +12,8 @@ import java.util.function.Consumer; import java.util.function.Function; +import org.jboss.logging.Logger; + import io.grpc.Context; import io.grpc.Metadata; import io.grpc.ServerCall; @@ -31,6 +32,7 @@ * For non-annotated methods, the interceptor acts as a pass-through. */ public class BlockingServerInterceptor implements ServerInterceptor, Function { + private static final Logger log = Logger.getLogger(BlockingServerInterceptor.class); private final Vertx vertx; private final Set blockingMethods; @@ -140,14 +142,16 @@ public ServerCall.Listener interceptCall(ServerCall extends ServerCall.Listener { private final InjectableContext.ContextState requestContextState; + private final Context grpcContext; // exclusive to event loop context - private ServerCall.Listener delegate; - private final Queue>> incomingEvents = new LinkedList<>(); - private boolean isConsumingFromIncomingEvents = false; + private volatile ServerCall.Listener delegate; + private final Queue>> incomingEvents = new ConcurrentLinkedQueue<>(); + private volatile boolean isConsumingFromIncomingEvents; private ReplayListener(InjectableContext.ContextState requestContextState) { this.requestContextState = requestContextState; + this.grpcContext = Context.current(); } /** @@ -185,6 +189,12 @@ private void executeBlockingWithRequestContext(Consumer blockingHandler = new BlockingExecutionHandler<>(consumer, grpcContext, delegate, requestContextState, getRequestContext(), this); + + if (!isExecutable()) { + log.warn("Not executable, already shutdown? Ignoring execution ..."); + return; + } + if (devMode) { blockingHandler = new DevModeBlockingExecutionHandler(Thread.currentThread().getContextClassLoader(), blockingHandler); @@ -323,6 +333,11 @@ public void onReady() { } // protected for tests + + protected boolean isExecutable() { + return Arc.container() != null; + } + protected ManagedContext getRequestContext() { return Arc.container().requestContext(); } diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcDuplicatedContextGrpcInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcDuplicatedContextGrpcInterceptor.java index e02cf3001f172..675f6338aaa11 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcDuplicatedContextGrpcInterceptor.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcDuplicatedContextGrpcInterceptor.java @@ -47,13 +47,28 @@ public ServerCall.Listener interceptCall(ServerCall(ehp, call, () -> next.startCall(call, headers), local); + return new ListenedOnDuplicatedContext<>(ehp, call, nextCall(call, headers, next), local); } else { log.warn("Unable to run on a duplicated context - interceptor not called on the Vert.x event loop"); return next.startCall(call, headers); } } + private Supplier> nextCall(ServerCall call, + Metadata headers, + ServerCallHandler next) { + // Must be sure to call next.startCall on the right context + io.grpc.Context current = io.grpc.Context.current(); + return () -> { + io.grpc.Context previous = current.attach(); + try { + return next.startCall(call, headers); + } finally { + current.detach(previous); + } + }; + } + @Override public int getPriority() { return Interceptors.DUPLICATE_CONTEXT; diff --git a/extensions/grpc/runtime/src/test/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptorTest.java b/extensions/grpc/runtime/src/test/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptorTest.java index 2ad16022d73bb..5e773bc4029f6 100644 --- a/extensions/grpc/runtime/src/test/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptorTest.java +++ b/extensions/grpc/runtime/src/test/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptorTest.java @@ -36,6 +36,11 @@ void setup() { when(requestContext.getState()).thenReturn(contextState); blockingServerInterceptor = new BlockingServerInterceptor(vertx, Collections.singletonList("blocking"), Collections.emptyList(), null, false) { + @Override + protected boolean isExecutable() { + return true; + } + @Override protected ManagedContext getRequestContext() { return requestContext; @@ -54,21 +59,25 @@ void testContextPropagation() throws Exception { // setting grpc context final Context context = Context.current().withValue(USERNAME, "my-user"); + Context previous = context.attach(); + try { + final ServerCall.Listener listener = blockingServerInterceptor.interceptCall(serverCall, null, serverCallHandler); + serverCallHandler.awaitSetup(); - final ServerCall.Listener listener = blockingServerInterceptor.interceptCall(serverCall, null, serverCallHandler); - serverCallHandler.awaitSetup(); + // simulate GRPC call + context.wrap(() -> listener.onMessage("hello")).run(); - // simulate GRPC call - context.wrap(() -> listener.onMessage("hello")).run(); + // await for the message to be received + serverCallHandler.await(); - // await for the message to be received - serverCallHandler.await(); + // check that the thread is a worker thread + assertThat(serverCallHandler.threadName).contains("vert.x").contains("worker"); - // check that the thread is a worker thread - assertThat(serverCallHandler.threadName).contains("vert.x").contains("worker"); - - // check that the context was propagated correctly - assertThat(serverCallHandler.contextUserName).isEqualTo("my-user"); + // check that the context was propagated correctly + assertThat(serverCallHandler.contextUserName).isEqualTo("my-user"); + } finally { + context.detach(previous); + } } static class BlockingServerCallHandler implements ServerCallHandler {