From 9317307615021bb07392b22bf1f38038b313ae56 Mon Sep 17 00:00:00 2001 From: Aaron Ai Date: Wed, 22 Feb 2023 09:48:45 +0800 Subject: [PATCH 1/3] Support more semantic convention for RocketMQ trace --- .../v5_0/RocketMqConsumerProcessAttributeExtractor.java | 8 ++++++++ .../v5_0/RocketMqProducerAttributeExtractor.java | 6 ++++++ 2 files changed, 14 insertions(+) diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeExtractor.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeExtractor.java index 35c4eb2e0b20..8c7408b624aa 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeExtractor.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeExtractor.java @@ -6,6 +6,8 @@ package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_CLIENT_GROUP; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_DELIVERY_TIMESTAMP; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_GROUP; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG; @@ -25,6 +27,12 @@ enum RocketMqConsumerProcessAttributeExtractor public void onStart( AttributesBuilder attributes, Context parentContext, MessageView messageView) { messageView.getTag().ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TAG, s)); + messageView + .getMessageGroup() + .ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_GROUP, s)); + messageView + .getDeliveryTimestamp() + .ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_DELIVERY_TIMESTAMP, s)); attributes.put(MESSAGING_ROCKETMQ_MESSAGE_KEYS, new ArrayList<>(messageView.getKeys())); attributes.put( MESSAGING_ROCKETMQ_CLIENT_GROUP, VirtualFieldStore.getConsumerGroupByMessage(messageView)); diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqProducerAttributeExtractor.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqProducerAttributeExtractor.java index aa322d92d081..f64a99b6fec2 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqProducerAttributeExtractor.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqProducerAttributeExtractor.java @@ -5,6 +5,8 @@ package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_DELIVERY_TIMESTAMP; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_GROUP; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TYPE; @@ -29,6 +31,10 @@ enum RocketMqProducerAttributeExtractor public void onStart( AttributesBuilder attributes, Context parentContext, PublishingMessageImpl message) { message.getTag().ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TAG, s)); + message.getMessageGroup().ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_GROUP, s)); + message + .getDeliveryTimestamp() + .ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_DELIVERY_TIMESTAMP, s)); attributes.put(MESSAGING_ROCKETMQ_MESSAGE_KEYS, new ArrayList<>(message.getKeys())); switch (message.getMessageType()) { case FIFO: From 60264b2ca55b76d5b4002644ade21e7ab830b74a Mon Sep 17 00:00:00 2001 From: Aaron Ai Date: Fri, 24 Feb 2023 22:51:50 +0800 Subject: [PATCH 2/3] Add more tests for rocketmq fifo message --- .../v5_0/AbstractRocketMqClientTest.java | 163 ++++++++++++++++-- .../v5_0/RocketMqProxyContainer.java | 2 +- 2 files changed, 148 insertions(+), 17 deletions(-) diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java index 532d10e0794f..b5ae3d0653ce 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java @@ -13,10 +13,12 @@ import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_CLIENT_GROUP; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_GROUP; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TYPE; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_SYSTEM; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.FIFO; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.NORMAL; import io.opentelemetry.api.common.AttributeKey; @@ -34,7 +36,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; @@ -56,7 +60,9 @@ public abstract class AbstractRocketMqClientTest { // Inner topic of the container. - private static final String topic = "normal-topic-0"; + private static final String normalTopic = "normal-topic-0"; + private static final String fifoTopic = "fifo-topic-0"; + private static final String delayTopic = "delay-topic-0"; private static final String tag = "tagA"; private static final String consumerGroup = "group-normal-topic-0"; @@ -73,12 +79,16 @@ void setUp() throws ClientException { ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(container.endpoints).build(); FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); + Map subscriptionExpressions = new HashMap<>(); + subscriptionExpressions.put(normalTopic, filterExpression); + subscriptionExpressions.put(fifoTopic, filterExpression); + subscriptionExpressions.put(delayTopic, filterExpression); consumer = provider .newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(consumerGroup) - .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) + .setSubscriptionExpressions(subscriptionExpressions) .setMessageListener( messageView -> { testing().runWithSpan("messageListener", () -> {}); @@ -89,7 +99,7 @@ void setUp() throws ClientException { provider .newProducerBuilder() .setClientConfiguration(clientConfiguration) - .setTopics(topic) + .setTopics(normalTopic) .build(); } @@ -106,13 +116,13 @@ void tearDown() throws IOException { } @Test - void testSendAndConsumeMessage() throws Throwable { + void testSendAndConsumeNormalMessage() throws Throwable { String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"}; byte[] body = "foobar".getBytes(StandardCharsets.UTF_8); Message message = provider .newMessageBuilder() - .setTopic(topic) + .setTopic(normalTopic) .setTag(tag) .setKeys(keys) .setBody(body) @@ -130,18 +140,18 @@ void testSendAndConsumeMessage() throws Throwable { trace.hasSpansSatisfyingExactly( span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), span -> - assertProducerSpan(span, topic, tag, keys, body, sendReceipt) + assertProducerSpan(span, normalTopic, tag, keys, body, sendReceipt) .hasParent(trace.getSpan(0))); sendSpanData.set(trace.getSpan(1)); }, trace -> trace.hasSpansSatisfyingExactly( - span -> assertReceiveSpan(span, topic, consumerGroup), + span -> assertReceiveSpan(span, normalTopic, consumerGroup), span -> assertProcessSpan( span, sendSpanData.get(), - topic, + normalTopic, consumerGroup, tag, keys, @@ -162,7 +172,7 @@ public void testSendAsyncMessage() throws Exception { Message message = provider .newMessageBuilder() - .setTopic(topic) + .setTopic(normalTopic) .setTag(tag) .setKeys(keys) .setBody(body) @@ -188,19 +198,19 @@ public void testSendAsyncMessage() throws Exception { trace.hasSpansSatisfyingExactly( span -> span.hasName("parent"), span -> - assertProducerSpan(span, topic, tag, keys, body, sendReceipt) + assertProducerSpan(span, normalTopic, tag, keys, body, sendReceipt) .hasParent(trace.getSpan(0)), span -> span.hasName("child")); sendSpanData.set(trace.getSpan(1)); }, trace -> trace.hasSpansSatisfyingExactly( - span -> assertReceiveSpan(span, topic, consumerGroup), + span -> assertReceiveSpan(span, normalTopic, consumerGroup), span -> assertProcessSpan( span, sendSpanData.get(), - topic, + normalTopic, consumerGroup, tag, keys, @@ -214,6 +224,60 @@ public void testSendAsyncMessage() throws Exception { .hasParent(trace.getSpan(1)))); } + @Test + public void testSendAndConsumeFifoMessage() throws Throwable { + String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"}; + byte[] body = "foobar".getBytes(StandardCharsets.UTF_8); + String messageGroup = "yourMessageGroup"; + Message message = + provider + .newMessageBuilder() + .setTopic(fifoTopic) + .setTag(tag) + .setKeys(keys) + .setMessageGroup(messageGroup) + .setBody(body) + .build(); + + SendReceipt sendReceipt = + testing() + .runWithSpan( + "parent", (ThrowingSupplier) () -> producer.send(message)); + AtomicReference sendSpanData = new AtomicReference<>(); + testing() + .waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + assertProducerSpanWithFifoMessage( + span, fifoTopic, tag, keys, messageGroup, body, sendReceipt) + .hasParent(trace.getSpan(0))); + sendSpanData.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> assertReceiveSpan(span, fifoTopic, consumerGroup), + span -> + assertProcessSpanWithFifoMessage( + span, + sendSpanData.get(), + fifoTopic, + consumerGroup, + tag, + keys, + messageGroup, + body, + sendReceipt) + // As the child of receive span. + .hasParent(trace.getSpan(0)), + span -> + span.hasName("messageListener") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(1)))); + } + @Test public void testCapturedMessageHeaders() throws Throwable { String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"}; @@ -221,7 +285,7 @@ public void testCapturedMessageHeaders() throws Throwable { Message message = provider .newMessageBuilder() - .setTopic(topic) + .setTopic(normalTopic) .setTag(tag) .setKeys(keys) .setBody(body) @@ -242,7 +306,7 @@ public void testCapturedMessageHeaders() throws Throwable { span -> assertProducerSpan( span, - topic, + normalTopic, tag, keys, body, @@ -256,12 +320,12 @@ public void testCapturedMessageHeaders() throws Throwable { }, trace -> trace.hasSpansSatisfyingExactly( - span -> assertReceiveSpan(span, topic, consumerGroup), + span -> assertReceiveSpan(span, normalTopic, consumerGroup), span -> assertProcessSpan( span, sendSpanData.get(), - topic, + normalTopic, consumerGroup, tag, keys, @@ -308,6 +372,37 @@ private static SpanDataAssert assertProducerSpan( .hasAttributesSatisfyingExactly(attributeAssertions); } + private static SpanDataAssert assertProducerSpanWithFifoMessage( + SpanDataAssert span, + String topic, + String tag, + String[] keys, + String messageGroup, + byte[] body, + SendReceipt sendReceipt, + AttributeAssertion... extraAttributes) { + List attributeAssertions = + new ArrayList<>( + Arrays.asList( + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_GROUP, messageGroup), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TYPE, FIFO), + equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), + equalTo(MESSAGING_SYSTEM, "rocketmq"), + equalTo(MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()), + equalTo( + MESSAGING_DESTINATION_KIND, + SemanticAttributes.MessagingDestinationKindValues.TOPIC), + equalTo(MESSAGING_DESTINATION_NAME, topic))); + attributeAssertions.addAll(Arrays.asList(extraAttributes)); + + return span.hasKind(SpanKind.PRODUCER) + .hasName(topic + " send") + .hasStatus(StatusData.unset()) + .hasAttributesSatisfyingExactly(attributeAssertions); + } + private static SpanDataAssert assertReceiveSpan( SpanDataAssert span, String topic, String consumerGroup) { return span.hasKind(SpanKind.CONSUMER) @@ -356,4 +451,40 @@ private static SpanDataAssert assertProcessSpan( .hasLinks(LinkData.create(linkedSpan.getSpanContext())) .hasAttributesSatisfyingExactly(attributeAssertions); } + + private static SpanDataAssert assertProcessSpanWithFifoMessage( + SpanDataAssert span, + SpanData linkedSpan, + String topic, + String consumerGroup, + String tag, + String[] keys, + String messageGroup, + byte[] body, + SendReceipt sendReceipt, + AttributeAssertion... extraAttributes) { + List attributeAssertions = + new ArrayList<>( + Arrays.asList( + equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_GROUP, messageGroup), + equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), + equalTo(MESSAGING_SYSTEM, "rocketmq"), + equalTo(MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()), + equalTo( + MESSAGING_DESTINATION_KIND, + SemanticAttributes.MessagingDestinationKindValues.TOPIC), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(MESSAGING_OPERATION, "process"))); + attributeAssertions.addAll(Arrays.asList(extraAttributes)); + + return span.hasKind(SpanKind.CONSUMER) + .hasName(topic + " process") + .hasStatus(StatusData.unset()) + // Link to send span. + .hasLinks(LinkData.create(linkedSpan.getSpanContext())) + .hasAttributesSatisfyingExactly(attributeAssertions); + } } diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqProxyContainer.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqProxyContainer.java index 957bea744b4e..5abe16b5b674 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqProxyContainer.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqProxyContainer.java @@ -11,7 +11,7 @@ public class RocketMqProxyContainer { // TODO(aaron-ai): replace it by the official image. - private static final String IMAGE_NAME = "aaronai/rocketmq-proxy-it:v1.0.0"; + private static final String IMAGE_NAME = "aaronai/rocketmq-proxy-it:v1.0.1"; private final GenericContainer container; final String endpoints; From c96c32aab0f7c0583eb22a40d2870744091e40c3 Mon Sep 17 00:00:00 2001 From: Aaron Ai Date: Fri, 24 Feb 2023 23:25:39 +0800 Subject: [PATCH 3/3] Add more tests for rocketmq delay message --- .../v5_0/AbstractRocketMqClientTest.java | 125 +++++++++++++++++- 1 file changed, 124 insertions(+), 1 deletion(-) diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java index b5ae3d0653ce..4e56bbfb333d 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java @@ -13,11 +13,13 @@ import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_CLIENT_GROUP; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_DELIVERY_TIMESTAMP; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_GROUP; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TYPE; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_SYSTEM; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.DELAY; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.FIFO; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.NORMAL; @@ -64,7 +66,7 @@ public abstract class AbstractRocketMqClientTest { private static final String fifoTopic = "fifo-topic-0"; private static final String delayTopic = "delay-topic-0"; private static final String tag = "tagA"; - private static final String consumerGroup = "group-normal-topic-0"; + private static final String consumerGroup = "group-0"; private static final RocketMqProxyContainer container = new RocketMqProxyContainer(); private final ClientServiceProvider provider = ClientServiceProvider.loadService(); @@ -278,6 +280,60 @@ public void testSendAndConsumeFifoMessage() throws Throwable { .hasParent(trace.getSpan(1)))); } + @Test + public void testSendAndConsumeDelayMessage() throws Throwable { + String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"}; + byte[] body = "foobar".getBytes(StandardCharsets.UTF_8); + long deliveryTimestamp = System.currentTimeMillis(); + Message message = + provider + .newMessageBuilder() + .setTopic(delayTopic) + .setTag(tag) + .setKeys(keys) + .setDeliveryTimestamp(deliveryTimestamp) + .setBody(body) + .build(); + + SendReceipt sendReceipt = + testing() + .runWithSpan( + "parent", (ThrowingSupplier) () -> producer.send(message)); + AtomicReference sendSpanData = new AtomicReference<>(); + testing() + .waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + assertProducerSpanWithDelayMessage( + span, delayTopic, tag, keys, deliveryTimestamp, body, sendReceipt) + .hasParent(trace.getSpan(0))); + sendSpanData.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> assertReceiveSpan(span, delayTopic, consumerGroup), + span -> + assertProcessSpanWithDelayMessage( + span, + sendSpanData.get(), + delayTopic, + consumerGroup, + tag, + keys, + deliveryTimestamp, + body, + sendReceipt) + // As the child of receive span. + .hasParent(trace.getSpan(0)), + span -> + span.hasName("messageListener") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(1)))); + } + @Test public void testCapturedMessageHeaders() throws Throwable { String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"}; @@ -403,6 +459,37 @@ private static SpanDataAssert assertProducerSpanWithFifoMessage( .hasAttributesSatisfyingExactly(attributeAssertions); } + private static SpanDataAssert assertProducerSpanWithDelayMessage( + SpanDataAssert span, + String topic, + String tag, + String[] keys, + long deliveryTimestamp, + byte[] body, + SendReceipt sendReceipt, + AttributeAssertion... extraAttributes) { + List attributeAssertions = + new ArrayList<>( + Arrays.asList( + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_DELIVERY_TIMESTAMP, deliveryTimestamp), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TYPE, DELAY), + equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), + equalTo(MESSAGING_SYSTEM, "rocketmq"), + equalTo(MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()), + equalTo( + MESSAGING_DESTINATION_KIND, + SemanticAttributes.MessagingDestinationKindValues.TOPIC), + equalTo(MESSAGING_DESTINATION_NAME, topic))); + attributeAssertions.addAll(Arrays.asList(extraAttributes)); + + return span.hasKind(SpanKind.PRODUCER) + .hasName(topic + " send") + .hasStatus(StatusData.unset()) + .hasAttributesSatisfyingExactly(attributeAssertions); + } + private static SpanDataAssert assertReceiveSpan( SpanDataAssert span, String topic, String consumerGroup) { return span.hasKind(SpanKind.CONSUMER) @@ -487,4 +574,40 @@ private static SpanDataAssert assertProcessSpanWithFifoMessage( .hasLinks(LinkData.create(linkedSpan.getSpanContext())) .hasAttributesSatisfyingExactly(attributeAssertions); } + + private static SpanDataAssert assertProcessSpanWithDelayMessage( + SpanDataAssert span, + SpanData linkedSpan, + String topic, + String consumerGroup, + String tag, + String[] keys, + long deliveryTimestamp, + byte[] body, + SendReceipt sendReceipt, + AttributeAssertion... extraAttributes) { + List attributeAssertions = + new ArrayList<>( + Arrays.asList( + equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_DELIVERY_TIMESTAMP, deliveryTimestamp), + equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), + equalTo(MESSAGING_SYSTEM, "rocketmq"), + equalTo(MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()), + equalTo( + MESSAGING_DESTINATION_KIND, + SemanticAttributes.MessagingDestinationKindValues.TOPIC), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(MESSAGING_OPERATION, "process"))); + attributeAssertions.addAll(Arrays.asList(extraAttributes)); + + return span.hasKind(SpanKind.CONSUMER) + .hasName(topic + " process") + .hasStatus(StatusData.unset()) + // Link to send span. + .hasLinks(LinkData.create(linkedSpan.getSpanContext())) + .hasAttributesSatisfyingExactly(attributeAssertions); + } }