diff --git a/bom/application/pom.xml b/bom/application/pom.xml
index 729acf449a32b..6ebcbe319604e 100644
--- a/bom/application/pom.xml
+++ b/bom/application/pom.xml
@@ -62,7 +62,7 @@
1.0.13
3.0.1
3.8.0
- 4.15.0
+ 4.16.0
2.5.0
2.1.2
2.1.1
diff --git a/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/tracing/instrumentation/InstrumentationProcessor.java b/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/tracing/instrumentation/InstrumentationProcessor.java
index ff6e4dda31e2e..3c6103be30b90 100644
--- a/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/tracing/instrumentation/InstrumentationProcessor.java
+++ b/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/tracing/instrumentation/InstrumentationProcessor.java
@@ -24,7 +24,6 @@
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.InstrumentationRecorder;
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.grpc.GrpcTracingClientInterceptor;
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.grpc.GrpcTracingServerInterceptor;
-import io.quarkus.opentelemetry.runtime.tracing.intrumentation.reactivemessaging.ReactiveMessagingTracingEmitterDecorator;
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.reactivemessaging.ReactiveMessagingTracingIncomingDecorator;
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.reactivemessaging.ReactiveMessagingTracingOutgoingDecorator;
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.restclient.OpenTelemetryClientFilter;
@@ -100,7 +99,6 @@ void registerReactiveMessagingMessageDecorator(
if (capabilities.isPresent(Capability.SMALLRYE_REACTIVE_MESSAGING) && config.instrument().reactiveMessaging()) {
additionalBeans.produce(new AdditionalBeanBuildItem(ReactiveMessagingTracingOutgoingDecorator.class));
additionalBeans.produce(new AdditionalBeanBuildItem(ReactiveMessagingTracingIncomingDecorator.class));
- additionalBeans.produce(new AdditionalBeanBuildItem(ReactiveMessagingTracingEmitterDecorator.class));
}
}
diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/reactivemessaging/ReactiveMessagingTracingEmitterDecorator.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/reactivemessaging/ReactiveMessagingTracingEmitterDecorator.java
deleted file mode 100644
index 17f6541114ecd..0000000000000
--- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/reactivemessaging/ReactiveMessagingTracingEmitterDecorator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package io.quarkus.opentelemetry.runtime.tracing.intrumentation.reactivemessaging;
-
-import static io.quarkus.opentelemetry.runtime.tracing.intrumentation.reactivemessaging.ReactiveMessagingTracingOutgoingDecorator.decorateOutgoing;
-
-import jakarta.enterprise.context.ApplicationScoped;
-import jakarta.inject.Inject;
-
-import org.eclipse.microprofile.reactive.messaging.Message;
-
-import io.smallrye.mutiny.Multi;
-import io.smallrye.reactive.messaging.ChannelRegistry;
-import io.smallrye.reactive.messaging.PublisherDecorator;
-
-/**
- * Intercepts outgoing messages from emitters from Reactive Messaging connectors.
- *
- * For outgoing messages from emitters, if the message doesn't already contain a tracing metadata, it attaches one with the
- * current
- * OpenTelemetry context.
- * Reactive messaging outbound connectors, if tracing is supported, will use that context as parent span to trace outbound
- * message transmission.
- */
-@ApplicationScoped
-public class ReactiveMessagingTracingEmitterDecorator implements PublisherDecorator {
-
- @Override
- public int getPriority() {
- // Place the decorator before all others including the ContextDecorator which is priority 0
- // This is only important for the emitter case
- return -1000;
- }
-
- @Inject
- ChannelRegistry registry;
-
- /**
- * Incoming messages
- */
- @Override
- public Multi extends Message>> decorate(Multi extends Message>> publisher,
- String channelName, boolean isConnector) {
- Multi extends Message>> multi = publisher;
- if (!isConnector && registry.getEmitterNames().contains(channelName)) {
- // Emitter is a special case for the emitter publisher
- multi = decorateOutgoing(multi);
- }
- return multi;
- }
-
-}
diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/reactivemessaging/ReactiveMessagingTracingOutgoingDecorator.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/reactivemessaging/ReactiveMessagingTracingOutgoingDecorator.java
index 00531430dfcd3..4e99bd33cec12 100644
--- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/reactivemessaging/ReactiveMessagingTracingOutgoingDecorator.java
+++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/reactivemessaging/ReactiveMessagingTracingOutgoingDecorator.java
@@ -31,20 +31,16 @@ public Multi extends Message>> decorate(Multi extends Message>> toBeSubs
Multi extends Message>> multi = toBeSubscribed;
if (isConnector) {
// add TracingMetadata to the outgoing message if it doesn't exist already
- multi = decorateOutgoing(multi);
+ multi = multi.map(m -> {
+ Message> message = m;
+ if (m.getMetadata(TracingMetadata.class).isEmpty()) {
+ var otelContext = QuarkusContextStorage.INSTANCE.current();
+ message = m.addMetadata(TracingMetadata.withCurrent(otelContext));
+ }
+ return message;
+ });
}
return multi;
}
- static Multi extends Message>> decorateOutgoing(Multi extends Message>> multi) {
- return multi.map(m -> {
- Message> message = m;
- if (m.getMetadata(TracingMetadata.class).isEmpty()) {
- var otelContext = QuarkusContextStorage.INSTANCE.current();
- message = m.addMetadata(TracingMetadata.withCurrent(otelContext));
- }
- return message;
- });
- }
-
}