From 79caf4aa42c1995bec191ec1e5eddf3c8d55f50b Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Wed, 10 Apr 2024 14:18:23 +0300 Subject: [PATCH] Replace messaging.kafka.destination.partition with messaging.destination.partition.id --- .../decorators/KafkaSpanDecorator.java | 4 +- .../kafka/internal/KafkaClientBaseTest.java | 13 +++--- .../InterceptorsSuppressReceiveSpansTest.java | 7 ++-- .../kafkaclients/v2_6/InterceptorsTest.java | 7 ++-- .../v2_6/WrapperSuppressReceiveSpansTest.java | 13 +++--- .../kafkaclients/v2_6/WrapperTest.java | 13 +++--- .../KafkaConsumerAttributesExtractor.java | 6 +-- .../KafkaProducerAttributesExtractor.java | 6 +-- .../groovy/KafkaStreamsDefaultTest.groovy | 9 ++--- ...afkaStreamsSuppressReceiveSpansTest.groovy | 9 ++--- .../kafka/v1_0/AbstractReactorKafkaTest.java | 13 +++--- .../kafka/KafkaIntegrationTest.java | 10 ++--- .../spring/kafka/v2_7/SpringKafkaTest.java | 39 +++++++----------- ...ractSpringKafkaNoReceiveTelemetryTest.java | 40 +++++++------------ .../kafka/v3_6/AbstractVertxKafkaTest.java | 13 +++--- 15 files changed, 79 insertions(+), 123 deletions(-) diff --git a/instrumentation/camel-2.20/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachecamel/decorators/KafkaSpanDecorator.java b/instrumentation/camel-2.20/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachecamel/decorators/KafkaSpanDecorator.java index b506501987aa..fc6e1fc87113 100644 --- a/instrumentation/camel-2.20/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachecamel/decorators/KafkaSpanDecorator.java +++ b/instrumentation/camel-2.20/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachecamel/decorators/KafkaSpanDecorator.java @@ -52,8 +52,6 @@ public String getDestination(Exchange exchange, Endpoint endpoint) { return topic != null ? topic : super.getDestination(exchange, endpoint); } - @SuppressWarnings("deprecation") // TODO - // MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation @Override public void pre( AttributesBuilder attributes, @@ -67,7 +65,7 @@ public void pre( Integer partition = exchange.getIn().getHeader(PARTITION, Integer.class); if (partition != null) { attributes.put( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, partition); + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, partition.toString()); } if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaClientBaseTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaClientBaseTest.java index 698602d7c40e..582d68c70a4e 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaClientBaseTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaClientBaseTest.java @@ -37,6 +37,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.assertj.core.api.AbstractLongAssert; +import org.assertj.core.api.AbstractStringAssert; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestInstance; @@ -152,8 +153,6 @@ public void awaitUntilConsumerIsReady() throws InterruptedException { consumer.seekToBeginning(Collections.emptyList()); } - @SuppressWarnings("deprecation") // TODO - // MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation protected static List sendAttributes( String messageKey, String messageValue, boolean testHeaders) { List assertions = @@ -166,8 +165,8 @@ protected static List sendAttributes( MessagingIncubatingAttributes.MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), satisfies( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative))); @@ -214,8 +213,6 @@ protected static List receiveAttributes(boolean testHeaders) return assertions; } - @SuppressWarnings("deprecation") // TODO - // MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation protected static List processAttributes( String messageKey, String messageValue, boolean testHeaders) { List assertions = @@ -228,8 +225,8 @@ protected static List processAttributes( MessagingIncubatingAttributes.MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")), satisfies( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java index ede8f3c0bcb3..8d70c340f60a 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java @@ -12,11 +12,10 @@ import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes; import java.nio.charset.StandardCharsets; import org.assertj.core.api.AbstractLongAssert; +import org.assertj.core.api.AbstractStringAssert; class InterceptorsSuppressReceiveSpansTest extends AbstractInterceptorsTest { - @SuppressWarnings("deprecation") // TODO - // MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation @Override void assertTraces() { testing.waitAndAssertTraces( @@ -50,8 +49,8 @@ void assertTraces() { MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, greeting.getBytes(StandardCharsets.UTF_8).length), satisfies( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java index de1d557c667b..35e723247b44 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java @@ -17,11 +17,10 @@ import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.AtomicReference; import org.assertj.core.api.AbstractLongAssert; +import org.assertj.core.api.AbstractStringAssert; class InterceptorsTest extends AbstractInterceptorsTest { - @SuppressWarnings("deprecation") // TODO - // MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation @Override void assertTraces() { AtomicReference producerSpanContext = new AtomicReference<>(); @@ -87,8 +86,8 @@ void assertTraces() { MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, greeting.getBytes(StandardCharsets.UTF_8).length), satisfies( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSuppressReceiveSpansTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSuppressReceiveSpansTest.java index eb3358ad497e..04c1babf5b60 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSuppressReceiveSpansTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSuppressReceiveSpansTest.java @@ -18,6 +18,7 @@ import java.util.Collections; import java.util.List; import org.assertj.core.api.AbstractLongAssert; +import org.assertj.core.api.AbstractStringAssert; class WrapperSuppressReceiveSpansTest extends AbstractWrapperTest { @@ -52,8 +53,6 @@ void assertTraces(boolean testHeaders) { .hasParent(trace.getSpan(0)))); } - @SuppressWarnings("deprecation") // TODO - // MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation protected static List sendAttributes(boolean testHeaders) { List assertions = new ArrayList<>( @@ -65,8 +64,8 @@ protected static List sendAttributes(boolean testHeaders) { MessagingIncubatingAttributes.MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), satisfies( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative))); @@ -79,8 +78,6 @@ protected static List sendAttributes(boolean testHeaders) { return assertions; } - @SuppressWarnings("deprecation") // TODO - // MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation private static List processAttributes(String greeting, boolean testHeaders) { List assertions = new ArrayList<>( @@ -92,8 +89,8 @@ private static List processAttributes(String greeting, boole MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, greeting.getBytes(StandardCharsets.UTF_8).length), satisfies( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java index 9fb07bab0866..aaa6730f1dd6 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicReference; import org.assertj.core.api.AbstractLongAssert; +import org.assertj.core.api.AbstractStringAssert; class WrapperTest extends AbstractWrapperTest { @@ -74,8 +75,6 @@ void assertTraces(boolean testHeaders) { .hasParent(trace.getSpan(1)))); } - @SuppressWarnings("deprecation") // TODO - // MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation protected static List sendAttributes(boolean testHeaders) { List assertions = new ArrayList<>( @@ -87,8 +86,8 @@ protected static List sendAttributes(boolean testHeaders) { MessagingIncubatingAttributes.MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), satisfies( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative))); @@ -101,8 +100,6 @@ protected static List sendAttributes(boolean testHeaders) { return assertions; } - @SuppressWarnings("deprecation") // TODO - // MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation private static List processAttributes(String greeting, boolean testHeaders) { List assertions = new ArrayList<>( @@ -114,8 +111,8 @@ private static List processAttributes(String greeting, boole MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, greeting.getBytes(StandardCharsets.UTF_8).length), satisfies( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerAttributesExtractor.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerAttributesExtractor.java index 7460956b51c8..4fea8da5713a 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerAttributesExtractor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerAttributesExtractor.java @@ -16,8 +16,6 @@ final class KafkaConsumerAttributesExtractor implements AttributesExtractor { - @SuppressWarnings("deprecation") // TODO - // MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation @Override public void onStart( AttributesBuilder attributes, Context parentContext, KafkaProcessRequest request) { @@ -25,8 +23,8 @@ public void onStart( ConsumerRecord record = request.getRecord(); attributes.put( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - (long) record.partition()); + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + String.valueOf(record.partition())); attributes.put(MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, record.offset()); Object key = record.key(); diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProducerAttributesExtractor.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProducerAttributesExtractor.java index 30618e6628f9..96c733f60f1c 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProducerAttributesExtractor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProducerAttributesExtractor.java @@ -35,8 +35,6 @@ private static boolean canSerialize(Class keyClass) { return !(keyClass.isArray() || keyClass == ByteBuffer.class); } - @SuppressWarnings("deprecation") // TODO - // MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation @Override public void onEnd( AttributesBuilder attributes, @@ -47,8 +45,8 @@ public void onEnd( if (recordMetadata != null) { attributes.put( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - recordMetadata.partition()); + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + String.valueOf(recordMetadata.partition())); attributes.put( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, recordMetadata.offset()); } diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsDefaultTest.groovy b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsDefaultTest.groovy index 44e1b3fa321e..d50f9e950dba 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsDefaultTest.groovy +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsDefaultTest.groovy @@ -24,7 +24,6 @@ import static io.opentelemetry.api.trace.SpanKind.PRODUCER class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest { - @SuppressWarnings("deprecation") // TODO MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation def "test kafka produce and consume with streams in-between"() { setup: def config = new Properties() @@ -102,7 +101,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest { "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish" "$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.startsWith("producer") } - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 } + "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10" } @@ -139,7 +138,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest { "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process" "$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.endsWith("consumer") } "$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 } + "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10" "kafka.record.queue_time_ms" { it >= 0 } @@ -159,7 +158,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest { "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish" "$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.endsWith("producer") } - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 } + "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 } } @@ -195,7 +194,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest { "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process" "$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.startsWith("consumer") } "$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 } + "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10" if (Boolean.getBoolean("testLatestDeps")) { diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsSuppressReceiveSpansTest.groovy b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsSuppressReceiveSpansTest.groovy index de1ec297e455..386e09b56efb 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsSuppressReceiveSpansTest.groovy +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsSuppressReceiveSpansTest.groovy @@ -24,7 +24,6 @@ import static io.opentelemetry.api.trace.SpanKind.PRODUCER class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest { - @SuppressWarnings("deprecation") // TODO MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation def "test kafka produce and consume with streams in-between"() { setup: def config = new Properties() @@ -97,7 +96,7 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest { "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish" "$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" "producer-1" - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 } + "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10" } @@ -113,7 +112,7 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest { "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process" "$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.endsWith("consumer") } "$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 } + "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10" "kafka.record.queue_time_ms" { it >= 0 } @@ -136,7 +135,7 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest { "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish" "$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" String - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 } + "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 } } @@ -151,7 +150,7 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest { "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process" "$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.startsWith("consumer") } "$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 } + "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10" if (Boolean.getBoolean("testLatestDeps")) { diff --git a/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java b/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java index aad42569d24d..becea32946d4 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java +++ b/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.assertj.core.api.AbstractLongAssert; +import org.assertj.core.api.AbstractStringAssert; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.extension.RegisterExtension; @@ -174,8 +175,6 @@ protected void testSingleRecordProcess( span -> span.hasName("consumer").hasParent(trace.getSpan(1)))); } - @SuppressWarnings("deprecation") // TODO - // MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation protected static List sendAttributes(ProducerRecord record) { List assertions = new ArrayList<>( @@ -187,8 +186,8 @@ protected static List sendAttributes(ProducerRecord stringAssert.startsWith("producer")), satisfies( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative))); @@ -217,8 +216,6 @@ protected static List receiveAttributes(String topic) { return assertions; } - @SuppressWarnings("deprecation") // TODO - // MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation protected static List processAttributes( ProducerRecord record) { List assertions = @@ -231,8 +228,8 @@ protected static List processAttributes( MessagingIncubatingAttributes.MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")), satisfies( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative))); diff --git a/instrumentation/spring/spring-boot-autoconfigure/src/test/java/io/opentelemetry/instrumentation/spring/autoconfigure/instrumentation/kafka/KafkaIntegrationTest.java b/instrumentation/spring/spring-boot-autoconfigure/src/test/java/io/opentelemetry/instrumentation/spring/autoconfigure/instrumentation/kafka/KafkaIntegrationTest.java index 62b9bf9990e0..ee9d99028816 100644 --- a/instrumentation/spring/spring-boot-autoconfigure/src/test/java/io/opentelemetry/instrumentation/spring/autoconfigure/instrumentation/kafka/KafkaIntegrationTest.java +++ b/instrumentation/spring/spring-boot-autoconfigure/src/test/java/io/opentelemetry/instrumentation/spring/autoconfigure/instrumentation/kafka/KafkaIntegrationTest.java @@ -16,6 +16,7 @@ import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.assertj.core.api.AbstractLongAssert; +import org.assertj.core.api.AbstractStringAssert; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -84,7 +85,6 @@ void shouldInstrumentProducerAndConsumer() { // In kafka 2 ops.send is deprecated. We are using it to avoid reflection because kafka 3 also has // ops.send, although with different return type. @SuppressWarnings({"unchecked", "deprecation"}) - // TODO MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation private static void runShouldInstrumentProducerAndConsumer( ConfigurableApplicationContext applicationContext) { KafkaTemplate kafkaTemplate = applicationContext.getBean(KafkaTemplate.class); @@ -117,8 +117,8 @@ private static void runShouldInstrumentProducerAndConsumer( MessagingIncubatingAttributes.MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), satisfies( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), @@ -138,8 +138,8 @@ private static void runShouldInstrumentProducerAndConsumer( MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative), satisfies( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java index 9c86159526d9..edf932d59af9 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import org.assertj.core.api.AbstractLongAssert; +import org.assertj.core.api.AbstractStringAssert; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -48,8 +49,6 @@ protected List> additionalSpringConfigs() { return emptyList(); } - @SuppressWarnings("deprecation") // TODO - // MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation @Test void shouldCreateSpansForSingleRecordProcess() { testing.runWithSpan( @@ -80,8 +79,8 @@ void shouldCreateSpansForSingleRecordProcess() { "testSingleTopic"), equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"), satisfies( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), @@ -127,8 +126,8 @@ void shouldCreateSpansForSingleRecordProcess() { MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative), satisfies( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), @@ -146,9 +145,6 @@ void shouldCreateSpansForSingleRecordProcess() { span -> span.hasName("consumer").hasParent(trace.getSpan(1)))); } - @SuppressWarnings( - "deprecation") // TODO MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION - // deprecation @Test void shouldHandleFailureInKafkaListener() { testing.runWithSpan( @@ -188,8 +184,8 @@ void shouldHandleFailureInKafkaListener() { MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative), satisfies( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), @@ -218,8 +214,8 @@ void shouldHandleFailureInKafkaListener() { "testSingleTopic"), equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"), satisfies( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), @@ -266,8 +262,6 @@ void shouldHandleFailureInKafkaListener() { span -> span.hasName("consumer").hasParent(trace.getSpan(1)))); } - @SuppressWarnings("deprecation") // TODO - // MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation @Test void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { Map batchMessages = new HashMap<>(); @@ -294,8 +288,8 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { "testBatchTopic"), equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"), satisfies( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), @@ -314,8 +308,8 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { "testBatchTopic"), equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"), satisfies( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), @@ -371,9 +365,6 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { span -> span.hasName("consumer").hasParent(trace.getSpan(1)))); } - @SuppressWarnings( - "deprecation") // TODO MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION - // deprecation @Test void shouldHandleFailureInKafkaBatchListener() { testing.runWithSpan( @@ -404,8 +395,8 @@ void shouldHandleFailureInKafkaBatchListener() { "testBatchTopic"), equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"), satisfies( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), diff --git a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java index 33eab53b3605..53535037d566 100644 --- a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java +++ b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java @@ -21,14 +21,13 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import org.assertj.core.api.AbstractLongAssert; +import org.assertj.core.api.AbstractStringAssert; import org.junit.jupiter.api.Test; public abstract class AbstractSpringKafkaNoReceiveTelemetryTest extends AbstractSpringKafkaTest { protected abstract boolean isLibraryInstrumentationTest(); - @SuppressWarnings("deprecation") // TODO - // MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation @Test void shouldCreateSpansForSingleRecordProcess() { testing() @@ -63,8 +62,8 @@ void shouldCreateSpansForSingleRecordProcess() { stringAssert -> stringAssert.startsWith("producer")), satisfies( MessagingIncubatingAttributes - .MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + .MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), @@ -87,8 +86,8 @@ void shouldCreateSpansForSingleRecordProcess() { AbstractLongAssert::isNotNegative), satisfies( MessagingIncubatingAttributes - .MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + .MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), @@ -104,8 +103,6 @@ void shouldCreateSpansForSingleRecordProcess() { span -> span.hasName("consumer").hasParent(trace.getSpan(2)))); } - @SuppressWarnings("deprecation") // TODO - // MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation @Test void shouldHandleFailureInKafkaListener() { testing() @@ -128,8 +125,8 @@ void shouldHandleFailureInKafkaListener() { MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative), satisfies( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), @@ -161,8 +158,8 @@ void shouldHandleFailureInKafkaListener() { stringAssert -> stringAssert.startsWith("producer")), satisfies( MessagingIncubatingAttributes - .MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + .MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), @@ -194,8 +191,6 @@ void shouldHandleFailureInKafkaListener() { span -> span.hasName("consumer").hasParent(trace.getSpan(6)))); } - @SuppressWarnings("deprecation") // TODO - // MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation @Test void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { Map batchMessages = new HashMap<>(); @@ -226,9 +221,8 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { MessagingIncubatingAttributes.MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), satisfies( - MessagingIncubatingAttributes - .MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), @@ -248,9 +242,8 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { MessagingIncubatingAttributes.MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), satisfies( - MessagingIncubatingAttributes - .MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), @@ -290,8 +283,6 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { span -> span.hasName("consumer").hasParent(trace.getSpan(0)))); } - @SuppressWarnings("deprecation") // TODO - // MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation @Test void shouldHandleFailureInKafkaBatchListener() { testing() @@ -339,9 +330,8 @@ void shouldHandleFailureInKafkaBatchListener() { MessagingIncubatingAttributes.MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), satisfies( - MessagingIncubatingAttributes - .MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/AbstractVertxKafkaTest.java b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/AbstractVertxKafkaTest.java index 70a2088f8734..2fdacdf5f531 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/AbstractVertxKafkaTest.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/AbstractVertxKafkaTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.assertj.core.api.AbstractLongAssert; +import org.assertj.core.api.AbstractStringAssert; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.extension.RegisterExtension; @@ -194,8 +195,6 @@ protected static void sendRecord( } } - @SuppressWarnings("deprecation") // TODO - // MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation protected static List sendAttributes( KafkaProducerRecord record) { List assertions = @@ -208,8 +207,8 @@ protected static List sendAttributes( MessagingIncubatingAttributes.MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")), satisfies( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative))); @@ -249,8 +248,6 @@ private static List batchConsumerAttributes(String topic, St return assertions; } - @SuppressWarnings("deprecation") // TODO - // MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation protected static List processAttributes( KafkaProducerRecord record) { List assertions = @@ -263,8 +260,8 @@ protected static List processAttributes( MessagingIncubatingAttributes.MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")), satisfies( - MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + AbstractStringAssert::isNotEmpty), satisfies( MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative)));