From 55e75874065b3f6873c165aafc5232c67018cc92 Mon Sep 17 00:00:00 2001 From: Roberto Cortez Date: Wed, 5 Jan 2022 22:09:46 +0000 Subject: [PATCH] Copy duplicated local context to the event loop context --- .../runtime/QuarkusContextStorage.java | 2 + .../VertxResteasyReactiveRequestContext.java | 3 + .../opentelemetry-reactive/pom.xml | 177 ++++++++++++++++++ .../reactive/ExporterResource.java | 49 +++++ .../reactive/ReactiveResource.java | 33 ++++ .../it/opentelemetry/reactive/SpanData.java | 28 +++ .../OpenTelemetryReactiveClientTest.java | 126 +++++++++++++ .../reactive/OpenTelemetryReactiveTest.java | 66 +++++++ .../src/test/resources/application.properties | 1 + integration-tests/opentelemetry/pom.xml | 5 - integration-tests/pom.xml | 1 + 11 files changed, 486 insertions(+), 5 deletions(-) create mode 100644 integration-tests/opentelemetry-reactive/pom.xml create mode 100644 integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/ExporterResource.java create mode 100644 integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/ReactiveResource.java create mode 100644 integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/SpanData.java create mode 100644 integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveClientTest.java create mode 100644 integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveTest.java create mode 100644 integration-tests/opentelemetry-reactive/src/test/resources/application.properties diff --git a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java index 6f4e1fe665617..3ef55023745c6 100644 --- a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java +++ b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/QuarkusContextStorage.java @@ -6,6 +6,7 @@ import io.opentelemetry.context.ContextStorage; import io.opentelemetry.context.Scope; import io.vertx.core.Vertx; +import io.vertx.core.impl.ContextInternal; public enum QuarkusContextStorage implements ContextStorage { INSTANCE; @@ -40,6 +41,7 @@ public Scope attach(io.vertx.core.Context vertxContext, Context toAttach) { } if (beforeAttach == null) { vertxContext.removeLocal(ACTIVE_CONTEXT); + ((ContextInternal) vertxContext).unwrap().removeLocal(ACTIVE_CONTEXT); } else { vertxContext.putLocal(ACTIVE_CONTEXT, beforeAttach); } diff --git a/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxResteasyReactiveRequestContext.java b/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxResteasyReactiveRequestContext.java index a41dd983dad3a..deeff94345c92 100644 --- a/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxResteasyReactiveRequestContext.java +++ b/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxResteasyReactiveRequestContext.java @@ -6,6 +6,7 @@ import io.netty.util.concurrent.ScheduledFuture; import io.vertx.core.AsyncResult; import io.vertx.core.Handler; +import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerResponse; @@ -60,6 +61,8 @@ public VertxResteasyReactiveRequestContext(Deployment deployment, ProvidersImpl context.addHeadersEndHandler(this); String expect = request.getHeader(HttpHeaderNames.EXPECT); ContextInternal internal = ((ConnectionBase) context.request().connection()).getContext(); + ContextInternal current = (ContextInternal) Vertx.currentContext(); + internal.localContextData().putAll(current.localContextData()); if (expect != null && expect.equalsIgnoreCase(CONTINUE)) { continueState = ContinueState.REQUIRED; } diff --git a/integration-tests/opentelemetry-reactive/pom.xml b/integration-tests/opentelemetry-reactive/pom.xml new file mode 100644 index 0000000000000..243ac4c40051f --- /dev/null +++ b/integration-tests/opentelemetry-reactive/pom.xml @@ -0,0 +1,177 @@ + + + 4.0.0 + + + io.quarkus + quarkus-integration-tests-parent + 999-SNAPSHOT + + + quarkus-integration-test-opentelemetry-reactive + Quarkus - Integration Tests - OpenTelemetry Reactive + + + + io.quarkus + quarkus-opentelemetry + + + + + io.quarkus + quarkus-resteasy-reactive + + + io.quarkus + quarkus-resteasy-reactive-jackson + + + + + io.quarkus + quarkus-rest-client-reactive + + + + + io.opentelemetry + opentelemetry-sdk-testing + + + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + org.awaitility + awaitility + test + + + + + io.quarkus + quarkus-resteasy-reactive-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-reactive-jackson-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-rest-client-reactive-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-opentelemetry-deployment + ${project.version} + pom + test + + + * + * + + + + + + + + + io.quarkus + quarkus-maven-plugin + + + + build + + + + + + + + + + native-image + + + native + + + + + native + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + ${native.surefire.skip} + + + + + maven-failsafe-plugin + + + + integration-test + verify + + + + + ${project.build.directory}/${project.build.finalName}-runner + + + + + + + + + + + diff --git a/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/ExporterResource.java b/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/ExporterResource.java new file mode 100644 index 0000000000000..5b4c4b0df387a --- /dev/null +++ b/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/ExporterResource.java @@ -0,0 +1,49 @@ +package io.quarkus.it.opentelemetry.reactive; + +import static java.util.Comparator.comparingLong; + +import java.util.List; +import java.util.stream.Collectors; + +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.inject.Produces; +import javax.inject.Inject; +import javax.inject.Singleton; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.core.Response; + +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.data.SpanData; + +@Path("") +public class ExporterResource { + @Inject + InMemorySpanExporter inMemorySpanExporter; + + @GET + @Path("/reset") + public Response reset() { + inMemorySpanExporter.reset(); + return Response.ok().build(); + } + + @GET + @Path("/export") + public List export() { + return inMemorySpanExporter.getFinishedSpanItems() + .stream() + .filter(sd -> !sd.getName().contains("export") && !sd.getName().contains("reset")) + .sorted(comparingLong(SpanData::getStartEpochNanos).reversed()) + .collect(Collectors.toList()); + } + + @ApplicationScoped + static class InMemorySpanExporterProducer { + @Produces + @Singleton + InMemorySpanExporter inMemorySpanExporter() { + return InMemorySpanExporter.create(); + } + } +} diff --git a/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/ReactiveResource.java b/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/ReactiveResource.java new file mode 100644 index 0000000000000..31b78a13405f2 --- /dev/null +++ b/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/ReactiveResource.java @@ -0,0 +1,33 @@ +package io.quarkus.it.opentelemetry.reactive; + +import java.time.Duration; + +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.QueryParam; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.smallrye.mutiny.Uni; + +@Path("/reactive") +public class ReactiveResource { + @Inject + Tracer tracer; + + @GET + public Uni helloGet(@QueryParam("name") String name) { + Span span = tracer.spanBuilder("helloGet").startSpan(); + return Uni.createFrom().item("Hello " + name).onItem().delayIt().by(Duration.ofSeconds(2)) + .eventually((Runnable) span::end); + } + + @POST + public Uni helloPost(String body) { + Span span = tracer.spanBuilder("helloPost").startSpan(); + return Uni.createFrom().item("Hello " + body).onItem().delayIt().by(Duration.ofSeconds(2)) + .eventually((Runnable) span::end); + } +} diff --git a/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/SpanData.java b/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/SpanData.java new file mode 100644 index 0000000000000..86d7a7f45def7 --- /dev/null +++ b/integration-tests/opentelemetry-reactive/src/main/java/io/quarkus/it/opentelemetry/reactive/SpanData.java @@ -0,0 +1,28 @@ +package io.quarkus.it.opentelemetry.reactive; + +import io.quarkus.runtime.annotations.RegisterForReflection; + +@RegisterForReflection(classNames = { + "io.opentelemetry.sdk.trace.data.SpanData", + "io.opentelemetry.sdk.trace.SpanWrapper", + "io.opentelemetry.sdk.trace.AutoValue_SpanWrapper", + "io.opentelemetry.sdk.trace.data.StatusData", + "io.opentelemetry.sdk.trace.data.ImmutableStatusData", + "io.opentelemetry.sdk.trace.data.AutoValue_ImmutableStatusData", + "io.opentelemetry.api.trace.SpanContext", + "io.opentelemetry.api.internal.ImmutableSpanContext", + "io.opentelemetry.api.internal.AutoValue_ImmutableSpanContext", + "io.opentelemetry.api.trace.TraceFlags", + "io.opentelemetry.api.trace.ImmutableTraceFlags", + "io.opentelemetry.api.trace.TraceState", + "io.opentelemetry.api.trace.ArrayBasedTraceState", + "io.opentelemetry.api.trace.AutoValue_ArrayBasedTraceState", + "io.opentelemetry.sdk.common.InstrumentationLibraryInfo", + "io.opentelemetry.sdk.common.AutoValue_InstrumentationLibraryInfo", + "io.opentelemetry.sdk.resources.Resource", + "io.opentelemetry.sdk.resources.AutoValue_Resource", + "io.opentelemetry.api.common.Attributes", + "io.quarkus.opentelemetry.runtime.tracing.DelayedAttributes" +}) +public class SpanData { +} diff --git a/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveClientTest.java b/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveClientTest.java new file mode 100644 index 0000000000000..1ccd0037b9f5e --- /dev/null +++ b/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveClientTest.java @@ -0,0 +1,126 @@ +package io.quarkus.it.opentelemetry.reactive; + +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_METHOD; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_ROUTE; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_STATUS_CODE; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_TARGET; +import static io.restassured.RestAssured.given; +import static io.restassured.RestAssured.when; +import static java.net.HttpURLConnection.HTTP_OK; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.QueryParam; + +import org.eclipse.microprofile.rest.client.inject.RegisterRestClient; +import org.eclipse.microprofile.rest.client.inject.RestClient; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import io.opentelemetry.api.trace.SpanKind; +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.common.mapper.TypeRef; +import io.smallrye.mutiny.Uni; +import io.vertx.core.http.HttpMethod; + +@QuarkusTest +public class OpenTelemetryReactiveClientTest { + @Inject + @RestClient + ReactiveRestClient client; + + @AfterEach + void reset() { + given().get("/reset").then().statusCode(HTTP_OK); + await().atMost(5, TimeUnit.SECONDS).until(() -> getSpans().size() == 0); + } + + @Test + void get() { + Uni result = client.helloGet("Naruto"); + assertEquals("Hello Naruto", result.await().indefinitely()); + + await().atMost(5, TimeUnit.SECONDS).until(() -> getSpans().size() == 3); + List> spans = getSpans(); + assertEquals(3, spans.size()); + assertEquals(spans.get(0).get("traceId"), spans.get(1).get("traceId")); + assertEquals(spans.get(0).get("traceId"), spans.get(2).get("traceId")); + + Map internal = spans.get(0); + Map server = spans.get(1); + Map client = spans.get(2); + + assertEquals(SpanKind.INTERNAL.toString(), internal.get("kind")); + assertEquals("helloGet", internal.get("name")); + assertEquals(internal.get("parentSpanId"), server.get("spanId")); + + assertEquals(SpanKind.SERVER.toString(), server.get("kind")); + assertEquals(server.get("parentSpanId"), client.get("spanId")); + assertEquals("reactive", server.get("name")); + assertEquals("/reactive", ((Map) server.get("attributes")).get(HTTP_ROUTE.getKey())); + assertEquals("/reactive?name=Naruto", ((Map) server.get("attributes")).get(HTTP_TARGET.getKey())); + assertEquals(HTTP_OK, ((Map) server.get("attributes")).get(HTTP_STATUS_CODE.getKey())); + assertEquals(HttpMethod.GET.name(), ((Map) server.get("attributes")).get(HTTP_METHOD.getKey())); + + assertEquals(SpanKind.CLIENT.toString(), client.get("kind")); + assertEquals("reactive", client.get("name")); + assertEquals(HTTP_OK, ((Map) client.get("attributes")).get(HTTP_STATUS_CODE.getKey())); + assertEquals(HttpMethod.GET.name(), ((Map) client.get("attributes")).get(HTTP_METHOD.getKey())); + } + + @Test + void post() { + Uni result = client.helloPost("Naruto"); + assertEquals("Hello Naruto", result.await().indefinitely()); + + await().atMost(5, TimeUnit.SECONDS).until(() -> getSpans().size() == 3); + List> spans = getSpans(); + assertEquals(3, spans.size()); + assertEquals(spans.get(0).get("traceId"), spans.get(1).get("traceId")); + assertEquals(spans.get(0).get("traceId"), spans.get(2).get("traceId")); + + Map internal = spans.get(0); + Map server = spans.get(1); + Map client = spans.get(2); + + assertEquals(SpanKind.INTERNAL.toString(), internal.get("kind")); + assertEquals("helloPost", internal.get("name")); + assertEquals(internal.get("parentSpanId"), server.get("spanId")); + + assertEquals(SpanKind.SERVER.toString(), server.get("kind")); + assertEquals(server.get("parentSpanId"), client.get("spanId")); + assertEquals("reactive", server.get("name")); + assertEquals("/reactive", ((Map) server.get("attributes")).get(HTTP_ROUTE.getKey())); + assertEquals("/reactive", ((Map) server.get("attributes")).get(HTTP_TARGET.getKey())); + assertEquals(HTTP_OK, ((Map) server.get("attributes")).get(HTTP_STATUS_CODE.getKey())); + assertEquals(HttpMethod.POST.name(), ((Map) server.get("attributes")).get(HTTP_METHOD.getKey())); + + assertEquals(SpanKind.CLIENT.toString(), client.get("kind")); + assertEquals("reactive", client.get("name")); + assertEquals(HTTP_OK, ((Map) client.get("attributes")).get(HTTP_STATUS_CODE.getKey())); + assertEquals(HttpMethod.POST.name(), ((Map) client.get("attributes")).get(HTTP_METHOD.getKey())); + } + + @RegisterRestClient(configKey = "client") + @Path("/reactive") + interface ReactiveRestClient { + @GET + Uni helloGet(@QueryParam("name") String name); + + @POST + Uni helloPost(String body); + } + + private static List> getSpans() { + return when().get("/export").body().as(new TypeRef<>() { + }); + } +} diff --git a/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveTest.java b/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveTest.java new file mode 100644 index 0000000000000..275288a35b1b6 --- /dev/null +++ b/integration-tests/opentelemetry-reactive/src/test/java/io/quarkus/it/opentelemetry/reactive/OpenTelemetryReactiveTest.java @@ -0,0 +1,66 @@ +package io.quarkus.it.opentelemetry.reactive; + +import static io.restassured.RestAssured.given; +import static io.restassured.RestAssured.when; +import static java.net.HttpURLConnection.HTTP_OK; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.common.mapper.TypeRef; + +@QuarkusTest +public class OpenTelemetryReactiveTest { + @AfterEach + void reset() { + given().get("/reset").then().statusCode(HTTP_OK); + await().atMost(5, TimeUnit.SECONDS).until(() -> getSpans().size() == 0); + } + + @Test + void get() { + given() + .contentType("application/json") + .when() + .queryParam("name", "Naruto") + .get("/reactive") + .then() + .statusCode(200) + .body(equalTo("Hello Naruto")); + + await().atMost(5, TimeUnit.SECONDS).until(() -> getSpans().size() == 2); + List> spans = getSpans(); + assertEquals(2, spans.size()); + assertEquals(spans.get(0).get("traceId"), spans.get(1).get("traceId")); + } + + @Test + void post() { + given() + .contentType("application/json") + .when() + .body("Naruto") + .post("/reactive") + .then() + .statusCode(200) + .body(equalTo("Hello Naruto")); + + await().atMost(5, TimeUnit.SECONDS).until(() -> getSpans().size() == 2); + List> spans = getSpans(); + assertEquals(2, spans.size()); + assertEquals(spans.get(0).get("traceId"), spans.get(1).get("traceId")); + } + + private static List> getSpans() { + return when().get("/export").body().as(new TypeRef<>() { + }); + } +} diff --git a/integration-tests/opentelemetry-reactive/src/test/resources/application.properties b/integration-tests/opentelemetry-reactive/src/test/resources/application.properties new file mode 100644 index 0000000000000..c84386f9fc151 --- /dev/null +++ b/integration-tests/opentelemetry-reactive/src/test/resources/application.properties @@ -0,0 +1 @@ +quarkus.rest-client.client.url=${test.url} diff --git a/integration-tests/opentelemetry/pom.xml b/integration-tests/opentelemetry/pom.xml index c6ca2785b1e45..3642042ff1b89 100644 --- a/integration-tests/opentelemetry/pom.xml +++ b/integration-tests/opentelemetry/pom.xml @@ -57,11 +57,6 @@ awaitility test - - org.jboss.logging - commons-logging-jboss-logging - test - diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 476895aaa3f1c..8f4a382261d2b 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -283,6 +283,7 @@ micrometer-prometheus opentelemetry opentelemetry-vertx + opentelemetry-reactive logging-json jaxb jaxp