From ea985a92019feace81ad83001418c3fa968914d0 Mon Sep 17 00:00:00 2001 From: Clement Escoffier <clement@apache.org> Date: Wed, 9 Feb 2022 16:18:46 +0100 Subject: [PATCH 1/4] Restrict access to some method from the Vert.x Context which are considered risky: - get/put/remove - which would leak data between the different tasks scheduled on a context - getLocal/putLocal/removeLocal when called from a "root" context (for the same reason) --- .../core/deployment/VertxCoreProcessor.java | 81 ++++++++ .../vertx/locals/LocalContextAccessTest.java | 187 ++++++++++++++++++ .../vertx/core/runtime/VertxLocalsHelper.java | 49 +++++ 3 files changed, 317 insertions(+) create mode 100644 extensions/vertx/deployment/src/test/java/io/quarkus/vertx/locals/LocalContextAccessTest.java create mode 100644 extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxLocalsHelper.java diff --git a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/core/deployment/VertxCoreProcessor.java b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/core/deployment/VertxCoreProcessor.java index 2d716efbe07c7..97f5961474fb1 100644 --- a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/core/deployment/VertxCoreProcessor.java +++ b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/core/deployment/VertxCoreProcessor.java @@ -21,6 +21,10 @@ import org.jboss.logging.Logger; import org.jboss.logmanager.Level; import org.jboss.logmanager.LogManager; +import org.objectweb.asm.ClassVisitor; +import org.objectweb.asm.MethodVisitor; +import org.objectweb.asm.Opcodes; +import org.objectweb.asm.Type; import io.quarkus.arc.deployment.SyntheticBeanBuildItem; import io.quarkus.deployment.annotations.BuildProducer; @@ -28,6 +32,7 @@ import io.quarkus.deployment.annotations.ExecutionTime; import io.quarkus.deployment.annotations.Produce; import io.quarkus.deployment.annotations.Record; +import io.quarkus.deployment.builditem.BytecodeTransformerBuildItem; import io.quarkus.deployment.builditem.CombinedIndexBuildItem; import io.quarkus.deployment.builditem.ContextHandlerBuildItem; import io.quarkus.deployment.builditem.ExecutorBuildItem; @@ -40,8 +45,10 @@ import io.quarkus.deployment.builditem.nativeimage.NativeImageConfigBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; import io.quarkus.deployment.logging.LogCleanupFilterBuildItem; +import io.quarkus.gizmo.Gizmo; import io.quarkus.netty.deployment.EventLoopSupplierBuildItem; import io.quarkus.vertx.core.runtime.VertxCoreRecorder; +import io.quarkus.vertx.core.runtime.VertxLocalsHelper; import io.quarkus.vertx.core.runtime.VertxLogDelegateFactory; import io.quarkus.vertx.core.runtime.config.VertxConfiguration; import io.vertx.core.AbstractVerticle; @@ -80,6 +87,80 @@ LogCleanupFilterBuildItem cleanupVertxWarnings() { return new LogCleanupFilterBuildItem("io.vertx.core.impl.ContextImpl", "You have disabled TCCL checks"); } + @BuildStep + BytecodeTransformerBuildItem overrideContextToAddSafeGuards() { + return new BytecodeTransformerBuildItem("io.vertx.core.impl.AbstractContext", + (className, classVisitor) -> new ClassVisitor(Gizmo.ASM_API_VERSION, classVisitor) { + @Override + public MethodVisitor visitMethod(int access, String name, String descriptor, String signature, + String[] exceptions) { + MethodVisitor visitor = super.visitMethod(access, name, descriptor, signature, exceptions); + + if (name.equals("get") || name.equals("put") || name.equals("remove")) { + return new MethodVisitor(Gizmo.ASM_API_VERSION, visitor) { + @Override + public void visitCode() { + super.visitCode(); + visitMethodInsn(Opcodes.INVOKESTATIC, + VertxLocalsHelper.class.getName().replace(".", "/"), "throwOnRootContextAccess", + "()V", false); + } + }; + } + + if (name.equals("getLocal")) { + return new MethodVisitor(Gizmo.ASM_API_VERSION, visitor) { + @Override + public void visitCode() { + super.visitCode(); + visitVarInsn(Opcodes.ALOAD, 0); // this + visitVarInsn(Opcodes.ALOAD, 1); // first param (object) + visitMethodInsn(Opcodes.INVOKESTATIC, + VertxLocalsHelper.class.getName().replace(".", "/"), "getLocal", + "(Lio/vertx/core/impl/ContextInternal;Ljava/lang/Object;)Ljava/lang/Object;", + false); + visitInsn(Opcodes.ARETURN); + } + }; + } + + if (name.equals("putLocal")) { + return new MethodVisitor(Gizmo.ASM_API_VERSION, visitor) { + @Override + public void visitCode() { + super.visitCode(); + visitVarInsn(Opcodes.ALOAD, 0); // this + visitVarInsn(Opcodes.ALOAD, 1); // first param (object) + visitVarInsn(Opcodes.ALOAD, 2); // second param (object) + visitMethodInsn(Opcodes.INVOKESTATIC, + VertxLocalsHelper.class.getName().replace(".", "/"), "putLocal", + "(Lio/vertx/core/impl/ContextInternal;Ljava/lang/Object;Ljava/lang/Object;)V", + false); + visitInsn(Opcodes.RETURN); + } + }; + } + + if (name.equals("removeLocal")) { + return new MethodVisitor(Gizmo.ASM_API_VERSION, visitor) { + @Override + public void visitCode() { + super.visitCode(); + visitVarInsn(Opcodes.ALOAD, 0); // this + visitVarInsn(Opcodes.ALOAD, 1); // first param (object) + visitMethodInsn(Opcodes.INVOKESTATIC, + VertxLocalsHelper.class.getName().replace(".", "/"), "removeLocal", + "(Lio/vertx/core/impl/ContextInternal;Ljava/lang/Object;)Z", false); + visitInsn(Type.getType(Boolean.TYPE).getOpcode(Opcodes.IRETURN)); + } + }; + } + + return visitor; + } + }); + } + @BuildStep LogCategoryBuildItem preventLoggerContention() { //Prevent the Logging warning about the TCCL checks being disabled to be logged; diff --git a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/locals/LocalContextAccessTest.java b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/locals/LocalContextAccessTest.java new file mode 100644 index 0000000000000..fe3dcd9814e6f --- /dev/null +++ b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/locals/LocalContextAccessTest.java @@ -0,0 +1,187 @@ +package io.quarkus.vertx.locals; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.assertj.core.api.Assertions; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.vertx.core.Context; +import io.vertx.core.Vertx; +import io.vertx.core.impl.ContextInternal; + +public class LocalContextAccessTest { + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap + .create(JavaArchive.class).addClasses(BeanAccessingContext.class)); + + @Inject + Vertx vertx; + + @Inject + BeanAccessingContext bean; + + @Test + public void testGlobalAccessFromEventLoop() throws ExecutionException, InterruptedException, TimeoutException { + Context context = vertx.getOrCreateContext(); + CompletableFuture<Throwable> get = new CompletableFuture<>(); + CompletableFuture<Throwable> put = new CompletableFuture<>(); + CompletableFuture<Throwable> remove = new CompletableFuture<>(); + + context.runOnContext(x -> { + try { + bean.getGlobal(); + get.completeExceptionally(new Exception("Exception expected as using get is forbidden")); + } catch (Exception e) { + get.complete(e); + } + }); + Throwable t = get.toCompletableFuture().get(5, TimeUnit.SECONDS); + Assertions.assertThat(t).isInstanceOf(UnsupportedOperationException.class).hasMessageContaining("Context.get()"); + + context.runOnContext(x -> { + try { + bean.putGlobal(); + put.completeExceptionally(new Exception("Exception expected as using put is forbidden")); + } catch (Exception e) { + put.complete(e); + } + }); + t = put.toCompletableFuture().get(5, TimeUnit.SECONDS); + Assertions.assertThat(t).isInstanceOf(UnsupportedOperationException.class).hasMessageContaining("Context.put()"); + + context.runOnContext(x -> { + try { + bean.removeGlobal(); + remove.completeExceptionally(new Exception("Exception expected as using remove is forbidden")); + } catch (Exception e) { + remove.complete(e); + } + }); + t = remove.toCompletableFuture().get(5, TimeUnit.SECONDS); + Assertions.assertThat(t).isInstanceOf(UnsupportedOperationException.class).hasMessageContaining("Context.remove()"); + } + + @Test + public void testLocalAccessFromEventLoop() throws ExecutionException, InterruptedException, TimeoutException { + Context context = vertx.getOrCreateContext(); + CompletableFuture<Throwable> get = new CompletableFuture<>(); + CompletableFuture<Throwable> put = new CompletableFuture<>(); + CompletableFuture<Throwable> remove = new CompletableFuture<>(); + + context.runOnContext(x -> { + try { + bean.getLocal(); + get.completeExceptionally(new Exception("Exception expected as using get is forbidden")); + } catch (Exception e) { + get.complete(e); + } + }); + Throwable t = get.toCompletableFuture().get(5, TimeUnit.SECONDS); + Assertions.assertThat(t).isInstanceOf(UnsupportedOperationException.class).hasMessageContaining("Context.getLocal()"); + + context.runOnContext(x -> { + try { + bean.putLocal(); + put.completeExceptionally(new Exception("Exception expected as using put is forbidden")); + } catch (Exception e) { + put.complete(e); + } + }); + t = put.toCompletableFuture().get(5, TimeUnit.SECONDS); + Assertions.assertThat(t).isInstanceOf(UnsupportedOperationException.class).hasMessageContaining("Context.putLocal()"); + + context.runOnContext(x -> { + try { + bean.removeLocal(); + remove.completeExceptionally(new Exception("Exception expected as using remove is forbidden")); + } catch (Exception e) { + remove.complete(e); + } + }); + t = remove.toCompletableFuture().get(5, TimeUnit.SECONDS); + Assertions.assertThat(t).isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Context.removeLocal()"); + } + + @Test + public void testLocalAccessFromDuplicatedContext() throws ExecutionException, InterruptedException, TimeoutException { + ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); + CompletableFuture<Void> get = new CompletableFuture<>(); + CompletableFuture<Void> put = new CompletableFuture<>(); + CompletableFuture<Void> remove = new CompletableFuture<>(); + + ContextInternal local = context.duplicate(); + + local.runOnContext(x -> { + try { + bean.putLocal(); + put.complete(null); + } catch (Exception e) { + get.completeExceptionally(e); + } + }); + put.toCompletableFuture().get(5, TimeUnit.SECONDS); + + local.runOnContext(x -> { + try { + Assertions.assertThat(bean.getLocal()).isEqualTo("bar"); + get.complete(null); + } catch (Exception e) { + get.completeExceptionally(e); + } + }); + get.toCompletableFuture().get(5, TimeUnit.SECONDS); + + local.runOnContext(x -> { + try { + Assertions.assertThat(bean.removeLocal()).isTrue(); + remove.complete(null); + } catch (Exception e) { + remove.completeExceptionally(e); + } + }); + remove.toCompletableFuture().get(5, TimeUnit.SECONDS); + } + + @ApplicationScoped + public static class BeanAccessingContext { + + public String getGlobal() { + return Vertx.currentContext().get("foo"); + } + + public void putGlobal() { + Vertx.currentContext().put("foo", "bar"); + } + + public void removeGlobal() { + Vertx.currentContext().remove("foo"); + } + + public String getLocal() { + return Vertx.currentContext().getLocal("foo"); + } + + public void putLocal() { + Vertx.currentContext().putLocal("foo", "bar"); + } + + public boolean removeLocal() { + return Vertx.currentContext().removeLocal("foo"); + } + + } + +} diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxLocalsHelper.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxLocalsHelper.java new file mode 100644 index 0000000000000..db3b187d44919 --- /dev/null +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxLocalsHelper.java @@ -0,0 +1,49 @@ +package io.quarkus.vertx.core.runtime; + +import io.vertx.core.impl.ContextInternal; +import io.vertx.core.impl.EventLoopContext; +import io.vertx.core.impl.WorkerContext; + +public class VertxLocalsHelper { + + private static final String ILLEGAL_ACCESS_TO_CONTEXT = "Access to Context.put(), Context.get() and Context.remove() " + + "are forbidden as it can leak data between unrelated processing. Use Context.putLocal(), Context.getLocal() " + + "and Context.removeLocal() instead. Note that these methods can only be used from a 'duplicated' Context, " + + "and so may not be available everywhere."; + + private static final String ILLEGAL_ACCESS_TO_LOCAL_CONTEXT = "Access to Context.putLocal(), Context.getLocal() and" + + " Context.removeLocal() are forbidden from a 'root' context as it can leak data between unrelated processing." + + " Make sure the method runs on a 'duplicated' (local) Context"; + + public static void throwOnRootContextAccess() { + throw new UnsupportedOperationException(ILLEGAL_ACCESS_TO_CONTEXT); + } + + @SuppressWarnings("unchecked") + public static <T> T getLocal(ContextInternal context, Object key) { + if (!(context instanceof EventLoopContext) && !(context instanceof WorkerContext)) { + // We are on a duplicated context, allow accessing the locals + return (T) context.localContextData().get(key); + } else { + throw new UnsupportedOperationException(ILLEGAL_ACCESS_TO_LOCAL_CONTEXT); + } + } + + public static void putLocal(ContextInternal context, Object key, Object value) { + if (!(context instanceof EventLoopContext) && !(context instanceof WorkerContext)) { + // We are on a duplicated context, allow accessing the locals + context.localContextData().put(key, value); + } else { + throw new UnsupportedOperationException(ILLEGAL_ACCESS_TO_LOCAL_CONTEXT); + } + } + + public static boolean removeLocal(ContextInternal context, Object key) { + if (!(context instanceof EventLoopContext) && !(context instanceof WorkerContext)) { + // We are on a duplicated context, allow accessing the locals + return context.localContextData().remove(key) != null; + } else { + throw new UnsupportedOperationException(ILLEGAL_ACCESS_TO_LOCAL_CONTEXT); + } + } +} From 7e3b6eb62f06615628bcb72d9555b58221ccc99b Mon Sep 17 00:00:00 2001 From: Roberto Cortez <radcortez@yahoo.com> Date: Tue, 15 Feb 2022 22:20:06 +0000 Subject: [PATCH 2/4] Use SR Commons VertxContext --- bom/application/pom.xml | 2 +- .../deployment/OpenTelemetryProcessor.java | 6 -- .../deployment/WithSpanInterceptorTest.java | 50 ++++++++++ .../opentelemetry/runtime/pom.xml | 4 + .../runtime/QuarkusContextStorage.java | 99 ++++++++++++++----- .../restclient/OpenTelemetryClientFilter.java | 20 ++++ .../vertx/OpenTelemetryVertxTracer.java | 20 ++-- .../OpenTelemetryReactiveClientTest.java | 10 +- .../io/quarkus/it/rest/client/BasicTest.java | 8 +- 9 files changed, 175 insertions(+), 44 deletions(-) diff --git a/bom/application/pom.xml b/bom/application/pom.xml index 4f4fce55d905f..38ee37e58d2b3 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -38,7 +38,7 @@ <microprofile-rest-client.version>2.0</microprofile-rest-client.version> <microprofile-jwt.version>1.2</microprofile-jwt.version> <microprofile-lra.version>1.0</microprofile-lra.version> - <smallrye-common.version>1.9.0</smallrye-common.version> + <smallrye-common.version>1.10.0</smallrye-common.version> <smallrye-config.version>2.9.0</smallrye-config.version> <smallrye-health.version>3.2.0</smallrye-health.version> <smallrye-metrics.version>3.0.4</smallrye-metrics.version> diff --git a/extensions/opentelemetry/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/OpenTelemetryProcessor.java b/extensions/opentelemetry/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/OpenTelemetryProcessor.java index 8dacc413feefd..f9f45c0fa7cd2 100644 --- a/extensions/opentelemetry/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/OpenTelemetryProcessor.java +++ b/extensions/opentelemetry/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/OpenTelemetryProcessor.java @@ -34,7 +34,6 @@ import io.quarkus.runtime.LaunchMode; import io.quarkus.runtime.RuntimeValue; import io.quarkus.vertx.core.deployment.CoreVertxBuildItem; -import io.quarkus.vertx.deployment.CopyVertxContextDataBuildItem; public class OpenTelemetryProcessor { static class OpenTelemetryEnabled implements BooleanSupplier { @@ -138,11 +137,6 @@ void storeVertxOnContextStorage(OpenTelemetryRecorder recorder, CoreVertxBuildIt recorder.storeVertxOnContextStorage(vertx.getVertx()); } - @BuildStep - CopyVertxContextDataBuildItem copyVertxContextData() { - return new CopyVertxContextDataBuildItem(QuarkusContextStorage.ACTIVE_CONTEXT); - } - public static boolean isClassPresent(String classname) { try { Class.forName(classname, false, Thread.currentThread().getContextClassLoader()); diff --git a/extensions/opentelemetry/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/WithSpanInterceptorTest.java b/extensions/opentelemetry/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/WithSpanInterceptorTest.java index 19575b397e21b..324c16bba89d6 100644 --- a/extensions/opentelemetry/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/WithSpanInterceptorTest.java +++ b/extensions/opentelemetry/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/WithSpanInterceptorTest.java @@ -2,12 +2,18 @@ import static io.opentelemetry.api.trace.SpanKind.INTERNAL; import static io.opentelemetry.api.trace.SpanKind.SERVER; +import static java.net.HttpURLConnection.HTTP_OK; import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.List; import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.event.Observes; import javax.inject.Inject; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriBuilder; import org.jboss.shrinkwrap.api.ShrinkWrap; import org.jboss.shrinkwrap.api.spec.JavaArchive; @@ -19,7 +25,10 @@ import io.opentelemetry.extension.annotations.SpanAttribute; import io.opentelemetry.extension.annotations.WithSpan; import io.opentelemetry.sdk.trace.data.SpanData; +import io.quarkus.runtime.StartupEvent; import io.quarkus.test.QuarkusUnitTest; +import io.smallrye.config.SmallRyeConfig; +import io.vertx.ext.web.Router; public class WithSpanInterceptorTest { @RegisterExtension @@ -79,6 +88,15 @@ void spanChild() { assertEquals(spanItems.get(0).getParentSpanId(), spanItems.get(1).getSpanId()); } + @Test + void spanCdiRest() { + spanBean.spanRestClient(); + List<SpanData> spanItems = spanExporter.getFinishedSpanItems(4); + assertEquals(spanItems.get(0).getTraceId(), spanItems.get(1).getTraceId()); + assertEquals(spanItems.get(0).getTraceId(), spanItems.get(2).getTraceId()); + assertEquals(spanItems.get(0).getTraceId(), spanItems.get(3).getTraceId()); + } + @ApplicationScoped public static class SpanBean { @WithSpan @@ -108,6 +126,14 @@ public void spanArgs(@SpanAttribute(value = "arg") String arg) { public void spanChild() { spanChildBean.spanChild(); } + + @Inject + SpanRestClient spanRestClient; + + @WithSpan + public void spanRestClient() { + spanRestClient.spanRestClient(); + } } @ApplicationScoped @@ -117,4 +143,28 @@ public void spanChild() { } } + + @ApplicationScoped + public static class SpanRestClient { + @Inject + SmallRyeConfig config; + + @WithSpan + public void spanRestClient() { + WebTarget target = ClientBuilder.newClient() + .target(UriBuilder.fromUri(config.getRawValue("test.url")).path("hello")); + Response response = target.request().get(); + assertEquals(HTTP_OK, response.getStatus()); + } + } + + @ApplicationScoped + public static class HelloRouter { + @Inject + Router router; + + public void register(@Observes StartupEvent ev) { + router.get("/hello").handler(rc -> rc.response().end("hello")); + } + } } diff --git a/extensions/opentelemetry/opentelemetry/runtime/pom.xml b/extensions/opentelemetry/opentelemetry/runtime/pom.xml index 3669b4f1cdab7..4425bf1fdd2ab 100644 --- a/extensions/opentelemetry/opentelemetry/runtime/pom.xml +++ b/extensions/opentelemetry/opentelemetry/runtime/pom.xml @@ -39,6 +39,10 @@ <optional>true</optional> </dependency> + <dependency> + <groupId>io.smallrye.common</groupId> + <artifactId>smallrye-common-vertx-context</artifactId> + </dependency> <dependency> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-sdk</artifactId> diff --git a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java index 3ef55023745c6..091871ba873aa 100644 --- a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java +++ b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java @@ -1,66 +1,113 @@ package io.quarkus.opentelemetry.runtime; +import static io.smallrye.common.vertx.VertxContext.getOrCreateDuplicatedContext; +import static io.smallrye.common.vertx.VertxContext.isDuplicatedContext; + import org.jboss.logging.Logger; import io.opentelemetry.context.Context; import io.opentelemetry.context.ContextStorage; import io.opentelemetry.context.Scope; import io.vertx.core.Vertx; -import io.vertx.core.impl.ContextInternal; +/** + * Bridges the OpenTelemetry ContextStorage with the Vert.x Context. The default OpenTelemetry ContextStorage (based in + * ThreadLocals) is not suitable for Vert.x. In this case, the OpenTelemetry Context piggybacks on top of the Vert.x + * Context. If the Vert.x Context is not available, fallbacks to the default OpenTelemetry ContextStorage. + */ public enum QuarkusContextStorage implements ContextStorage { INSTANCE; private static final Logger log = Logger.getLogger(QuarkusContextStorage.class); + private static final String OTEL_CONTEXT = QuarkusContextStorage.class.getName() + ".otelContext"; - public static final String ACTIVE_CONTEXT = QuarkusContextStorage.class.getName() + ".activeContext"; - + private static final ContextStorage DEFAULT_CONTEXT_STORAGE = ContextStorage.defaultStorage(); static Vertx vertx; + /** + * Attach the OpenTelemetry Context to the current Context. If a Vert.x Context is available, and it is a duplicated + * Vert.x Context the OpenTelemetry Context is attached to the Vert.x Context. Otherwise, fallback to the + * OpenTelemetry default ContextStorage. + * + * @param toAttach the OpenTelemetry Context to attach + * @return the Scope of the OpenTelemetry Context + */ @Override public Scope attach(Context toAttach) { - return attach(getVertxContext(), toAttach); + io.vertx.core.Context vertxContext = getVertxContext(); + return vertxContext != null && isDuplicatedContext(vertxContext) ? attach(vertxContext, toAttach) + : DEFAULT_CONTEXT_STORAGE.attach(toAttach); } + /** + * Attach the OpenTelemetry Context in the Vert.x Context if it is a duplicated Vert.x Context. + * + * @param vertxContext the Vert.x Context to attach the OpenTelemetry Context + * @param toAttach the OpenTelemetry Context to attach + * @return the Scope of the OpenTelemetry Context + */ public Scope attach(io.vertx.core.Context vertxContext, Context toAttach) { - if (toAttach == null) { - // Not allowed + if (vertxContext == null || toAttach == null) { return Scope.noop(); } + // We don't allow to attach the OpenTelemetry Context to a Vert.x Context that is not a duplicate. + if (!isDuplicatedContext(vertxContext)) { + throw new IllegalArgumentException( + "The Vert.x Context to attach the OpenTelemetry Context must be a duplicated Context"); + } + Context beforeAttach = getContext(vertxContext); if (toAttach == beforeAttach) { return Scope.noop(); } - if (vertxContext != null) { - vertxContext.putLocal(ACTIVE_CONTEXT, toAttach); - return () -> { - if (getContext(vertxContext) != toAttach) { - log.warn("Context in storage not the expected context, Scope.close was not called correctly"); - } - if (beforeAttach == null) { - vertxContext.removeLocal(ACTIVE_CONTEXT); - ((ContextInternal) vertxContext).unwrap().removeLocal(ACTIVE_CONTEXT); - } else { - vertxContext.putLocal(ACTIVE_CONTEXT, beforeAttach); - } - }; - } + vertxContext.putLocal(OTEL_CONTEXT, toAttach); + + return () -> { + if (getContext(vertxContext) != toAttach) { + log.warn("Context in storage not the expected context, Scope.close was not called correctly"); + } - return Scope.noop(); + if (beforeAttach == null) { + vertxContext.removeLocal(OTEL_CONTEXT); + } else { + vertxContext.putLocal(OTEL_CONTEXT, beforeAttach); + } + }; } + /** + * Gets the current OpenTelemetry Context from the current Vert.x Context if one exists or from the default + * ContextStorage. The current Vert.x Context must be a duplicated Context. + * + * @return the current OpenTelemetry Context or null. + */ @Override public Context current() { - return getContext(getVertxContext()); + return Vertx.currentContext() != null ? getOrCreateDuplicatedContext(vertx).getLocal(OTEL_CONTEXT) + : DEFAULT_CONTEXT_STORAGE.current(); } - private Context getContext(io.vertx.core.Context vertxContext) { - return vertxContext != null ? vertxContext.getLocal(ACTIVE_CONTEXT) : null; + /** + * Gets the OpenTelemetry Context in a Vert.x Context. The Vert.x Context has to be a duplicate context. + * + * @param vertxContext a Vert.x Context. + * @return the OpenTelemetry Context if exists in the Vert.x Context or null. + */ + public static Context getContext(io.vertx.core.Context vertxContext) { + return vertxContext != null && isDuplicatedContext(vertxContext) ? vertxContext.getLocal(OTEL_CONTEXT) : null; } - private io.vertx.core.Context getVertxContext() { - return vertx.getOrCreateContext(); + /** + * Gets the current duplicated context or a new duplicated context if a Vert.x Context exists. Multiple invocations + * of this method may return the same or different context. If the current context is a duplicate one, multiple + * invocations always return the same context. If the current context is not duplicated, a new instance is returned + * with each method invocation. + * + * @return a duplicated Vert.x Context or null. + */ + private static io.vertx.core.Context getVertxContext() { + return Vertx.currentContext() != null ? getOrCreateDuplicatedContext(vertx) : null; } } diff --git a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/restclient/OpenTelemetryClientFilter.java b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/restclient/OpenTelemetryClientFilter.java index 7601457bf5aea..8be764b6ac3fd 100644 --- a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/restclient/OpenTelemetryClientFilter.java +++ b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/restclient/OpenTelemetryClientFilter.java @@ -12,6 +12,7 @@ import javax.ws.rs.client.ClientRequestFilter; import javax.ws.rs.client.ClientResponseContext; import javax.ws.rs.client.ClientResponseFilter; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.UriBuilder; import javax.ws.rs.ext.Provider; @@ -28,6 +29,17 @@ import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanStatusExtractor; import io.quarkus.arc.Unremovable; +/** + * A client filter for the JAX-RS Client and MicroProfile REST Client that records OpenTelemetry data. + * + * For the Resteasy Reactive Client, we skip the OpenTelemetry registration, since this can be handled by the + * {@link io.quarkus.opentelemetry.runtime.tracing.vertx.OpenTelemetryVertxTracer}. In theory, this wouldn't be an + * issue, because the OpenTelemetry Instrumenter detects two Client Span and merge both together, but they need to be + * executed with the same OpenTelemetry Context. Right now, the Reactive REST Client filters are executed outside the + * Vert.x Context, so we are unable to propagate the OpenTelemetry Context. This is also not a big issue, because the + * correct OpenTelemetry data will be populated in Vert.x. The only missing piece is the route name available in + * io.quarkus.resteasy.reactive.server.runtime.observability.ObservabilityHandler, which is not propagated to Vert.x. + */ @Unremovable @Provider public class OpenTelemetryClientFilter implements ClientRequestFilter, ClientResponseFilter { @@ -60,6 +72,10 @@ public OpenTelemetryClientFilter(final OpenTelemetry openTelemetry) { @Override public void filter(final ClientRequestContext request) { + if (isReactiveClient(request)) { + return; + } + Context parentContext = Context.current(); if (instrumenter.shouldStart(parentContext, request)) { Context spanContext = instrumenter.start(parentContext, request); @@ -89,6 +105,10 @@ public void filter(final ClientRequestContext request, final ClientResponseConte } } + static boolean isReactiveClient(final ClientRequestContext request) { + return "Resteasy Reactive Client".equals(request.getHeaderString(HttpHeaders.USER_AGENT)); + } + private static class ClientRequestContextTextMapSetter implements TextMapSetter<ClientRequestContext> { @Override public void set(final ClientRequestContext carrier, final String key, final String value) { diff --git a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/vertx/OpenTelemetryVertxTracer.java b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/vertx/OpenTelemetryVertxTracer.java index ca0593d8aba7c..d308c8f7aeace 100644 --- a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/vertx/OpenTelemetryVertxTracer.java +++ b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/vertx/OpenTelemetryVertxTracer.java @@ -3,7 +3,6 @@ import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_CLIENT_IP; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_ROUTE; import static io.quarkus.opentelemetry.runtime.OpenTelemetryConfig.INSTRUMENTATION_NAME; -import static io.quarkus.opentelemetry.runtime.QuarkusContextStorage.ACTIVE_CONTEXT; import java.net.URI; import java.util.List; @@ -27,6 +26,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanStatusExtractor; import io.quarkus.opentelemetry.runtime.QuarkusContextStorage; +import io.smallrye.common.vertx.VertxContext; import io.vertx.core.Context; import io.vertx.core.MultiMap; import io.vertx.core.http.HttpMethod; @@ -75,6 +75,7 @@ public OpenTelemetryVertxTracer(OpenTelemetry openTelemetry) { @Override public <R> SpanOperation receiveRequest( + // The Vert.x context passed to use is already duplicated. final Context context, final SpanKind kind, final TracingPolicy policy, @@ -87,9 +88,9 @@ public <R> SpanOperation receiveRequest( return null; } - io.opentelemetry.context.Context parentContext = context.getLocal(ACTIVE_CONTEXT); + io.opentelemetry.context.Context parentContext = QuarkusContextStorage.getContext(context); if (parentContext == null) { - parentContext = io.opentelemetry.context.Context.root(); + parentContext = io.opentelemetry.context.Context.current(); } if (serverInstrumenter.shouldStart(parentContext, (HttpRequest) request)) { @@ -103,6 +104,7 @@ public <R> SpanOperation receiveRequest( @Override public <R> void sendResponse( + // The Vert.x context passed to use is already duplicated. final Context context, final R response, final SpanOperation spanOperation, @@ -132,6 +134,7 @@ public <R> void sendResponse( @Override public <R> SpanOperation sendRequest( + // This context is not duplicated, so we need to do it. final Context context, final SpanKind kind, final TracingPolicy policy, @@ -144,16 +147,17 @@ public <R> SpanOperation sendRequest( return null; } - io.opentelemetry.context.Context parentContext = context.getLocal(ACTIVE_CONTEXT); + io.opentelemetry.context.Context parentContext = QuarkusContextStorage.getContext(context); if (parentContext == null) { - parentContext = io.opentelemetry.context.Context.root(); + parentContext = io.opentelemetry.context.Context.current(); } if (clientInstrumenter.shouldStart(parentContext, (HttpRequest) request)) { io.opentelemetry.context.Context spanContext = clientInstrumenter.start(parentContext, WriteHeadersHttpRequest.request((HttpRequest) request, headers)); - Scope scope = QuarkusContextStorage.INSTANCE.attach(context, spanContext); - return SpanOperation.span(context, (HttpRequest) request, spanContext, scope); + Context duplicatedContext = VertxContext.getOrCreateDuplicatedContext(context); + Scope scope = QuarkusContextStorage.INSTANCE.attach(duplicatedContext, spanContext); + return SpanOperation.span(duplicatedContext, (HttpRequest) request, spanContext, scope); } return null; @@ -161,6 +165,8 @@ public <R> SpanOperation sendRequest( @Override public <R> void receiveResponse( + // This context is not duplicated, so we need to do it, but we can't duplicate it again because it was already done in + // io.quarkus.opentelemetry.runtime.tracing.vertx.OpenTelemetryVertxTracer.sendRequest, but we don't use it so it should be ok. final Context context, final R response, final SpanOperation spanOperation, diff --git a/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveClientTest.java b/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveClientTest.java index 1ccd0037b9f5e..e197c8da71cb9 100644 --- a/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveClientTest.java +++ b/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveClientTest.java @@ -71,7 +71,10 @@ void get() { assertEquals(HttpMethod.GET.name(), ((Map<?, ?>) server.get("attributes")).get(HTTP_METHOD.getKey())); assertEquals(SpanKind.CLIENT.toString(), client.get("kind")); - assertEquals("reactive", client.get("name")); + // TODO - radcortez - Requires a fix to pass in the UrlPathTemplate in the Vert.x Context. Check: + // io.quarkus.resteasy.reactive.server.runtime.observability.ObservabilityHandler + // org.jboss.resteasy.reactive.client.AsyncResultUni + //assertEquals("reactive", client.get("name")); assertEquals(HTTP_OK, ((Map<?, ?>) client.get("attributes")).get(HTTP_STATUS_CODE.getKey())); assertEquals(HttpMethod.GET.name(), ((Map<?, ?>) client.get("attributes")).get(HTTP_METHOD.getKey())); } @@ -104,7 +107,10 @@ void post() { assertEquals(HttpMethod.POST.name(), ((Map<?, ?>) server.get("attributes")).get(HTTP_METHOD.getKey())); assertEquals(SpanKind.CLIENT.toString(), client.get("kind")); - assertEquals("reactive", client.get("name")); + // TODO - radcortez - Requires a fix to pass in the UrlPathTemplate in the Vert.x Context. Check: + // io.quarkus.resteasy.reactive.server.runtime.observability.ObservabilityHandler + // org.jboss.resteasy.reactive.client.AsyncResultUni + //assertEquals("reactive", client.get("name")); assertEquals(HTTP_OK, ((Map<?, ?>) client.get("attributes")).get(HTTP_STATUS_CODE.getKey())); assertEquals(HttpMethod.POST.name(), ((Map<?, ?>) client.get("attributes")).get(HTTP_METHOD.getKey())); } diff --git a/integration-tests/rest-client-reactive/src/test/java/io/quarkus/it/rest/client/BasicTest.java b/integration-tests/rest-client-reactive/src/test/java/io/quarkus/it/rest/client/BasicTest.java index d9ef8c44460ed..52e452a7e3ef6 100644 --- a/integration-tests/rest-client-reactive/src/test/java/io/quarkus/it/rest/client/BasicTest.java +++ b/integration-tests/rest-client-reactive/src/test/java/io/quarkus/it/rest/client/BasicTest.java @@ -154,11 +154,15 @@ void shouldCreateClientSpans() { Assertions.assertNotNull(spanData.get("attr_http.client_ip")); Assertions.assertNotNull(spanData.get("attr_http.user_agent")); } else if (spanData.get("kind").equals(SpanKind.CLIENT.toString()) - && spanData.get("name").equals("hello")) { + && spanData.get("name").equals("HTTP POST")) { clientFound = true; // Client span - Assertions.assertEquals("hello", spanData.get("name")); + // TODO - radcortez - Requires a fix to pass in the UrlPathTemplate in the Vert.x Context. Check: + // io.quarkus.resteasy.reactive.server.runtime.observability.ObservabilityHandler + // org.jboss.resteasy.reactive.client.AsyncResultUni + //assertEquals("reactive", client.get("name")); + Assertions.assertEquals("HTTP POST", spanData.get("name")); Assertions.assertEquals(SpanKind.CLIENT.toString(), spanData.get("kind")); Assertions.assertTrue((Boolean) spanData.get("ended")); From 4e6908f522190127a28af7c96806c06cc0b0a60e Mon Sep 17 00:00:00 2001 From: Clement Escoffier <clement@apache.org> Date: Thu, 17 Feb 2022 15:03:32 +0100 Subject: [PATCH 3/4] Replace direct access to the Vert.x internal API with the SmallRye Common Vert.x helper. Also protect the context poisoning from Hibernate Reactive to avoid using the root context. --- .../client/MutinyClientInjectionTest.java | 3 +- .../grpc/client/MutinyStubInjectionTest.java | 4 +- .../grpc/server/services/AssertHelper.java | 12 ++---- extensions/grpc/runtime/pom.xml | 4 ++ .../GrpcDuplicatedContextGrpcInterceptor.java | 11 ++--- .../web/context/DuplicatedContextTest.java | 15 +++---- .../quarkus/vertx/DuplicatedContextTest.java | 43 ++++++++----------- .../vertx/locals/LocalContextAccessTest.java | 3 +- extensions/vertx/runtime/pom.xml | 4 ++ .../vertx/core/runtime/VertxLocalsHelper.java | 9 ++-- .../context/VertxContextSafetyToggle.java | 8 ++++ .../quarkus/vertx/runtime/VertxRecorder.java | 3 +- 12 files changed, 59 insertions(+), 60 deletions(-) diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/MutinyClientInjectionTest.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/MutinyClientInjectionTest.java index d7ef443e16c63..b6ae1d2dc51d4 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/MutinyClientInjectionTest.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/MutinyClientInjectionTest.java @@ -19,6 +19,7 @@ import io.quarkus.grpc.GrpcClient; import io.quarkus.grpc.server.services.HelloService; import io.quarkus.test.QuarkusUnitTest; +import io.smallrye.common.vertx.VertxContext; import io.smallrye.mutiny.Uni; import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.EventLoopContext; @@ -80,7 +81,7 @@ public String invokeFromIoThread(String s) { public String invokeFromDuplicatedContext(String s) { Context root = vertx.getOrCreateContext(); - ContextInternal duplicate = ((ContextInternal) root.getDelegate()).duplicate(); + ContextInternal duplicate = (ContextInternal) VertxContext.getOrCreateDuplicatedContext(root.getDelegate()); return Uni.createFrom().<String> emitter(e -> { duplicate.runOnContext(x -> { service.sayHello(HelloRequest.newBuilder().setName(s).build()) diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/MutinyStubInjectionTest.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/MutinyStubInjectionTest.java index 47094ec329483..31ea175e08ad4 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/MutinyStubInjectionTest.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/MutinyStubInjectionTest.java @@ -21,8 +21,8 @@ import io.quarkus.grpc.GrpcClient; import io.quarkus.grpc.server.services.HelloService; import io.quarkus.test.QuarkusUnitTest; +import io.smallrye.common.vertx.VertxContext; import io.smallrye.mutiny.Uni; -import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.EventLoopContext; import io.vertx.core.impl.WorkerContext; import io.vertx.mutiny.core.Context; @@ -88,7 +88,7 @@ public String invokeFromIoThread(String s) { public String invokeFromDuplicatedContext(String s) { Context root = vertx.getOrCreateContext(); - ContextInternal duplicate = ((ContextInternal) root.getDelegate()).duplicate(); + io.vertx.core.Context duplicate = VertxContext.getOrCreateDuplicatedContext(root.getDelegate()); return Uni.createFrom().<String> emitter(e -> { duplicate.runOnContext(x -> { service.sayHello(HelloRequest.newBuilder().setName(s).build()) diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/services/AssertHelper.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/services/AssertHelper.java index e8461260eb4da..0feb45b6d4c49 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/services/AssertHelper.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/services/AssertHelper.java @@ -3,10 +3,9 @@ import static org.assertj.core.api.Assertions.assertThat; import io.quarkus.arc.Arc; +import io.smallrye.common.vertx.VertxContext; import io.vertx.core.Context; import io.vertx.core.Vertx; -import io.vertx.core.impl.EventLoopContext; -import io.vertx.core.impl.WorkerContext; public class AssertHelper { @@ -16,20 +15,15 @@ public static void assertThatTheRequestScopeIsActive() { public static void assertRunOnEventLoop() { assertThat(Vertx.currentContext()).isNotNull(); - assertThat(Vertx.currentContext().isEventLoopContext()); + assertThat(Vertx.currentContext().isEventLoopContext()).isTrue(); assertThat(Thread.currentThread().getName()).contains("eventloop"); } public static Context assertRunOnDuplicatedContext() { - assertThat(Vertx.currentContext()).isNotNull(); - assertThat(isRootContext(Vertx.currentContext())).isFalse(); + assertThat(VertxContext.isOnDuplicatedContext()).isTrue(); return Vertx.currentContext(); } - private static boolean isRootContext(Context context) { - return context instanceof EventLoopContext || context instanceof WorkerContext; - } - public static void assertRunOnWorker() { assertThat(Vertx.currentContext()).isNotNull(); assertThat(Thread.currentThread().getName()).contains("executor"); diff --git a/extensions/grpc/runtime/pom.xml b/extensions/grpc/runtime/pom.xml index a06bc5f6449ea..0c4baa4be920a 100644 --- a/extensions/grpc/runtime/pom.xml +++ b/extensions/grpc/runtime/pom.xml @@ -85,6 +85,10 @@ <groupId>io.smallrye.common</groupId> <artifactId>smallrye-common-annotation</artifactId> </dependency> + <dependency> + <groupId>io.smallrye.common</groupId> + <artifactId>smallrye-common-vertx-context</artifactId> + </dependency> <!-- Test dependencies --> <dependency> diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcDuplicatedContextGrpcInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcDuplicatedContextGrpcInterceptor.java index 78983dcad303f..b6f0938dafeac 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcDuplicatedContextGrpcInterceptor.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcDuplicatedContextGrpcInterceptor.java @@ -12,11 +12,9 @@ import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; import io.quarkus.grpc.GlobalInterceptor; +import io.smallrye.common.vertx.VertxContext; import io.vertx.core.Context; import io.vertx.core.Vertx; -import io.vertx.core.impl.ContextInternal; -import io.vertx.core.impl.EventLoopContext; -import io.vertx.core.impl.WorkerContext; @ApplicationScoped @GlobalInterceptor @@ -27,7 +25,7 @@ public GrpcDuplicatedContextGrpcInterceptor() { } private static boolean isRootContext(Context context) { - return context instanceof EventLoopContext || context instanceof WorkerContext; + return !VertxContext.isDuplicatedContext(context); } @Override @@ -40,10 +38,7 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, Re if (capturedVertxContext != null) { // If we are not on a duplicated context, create and switch. - Context local = capturedVertxContext; - if (isRootContext(capturedVertxContext)) { - local = ((ContextInternal) capturedVertxContext).duplicate(); - } + Context local = VertxContext.getOrCreateDuplicatedContext(capturedVertxContext); // Must be sure to call next.startCall on the right context return new ListenedOnDuplicatedContext<>(() -> next.startCall(call, headers), local); diff --git a/extensions/reactive-routes/deployment/src/test/java/io/quarkus/vertx/web/context/DuplicatedContextTest.java b/extensions/reactive-routes/deployment/src/test/java/io/quarkus/vertx/web/context/DuplicatedContextTest.java index 117bcb7064946..a7ef899529589 100644 --- a/extensions/reactive-routes/deployment/src/test/java/io/quarkus/vertx/web/context/DuplicatedContextTest.java +++ b/extensions/reactive-routes/deployment/src/test/java/io/quarkus/vertx/web/context/DuplicatedContextTest.java @@ -18,6 +18,8 @@ import io.quarkus.test.QuarkusUnitTest; import io.quarkus.vertx.web.Route; import io.smallrye.common.annotation.Blocking; +import io.smallrye.common.vertx.ContextLocals; +import io.smallrye.common.vertx.VertxContext; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.vertx.core.Context; @@ -25,8 +27,6 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpClientResponse; import io.vertx.core.http.HttpMethod; -import io.vertx.core.impl.EventLoopContext; -import io.vertx.core.impl.WorkerContext; import io.vertx.ext.web.RoutingContext; /** @@ -87,14 +87,13 @@ void getBlocking(RoutingContext ctx) { private void process(RoutingContext ctx) { Context context = Vertx.currentContext(); - Assertions.assertFalse(context instanceof EventLoopContext); - Assertions.assertFalse(context instanceof WorkerContext); + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); - String val = context.getLocal("key"); + String val = ContextLocals.<String> get("key").orElse(null); Assertions.assertNull(val); String id = ctx.pathParam("id"); - context.putLocal("key", id); + ContextLocals.put("key", id); vertx.createHttpClient().request(HttpMethod.GET, 8081, "localhost", "/hey") .compose(request -> request.end().compose(x -> request.response())) @@ -102,9 +101,9 @@ private void process(RoutingContext ctx) { .map(Buffer::toString) .onSuccess(msg -> { Assertions.assertEquals("hey!", msg); - Assertions.assertEquals(id, Vertx.currentContext().getLocal("key")); + Assertions.assertEquals(id, ContextLocals.<String> get("key").orElseThrow()); Assertions.assertSame(Vertx.currentContext(), context); - ctx.response().end("OK-" + context.getLocal("key")); + ctx.response().end("OK-" + ContextLocals.get("key").orElseThrow()); }); } diff --git a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/DuplicatedContextTest.java b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/DuplicatedContextTest.java index 52b320e66db31..b1e615b69dfba 100644 --- a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/DuplicatedContextTest.java +++ b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/DuplicatedContextTest.java @@ -20,6 +20,8 @@ import io.quarkus.test.QuarkusUnitTest; import io.smallrye.common.annotation.Blocking; +import io.smallrye.common.vertx.ContextLocals; +import io.smallrye.common.vertx.VertxContext; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.vertx.core.Context; @@ -27,8 +29,6 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpClientResponse; import io.vertx.core.http.HttpMethod; -import io.vertx.core.impl.EventLoopContext; -import io.vertx.core.impl.WorkerContext; import io.vertx.mutiny.core.eventbus.EventBus; import io.vertx.mutiny.core.eventbus.Message; @@ -152,10 +152,9 @@ Uni<String> receive(String data) { private Uni<String> process(String id) { Context context = Vertx.currentContext(); - Assertions.assertFalse(context instanceof EventLoopContext); - Assertions.assertFalse(context instanceof WorkerContext); + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); - String val = context.getLocal("key"); + String val = ContextLocals.get("key", null); Assertions.assertNull(val); context.putLocal("key", id); @@ -167,9 +166,9 @@ private Uni<String> process(String id) { .map(Buffer::toString) .map(msg -> { Assertions.assertEquals("hey!", msg); - Assertions.assertEquals(id, Vertx.currentContext().getLocal("key")); + Assertions.assertEquals(id, ContextLocals.get("key", null)); Assertions.assertSame(Vertx.currentContext(), context); - return "OK-" + context.getLocal("key"); + return "OK-" + ContextLocals.get("key", null); }).toCompletionStage()); } @@ -183,10 +182,9 @@ String receiveBlocking(String data) { @ConsumeEvent(value = "context-send") public void consumeSend(String s) { Context context = Vertx.currentContext(); - Assertions.assertFalse(context instanceof EventLoopContext); - Assertions.assertFalse(context instanceof WorkerContext); + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); - String val = context.getLocal("key"); + String val = ContextLocals.get("key", null); Assertions.assertNull(val); context.putLocal("key", s); @@ -196,10 +194,9 @@ public void consumeSend(String s) { @ConsumeEvent(value = "context-send-blocking") public void consumeSendBlocking(String s) { Context context = Vertx.currentContext(); - Assertions.assertFalse(context instanceof EventLoopContext); - Assertions.assertFalse(context instanceof WorkerContext); + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); - String val = context.getLocal("key"); + String val = ContextLocals.get("key", null); Assertions.assertNull(val); context.putLocal("key", s); @@ -209,10 +206,9 @@ public void consumeSendBlocking(String s) { @ConsumeEvent(value = "context-publish") public void consumePublish1(String s) { Context context = Vertx.currentContext(); - Assertions.assertFalse(context instanceof EventLoopContext); - Assertions.assertFalse(context instanceof WorkerContext); + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); - String val = context.getLocal("key"); + String val = ContextLocals.get("key", null); Assertions.assertNull(val); context.putLocal("key", s); @@ -222,10 +218,9 @@ public void consumePublish1(String s) { @ConsumeEvent(value = "context-publish") public void consumePublish2(String s) { Context context = Vertx.currentContext(); - Assertions.assertFalse(context instanceof EventLoopContext); - Assertions.assertFalse(context instanceof WorkerContext); + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); - String val = context.getLocal("key"); + String val = ContextLocals.get("key", null); Assertions.assertNull(val); context.putLocal("key", s); @@ -236,10 +231,9 @@ public void consumePublish2(String s) { @Blocking public void consumePublishBlocking1(String s) { Context context = Vertx.currentContext(); - Assertions.assertFalse(context instanceof EventLoopContext); - Assertions.assertFalse(context instanceof WorkerContext); + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); - String val = context.getLocal("key"); + String val = ContextLocals.get("key", null); Assertions.assertNull(val); context.putLocal("key", s); @@ -250,10 +244,9 @@ public void consumePublishBlocking1(String s) { @Blocking public void consumePublishBlocking2(String s) { Context context = Vertx.currentContext(); - Assertions.assertFalse(context instanceof EventLoopContext); - Assertions.assertFalse(context instanceof WorkerContext); + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); - String val = context.getLocal("key"); + String val = ContextLocals.get("key", null); Assertions.assertNull(val); context.putLocal("key", s); diff --git a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/locals/LocalContextAccessTest.java b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/locals/LocalContextAccessTest.java index fe3dcd9814e6f..35c465a2d7f2c 100644 --- a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/locals/LocalContextAccessTest.java +++ b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/locals/LocalContextAccessTest.java @@ -15,6 +15,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import io.quarkus.test.QuarkusUnitTest; +import io.smallrye.common.vertx.VertxContext; import io.vertx.core.Context; import io.vertx.core.Vertx; import io.vertx.core.impl.ContextInternal; @@ -122,7 +123,7 @@ public void testLocalAccessFromDuplicatedContext() throws ExecutionException, In CompletableFuture<Void> put = new CompletableFuture<>(); CompletableFuture<Void> remove = new CompletableFuture<>(); - ContextInternal local = context.duplicate(); + Context local = VertxContext.getOrCreateDuplicatedContext(context); local.runOnContext(x -> { try { diff --git a/extensions/vertx/runtime/pom.xml b/extensions/vertx/runtime/pom.xml index 2ca2d292dcb5a..5970735688cf6 100644 --- a/extensions/vertx/runtime/pom.xml +++ b/extensions/vertx/runtime/pom.xml @@ -34,6 +34,10 @@ <groupId>io.smallrye.common</groupId> <artifactId>smallrye-common-annotation</artifactId> </dependency> + <dependency> + <groupId>io.smallrye.common</groupId> + <artifactId>smallrye-common-vertx-context</artifactId> + </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-mutiny</artifactId> diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxLocalsHelper.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxLocalsHelper.java index db3b187d44919..6642f6bd1b80f 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxLocalsHelper.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxLocalsHelper.java @@ -1,8 +1,7 @@ package io.quarkus.vertx.core.runtime; +import io.smallrye.common.vertx.VertxContext; import io.vertx.core.impl.ContextInternal; -import io.vertx.core.impl.EventLoopContext; -import io.vertx.core.impl.WorkerContext; public class VertxLocalsHelper { @@ -21,7 +20,7 @@ public static void throwOnRootContextAccess() { @SuppressWarnings("unchecked") public static <T> T getLocal(ContextInternal context, Object key) { - if (!(context instanceof EventLoopContext) && !(context instanceof WorkerContext)) { + if (VertxContext.isDuplicatedContext(context)) { // We are on a duplicated context, allow accessing the locals return (T) context.localContextData().get(key); } else { @@ -30,7 +29,7 @@ public static <T> T getLocal(ContextInternal context, Object key) { } public static void putLocal(ContextInternal context, Object key, Object value) { - if (!(context instanceof EventLoopContext) && !(context instanceof WorkerContext)) { + if (VertxContext.isDuplicatedContext(context)) { // We are on a duplicated context, allow accessing the locals context.localContextData().put(key, value); } else { @@ -39,7 +38,7 @@ public static void putLocal(ContextInternal context, Object key, Object value) { } public static boolean removeLocal(ContextInternal context, Object key) { - if (!(context instanceof EventLoopContext) && !(context instanceof WorkerContext)) { + if (VertxContext.isDuplicatedContext(context)) { // We are on a duplicated context, allow accessing the locals return context.localContextData().remove(key) != null; } else { diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/context/VertxContextSafetyToggle.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/context/VertxContextSafetyToggle.java index 67acf584e900c..f4203291f218b 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/context/VertxContextSafetyToggle.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/context/VertxContextSafetyToggle.java @@ -1,5 +1,6 @@ package io.quarkus.vertx.core.runtime.context; +import io.smallrye.common.vertx.VertxContext; import io.vertx.core.Context; import io.vertx.core.Vertx; @@ -64,6 +65,10 @@ public static void validateContextIfExists(final String errorMessageOnVeto, fina } private static void checkIsSafe(final Context context, final String errorMessageOnVeto, final String errorMessageOnDoubt) { + if (!VertxContext.isDuplicatedContext(context)) { + throw new IllegalStateException( + "Can't get the context safety flag: the current context is not a duplicated context"); + } final Object safeFlag = context.getLocal(ACCESS_TOGGLE_KEY); if (safeFlag == Boolean.TRUE) { return; @@ -88,6 +93,9 @@ public static void setCurrentContextSafe(boolean safe) { final io.vertx.core.Context context = Vertx.currentContext(); if (context == null) { throw new IllegalStateException("Can't set the context safety flag: no Vert.x context found"); + } else if (!VertxContext.isDuplicatedContext(context)) { + throw new IllegalStateException( + "Can't set the context safety flag: the current context is not a duplicated context"); } else { context.putLocal(ACCESS_TOGGLE_KEY, Boolean.valueOf(safe)); } diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxRecorder.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxRecorder.java index 3b3c8e7f3de7a..4802063fbf9c0 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxRecorder.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxRecorder.java @@ -17,6 +17,7 @@ import io.quarkus.runtime.annotations.Recorder; import io.quarkus.runtime.configuration.ProfileManager; import io.quarkus.vertx.ConsumeEvent; +import io.smallrye.common.vertx.VertxContext; import io.vertx.core.AsyncResult; import io.vertx.core.Context; import io.vertx.core.Handler; @@ -98,7 +99,7 @@ public void handle(Void x) { public void handle(Message<Object> m) { if (invoker.isBlocking()) { // We need to create a duplicated context from the "context" - Context dup = context.duplicate(); + Context dup = VertxContext.getOrCreateDuplicatedContext(context); dup.executeBlocking(new Handler<Promise<Object>>() { @Override public void handle(Promise<Object> event) { From 70bcd82b647f25d039f4129f8485d6e58a4c1cf1 Mon Sep 17 00:00:00 2001 From: Clement Escoffier <clement@apache.org> Date: Fri, 18 Feb 2022 10:09:33 +0100 Subject: [PATCH 4/4] Create duplicated context for each HTTP request in the virtual HTTP server --- extensions/vertx-http/runtime/pom.xml | 4 ++++ .../java/io/quarkus/vertx/http/runtime/VertxHttpRecorder.java | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/extensions/vertx-http/runtime/pom.xml b/extensions/vertx-http/runtime/pom.xml index 3b6ab58554b66..a5326ef98b710 100644 --- a/extensions/vertx-http/runtime/pom.xml +++ b/extensions/vertx-http/runtime/pom.xml @@ -25,6 +25,10 @@ <groupId>io.quarkus</groupId> <artifactId>quarkus-mutiny</artifactId> </dependency> + <dependency> + <groupId>io.smallrye.common</groupId> + <artifactId>smallrye-common-vertx-context</artifactId> + </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-vertx-http-dev-console-runtime-spi</artifactId> diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/VertxHttpRecorder.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/VertxHttpRecorder.java index 18645cc249a72..929b1088030ff 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/VertxHttpRecorder.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/VertxHttpRecorder.java @@ -73,6 +73,7 @@ import io.quarkus.vertx.http.runtime.filters.accesslog.AccessLogReceiver; import io.quarkus.vertx.http.runtime.filters.accesslog.DefaultAccessLogReceiver; import io.quarkus.vertx.http.runtime.filters.accesslog.JBossLoggingAccessLogReceiver; +import io.smallrye.common.vertx.VertxContext; import io.vertx.core.AbstractVerticle; import io.vertx.core.AsyncResult; import io.vertx.core.Context; @@ -92,6 +93,7 @@ import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpVersion; import io.vertx.core.http.impl.Http1xServerConnection; +import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.EventLoopContext; import io.vertx.core.impl.VertxInternal; import io.vertx.core.net.JdkSSLEngineOptions; @@ -1207,7 +1209,7 @@ public void initChannel(VirtualChannel ch) throws Exception { VertxHandler<Http1xServerConnection> handler = VertxHandler.create(chctx -> { Http1xServerConnection conn = new Http1xServerConnection( - () -> context, + () -> (ContextInternal) VertxContext.getOrCreateDuplicatedContext(context), null, new HttpServerOptions(), chctx,