Skip to content

Commit

Permalink
Merge pull request #30478 from OpenGuidou/OTEL-CP
Browse files Browse the repository at this point in the history
Allow context propagation for OpenTelemetry
  • Loading branch information
radcortez authored Jan 20, 2023
2 parents 2a31ed5 + 3232c8b commit 8c95e20
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 8 deletions.
2 changes: 0 additions & 2 deletions docs/src/main/asciidoc/scheduler-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,6 @@ If the xref:smallrye-metrics.adoc[SmallRye Metrics extension] is present, then a

If `quarkus.scheduler.tracing.enabled` is set to `true` and the xref:opentelemetry.adoc[OpenTelemetry extension] is present then the `@io.opentelemetry.instrumentation.annotations.WithSpan` annotation is added automatically to every `@Scheduled` method. As a result, each execution of this method has a new `io.opentelemetry.api.trace.Span` associated.

IMPORTANT: <<non-blocking-methods,Non-blocking methods>> are not supported, i.e. a new span is associated with the _actual_ invocation but it's not available within the asynchronous computation.

== Configuration Reference

include::{generated-dir}/config/quarkus-scheduler.adoc[leveloffset=+1, opts=optional]
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.quarkus.opentelemetry.deployment.propagation;

import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.BuildSteps;
import io.quarkus.opentelemetry.deployment.OpenTelemetryEnabled;
import io.quarkus.opentelemetry.runtime.propagation.OpenTelemetryMpContextPropagationProvider;
import io.quarkus.smallrye.context.deployment.spi.ThreadContextProviderBuildItem;

@BuildSteps(onlyIf = OpenTelemetryEnabled.class)
public class OpenTelemetryMpContextPropagationProcessor {

@BuildStep
void registerOpenTelemetryThreadProvider(
BuildProducer<ThreadContextProviderBuildItem> threadContextProvider) {
threadContextProvider.produce(new ThreadContextProviderBuildItem(OpenTelemetryMpContextPropagationProvider.class));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package io.quarkus.opentelemetry.deployment.propagation;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;

import org.eclipse.microprofile.context.ThreadContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.opentelemetry.api.trace.Span;
import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;

public class OpenTelemetryMpContextPropagationTest {

@RegisterExtension
static final QuarkusUnitTest unitTest = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar
.addClass(OpenTelemetryMpContextPropagationTest.TestResource.class));

@Test
void testOpenTelemetryContextPropagationWithCustomExecutorAndThreadContextProvider() {
String message = RestAssured.when()
.get("/helloWithContextPropagation").then()
.statusCode(200)
.extract().asString();
assertTrue(message.startsWith("Hello/"));
String[] traceIds = message.split("/")[1].split("-");
assertEquals(2, traceIds.length);
assertEquals(traceIds[0], traceIds[1]);
}

@ApplicationScoped
@Path("/")
public static class TestResource {

private final ExecutorService customExecutorService;

private final ThreadContext threadContext;

@Inject
TestResource(ThreadContext threadContext) {
this.customExecutorService = Executors.newWorkStealingPool();
this.threadContext = threadContext;
}

@GET
@Path("/helloWithContextPropagation")
public CompletionStage<String> helloWithCustomExecutor() {
String message = "Hello/" + Span.current().getSpanContext().getTraceId();
return this.threadContext
.withContextCapture(CompletableFuture.supplyAsync(
() -> message, this.customExecutorService))
.thenApplyAsync(msg -> msg + "-" + Span.current().getSpanContext().getTraceId());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package io.quarkus.opentelemetry.runtime.propagation;

import java.util.Map;

import org.eclipse.microprofile.context.spi.ThreadContextController;
import org.eclipse.microprofile.context.spi.ThreadContextProvider;
import org.eclipse.microprofile.context.spi.ThreadContextSnapshot;

import io.quarkus.opentelemetry.runtime.QuarkusContextStorage;

public class OpenTelemetryMpContextPropagationProvider implements ThreadContextProvider {

@Override
public ThreadContextSnapshot currentContext(Map<String, String> props) {

io.opentelemetry.context.Context context = QuarkusContextStorage.INSTANCE.current();

// Use anonymous classes instad of lambdas for the native image
return new ThreadContextSnapshot() {

@Override
public ThreadContextController begin() {
io.opentelemetry.context.Context currentContext = QuarkusContextStorage.INSTANCE.current();
if (context != null) {
QuarkusContextStorage.INSTANCE.attach(context);
return new ThreadContextController() {
@Override
public void endContext() throws IllegalStateException {
QuarkusContextStorage.INSTANCE.attach(currentContext);
}
};
}
return new ThreadContextController() {
@Override
public void endContext() throws IllegalStateException {
// nothing to do
}
};
}

};
}

@Override
public ThreadContextSnapshot clearedContext(Map<String, String> props) {
// Use anonymous classes instad of lambdas for the native image
return new ThreadContextSnapshot() {
@Override
public ThreadContextController begin() {
return new ThreadContextController() {
@Override
public void endContext() throws IllegalStateException {
// nothring to do
}
};
}
};
}

@Override
public String getThreadContextType() {
return "OpenTelemetry";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,9 @@ void testWithSpan() throws InterruptedException {
assertTrue(Jobs.latch.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.nonBlockingLatch.await(5, TimeUnit.SECONDS));

// assert that different spand ids were used
// assert that different span ids were used
assertTrue(Jobs.spanIds.stream().collect(Collectors.toSet()).size() >= 2);

// assert that non-blocing scheduled methods are not supported
// when the WithSpanInterceptor is fixed and this test fails we should update the assertion and update the docs
assertTrue(Jobs.nonBlockingSpanIds.stream().collect(Collectors.toSet()).size() == 1);

assertTrue(Jobs.nonBlockingSpanIds.stream().collect(Collectors.toSet()).size() >= 2);
}

static class Jobs {
Expand Down

0 comments on commit 8c95e20

Please sign in to comment.