diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/auth/GrpcAuthTest.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/auth/GrpcAuthTest.java index c995eb501b7c0..b73d71e2c7604 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/auth/GrpcAuthTest.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/auth/GrpcAuthTest.java @@ -68,7 +68,7 @@ void shouldSecureUniEndpoint() { client.unaryCall(Security.Container.newBuilder().setText("woo-hoo").build()) .subscribe().with(e -> resultCount.incrementAndGet()); - await().atMost(5, TimeUnit.SECONDS) + await().atMost(10, TimeUnit.SECONDS) .until(() -> resultCount.get() == 1); } @@ -82,7 +82,7 @@ void shouldSecureMultiEndpoint() { .supplier(() -> (Security.Container.newBuilder().setText("woo-hoo").build())).atMost(4)) .subscribe().with(e -> results.add(e.getIsOnEventLoop())); - await().atMost(5, TimeUnit.SECONDS) + await().atMost(10, TimeUnit.SECONDS) .until(() -> results.size() == 5); assertThat(results.stream().filter(e -> !e)).isEmpty(); @@ -101,7 +101,7 @@ void shouldFailWithInvalidCredentials() { .onFailure().invoke(error::set) .subscribe().with(e -> resultCount.incrementAndGet()); - await().atMost(5, TimeUnit.SECONDS) + await().atMost(10, TimeUnit.SECONDS) .until(() -> error.get() != null); } @@ -118,7 +118,7 @@ void shouldFailWithInvalidInsufficientRole() { .onFailure().invoke(error::set) .subscribe().with(e -> resultCount.incrementAndGet()); - await().atMost(5, TimeUnit.SECONDS) + await().atMost(10, TimeUnit.SECONDS) .until(() -> error.get() != null); } diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/GrpcContextPropagationTest.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/GrpcContextPropagationTest.java new file mode 100644 index 0000000000000..83278b2073d38 --- /dev/null +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/GrpcContextPropagationTest.java @@ -0,0 +1,39 @@ +package io.quarkus.grpc.server.interceptors; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.grpc.examples.helloworld.Greeter; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; +import io.quarkus.grpc.GrpcClient; +import io.quarkus.test.QuarkusUnitTest; + +/** + * Test reproducing #26830. + */ +public class GrpcContextPropagationTest { + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer( + () -> ShrinkWrap.create(JavaArchive.class) + .addPackage(GreeterGrpc.class.getPackage()) + .addClasses(MyFirstInterceptor.class, MyInterceptedGreeting.class)); + + @GrpcClient + Greeter greeter; + + @Test + void test() { + HelloReply foo = greeter.sayHello(HelloRequest.newBuilder().setName("foo").build()).await().indefinitely(); + assertThat(foo.getMessage()).isEqualTo("hello k1 - 1"); + foo = greeter.sayHello(HelloRequest.newBuilder().setName("foo").build()).await().indefinitely(); + assertThat(foo.getMessage()).isEqualTo("hello k1 - 2"); + } + +} diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/MyFirstInterceptor.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/MyFirstInterceptor.java index ea796231910ca..bee9dba9d37bc 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/MyFirstInterceptor.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/MyFirstInterceptor.java @@ -1,8 +1,12 @@ package io.quarkus.grpc.server.interceptors; +import java.util.concurrent.atomic.AtomicInteger; + import javax.enterprise.context.ApplicationScoped; import javax.enterprise.inject.spi.Prioritized; +import io.grpc.Context; +import io.grpc.Contexts; import io.grpc.ForwardingServerCall; import io.grpc.Metadata; import io.grpc.ServerCall; @@ -15,19 +19,26 @@ @GlobalInterceptor public class MyFirstInterceptor implements ServerInterceptor, Prioritized { + public static Context.Key KEY_1 = Context.key("X-TEST_1"); + public static Context.Key KEY_2 = Context.keyWithDefault("X-TEST_2", -1); private volatile long callTime; + private AtomicInteger counter = new AtomicInteger(); + @Override public ServerCall.Listener interceptCall(ServerCall serverCall, Metadata metadata, ServerCallHandler serverCallHandler) { - return serverCallHandler - .startCall(new ForwardingServerCall.SimpleForwardingServerCall(serverCall) { - @Override - public void close(Status status, Metadata trailers) { - callTime = System.nanoTime(); - super.close(status, trailers); - } - }, metadata); + + Context ctx = Context.current().withValue(KEY_1, "k1").withValue(KEY_2, counter.incrementAndGet()); + return Contexts.interceptCall(ctx, new ForwardingServerCall.SimpleForwardingServerCall<>(serverCall) { + + @Override + public void close(Status status, Metadata trailers) { + callTime = System.nanoTime(); + super.close(status, trailers); + } + }, metadata, serverCallHandler); + } public long getLastCall() { diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/MyInterceptedGreeting.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/MyInterceptedGreeting.java new file mode 100644 index 0000000000000..1929da8abd640 --- /dev/null +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/MyInterceptedGreeting.java @@ -0,0 +1,18 @@ +package io.quarkus.grpc.server.interceptors; + +import io.grpc.examples.helloworld.Greeter; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; +import io.quarkus.grpc.GrpcService; +import io.smallrye.common.annotation.Blocking; +import io.smallrye.mutiny.Uni; + +@GrpcService +public class MyInterceptedGreeting implements Greeter { + @Override + @Blocking + public Uni sayHello(HelloRequest request) { + return Uni.createFrom().item(() -> HelloReply.newBuilder() + .setMessage("hello " + MyFirstInterceptor.KEY_1.get() + " - " + MyFirstInterceptor.KEY_2.get()).build()); + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/grpc/override/ContextStorageOverride.java b/extensions/grpc/runtime/src/main/java/io/grpc/override/ContextStorageOverride.java new file mode 100644 index 0000000000000..84de72b532fdd --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/grpc/override/ContextStorageOverride.java @@ -0,0 +1,67 @@ +package io.grpc.override; + +import io.grpc.Context; +import io.smallrye.common.vertx.VertxContext; +import io.vertx.core.Vertx; + +/** + * Override gRPC context storage to rely on duplicated context when available. + */ +public class ContextStorageOverride extends Context.Storage { + + private static final ThreadLocal fallback = new ThreadLocal<>(); + + private static final String GRPC_CONTEXT = "GRPC_CONTEXT"; + + @Override + public Context doAttach(Context toAttach) { + Context current = current(); + io.vertx.core.Context dc = Vertx.currentContext(); + if (dc != null && VertxContext.isDuplicatedContext(dc)) { + dc.putLocal(GRPC_CONTEXT, toAttach); + } else { + fallback.set(toAttach); + } + return current; + } + + @Override + public void detach(Context context, Context toRestore) { + io.vertx.core.Context dc = Vertx.currentContext(); + if (toRestore != Context.ROOT) { + if (dc != null && VertxContext.isDuplicatedContext(dc)) { + dc.putLocal(GRPC_CONTEXT, toRestore); + } else { + fallback.set(toRestore); + } + } else { + if (dc != null && VertxContext.isDuplicatedContext(dc)) { + // Do nothing duplicated context are not shared. + } else { + fallback.set(null); + } + } + } + + @Override + public Context current() { + if (VertxContext.isOnDuplicatedContext()) { + Context current = Vertx.currentContext().getLocal(GRPC_CONTEXT); + if (current == null) { + return Context.ROOT; + } + return current; + } else { + Context current = fallback.get(); + if (current == null) { + return Context.ROOT; + } + return current; + } + } + + @Override + public void attach(Context toAttach) { + // do nothing, should not be called. + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcDuplicatedContextGrpcInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcDuplicatedContextGrpcInterceptor.java index d4ca59da2e55c..b876ab00e3001 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcDuplicatedContextGrpcInterceptor.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcDuplicatedContextGrpcInterceptor.java @@ -82,7 +82,7 @@ private synchronized ServerCall.Listener getDelegate() { delegate = supplier.get(); } catch (Throwable t) { // If the interceptor supplier throws an exception, catch it, and close the call. - log.warnf("Unable to retrieve gRPC Server call listener", t); + log.warn("Unable to retrieve gRPC Server call listener", t); close(t); return null; }