diff --git a/.github/native-tests.json b/.github/native-tests.json index fbd8e62dbe16a..f44c910369ebd 100644 --- a/.github/native-tests.json +++ b/.github/native-tests.json @@ -117,7 +117,7 @@ { "category": "Misc4", "timeout": 130, - "test-modules": "picocli-native, gradle, micrometer-mp-metrics, micrometer-prometheus, logging-json, jaxp, jaxb, opentelemetry, opentelemetry-jdbc-instrumentation, opentelemetry-redis-instrumentation, web-dependency-locator", + "test-modules": "picocli-native, gradle, micrometer-mp-metrics, micrometer-prometheus, logging-json, jaxp, jaxb, opentelemetry, opentelemetry-jdbc-instrumentation, opentelemetry-mongodb-client-instrumentation, opentelemetry-redis-instrumentation, web-dependency-locator", "os-name": "ubuntu-latest" }, { diff --git a/docs/src/main/asciidoc/mongodb.adoc b/docs/src/main/asciidoc/mongodb.adoc index 6093e86a9af16..2e0b50bfa0128 100644 --- a/docs/src/main/asciidoc/mongodb.adoc +++ b/docs/src/main/asciidoc/mongodb.adoc @@ -616,6 +616,18 @@ This behavior must first be enabled by setting the `quarkus.mongodb.metrics.enab So when you access the `/q/metrics` endpoint of your application you will have information about the connection pool status. When using xref:smallrye-metrics.adoc[SmallRye Metrics], connection pool metrics will be available under the `vendor` scope. + +== Tracing + +To use tracing with MongoDB, you need to add the xref:opentelemetry.adoc[`quarkus-opentelemetry`] extension to your project. + +Even with all the tracing infrastructure in place the mongodb tracing is not enabled by default, and you need to enable it by setting this property: +[source, properties] +---- +# enable tracing +quarkus.mongodb.tracing.enabled=true +---- + == Testing helpers xref:#dev-services[Dev Services for MongoDB] is your best option to start a MongoDB database for your unit tests. diff --git a/docs/src/main/asciidoc/opentelemetry.adoc b/docs/src/main/asciidoc/opentelemetry.adoc index 6f08bda91c7ab..974ec37b39b07 100644 --- a/docs/src/main/asciidoc/opentelemetry.adoc +++ b/docs/src/main/asciidoc/opentelemetry.adoc @@ -684,6 +684,7 @@ Additional exporters will be available in the Quarkiverse https://docs.quarkiver * https://quarkus.io/guides/resteasy-client[`quarkus-resteasy-client`] * https://quarkus.io/guides/scheduler[`quarkus-scheduler`] * https://quarkus.io/guides/smallrye-graphql[`quarkus-smallrye-graphql`] +* https://quarkus.io/extensions/io.quarkus/quarkus-mongodb-client[`quarkus-mongodb-client`] * https://quarkus.io/extensions/io.quarkus/quarkus-messaging[`quarkus-messaging`] ** AMQP 1.0 ** RabbitMQ diff --git a/extensions/mongodb-client/deployment/pom.xml b/extensions/mongodb-client/deployment/pom.xml index 0c50568794612..ad80149825900 100644 --- a/extensions/mongodb-client/deployment/pom.xml +++ b/extensions/mongodb-client/deployment/pom.xml @@ -34,6 +34,11 @@ io.quarkus quarkus-mongodb-client + + io.quarkus + quarkus-opentelemetry-deployment + true + org.testcontainers mongodb diff --git a/extensions/mongodb-client/deployment/src/main/java/io/quarkus/mongodb/deployment/ContextProviderBuildItem.java b/extensions/mongodb-client/deployment/src/main/java/io/quarkus/mongodb/deployment/ContextProviderBuildItem.java new file mode 100644 index 0000000000000..162e75418e5b7 --- /dev/null +++ b/extensions/mongodb-client/deployment/src/main/java/io/quarkus/mongodb/deployment/ContextProviderBuildItem.java @@ -0,0 +1,23 @@ +package io.quarkus.mongodb.deployment; + +import java.util.List; + +import com.mongodb.reactivestreams.client.ReactiveContextProvider; + +import io.quarkus.builder.item.SimpleBuildItem; + +/** + * Register additional {@link ReactiveContextProvider}s for the MongoDB clients. + */ +public final class ContextProviderBuildItem extends SimpleBuildItem { + + private final List classNames; + + public ContextProviderBuildItem(List classNames) { + this.classNames = classNames == null ? List.of() : classNames; + } + + public List getContextProviderClassNames() { + return classNames; + } +} diff --git a/extensions/mongodb-client/deployment/src/main/java/io/quarkus/mongodb/deployment/MongoClientProcessor.java b/extensions/mongodb-client/deployment/src/main/java/io/quarkus/mongodb/deployment/MongoClientProcessor.java index 767e3832ce4df..9abfbcde5d5f7 100644 --- a/extensions/mongodb-client/deployment/src/main/java/io/quarkus/mongodb/deployment/MongoClientProcessor.java +++ b/extensions/mongodb-client/deployment/src/main/java/io/quarkus/mongodb/deployment/MongoClientProcessor.java @@ -32,6 +32,7 @@ import com.mongodb.client.model.changestream.UpdateDescription; import com.mongodb.event.CommandListener; import com.mongodb.event.ConnectionPoolListener; +import com.mongodb.reactivestreams.client.ReactiveContextProvider; import com.mongodb.spi.dns.DnsClientProvider; import io.quarkus.arc.deployment.AdditionalBeanBuildItem; @@ -69,6 +70,7 @@ import io.quarkus.mongodb.runtime.MongoClientRecorder; import io.quarkus.mongodb.runtime.MongoClientSupport; import io.quarkus.mongodb.runtime.MongoClients; +import io.quarkus.mongodb.runtime.MongoReactiveContextProvider; import io.quarkus.mongodb.runtime.MongoServiceBindingConverter; import io.quarkus.mongodb.runtime.MongodbConfig; import io.quarkus.mongodb.runtime.dns.MongoDnsClient; @@ -113,9 +115,11 @@ AdditionalIndexedClassesBuildItem includeDnsTypesToIndex() { } @BuildStep - AdditionalIndexedClassesBuildItem includeDnsTypesToIndex(MongoClientBuildTimeConfig buildTimeConfig) { + AdditionalIndexedClassesBuildItem includeMongoCommandListener(MongoClientBuildTimeConfig buildTimeConfig) { if (buildTimeConfig.tracingEnabled) { - return new AdditionalIndexedClassesBuildItem(MongoTracingCommandListener.class.getName()); + return new AdditionalIndexedClassesBuildItem( + MongoTracingCommandListener.class.getName(), + MongoReactiveContextProvider.class.getName()); } return new AdditionalIndexedClassesBuildItem(); } @@ -161,15 +165,27 @@ CommandListenerBuildItem collectCommandListeners(CombinedIndexBuildItem indexBui return new CommandListenerBuildItem(names); } + @BuildStep + ContextProviderBuildItem collectContextProviders(CombinedIndexBuildItem indexBuildItem) { + Collection contextProviders = indexBuildItem.getIndex() + .getAllKnownImplementors(DotName.createSimple(ReactiveContextProvider.class.getName())); + List names = contextProviders.stream() + .map(ci -> ci.name().toString()) + .collect(Collectors.toList()); + return new ContextProviderBuildItem(names); + } + @BuildStep List addExtensionPointsToNative(CodecProviderBuildItem codecProviders, PropertyCodecProviderBuildItem propertyCodecProviders, BsonDiscriminatorBuildItem bsonDiscriminators, - CommandListenerBuildItem commandListeners) { + CommandListenerBuildItem commandListeners, + ContextProviderBuildItem contextProviders) { List reflectiveClassNames = new ArrayList<>(); reflectiveClassNames.addAll(codecProviders.getCodecProviderClassNames()); reflectiveClassNames.addAll(propertyCodecProviders.getPropertyCodecProviderClassNames()); reflectiveClassNames.addAll(bsonDiscriminators.getBsonDiscriminatorClassNames()); reflectiveClassNames.addAll(commandListeners.getCommandListenerClassNames()); + reflectiveClassNames.addAll(contextProviders.getContextProviderClassNames()); List reflectiveClass = reflectiveClassNames.stream() .map(s -> ReflectiveClassBuildItem.builder(s).methods().build()) @@ -256,6 +272,7 @@ void build( PropertyCodecProviderBuildItem propertyCodecProvider, BsonDiscriminatorBuildItem bsonDiscriminator, CommandListenerBuildItem commandListener, + ContextProviderBuildItem contextProvider, List connectionPoolListenerProvider, BuildProducer additionalBeanBuildItemProducer, BuildProducer syntheticBeanBuildItemBuildProducer) { @@ -277,6 +294,9 @@ void build( for (String name : commandListener.getCommandListenerClassNames()) { additionalBeansBuilder.addBeanClass(name); } + for (String name : contextProvider.getContextProviderClassNames()) { + additionalBeansBuilder.addBeanClass(name); + } additionalBeanBuildItemProducer.produce(additionalBeansBuilder.build()); // create MongoClientSupport as a synthetic bean as it's used in AbstractMongoClientProducer diff --git a/extensions/mongodb-client/deployment/src/test/java/io/quarkus/mongodb/MockReactiveContextProvider.java b/extensions/mongodb-client/deployment/src/test/java/io/quarkus/mongodb/MockReactiveContextProvider.java new file mode 100644 index 0000000000000..7f85aa84d3d59 --- /dev/null +++ b/extensions/mongodb-client/deployment/src/test/java/io/quarkus/mongodb/MockReactiveContextProvider.java @@ -0,0 +1,22 @@ +package io.quarkus.mongodb; + +import java.util.ArrayList; +import java.util.List; + +import org.reactivestreams.Subscriber; + +import com.mongodb.RequestContext; +import com.mongodb.reactivestreams.client.ReactiveContextProvider; + +import io.quarkus.mongodb.runtime.MongoRequestContext; + +public class MockReactiveContextProvider implements ReactiveContextProvider { + + public static final List EVENTS = new ArrayList<>(); + + @Override + public RequestContext getContext(Subscriber subscriber) { + EVENTS.add(MongoRequestContext.class.getName()); + return new MongoRequestContext(null); + } +} diff --git a/extensions/mongodb-client/deployment/src/test/java/io/quarkus/mongodb/MongoTracingEnabled.java b/extensions/mongodb-client/deployment/src/test/java/io/quarkus/mongodb/MongoTracingEnabled.java new file mode 100644 index 0000000000000..e1a105f7b0d3b --- /dev/null +++ b/extensions/mongodb-client/deployment/src/test/java/io/quarkus/mongodb/MongoTracingEnabled.java @@ -0,0 +1,49 @@ +package io.quarkus.mongodb; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.time.Duration; + +import jakarta.inject.Inject; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.mongodb.reactive.ReactiveMongoClient; +import io.quarkus.test.QuarkusUnitTest; + +public class MongoTracingEnabled extends MongoTestBase { + + @Inject + ReactiveMongoClient reactiveClient; + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer( + () -> ShrinkWrap.create(JavaArchive.class) + .addClasses(MongoTestBase.class, MockReactiveContextProvider.class, MockCommandListener.class)) + .withConfigurationResource("application-tracing-mongoclient.properties"); + + @AfterEach + void cleanup() { + if (reactiveClient != null) { + reactiveClient.close(); + } + } + + @Test + void invokeReactiveContextProvider() { + String dbNames = reactiveClient.listDatabaseNames().toUni().await().atMost(Duration.ofSeconds(30L)); + assertThat(dbNames).as("expect db names available").isNotBlank(); + await().atMost(Duration.ofSeconds(30L)).untilAsserted( + () -> assertThat(MockReactiveContextProvider.EVENTS) + .as("reactive context provider must be called") + .isNotEmpty()); + assertThat(MockCommandListener.EVENTS).isNotEmpty(); + + } + +} diff --git a/extensions/mongodb-client/deployment/src/test/java/io/quarkus/mongodb/MongoTracingNotEnabledTest.java b/extensions/mongodb-client/deployment/src/test/java/io/quarkus/mongodb/MongoTracingNotEnabledTest.java new file mode 100644 index 0000000000000..09ac8a61f34dc --- /dev/null +++ b/extensions/mongodb-client/deployment/src/test/java/io/quarkus/mongodb/MongoTracingNotEnabledTest.java @@ -0,0 +1,42 @@ +package io.quarkus.mongodb; + +import static org.assertj.core.api.Assertions.assertThat; + +import jakarta.inject.Inject; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.mongodb.client.MongoClient; + +import io.quarkus.test.QuarkusUnitTest; + +public class MongoTracingNotEnabledTest extends MongoTestBase { + + @Inject + MongoClient client; + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .setArchiveProducer( + () -> ShrinkWrap.create(JavaArchive.class).addClasses(MongoTestBase.class, + MockReactiveContextProvider.class)) + .withConfigurationResource("default-mongoclient.properties"); + + @AfterEach + void cleanup() { + if (client != null) { + client.close(); + } + } + + @Test + void contextProviderMustNotBeCalledIfNoOpenTelemetryIsAvailable() { + assertThat(client.listDatabaseNames().first()).isNotEmpty(); + assertThat(MockReactiveContextProvider.EVENTS).isEmpty(); + } + +} diff --git a/extensions/mongodb-client/deployment/src/test/java/io/quarkus/mongodb/deployment/ContextProviderBuildItemTest.java b/extensions/mongodb-client/deployment/src/test/java/io/quarkus/mongodb/deployment/ContextProviderBuildItemTest.java new file mode 100644 index 0000000000000..d813fda475164 --- /dev/null +++ b/extensions/mongodb-client/deployment/src/test/java/io/quarkus/mongodb/deployment/ContextProviderBuildItemTest.java @@ -0,0 +1,28 @@ +package io.quarkus.mongodb.deployment; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; + +import org.junit.jupiter.api.Test; + +class ContextProviderBuildItemTest { + + @Test + void getContextProviderClassNames() { + ContextProviderBuildItem item = new ContextProviderBuildItem(List.of("foo.bar")); + assertThat(item.getContextProviderClassNames()) + .hasSize(1) + .first() + .isEqualTo("foo.bar"); + } + + @Test + void emptyOrNull() { + ContextProviderBuildItem withNull = new ContextProviderBuildItem(null); + assertThat(withNull.getContextProviderClassNames()).isEmpty(); + + ContextProviderBuildItem empty = new ContextProviderBuildItem(List.of()); + assertThat(empty.getContextProviderClassNames()).isEmpty(); + } +} diff --git a/extensions/mongodb-client/deployment/src/test/resources/application-tracing-mongoclient.properties b/extensions/mongodb-client/deployment/src/test/resources/application-tracing-mongoclient.properties new file mode 100644 index 0000000000000..32e6f84c16416 --- /dev/null +++ b/extensions/mongodb-client/deployment/src/test/resources/application-tracing-mongoclient.properties @@ -0,0 +1,3 @@ +quarkus.mongodb.connection-string=mongodb://127.0.0.1:27018 +quarkus.mongodb.tracing.enabled=true + diff --git a/extensions/mongodb-client/runtime/pom.xml b/extensions/mongodb-client/runtime/pom.xml index 408d5efd30210..cbdc90f2750c3 100644 --- a/extensions/mongodb-client/runtime/pom.xml +++ b/extensions/mongodb-client/runtime/pom.xml @@ -58,6 +58,11 @@ quarkus-kubernetes-service-binding true + + io.quarkus + quarkus-opentelemetry + true + diff --git a/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/runtime/MongoClients.java b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/runtime/MongoClients.java index cc0b938767c99..125f978669dfb 100644 --- a/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/runtime/MongoClients.java +++ b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/runtime/MongoClients.java @@ -56,6 +56,7 @@ import com.mongodb.connection.SslSettings; import com.mongodb.event.CommandListener; import com.mongodb.event.ConnectionPoolListener; +import com.mongodb.reactivestreams.client.ReactiveContextProvider; import io.quarkus.arc.Arc; import io.quarkus.arc.InstanceHandle; @@ -86,18 +87,21 @@ public class MongoClients { private final Map mongoclients = new HashMap<>(); private final Map reactiveMongoClients = new HashMap<>(); + private final Instance reactiveContextProviders; private final Instance customizers; public MongoClients(MongodbConfig mongodbConfig, MongoClientSupport mongoClientSupport, Instance codecProviders, Instance propertyCodecProviders, Instance commandListeners, + Instance reactiveContextProviders, @Any Instance customizers) { this.mongodbConfig = mongodbConfig; this.mongoClientSupport = mongoClientSupport; this.codecProviders = codecProviders; this.propertyCodecProviders = propertyCodecProviders; this.commandListeners = commandListeners; + this.reactiveContextProviders = reactiveContextProviders; this.customizers = customizers; try { @@ -121,7 +125,8 @@ public MongoClients(MongodbConfig mongodbConfig, MongoClientSupport mongoClientS } public MongoClient createMongoClient(String clientName) throws MongoException { - MongoClientSettings mongoConfiguration = createMongoConfiguration(clientName, getMatchingMongoClientConfig(clientName)); + MongoClientSettings mongoConfiguration = createMongoConfiguration(clientName, getMatchingMongoClientConfig(clientName), + false); MongoClient client = com.mongodb.client.MongoClients.create(mongoConfiguration); mongoclients.put(clientName, client); return client; @@ -129,7 +134,8 @@ public MongoClient createMongoClient(String clientName) throws MongoException { public ReactiveMongoClient createReactiveMongoClient(String clientName) throws MongoException { - MongoClientSettings mongoConfiguration = createMongoConfiguration(clientName, getMatchingMongoClientConfig(clientName)); + MongoClientSettings mongoConfiguration = createMongoConfiguration(clientName, getMatchingMongoClientConfig(clientName), + true); com.mongodb.reactivestreams.client.MongoClient client = com.mongodb.reactivestreams.client.MongoClients .create(mongoConfiguration); ReactiveMongoClientImpl reactive = new ReactiveMongoClientImpl(client); @@ -254,7 +260,7 @@ public void apply(ServerSettings.Builder builder) { } } - private MongoClientSettings createMongoConfiguration(String name, MongoClientConfig config) { + private MongoClientSettings createMongoConfiguration(String name, MongoClientConfig config, boolean isReactive) { if (config == null) { throw new RuntimeException("mongo config is missing for creating mongo client."); } @@ -262,6 +268,10 @@ private MongoClientSettings createMongoConfiguration(String name, MongoClientCon MongoClientSettings.Builder settings = MongoClientSettings.builder(); + if (isReactive) { + reactiveContextProviders.stream().findAny().ifPresent(settings::contextProvider); + } + ConnectionString connectionString; Optional maybeConnectionString = config.connectionString; if (maybeConnectionString.isPresent()) { diff --git a/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/runtime/MongoReactiveContextProvider.java b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/runtime/MongoReactiveContextProvider.java new file mode 100644 index 0000000000000..4061c18e2007e --- /dev/null +++ b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/runtime/MongoReactiveContextProvider.java @@ -0,0 +1,17 @@ +package io.quarkus.mongodb.runtime; + +import org.reactivestreams.Subscriber; + +import com.mongodb.RequestContext; +import com.mongodb.reactivestreams.client.ReactiveContextProvider; + +import io.opentelemetry.context.Context; + +public class MongoReactiveContextProvider implements ReactiveContextProvider { + + @Override + public RequestContext getContext(Subscriber subscriber) { + return new MongoRequestContext(Context.current()); + } + +} diff --git a/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/runtime/MongoRequestContext.java b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/runtime/MongoRequestContext.java new file mode 100644 index 0000000000000..cd135be5973f9 --- /dev/null +++ b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/runtime/MongoRequestContext.java @@ -0,0 +1,57 @@ +package io.quarkus.mongodb.runtime; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Stream; + +import com.mongodb.RequestContext; + +import io.opentelemetry.context.Context; + +@SuppressWarnings("unchecked") +public class MongoRequestContext implements RequestContext { + public static final String OTEL_CONTEXT_KEY = "otel.context.current"; + private final Map valuesMap; + + public MongoRequestContext(Context currentContext) { + valuesMap = new ConcurrentHashMap<>(); + if (currentContext != null) { + valuesMap.put(OTEL_CONTEXT_KEY, currentContext); + } + } + + @Override + public T get(Object key) { + return (T) valuesMap.get(key); + } + + @Override + public boolean hasKey(Object key) { + return valuesMap.containsKey(key); + } + + @Override + public boolean isEmpty() { + return valuesMap.isEmpty(); + } + + @Override + public void put(Object key, Object value) { + valuesMap.put(key, value); + } + + @Override + public void delete(Object key) { + valuesMap.remove(key); + } + + @Override + public int size() { + return valuesMap.size(); + } + + @Override + public Stream> stream() { + return valuesMap.entrySet().stream(); + } +} diff --git a/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/tracing/MongoTracingCommandListener.java b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/tracing/MongoTracingCommandListener.java index 5e0299055ba27..c963e9477e8ee 100644 --- a/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/tracing/MongoTracingCommandListener.java +++ b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/tracing/MongoTracingCommandListener.java @@ -1,15 +1,20 @@ package io.quarkus.mongodb.tracing; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nullable; import jakarta.inject.Inject; +import org.bson.BsonDocument; import org.jboss.logging.Logger; -import com.mongodb.event.*; +import com.mongodb.event.CommandFailedEvent; +import com.mongodb.event.CommandListener; +import com.mongodb.event.CommandStartedEvent; +import com.mongodb.event.CommandSucceededEvent; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.AttributesBuilder; @@ -18,21 +23,25 @@ import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; +import io.quarkus.mongodb.runtime.MongoRequestContext; public class MongoTracingCommandListener implements CommandListener { private static final org.jboss.logging.Logger LOGGER = Logger.getLogger(MongoTracingCommandListener.class); private static final String KEY = "mongodb.command"; private final Map requestMap; - private final Instrumenter instrumenter; + private final Instrumenter instrumenter; - private record ContextEvent(Context context, CommandStartedEvent commandEvent) { + private record MongoCommand(String name, BsonDocument command) { + } + + private record ContextEvent(Context context, MongoCommand command) { } @Inject public MongoTracingCommandListener(OpenTelemetry openTelemetry) { requestMap = new ConcurrentHashMap<>(); - SpanNameExtractor spanNameExtractor = CommandEvent::getCommandName; - instrumenter = Instrumenter. builder( + SpanNameExtractor spanNameExtractor = MongoCommand::name; + instrumenter = Instrumenter. builder( openTelemetry, "quarkus-mongodb-client", spanNameExtractor) .addAttributesExtractor(new CommandEventAttrExtractor()) .buildInstrumenter(SpanKindExtractor.alwaysClient()); @@ -43,10 +52,17 @@ public MongoTracingCommandListener(OpenTelemetry openTelemetry) { public void commandStarted(CommandStartedEvent event) { LOGGER.tracef("commandStarted event %s", event.getCommandName()); - Context parentContext = Context.current(); - if (instrumenter.shouldStart(parentContext, event)) { - Context context = instrumenter.start(parentContext, event); - requestMap.put(event.getRequestId(), new ContextEvent(context, event)); + Context parentContext = Optional.ofNullable(event.getRequestContext()) + .map(rc -> { + Context ctx = rc.get(MongoRequestContext.OTEL_CONTEXT_KEY); + rc.delete(MongoRequestContext.OTEL_CONTEXT_KEY); + return ctx; + }) + .orElseGet(Context::current); + var mongoCommand = new MongoCommand(event.getCommandName(), event.getCommand()); + if (instrumenter.shouldStart(parentContext, mongoCommand)) { + Context context = instrumenter.start(parentContext, mongoCommand); + requestMap.put(event.getRequestId(), new ContextEvent(context, mongoCommand)); } } @@ -55,7 +71,7 @@ public void commandSucceeded(CommandSucceededEvent event) { LOGGER.tracef("commandSucceeded event %s", event.getCommandName()); ContextEvent contextEvent = requestMap.remove(event.getRequestId()); if (contextEvent != null) { - instrumenter.end(contextEvent.context(), contextEvent.commandEvent(), null, null); + instrumenter.end(contextEvent.context(), contextEvent.command(), null, null); } } @@ -66,27 +82,26 @@ public void commandFailed(CommandFailedEvent event) { if (contextEvent != null) { instrumenter.end( contextEvent.context(), - contextEvent.commandEvent(), + contextEvent.command(), null, event.getThrowable()); } } - private static class CommandEventAttrExtractor implements AttributesExtractor { + private static class CommandEventAttrExtractor implements AttributesExtractor { + @Override - public void onStart(AttributesBuilder attributesBuilder, - Context context, - CommandStartedEvent commandStartedEvent) { - attributesBuilder.put(KEY, commandStartedEvent.getCommand().toJson()); + public void onStart(AttributesBuilder attributesBuilder, Context context, MongoCommand command) { + attributesBuilder.put(KEY, command.command().toJson()); } @Override - public void onEnd(AttributesBuilder attributesBuilder, + public void onEnd( + AttributesBuilder attributesBuilder, Context context, - CommandStartedEvent commandStartedEvent, + MongoCommand command, @Nullable Void unused, @Nullable Throwable throwable) { - } } } diff --git a/extensions/mongodb-client/runtime/src/test/java/io/quarkus/mongodb/runtime/MongoReactiveContextProviderTest.java b/extensions/mongodb-client/runtime/src/test/java/io/quarkus/mongodb/runtime/MongoReactiveContextProviderTest.java new file mode 100644 index 0000000000000..d02b05a95fecd --- /dev/null +++ b/extensions/mongodb-client/runtime/src/test/java/io/quarkus/mongodb/runtime/MongoReactiveContextProviderTest.java @@ -0,0 +1,16 @@ +package io.quarkus.mongodb.runtime; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +import org.junit.jupiter.api.Test; + +class MongoReactiveContextProviderTest { + + @Test + void getContext() { + var provider = new MongoReactiveContextProvider(); + assertThat(provider.getContext(mock())).isInstanceOf(MongoRequestContext.class); + } + +} diff --git a/extensions/mongodb-client/runtime/src/test/java/io/quarkus/mongodb/runtime/MongoRequestContextTest.java b/extensions/mongodb-client/runtime/src/test/java/io/quarkus/mongodb/runtime/MongoRequestContextTest.java new file mode 100644 index 0000000000000..6d9baf69dda2f --- /dev/null +++ b/extensions/mongodb-client/runtime/src/test/java/io/quarkus/mongodb/runtime/MongoRequestContextTest.java @@ -0,0 +1,77 @@ +package io.quarkus.mongodb.runtime; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import org.junit.jupiter.api.Test; + +import io.opentelemetry.context.Context; + +public class MongoRequestContextTest { + + @Test + void nonNulContext() { + Context context = Context.current(); + MongoRequestContext requestContext = new MongoRequestContext(context); + + assertThat((Context) requestContext.get(MongoRequestContext.OTEL_CONTEXT_KEY)).isSameAs(context); + assertThat(requestContext.isEmpty()).isFalse(); + assertThat(requestContext.size()).isOne(); + } + + @Test + void withNullContext() { + MongoRequestContext requestContext = new MongoRequestContext(null); + + assertThat((Context) requestContext.get(MongoRequestContext.OTEL_CONTEXT_KEY)).isNull(); + assertThat(requestContext.isEmpty()).isTrue(); + assertThat(requestContext.size()).isZero(); + } + + @Test + void testPutAndGet() { + MongoRequestContext requestContext = new MongoRequestContext(null); + String key = "testKey"; + String value = "testValue"; + + requestContext.put(key, value); + assertThat((String) requestContext.get(key)).isEqualTo(value); + assertThat(requestContext.hasKey(key)).isTrue(); + assertThat(requestContext.hasKey("tmp")).isFalse(); + } + + @Test + void testDelete() { + MongoRequestContext requestContext = new MongoRequestContext(null); + String key = "testKey"; + String value = "testValue"; + + requestContext.put(key, value); + assertThat((String) requestContext.get(key)).isEqualTo(value); + assertThat(requestContext.size()).isOne(); + + requestContext.delete(key); + assertThat((String) requestContext.get(key)).isNull(); + assertThat(requestContext.hasKey(key)).isFalse(); + assertThat(requestContext.size()).isZero(); + } + + @Test + void testStream() { + MongoRequestContext requestContext = new MongoRequestContext(null); + requestContext.put("testKey1", "testValue1"); + requestContext.put("testKey2", "testValue2"); + + Map map = requestContext.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + assertThat(map).containsExactlyInAnyOrderEntriesOf(new ConcurrentHashMap<>() { + { + put("testKey1", "testValue1"); + put("testKey2", "testValue2"); + } + }); + } +} diff --git a/extensions/mongodb-client/runtime/src/test/java/io/quarkus/mongodb/tracing/MongoTracingCommandListenerTest.java b/extensions/mongodb-client/runtime/src/test/java/io/quarkus/mongodb/tracing/MongoTracingCommandListenerTest.java index 1dc05785e8c01..b7434fcb43d25 100644 --- a/extensions/mongodb-client/runtime/src/test/java/io/quarkus/mongodb/tracing/MongoTracingCommandListenerTest.java +++ b/extensions/mongodb-client/runtime/src/test/java/io/quarkus/mongodb/tracing/MongoTracingCommandListenerTest.java @@ -1,11 +1,13 @@ package io.quarkus.mongodb.tracing; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatNoException; import org.bson.BsonDocument; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import com.mongodb.RequestContext; import com.mongodb.ServerAddress; import com.mongodb.connection.ClusterId; import com.mongodb.connection.ConnectionDescription; @@ -15,6 +17,8 @@ import com.mongodb.event.CommandSucceededEvent; import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.context.Context; +import io.quarkus.mongodb.runtime.MongoRequestContext; class MongoTracingCommandListenerTest { private ConnectionDescription connDescr; @@ -51,6 +55,25 @@ void commandStarted() { assertThatNoException().isThrownBy(() -> listener.commandSucceeded(successEvent)); } + @Test + void mustRemoveOtelContext() { + RequestContext requestContext = new MongoRequestContext(Context.current()); + assertThat((Context) requestContext.get(MongoRequestContext.OTEL_CONTEXT_KEY)).isNotNull(); + + var startEvent = new CommandStartedEvent( + requestContext, + 1L, + 10, + connDescr, + "db", + "find", + command); + listener.commandStarted(startEvent); + assertThat((Context) requestContext.get(MongoRequestContext.OTEL_CONTEXT_KEY)) + .as("Must remove otel context from request context") + .isNull(); + } + @Test void commandSucceeded() { CommandSucceededEvent cmd = new CommandSucceededEvent(null, diff --git a/integration-tests/opentelemetry-mongodb-client-instrumentation/pom.xml b/integration-tests/opentelemetry-mongodb-client-instrumentation/pom.xml index af52788c2d49f..4e209239015a9 100644 --- a/integration-tests/opentelemetry-mongodb-client-instrumentation/pom.xml +++ b/integration-tests/opentelemetry-mongodb-client-instrumentation/pom.xml @@ -119,6 +119,18 @@ + + maven-surefire-plugin + + true + + + + maven-failsafe-plugin + + true + + io.quarkus quarkus-maven-plugin @@ -177,5 +189,30 @@ + + + test-mongodb + + + test-containers + + + + + + maven-surefire-plugin + + false + + + + maven-failsafe-plugin + + false + + + + + diff --git a/integration-tests/opentelemetry-mongodb-client-instrumentation/src/main/java/io/quarkus/it/opentelemetry/BookResource.java b/integration-tests/opentelemetry-mongodb-client-instrumentation/src/main/java/io/quarkus/it/opentelemetry/BookResource.java index ed667a4ecb5ee..131d7cce47f0f 100644 --- a/integration-tests/opentelemetry-mongodb-client-instrumentation/src/main/java/io/quarkus/it/opentelemetry/BookResource.java +++ b/integration-tests/opentelemetry-mongodb-client-instrumentation/src/main/java/io/quarkus/it/opentelemetry/BookResource.java @@ -9,6 +9,8 @@ import jakarta.ws.rs.*; import jakarta.ws.rs.core.Response; +import org.bson.BsonDocument; +import org.bson.BsonDouble; import org.bson.Document; import com.mongodb.WriteConcern; @@ -38,6 +40,10 @@ public Response clearBooks() { @GET public List getBooks() { FindIterable iterable = getCollection().find(); + return getBooks(iterable); + } + + private List getBooks(FindIterable iterable) { List books = new ArrayList<>(); WriteConcern writeConcern = client.getDatabase("temp").getWriteConcern(); // force a test failure if we're not getting the correct, and correctly configured named mongodb client @@ -49,6 +55,15 @@ public List getBooks() { return books; } + @GET + @Path("/invalid") + public List error() { + BsonDocument query = new BsonDocument(); + query.put("$invalidop", new BsonDouble(0d)); + FindIterable iterable = getCollection().find(query); + return getBooks(iterable); + } + @POST public Response addBook(Book book) { getCollection().insertOne(book); diff --git a/integration-tests/opentelemetry-mongodb-client-instrumentation/src/main/java/io/quarkus/it/opentelemetry/Config.java b/integration-tests/opentelemetry-mongodb-client-instrumentation/src/main/java/io/quarkus/it/opentelemetry/Config.java new file mode 100644 index 0000000000000..bbd8e467c6081 --- /dev/null +++ b/integration-tests/opentelemetry-mongodb-client-instrumentation/src/main/java/io/quarkus/it/opentelemetry/Config.java @@ -0,0 +1,15 @@ +package io.quarkus.it.opentelemetry; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Produces; + +import io.quarkus.mongodb.runtime.MongoClientCustomizer; + +@ApplicationScoped +public class Config { + @Produces + public MongoClientCustomizer mongoClientCustomizer() { + return builder -> builder.applicationName("opentelemetry-mongodb-integration-test"); + } + +} diff --git a/integration-tests/opentelemetry-mongodb-client-instrumentation/src/main/java/io/quarkus/it/opentelemetry/ReactiveBookResource.java b/integration-tests/opentelemetry-mongodb-client-instrumentation/src/main/java/io/quarkus/it/opentelemetry/ReactiveBookResource.java index 12c8023185335..d54a19a3403c2 100644 --- a/integration-tests/opentelemetry-mongodb-client-instrumentation/src/main/java/io/quarkus/it/opentelemetry/ReactiveBookResource.java +++ b/integration-tests/opentelemetry-mongodb-client-instrumentation/src/main/java/io/quarkus/it/opentelemetry/ReactiveBookResource.java @@ -7,15 +7,27 @@ import java.util.concurrent.CompletionStage; import jakarta.inject.Inject; -import jakarta.ws.rs.*; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.DELETE; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; +import org.bson.BsonDocument; +import org.bson.BsonDouble; import org.bson.Document; import io.quarkus.mongodb.reactive.ReactiveMongoClient; import io.quarkus.mongodb.reactive.ReactiveMongoCollection; +import io.smallrye.mutiny.Uni; @Path("/reactive-books") +@Consumes(MediaType.APPLICATION_JSON) +@Produces(MediaType.APPLICATION_JSON) public class ReactiveBookResource { @Inject @@ -38,6 +50,14 @@ public CompletionStage> getBooks() { return getCollection().find().collect().asList().subscribeAsCompletionStage(); } + @GET + @Path("/invalid") + public CompletionStage> getBooksError() { + BsonDocument query = new BsonDocument(); + query.put("$invalidop", new BsonDouble(0d)); + return getCollection().find(query).collect().asList().subscribeAsCompletionStage(); + } + @POST public CompletionStage addBook(Book book) { return getCollection().insertOne(book) @@ -53,4 +73,24 @@ public CompletionStage> getBooksByAuthor(@PathParam("author") String .subscribeAsCompletionStage(); } + @GET + @Path("/multiple-chain") + public Uni helloMultipleUsingChain() { + return getCountDocuments("Victor Hugo") + .chain(count1 -> getCountDocuments("Charles Baudelaire").map(count2 -> count1 + count2)); + } + + @GET + @Path("/multiple-combine") + public Uni helloMultipleUsingCombine() { + return Uni.combine().all().unis( + getCountDocuments("Victor Hugo"), + getCountDocuments("Charles Baudelaire")) + .with(Long::sum); + } + + private Uni getCountDocuments(String author) { + return getCollection().countDocuments(eq("author", author)); + } + } diff --git a/integration-tests/opentelemetry-mongodb-client-instrumentation/src/test/java/io/quarkus/it/opentelemetry/BookResourceTest.java b/integration-tests/opentelemetry-mongodb-client-instrumentation/src/test/java/io/quarkus/it/opentelemetry/BookResourceTest.java index ee2095f43fba0..59e74442f163f 100644 --- a/integration-tests/opentelemetry-mongodb-client-instrumentation/src/test/java/io/quarkus/it/opentelemetry/BookResourceTest.java +++ b/integration-tests/opentelemetry-mongodb-client-instrumentation/src/test/java/io/quarkus/it/opentelemetry/BookResourceTest.java @@ -44,29 +44,132 @@ void reset() { @Test void blockingClient() { testInsertBooks("/books"); - assertTraceAvailable("my-collection"); + reset(); + assertThat(get("/books").as(bookListType)).hasSize(3); + assertTraceAvailable(2, "my-collection"); + assertParentChild(2, "my-collection"); + } + + @Test + void blockingClientError() { + given() + .get("/books/invalid") + .then() + .assertThat() + .statusCode(500); + assertTraceAvailable(2, "my-collection", "$invalidop"); + assertParentChild(2, "my-collection"); } @Test void reactiveClient() { testInsertBooks("/reactive-books"); - assertTraceAvailable("my-reactive-collection"); + reset(); + assertThat(get("/reactive-books").as(bookListType)).hasSize(3); + assertTraceAvailable(2, "my-reactive-collection"); + assertParentChild(2, "my-reactive-collection"); + } + + @Test + void reactiveClientMultipleChain() { + testInsertBooks("/reactive-books"); + reset(); + assertThat(get("/reactive-books/multiple-chain").as(Long.class)).isEqualTo(3L); + assertTraceAvailable(3, "my-reactive-collection"); + assertParentChild(3, "my-reactive-collection"); + } + + @Test + void reactiveClientMultipleCombine() { + testInsertBooks("/reactive-books"); + reset(); + assertThat(get("/reactive-books/multiple-combine").as(Long.class)).isEqualTo(3L); + assertTraceAvailable(3, "my-reactive-collection"); + assertParentChild(3, "my-reactive-collection"); + } + + @Test + void reactiveClientParentChild() { + testInsertBooks("/reactive-books"); + reset(); + assertThat(get("/reactive-books").as(bookListType)).hasSize(3); + assertTraceAvailable(2, "my-reactive-collection"); + assertParentChild(2, "my-reactive-collection"); + } + + @Test + void reactiveClientError() { + given() + .get("/reactive-books/invalid") + .then() + .assertThat() + .statusCode(500); + assertTraceAvailable(2, "my-reactive-collection", "$invalidop"); + assertParentChild(2, "my-reactive-collection"); } - private void assertTraceAvailable(String dbCollectionName) { - await().atMost(Duration.ofSeconds(30L)).untilAsserted(() -> { - boolean traceAvailable = false; - for (Map spanData : getSpans()) { - if (spanData.get("attributes") instanceof Map attr) { - var cmd = (String) attr.get("mongodb.command"); - if (cmd != null) { - assertThat(cmd).contains(dbCollectionName, "books"); - traceAvailable = true; - } + private void assertTraceAvailable(int expectedNumOfSpans, String... commandPart) { + await().atMost(Duration.ofSeconds(10L)).until(() -> getSpans().size() == expectedNumOfSpans); + boolean found = false; + for (Map spanData : getSpans()) { + if (spanData.get("attributes") instanceof Map attr) { + var cmd = (String) attr.get("mongodb.command"); + if (cmd != null) { + assertThat(cmd).contains(commandPart).contains("books"); + found = true; } } - assertThat(traceAvailable).as("Mongodb statement was not traced.").isTrue(); - }); + } + assertThat(found).isTrue(); + } + + @SuppressWarnings("unchecked") + private void assertParentChild(int expectedNumOfSpans, String... commandPart) { + await().atMost(Duration.ofSeconds(10L)).until(() -> getSpans().size() == expectedNumOfSpans); + List> spans = getSpans(); + var traceIds = spans.stream().map(data -> data.get("traceId")).toList(); + String traceId = (String) traceIds.get(0); + assertThat(traceId).isNotBlank(); + assertThat(traceIds).as("All spans must have the same trace id").containsOnly(traceId); + + var rootSpanId = getRootSpan(spans); + assertThat(rootSpanId).isNotBlank(); + + for (Map spanData : spans) { + assertThat(spanData).as("span must have trace id").containsEntry("traceId", traceId); + if (spanData.get("attributes") instanceof Map attr) { + var cmd = (String) attr.get("mongodb.command"); + if (cmd != null) { + assertThat(cmd).contains(commandPart).contains("books"); + var parentSpanContext = (Map) spanData.get("parentSpanContext"); + assertThat((String) spanData.get("traceId")).isNotBlank().isEqualTo(traceId); + assertThat((String) parentSpanContext.get("spanId")).isNotBlank().isEqualTo(rootSpanId); + assertThat((String) spanData.get("spanId")).isNotBlank().isNotEqualTo(rootSpanId); + } + } + } + + } + + private record ChildSpanData(String traceId, String parentSpanId, String spanId) { + } + + /** + * find root span id + * this is the case if the trace id in parentSpanContext contains only zeros + * + * @param spans + * @return + */ + @SuppressWarnings("unchecked") + private static String getRootSpan(Iterable> spans) { + for (Map spanData : spans) { + var parentContext = (Map) spanData.get("parentSpanContext"); + if (((String) parentContext.get("traceId")).matches("0+")) { + return (String) spanData.get("spanId"); + } + } + throw new IllegalStateException("No root span found"); } private void testInsertBooks(String endpoint) { @@ -76,19 +179,16 @@ private void testInsertBooks(String endpoint) { .assertThat() .statusCode(200); - assertThat(get(endpoint).as(bookListType)).isEmpty(); + await().atMost(Duration.ofSeconds(60L)) + .untilAsserted(() -> assertThat(get(endpoint).as(bookListType)).as("must delete all").isEmpty()); saveBook(new Book("Victor Hugo", "Les Misérables"), endpoint); saveBook(new Book("Victor Hugo", "Notre-Dame de Paris"), endpoint); await().atMost(Duration.ofSeconds(60L)) .untilAsserted(() -> assertThat(get(endpoint).as(bookListType)).hasSize(2)); - saveBook(new Book("Charles Baudelaire", "Les fleurs du mal"), endpoint); - - assertThat(get(endpoint).as(bookListType)).hasSize(3); - - List books = get("%s/Victor Hugo".formatted(endpoint)).as(bookListType); - assertThat(books).hasSize(2); + await().atMost(Duration.ofSeconds(60L)) + .untilAsserted(() -> assertThat(get(endpoint).as(bookListType)).hasSize(3)); } private static void saveBook(Book book, String endpoint) { diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 0a63ea27ed39a..4c3827d4bd4bc 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -352,6 +352,7 @@ opentelemetry-quickstart opentelemetry-spi opentelemetry-jdbc-instrumentation + opentelemetry-mongodb-client-instrumentation opentelemetry-quartz opentelemetry-scheduler opentelemetry-vertx