From 9f3317b4232f6343a0a7abd495623cf6e167778a Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Wed, 1 Nov 2023 15:25:04 +0200 Subject: [PATCH] Always set messaging operation --- .../MessagingAttributesExtractor.java | 6 +++++- .../MessagingAttributesExtractorTest.java | 2 +- .../v1_11/AbstractSqsTracingTest.groovy | 2 ++ .../awssdk/v2_2/AbstractAws2ClientTest.groovy | 2 ++ .../v2_2/AbstractAws2SqsTracingTest.groovy | 2 ++ .../apachecamel/aws/AwsSpanAssertions.java | 2 ++ .../jms/v1_1/Jms2InstrumentationTest.java | 2 ++ .../jms/v1_1/Jms1InstrumentationTest.java | 4 ++++ .../jms/v3_0/Jms3InstrumentationTest.java | 3 +++ .../kafka/internal/KafkaClientBaseTest.java | 1 + .../InterceptorsSuppressReceiveSpansTest.java | 1 + .../kafkaclients/v2_6/InterceptorsTest.java | 1 + .../v2_6/WrapperSuppressReceiveSpansTest.java | 1 + .../kafkaclients/v2_6/WrapperTest.java | 1 + .../groovy/KafkaStreamsDefaultTest.groovy | 2 ++ ...afkaStreamsSuppressReceiveSpansTest.groovy | 2 ++ .../pulsar/v2_8/PulsarClientTest.groovy | 1 + .../RabbitChannelInstrumentation.java | 6 +++--- .../rabbitmq/RabbitSingletons.java | 19 +++++++++++-------- .../kafka/v1_0/AbstractReactorKafkaTest.java | 1 + .../v4_8/AbstractRocketMqClientTest.groovy | 4 ++++ ...RocketMqClientSuppressReceiveSpanTest.java | 3 ++- .../v5_0/AbstractRocketMqClientTest.java | 9 ++++++--- .../kafka/KafkaIntegrationTest.java | 1 + .../SpringIntegrationAndRabbitTest.groovy | 1 + .../src/test/groovy/SpringListenerTest.groovy | 1 + .../jms/v6_0/SpringJmsListenerTest.java | 2 ++ .../spring/kafka/v2_7/SpringKafkaTest.java | 5 +++++ ...ractSpringKafkaNoReceiveTelemetryTest.java | 5 +++++ .../rabbit/v1_0/ContextPropagationTest.java | 3 ++- .../kafka/v3_6/AbstractVertxKafkaTest.java | 1 + 31 files changed, 78 insertions(+), 18 deletions(-) diff --git a/instrumentation-api-semconv/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/messaging/MessagingAttributesExtractor.java b/instrumentation-api-semconv/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/messaging/MessagingAttributesExtractor.java index f9172420e3ef..88bda3ba883a 100644 --- a/instrumentation-api-semconv/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/messaging/MessagingAttributesExtractor.java +++ b/instrumentation-api-semconv/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/messaging/MessagingAttributesExtractor.java @@ -89,7 +89,7 @@ public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST attributes, SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_COMPRESSED_SIZE_BYTES, getter.getMessagePayloadCompressedSize(request)); - if (operation == RECEIVE || operation == PROCESS) { + if (operation != null) { internalSet(attributes, SemanticAttributes.MESSAGING_OPERATION, operation.operationName()); } } @@ -120,6 +120,10 @@ public void onEnd( */ @Override public SpanKey internalGetSpanKey() { + if (operation == null) { + return null; + } + switch (operation) { case PUBLISH: return SpanKey.PRODUCER; diff --git a/instrumentation-api-semconv/src/test/java/io/opentelemetry/instrumentation/api/instrumenter/messaging/MessagingAttributesExtractorTest.java b/instrumentation-api-semconv/src/test/java/io/opentelemetry/instrumentation/api/instrumenter/messaging/MessagingAttributesExtractorTest.java index bc12a0a103c8..c6e915b59b36 100644 --- a/instrumentation-api-semconv/src/test/java/io/opentelemetry/instrumentation/api/instrumenter/messaging/MessagingAttributesExtractorTest.java +++ b/instrumentation-api-semconv/src/test/java/io/opentelemetry/instrumentation/api/instrumenter/messaging/MessagingAttributesExtractorTest.java @@ -92,7 +92,7 @@ static Stream destinations() { void shouldExtractNoAttributesIfNoneAreAvailable() { // given AttributesExtractor, String> underTest = - MessagingAttributesExtractor.create(TestGetter.INSTANCE, MessageOperation.PUBLISH); + MessagingAttributesExtractor.create(TestGetter.INSTANCE, null); Context context = Context.root(); diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/AbstractSqsTracingTest.groovy b/instrumentation/aws-sdk/aws-sdk-1.11/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/AbstractSqsTracingTest.groovy index c2c6b8826774..0db2fe15eaf7 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/AbstractSqsTracingTest.groovy +++ b/instrumentation/aws-sdk/aws-sdk-1.11/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/AbstractSqsTracingTest.groovy @@ -104,6 +104,7 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification { "net.peer.port" sqsPort "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs" + "$SemanticAttributes.MESSAGING_OPERATION" "publish" "$SemanticAttributes.NET_PROTOCOL_NAME" "http" "$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1" "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long @@ -193,6 +194,7 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification { "net.peer.port" sqsPort "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs" + "$SemanticAttributes.MESSAGING_OPERATION" "publish" "$SemanticAttributes.NET_PROTOCOL_NAME" "http" "$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1" "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2ClientTest.groovy b/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2ClientTest.groovy index f852619a4f44..53390c8d8524 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2ClientTest.groovy +++ b/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2ClientTest.groovy @@ -115,6 +115,7 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest { } else if (service == "Sqs" && operation == "SendMessage") { "aws.queue.url" QUEUE_URL "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "somequeue" + "$SemanticAttributes.MESSAGING_OPERATION" "publish" "$SemanticAttributes.MESSAGING_MESSAGE_ID" String "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS" } else if (service == "Kinesis") { @@ -217,6 +218,7 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest { } else if (service == "Sqs" && operation == "SendMessage") { "aws.queue.url" QUEUE_URL "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "somequeue" + "$SemanticAttributes.MESSAGING_OPERATION" "publish" "$SemanticAttributes.MESSAGING_MESSAGE_ID" String "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS" } else if (service == "Kinesis") { diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.groovy b/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.groovy index f6f957b7ecc8..dbf5cbba56c9 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.groovy +++ b/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.groovy @@ -154,6 +154,7 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification { "net.peer.port" sqsPort "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs" + "$SemanticAttributes.MESSAGING_OPERATION" "publish" "$SemanticAttributes.MESSAGING_MESSAGE_ID" String "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long } "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long } @@ -321,6 +322,7 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification { "net.peer.port" sqsPort "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs" + "$SemanticAttributes.MESSAGING_OPERATION" "publish" "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long } "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long } } diff --git a/instrumentation/camel-2.20/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apachecamel/aws/AwsSpanAssertions.java b/instrumentation/camel-2.20/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apachecamel/aws/AwsSpanAssertions.java index 91a446b2e8b1..7001f7fcbc3f 100644 --- a/instrumentation/camel-2.20/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apachecamel/aws/AwsSpanAssertions.java +++ b/instrumentation/camel-2.20/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/apachecamel/aws/AwsSpanAssertions.java @@ -102,6 +102,8 @@ static SpanDataAssert sqs( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS"))); if (spanName.endsWith("receive")) { attributeAssertions.add(equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive")); + } else if (spanName.endsWith("publish")) { + attributeAssertions.add(equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish")); } } diff --git a/instrumentation/jms/jms-1.1/javaagent/src/jms2Test/java/io/opentelemetry/javaagent/instrumentation/jms/v1_1/Jms2InstrumentationTest.java b/instrumentation/jms/jms-1.1/javaagent/src/jms2Test/java/io/opentelemetry/javaagent/instrumentation/jms/v1_1/Jms2InstrumentationTest.java index bf7e6d1bf0e5..2412c8b7d085 100644 --- a/instrumentation/jms/jms-1.1/javaagent/src/jms2Test/java/io/opentelemetry/javaagent/instrumentation/jms/v1_1/Jms2InstrumentationTest.java +++ b/instrumentation/jms/jms-1.1/javaagent/src/jms2Test/java/io/opentelemetry/javaagent/instrumentation/jms/v1_1/Jms2InstrumentationTest.java @@ -169,6 +169,7 @@ void testMessageConsumer( .hasAttributesSatisfyingExactly( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), messagingTempDestination(isTemporary))); @@ -231,6 +232,7 @@ void testMessageListener( .hasAttributesSatisfyingExactly( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), messagingTempDestination(isTemporary)), span -> diff --git a/instrumentation/jms/jms-1.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/jms/v1_1/Jms1InstrumentationTest.java b/instrumentation/jms/jms-1.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/jms/v1_1/Jms1InstrumentationTest.java index 9c6c828649ed..e9d256b01d47 100644 --- a/instrumentation/jms/jms-1.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/jms/v1_1/Jms1InstrumentationTest.java +++ b/instrumentation/jms/jms-1.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/jms/v1_1/Jms1InstrumentationTest.java @@ -129,6 +129,7 @@ void testMessageConsumer( .hasAttributesSatisfyingExactly( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), messagingTempDestination(isTemporary))); @@ -191,6 +192,7 @@ void testMessageListener( .hasAttributesSatisfyingExactly( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), messagingTempDestination(isTemporary)), span -> @@ -269,6 +271,7 @@ void shouldCaptureMessageHeaders( .hasAttributesSatisfyingExactly( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), messagingTempDestination(isTemporary), equalTo( @@ -338,6 +341,7 @@ void shouldFailWhenSendingReadOnlyMessage( .hasAttributesSatisfyingExactly( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), messagingTempDestination(isTemporary))), trace -> diff --git a/instrumentation/jms/jms-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/Jms3InstrumentationTest.java b/instrumentation/jms/jms-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/Jms3InstrumentationTest.java index baed01970bbf..c98267887b82 100644 --- a/instrumentation/jms/jms-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/Jms3InstrumentationTest.java +++ b/instrumentation/jms/jms-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/Jms3InstrumentationTest.java @@ -147,6 +147,7 @@ void testMessageConsumer(DestinationFactory destinationFactory, boolean isTempor equalTo( SemanticAttributes.MESSAGING_DESTINATION_NAME, producerDestinationName), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), messagingTempDestination(isTemporary))); @@ -214,6 +215,7 @@ void testMessageListener(DestinationFactory destinationFactory, boolean isTempor equalTo( SemanticAttributes.MESSAGING_DESTINATION_NAME, producerDestinationName), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), messagingTempDestination(isTemporary)), span -> @@ -297,6 +299,7 @@ void shouldCaptureMessageHeaders(DestinationFactory destinationFactory, boolean equalTo( SemanticAttributes.MESSAGING_DESTINATION_NAME, producerDestinationName), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), messagingTempDestination(isTemporary), equalTo( diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaClientBaseTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaClientBaseTest.java index abed52ab2972..9874b225157d 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaClientBaseTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaClientBaseTest.java @@ -159,6 +159,7 @@ protected static List sendAttributes( Arrays.asList( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), satisfies( SemanticAttributes.MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java index 531d53b969db..c3387b8a6e99 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java @@ -28,6 +28,7 @@ void assertTraces() { .hasAttributesSatisfyingExactly( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), satisfies( SemanticAttributes.MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer"))), diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java index 2f1aa4f59ae0..d49e4d734f8d 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java @@ -35,6 +35,7 @@ void assertTraces() { .hasAttributesSatisfyingExactly( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), satisfies( SemanticAttributes.MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")))); diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSuppressReceiveSpansTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSuppressReceiveSpansTest.java index ec92559f929f..3a900ed26a51 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSuppressReceiveSpansTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSuppressReceiveSpansTest.java @@ -58,6 +58,7 @@ protected static List sendAttributes(boolean testHeaders) { Arrays.asList( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), satisfies( SemanticAttributes.MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java index 791584f84dbc..b435b614f3c0 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java @@ -80,6 +80,7 @@ protected static List sendAttributes(boolean testHeaders) { Arrays.asList( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), satisfies( SemanticAttributes.MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsDefaultTest.groovy b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsDefaultTest.groovy index 4933e8414e3d..455dc02005b1 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsDefaultTest.groovy +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsDefaultTest.groovy @@ -99,6 +99,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest { attributes { "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING + "$SemanticAttributes.MESSAGING_OPERATION" "publish" "$SemanticAttributes.MESSAGING_CLIENT_ID" { it.startsWith("producer") } "$SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 } "$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 @@ -154,6 +155,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest { attributes { "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED + "$SemanticAttributes.MESSAGING_OPERATION" "publish" "$SemanticAttributes.MESSAGING_CLIENT_ID" { it.endsWith("producer") } "$SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 } "$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsSuppressReceiveSpansTest.groovy b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsSuppressReceiveSpansTest.groovy index bb69cea16133..f76acd674846 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsSuppressReceiveSpansTest.groovy +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsSuppressReceiveSpansTest.groovy @@ -94,6 +94,7 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest { attributes { "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING + "$SemanticAttributes.MESSAGING_OPERATION" "publish" "$SemanticAttributes.MESSAGING_CLIENT_ID" "producer-1" "$SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 } "$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 @@ -132,6 +133,7 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest { attributes { "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED + "$SemanticAttributes.MESSAGING_OPERATION" "publish" "$SemanticAttributes.MESSAGING_CLIENT_ID" String "$SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 } "$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.groovy b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.groovy index b0ffb8ce7723..fe7c1d88b2f3 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.groovy +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.groovy @@ -587,6 +587,7 @@ class PulsarClientTest extends AgentInstrumentationSpecification { "$SemanticAttributes.NET_PEER_NAME" brokerHost "$SemanticAttributes.NET_PEER_PORT" brokerPort "$SemanticAttributes.MESSAGING_DESTINATION_NAME" destination + "$SemanticAttributes.MESSAGING_OPERATION" "publish" if (msgId == String) { "$SemanticAttributes.MESSAGING_MESSAGE_ID" String } else if (msgId != null) { diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java index 84f2c8440cac..189c587bbc18 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java @@ -108,11 +108,11 @@ public static void onEnter( Context parentContext = Java8BytecodeBridge.currentContext(); request = ChannelAndMethod.create(channel, method); - if (!channelInstrumenter().shouldStart(parentContext, request)) { + if (!channelInstrumenter(request).shouldStart(parentContext, request)) { return; } - context = channelInstrumenter().start(parentContext, request); + context = channelInstrumenter(request).start(parentContext, request); CURRENT_RABBIT_CONTEXT.set(context); helper().setChannelAndMethod(context, request); scope = context.makeCurrent(); @@ -132,7 +132,7 @@ public static void stopSpan( scope.close(); CURRENT_RABBIT_CONTEXT.remove(); - channelInstrumenter().end(context, request, null, throwable); + channelInstrumenter(request).end(context, request, null, throwable); } } diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java index e80b226cf887..a150d3739eb9 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java @@ -30,7 +30,9 @@ public final class RabbitSingletons { .getBoolean("otel.instrumentation.rabbitmq.experimental-span-attributes", false); private static final String instrumentationName = "io.opentelemetry.rabbitmq-2.7"; private static final Instrumenter channelInstrumenter = - createChannelInstrumenter(); + createChannelInstrumenter(false); + private static final Instrumenter channelPublishInstrumenter = + createChannelInstrumenter(true); private static final Instrumenter receiveInstrumenter = createReceiveInstrumenter(); private static final Instrumenter deliverInstrumenter = @@ -38,8 +40,11 @@ public final class RabbitSingletons { static final ContextKey CHANNEL_AND_METHOD_CONTEXT_KEY = ContextKey.named("opentelemetry-rabbitmq-channel-and-method-context-key"); - public static Instrumenter channelInstrumenter() { - return channelInstrumenter; + public static Instrumenter channelInstrumenter( + ChannelAndMethod channelAndMethod) { + return channelAndMethod.getMethod().equals("Channel.basicPublish") + ? channelPublishInstrumenter + : channelInstrumenter; } public static Instrumenter receiveInstrumenter() { @@ -51,21 +56,19 @@ static Instrumenter deliverInstrumenter() { } @SuppressWarnings("deprecation") // have to use the deprecated Net*AttributesExtractor for now - private static Instrumenter createChannelInstrumenter() { + private static Instrumenter createChannelInstrumenter(boolean publish) { return Instrumenter.builder( GlobalOpenTelemetry.get(), instrumentationName, ChannelAndMethod::getMethod) .addAttributesExtractor( buildMessagingAttributesExtractor( - RabbitChannelAttributesGetter.INSTANCE, MessageOperation.PUBLISH)) + RabbitChannelAttributesGetter.INSTANCE, publish ? MessageOperation.PUBLISH : null)) .addAttributesExtractor( io.opentelemetry.instrumentation.api.instrumenter.net.NetClientAttributesExtractor .create(new RabbitChannelNetAttributesGetter())) .addContextCustomizer( (context, request, startAttributes) -> context.with(CHANNEL_AND_METHOD_CONTEXT_KEY, new RabbitChannelAndMethodHolder())) - .buildInstrumenter( - channelAndMethod -> - channelAndMethod.getMethod().equals("Channel.basicPublish") ? PRODUCER : CLIENT); + .buildInstrumenter(channelAndMethod -> publish ? PRODUCER : CLIENT); } @SuppressWarnings("deprecation") // have to use the deprecated Net*AttributesExtractor for now diff --git a/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java b/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java index c34fd700fa57..2d890d10db32 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java +++ b/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java @@ -180,6 +180,7 @@ protected static List sendAttributes(ProducerRecord stringAssert.startsWith("producer")), diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmqclient/v4_8/AbstractRocketMqClientTest.groovy b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmqclient/v4_8/AbstractRocketMqClientTest.groovy index 8d2f641ecee4..97ae614a0b97 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmqclient/v4_8/AbstractRocketMqClientTest.groovy +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmqclient/v4_8/AbstractRocketMqClientTest.groovy @@ -106,6 +106,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { attributes { "$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic + "$SemanticAttributes.MESSAGING_OPERATION" "publish" "$SemanticAttributes.MESSAGING_MESSAGE_ID" String "$SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG" "TagA" "messaging.rocketmq.broker_address" String @@ -160,6 +161,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { attributes { "$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic + "$SemanticAttributes.MESSAGING_OPERATION" "publish" "$SemanticAttributes.MESSAGING_MESSAGE_ID" String "$SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG" "TagA" "messaging.rocketmq.broker_address" String @@ -236,6 +238,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { attributes { "$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic + "$SemanticAttributes.MESSAGING_OPERATION" "publish" "$SemanticAttributes.MESSAGING_MESSAGE_ID" String "messaging.rocketmq.broker_address" String "messaging.rocketmq.send_result" "SEND_OK" @@ -320,6 +323,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { attributes { "$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic + "$SemanticAttributes.MESSAGING_OPERATION" "publish" "$SemanticAttributes.MESSAGING_MESSAGE_ID" String "$SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG" "TagA" "messaging.rocketmq.broker_address" String diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientSuppressReceiveSpanTest.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientSuppressReceiveSpanTest.java index 79f46a9fe73d..9894b0037fbc 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientSuppressReceiveSpanTest.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientSuppressReceiveSpanTest.java @@ -121,7 +121,8 @@ void testSendAndConsumeMessage() throws Throwable { equalTo( MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()), - equalTo(MESSAGING_DESTINATION_NAME, topic)), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(MESSAGING_OPERATION, "publish")), span -> span.hasKind(SpanKind.CONSUMER) .hasName(topic + " process") diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java index cb5840a1e3dc..ba221579eb27 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java @@ -418,7 +418,8 @@ private static SpanDataAssert assertProducerSpan( equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), equalTo(MESSAGING_SYSTEM, "rocketmq"), equalTo(MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()), - equalTo(MESSAGING_DESTINATION_NAME, topic))); + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(MESSAGING_OPERATION, "publish"))); attributeAssertions.addAll(Arrays.asList(extraAttributes)); return span.hasKind(SpanKind.PRODUCER) @@ -446,7 +447,8 @@ private static SpanDataAssert assertProducerSpanWithFifoMessage( equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), equalTo(MESSAGING_SYSTEM, "rocketmq"), equalTo(MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()), - equalTo(MESSAGING_DESTINATION_NAME, topic))); + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(MESSAGING_OPERATION, "publish"))); attributeAssertions.addAll(Arrays.asList(extraAttributes)); return span.hasKind(SpanKind.PRODUCER) @@ -474,7 +476,8 @@ private static SpanDataAssert assertProducerSpanWithDelayMessage( equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), equalTo(MESSAGING_SYSTEM, "rocketmq"), equalTo(MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()), - equalTo(MESSAGING_DESTINATION_NAME, topic))); + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(MESSAGING_OPERATION, "publish"))); attributeAssertions.addAll(Arrays.asList(extraAttributes)); return span.hasKind(SpanKind.PRODUCER) diff --git a/instrumentation/spring/spring-boot-autoconfigure/src/test/java/io/opentelemetry/instrumentation/spring/autoconfigure/instrumentation/kafka/KafkaIntegrationTest.java b/instrumentation/spring/spring-boot-autoconfigure/src/test/java/io/opentelemetry/instrumentation/spring/autoconfigure/instrumentation/kafka/KafkaIntegrationTest.java index 70bac6320f5a..0e570b627b0a 100644 --- a/instrumentation/spring/spring-boot-autoconfigure/src/test/java/io/opentelemetry/instrumentation/spring/autoconfigure/instrumentation/kafka/KafkaIntegrationTest.java +++ b/instrumentation/spring/spring-boot-autoconfigure/src/test/java/io/opentelemetry/instrumentation/spring/autoconfigure/instrumentation/kafka/KafkaIntegrationTest.java @@ -109,6 +109,7 @@ private static void runShouldInstrumentProducerAndConsumer( .hasAttributesSatisfyingExactly( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, "testTopic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), satisfies( SemanticAttributes.MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), diff --git a/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/SpringIntegrationAndRabbitTest.groovy b/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/SpringIntegrationAndRabbitTest.groovy index 8b62dc458e56..14b95ae1efb5 100644 --- a/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/SpringIntegrationAndRabbitTest.groovy +++ b/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/SpringIntegrationAndRabbitTest.groovy @@ -60,6 +60,7 @@ class SpringIntegrationAndRabbitTest extends AgentInstrumentationSpecification i "$SemanticAttributes.NET_SOCK_FAMILY" { it == SemanticAttributes.NetSockFamilyValues.INET6 || it == null } "$SemanticAttributes.MESSAGING_SYSTEM" "rabbitmq" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testTopic" + "$SemanticAttributes.MESSAGING_OPERATION" "publish" "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long "$SemanticAttributes.MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY" String } diff --git a/instrumentation/spring/spring-jms/spring-jms-2.0/javaagent/src/test/groovy/SpringListenerTest.groovy b/instrumentation/spring/spring-jms/spring-jms-2.0/javaagent/src/test/groovy/SpringListenerTest.groovy index d64a01f03764..06c21fce6c47 100644 --- a/instrumentation/spring/spring-jms/spring-jms-2.0/javaagent/src/test/groovy/SpringListenerTest.groovy +++ b/instrumentation/spring/spring-jms/spring-jms-2.0/javaagent/src/test/groovy/SpringListenerTest.groovy @@ -54,6 +54,7 @@ class SpringListenerTest extends AgentInstrumentationSpecification { attributes { "$SemanticAttributes.MESSAGING_SYSTEM" "jms" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" destinationName + "$SemanticAttributes.MESSAGING_OPERATION" "publish" if (destinationName == "(temporary)") { "$SemanticAttributes.MESSAGING_DESTINATION_TEMPORARY" true } diff --git a/instrumentation/spring/spring-jms/spring-jms-6.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/jms/v6_0/SpringJmsListenerTest.java b/instrumentation/spring/spring-jms/spring-jms-6.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/jms/v6_0/SpringJmsListenerTest.java index f8baf71c1db8..19216545e629 100644 --- a/instrumentation/spring/spring-jms/spring-jms-6.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/jms/v6_0/SpringJmsListenerTest.java +++ b/instrumentation/spring/spring-jms/spring-jms-6.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/jms/v6_0/SpringJmsListenerTest.java @@ -115,6 +115,7 @@ void testSpringJmsListener(Class configClass) equalTo( SemanticAttributes.MESSAGING_DESTINATION_NAME, "spring-jms-listener"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), satisfies( SemanticAttributes.MESSAGING_MESSAGE_ID, AbstractStringAssert::isNotBlank)), @@ -195,6 +196,7 @@ void shouldCaptureHeaders(Class configClass) equalTo( SemanticAttributes.MESSAGING_DESTINATION_NAME, "spring-jms-listener"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), satisfies( SemanticAttributes.MESSAGING_MESSAGE_ID, AbstractStringAssert::isNotBlank), diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java index 20538972dbc8..19d877b1e414 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java @@ -68,6 +68,7 @@ void shouldCreateSpansForSingleRecordProcess() { .hasAttributesSatisfyingExactly( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, "testSingleTopic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), satisfies( SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, AbstractLongAssert::isNotNegative), @@ -156,6 +157,7 @@ void shouldHandleFailureInKafkaListener() { .hasAttributesSatisfyingExactly( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, "testSingleTopic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), satisfies( SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, AbstractLongAssert::isNotNegative), @@ -242,6 +244,7 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { .hasAttributesSatisfyingExactly( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), satisfies( SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, AbstractLongAssert::isNotNegative), @@ -259,6 +262,7 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { .hasAttributesSatisfyingExactly( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), satisfies( SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, AbstractLongAssert::isNotNegative), @@ -337,6 +341,7 @@ void shouldHandleFailureInKafkaBatchListener() { .hasAttributesSatisfyingExactly( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), satisfies( SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, AbstractLongAssert::isNotNegative), diff --git a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java index 3c3970f1b56b..8071f001e3d3 100644 --- a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java +++ b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java @@ -48,6 +48,7 @@ void shouldCreateSpansForSingleRecordProcess() { equalTo( SemanticAttributes.MESSAGING_DESTINATION_NAME, "testSingleTopic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), satisfies( SemanticAttributes.MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), @@ -114,6 +115,7 @@ void shouldHandleFailureInKafkaListener() { equalTo( SemanticAttributes.MESSAGING_DESTINATION_NAME, "testSingleTopic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), satisfies( SemanticAttributes.MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), @@ -179,6 +181,7 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo( SemanticAttributes.MESSAGING_DESTINATION_NAME, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), satisfies( SemanticAttributes.MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), @@ -197,6 +200,7 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo( SemanticAttributes.MESSAGING_DESTINATION_NAME, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), satisfies( SemanticAttributes.MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), @@ -265,6 +269,7 @@ void shouldHandleFailureInKafkaBatchListener() { equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo( SemanticAttributes.MESSAGING_DESTINATION_NAME, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), satisfies( SemanticAttributes.MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), diff --git a/instrumentation/spring/spring-rabbit-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/v1_0/ContextPropagationTest.java b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/v1_0/ContextPropagationTest.java index d10af2426141..6e5ae145e9ec 100644 --- a/instrumentation/spring/spring-rabbit-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/v1_0/ContextPropagationTest.java +++ b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/v1_0/ContextPropagationTest.java @@ -159,7 +159,8 @@ public void test(boolean testHeaders) throws Exception { .hasKind(SpanKind.PRODUCER) .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( - getAssertions("", null, "127.0.0.1", true, testHeaders)), + getAssertions( + "", "publish", "127.0.0.1", true, testHeaders)), // spring-cloud-stream-binder-rabbit listener puts all messages into a // BlockingQueue immediately after receiving // that's why the rabbitmq CONSUMER span will never have any child span (and diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/AbstractVertxKafkaTest.java b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/AbstractVertxKafkaTest.java index 637e2cbb69a6..0cae61818e39 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/AbstractVertxKafkaTest.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/AbstractVertxKafkaTest.java @@ -201,6 +201,7 @@ protected static List sendAttributes( Arrays.asList( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, record.topic()), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), satisfies( SemanticAttributes.MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")),