diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeExtractor.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeExtractor.java index 35c4eb2e0b20..8c7408b624aa 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeExtractor.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeExtractor.java @@ -6,6 +6,8 @@ package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_CLIENT_GROUP; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_DELIVERY_TIMESTAMP; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_GROUP; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG; @@ -25,6 +27,12 @@ enum RocketMqConsumerProcessAttributeExtractor public void onStart( AttributesBuilder attributes, Context parentContext, MessageView messageView) { messageView.getTag().ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TAG, s)); + messageView + .getMessageGroup() + .ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_GROUP, s)); + messageView + .getDeliveryTimestamp() + .ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_DELIVERY_TIMESTAMP, s)); attributes.put(MESSAGING_ROCKETMQ_MESSAGE_KEYS, new ArrayList<>(messageView.getKeys())); attributes.put( MESSAGING_ROCKETMQ_CLIENT_GROUP, VirtualFieldStore.getConsumerGroupByMessage(messageView)); diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqProducerAttributeExtractor.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqProducerAttributeExtractor.java index aa322d92d081..f64a99b6fec2 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqProducerAttributeExtractor.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqProducerAttributeExtractor.java @@ -5,6 +5,8 @@ package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_DELIVERY_TIMESTAMP; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_GROUP; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TYPE; @@ -29,6 +31,10 @@ enum RocketMqProducerAttributeExtractor public void onStart( AttributesBuilder attributes, Context parentContext, PublishingMessageImpl message) { message.getTag().ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TAG, s)); + message.getMessageGroup().ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_GROUP, s)); + message + .getDeliveryTimestamp() + .ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_DELIVERY_TIMESTAMP, s)); attributes.put(MESSAGING_ROCKETMQ_MESSAGE_KEYS, new ArrayList<>(message.getKeys())); switch (message.getMessageType()) { case FIFO: diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java index 532d10e0794f..4e56bbfb333d 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java @@ -13,10 +13,14 @@ import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_CLIENT_GROUP; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_DELIVERY_TIMESTAMP; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_GROUP; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TYPE; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_SYSTEM; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.DELAY; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.FIFO; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.NORMAL; import io.opentelemetry.api.common.AttributeKey; @@ -34,7 +38,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; @@ -56,9 +62,11 @@ public abstract class AbstractRocketMqClientTest { // Inner topic of the container. - private static final String topic = "normal-topic-0"; + private static final String normalTopic = "normal-topic-0"; + private static final String fifoTopic = "fifo-topic-0"; + private static final String delayTopic = "delay-topic-0"; private static final String tag = "tagA"; - private static final String consumerGroup = "group-normal-topic-0"; + private static final String consumerGroup = "group-0"; private static final RocketMqProxyContainer container = new RocketMqProxyContainer(); private final ClientServiceProvider provider = ClientServiceProvider.loadService(); @@ -73,12 +81,16 @@ void setUp() throws ClientException { ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(container.endpoints).build(); FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); + Map subscriptionExpressions = new HashMap<>(); + subscriptionExpressions.put(normalTopic, filterExpression); + subscriptionExpressions.put(fifoTopic, filterExpression); + subscriptionExpressions.put(delayTopic, filterExpression); consumer = provider .newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(consumerGroup) - .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) + .setSubscriptionExpressions(subscriptionExpressions) .setMessageListener( messageView -> { testing().runWithSpan("messageListener", () -> {}); @@ -89,7 +101,7 @@ void setUp() throws ClientException { provider .newProducerBuilder() .setClientConfiguration(clientConfiguration) - .setTopics(topic) + .setTopics(normalTopic) .build(); } @@ -106,13 +118,13 @@ void tearDown() throws IOException { } @Test - void testSendAndConsumeMessage() throws Throwable { + void testSendAndConsumeNormalMessage() throws Throwable { String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"}; byte[] body = "foobar".getBytes(StandardCharsets.UTF_8); Message message = provider .newMessageBuilder() - .setTopic(topic) + .setTopic(normalTopic) .setTag(tag) .setKeys(keys) .setBody(body) @@ -130,18 +142,18 @@ void testSendAndConsumeMessage() throws Throwable { trace.hasSpansSatisfyingExactly( span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), span -> - assertProducerSpan(span, topic, tag, keys, body, sendReceipt) + assertProducerSpan(span, normalTopic, tag, keys, body, sendReceipt) .hasParent(trace.getSpan(0))); sendSpanData.set(trace.getSpan(1)); }, trace -> trace.hasSpansSatisfyingExactly( - span -> assertReceiveSpan(span, topic, consumerGroup), + span -> assertReceiveSpan(span, normalTopic, consumerGroup), span -> assertProcessSpan( span, sendSpanData.get(), - topic, + normalTopic, consumerGroup, tag, keys, @@ -162,7 +174,7 @@ public void testSendAsyncMessage() throws Exception { Message message = provider .newMessageBuilder() - .setTopic(topic) + .setTopic(normalTopic) .setTag(tag) .setKeys(keys) .setBody(body) @@ -188,19 +200,19 @@ public void testSendAsyncMessage() throws Exception { trace.hasSpansSatisfyingExactly( span -> span.hasName("parent"), span -> - assertProducerSpan(span, topic, tag, keys, body, sendReceipt) + assertProducerSpan(span, normalTopic, tag, keys, body, sendReceipt) .hasParent(trace.getSpan(0)), span -> span.hasName("child")); sendSpanData.set(trace.getSpan(1)); }, trace -> trace.hasSpansSatisfyingExactly( - span -> assertReceiveSpan(span, topic, consumerGroup), + span -> assertReceiveSpan(span, normalTopic, consumerGroup), span -> assertProcessSpan( span, sendSpanData.get(), - topic, + normalTopic, consumerGroup, tag, keys, @@ -214,6 +226,114 @@ public void testSendAsyncMessage() throws Exception { .hasParent(trace.getSpan(1)))); } + @Test + public void testSendAndConsumeFifoMessage() throws Throwable { + String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"}; + byte[] body = "foobar".getBytes(StandardCharsets.UTF_8); + String messageGroup = "yourMessageGroup"; + Message message = + provider + .newMessageBuilder() + .setTopic(fifoTopic) + .setTag(tag) + .setKeys(keys) + .setMessageGroup(messageGroup) + .setBody(body) + .build(); + + SendReceipt sendReceipt = + testing() + .runWithSpan( + "parent", (ThrowingSupplier) () -> producer.send(message)); + AtomicReference sendSpanData = new AtomicReference<>(); + testing() + .waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + assertProducerSpanWithFifoMessage( + span, fifoTopic, tag, keys, messageGroup, body, sendReceipt) + .hasParent(trace.getSpan(0))); + sendSpanData.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> assertReceiveSpan(span, fifoTopic, consumerGroup), + span -> + assertProcessSpanWithFifoMessage( + span, + sendSpanData.get(), + fifoTopic, + consumerGroup, + tag, + keys, + messageGroup, + body, + sendReceipt) + // As the child of receive span. + .hasParent(trace.getSpan(0)), + span -> + span.hasName("messageListener") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(1)))); + } + + @Test + public void testSendAndConsumeDelayMessage() throws Throwable { + String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"}; + byte[] body = "foobar".getBytes(StandardCharsets.UTF_8); + long deliveryTimestamp = System.currentTimeMillis(); + Message message = + provider + .newMessageBuilder() + .setTopic(delayTopic) + .setTag(tag) + .setKeys(keys) + .setDeliveryTimestamp(deliveryTimestamp) + .setBody(body) + .build(); + + SendReceipt sendReceipt = + testing() + .runWithSpan( + "parent", (ThrowingSupplier) () -> producer.send(message)); + AtomicReference sendSpanData = new AtomicReference<>(); + testing() + .waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + assertProducerSpanWithDelayMessage( + span, delayTopic, tag, keys, deliveryTimestamp, body, sendReceipt) + .hasParent(trace.getSpan(0))); + sendSpanData.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> assertReceiveSpan(span, delayTopic, consumerGroup), + span -> + assertProcessSpanWithDelayMessage( + span, + sendSpanData.get(), + delayTopic, + consumerGroup, + tag, + keys, + deliveryTimestamp, + body, + sendReceipt) + // As the child of receive span. + .hasParent(trace.getSpan(0)), + span -> + span.hasName("messageListener") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(1)))); + } + @Test public void testCapturedMessageHeaders() throws Throwable { String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"}; @@ -221,7 +341,7 @@ public void testCapturedMessageHeaders() throws Throwable { Message message = provider .newMessageBuilder() - .setTopic(topic) + .setTopic(normalTopic) .setTag(tag) .setKeys(keys) .setBody(body) @@ -242,7 +362,7 @@ public void testCapturedMessageHeaders() throws Throwable { span -> assertProducerSpan( span, - topic, + normalTopic, tag, keys, body, @@ -256,12 +376,12 @@ public void testCapturedMessageHeaders() throws Throwable { }, trace -> trace.hasSpansSatisfyingExactly( - span -> assertReceiveSpan(span, topic, consumerGroup), + span -> assertReceiveSpan(span, normalTopic, consumerGroup), span -> assertProcessSpan( span, sendSpanData.get(), - topic, + normalTopic, consumerGroup, tag, keys, @@ -308,6 +428,68 @@ private static SpanDataAssert assertProducerSpan( .hasAttributesSatisfyingExactly(attributeAssertions); } + private static SpanDataAssert assertProducerSpanWithFifoMessage( + SpanDataAssert span, + String topic, + String tag, + String[] keys, + String messageGroup, + byte[] body, + SendReceipt sendReceipt, + AttributeAssertion... extraAttributes) { + List attributeAssertions = + new ArrayList<>( + Arrays.asList( + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_GROUP, messageGroup), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TYPE, FIFO), + equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), + equalTo(MESSAGING_SYSTEM, "rocketmq"), + equalTo(MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()), + equalTo( + MESSAGING_DESTINATION_KIND, + SemanticAttributes.MessagingDestinationKindValues.TOPIC), + equalTo(MESSAGING_DESTINATION_NAME, topic))); + attributeAssertions.addAll(Arrays.asList(extraAttributes)); + + return span.hasKind(SpanKind.PRODUCER) + .hasName(topic + " send") + .hasStatus(StatusData.unset()) + .hasAttributesSatisfyingExactly(attributeAssertions); + } + + private static SpanDataAssert assertProducerSpanWithDelayMessage( + SpanDataAssert span, + String topic, + String tag, + String[] keys, + long deliveryTimestamp, + byte[] body, + SendReceipt sendReceipt, + AttributeAssertion... extraAttributes) { + List attributeAssertions = + new ArrayList<>( + Arrays.asList( + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_DELIVERY_TIMESTAMP, deliveryTimestamp), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TYPE, DELAY), + equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), + equalTo(MESSAGING_SYSTEM, "rocketmq"), + equalTo(MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()), + equalTo( + MESSAGING_DESTINATION_KIND, + SemanticAttributes.MessagingDestinationKindValues.TOPIC), + equalTo(MESSAGING_DESTINATION_NAME, topic))); + attributeAssertions.addAll(Arrays.asList(extraAttributes)); + + return span.hasKind(SpanKind.PRODUCER) + .hasName(topic + " send") + .hasStatus(StatusData.unset()) + .hasAttributesSatisfyingExactly(attributeAssertions); + } + private static SpanDataAssert assertReceiveSpan( SpanDataAssert span, String topic, String consumerGroup) { return span.hasKind(SpanKind.CONSUMER) @@ -356,4 +538,76 @@ private static SpanDataAssert assertProcessSpan( .hasLinks(LinkData.create(linkedSpan.getSpanContext())) .hasAttributesSatisfyingExactly(attributeAssertions); } + + private static SpanDataAssert assertProcessSpanWithFifoMessage( + SpanDataAssert span, + SpanData linkedSpan, + String topic, + String consumerGroup, + String tag, + String[] keys, + String messageGroup, + byte[] body, + SendReceipt sendReceipt, + AttributeAssertion... extraAttributes) { + List attributeAssertions = + new ArrayList<>( + Arrays.asList( + equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_GROUP, messageGroup), + equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), + equalTo(MESSAGING_SYSTEM, "rocketmq"), + equalTo(MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()), + equalTo( + MESSAGING_DESTINATION_KIND, + SemanticAttributes.MessagingDestinationKindValues.TOPIC), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(MESSAGING_OPERATION, "process"))); + attributeAssertions.addAll(Arrays.asList(extraAttributes)); + + return span.hasKind(SpanKind.CONSUMER) + .hasName(topic + " process") + .hasStatus(StatusData.unset()) + // Link to send span. + .hasLinks(LinkData.create(linkedSpan.getSpanContext())) + .hasAttributesSatisfyingExactly(attributeAssertions); + } + + private static SpanDataAssert assertProcessSpanWithDelayMessage( + SpanDataAssert span, + SpanData linkedSpan, + String topic, + String consumerGroup, + String tag, + String[] keys, + long deliveryTimestamp, + byte[] body, + SendReceipt sendReceipt, + AttributeAssertion... extraAttributes) { + List attributeAssertions = + new ArrayList<>( + Arrays.asList( + equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_DELIVERY_TIMESTAMP, deliveryTimestamp), + equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), + equalTo(MESSAGING_SYSTEM, "rocketmq"), + equalTo(MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()), + equalTo( + MESSAGING_DESTINATION_KIND, + SemanticAttributes.MessagingDestinationKindValues.TOPIC), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(MESSAGING_OPERATION, "process"))); + attributeAssertions.addAll(Arrays.asList(extraAttributes)); + + return span.hasKind(SpanKind.CONSUMER) + .hasName(topic + " process") + .hasStatus(StatusData.unset()) + // Link to send span. + .hasLinks(LinkData.create(linkedSpan.getSpanContext())) + .hasAttributesSatisfyingExactly(attributeAssertions); + } } diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqProxyContainer.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqProxyContainer.java index 957bea744b4e..5abe16b5b674 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqProxyContainer.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqProxyContainer.java @@ -11,7 +11,7 @@ public class RocketMqProxyContainer { // TODO(aaron-ai): replace it by the official image. - private static final String IMAGE_NAME = "aaronai/rocketmq-proxy-it:v1.0.0"; + private static final String IMAGE_NAME = "aaronai/rocketmq-proxy-it:v1.0.1"; private final GenericContainer container; final String endpoints;