diff --git a/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/build.gradle.kts b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/build.gradle.kts new file mode 100644 index 000000000000..12cce38a447f --- /dev/null +++ b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/build.gradle.kts @@ -0,0 +1,20 @@ +plugins { + id("otel.javaagent-instrumentation") +} + +muzzle { + pass { + group.set("org.apache.pulsar") + module.set("pulsar-client") + versions.set("[2.8.0,)") + assertInverse.set(true) + } +} + +dependencies { + library("org.apache.pulsar:pulsar-client:2.8.0") + + testImplementation("javax.annotation:javax.annotation-api:1.3.2") + testImplementation("org.testcontainers:pulsar:1.17.1") + testImplementation("org.apache.pulsar:pulsar-client-admin:2.8.0") +} diff --git a/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/ConsumerImplInstrumentation.java b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/ConsumerImplInstrumentation.java new file mode 100644 index 000000000000..0b385a64ad55 --- /dev/null +++ b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/ConsumerImplInstrumentation.java @@ -0,0 +1,165 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v28; + +import static io.opentelemetry.javaagent.instrumentation.pulsar.v28.telemetry.PulsarSingletons.startAndEndConsumerReceive; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isProtected; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.namedOneOf; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.context.Context; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import java.util.concurrent.CompletableFuture; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.PulsarClientImpl; + +public class ConsumerImplInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return namedOneOf( + "org.apache.pulsar.client.impl.ConsumerImpl", + "org.apache.pulsar.client.impl.MultiTopicsConsumerImpl"); + } + + @Override + public void transform(TypeTransformer transformer) { + String klassName = ConsumerImplInstrumentation.class.getName(); + + transformer.applyAdviceToMethod(isConstructor(), klassName + "$ConsumerConstructorAdviser"); + + // internalReceive will apply to Consumer#receive(long,TimeUnit) + // and called before MessageListener#receive. + transformer.applyAdviceToMethod( + isMethod() + .and(isProtected()) + .and(named("internalReceive")) + .and(takesArguments(2)) + .and(takesArgument(1, named("java.util.concurrent.TimeUnit"))), + klassName + "$ConsumerInternalReceiveAdviser"); + // receive/batchReceive will apply to Consumer#receive()/Consumer#batchReceive() + transformer.applyAdviceToMethod( + isMethod() + .and(isPublic()) + .and(namedOneOf("receive", "batchReceive")) + .and(takesArguments(0)), + klassName + "$ConsumerSyncReceiveAdviser"); + // receiveAsync/batchReceiveAsync will apply to + // Consumer#receiveAsync()/Consumer#batchReceiveAsync() + transformer.applyAdviceToMethod( + isMethod() + .and(isPublic()) + .and(namedOneOf("receiveAsync", "batchReceiveAsync")) + .and(takesArguments(0)), + klassName + "$ConsumerAsyncReceiveAdviser"); + } + + @SuppressWarnings("unused") + public static class ConsumerConstructorAdviser { + private ConsumerConstructorAdviser() {} + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void after( + @Advice.This Consumer consumer, @Advice.Argument(value = 0) PulsarClient client) { + + PulsarClientImpl pulsarClient = (PulsarClientImpl) client; + String url = pulsarClient.getLookup().getServiceUrl(); + VirtualFieldStore.inject(consumer, url); + } + } + + @SuppressWarnings("unused") + public static class ConsumerInternalReceiveAdviser { + private ConsumerInternalReceiveAdviser() {} + + @Advice.OnMethodEnter + public static void before(@Advice.Local(value = "startTime") long startTime) { + startTime = System.currentTimeMillis(); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class) + public static void after( + @Advice.This Consumer consumer, + @Advice.Return Message message, + @Advice.Thrown Throwable t, + @Advice.Local(value = "startTime") long startTime) { + if (t != null) { + return; + } + + Context parent = Context.current(); + Context current = startAndEndConsumerReceive(parent, message, startTime, consumer); + if (current != null) { + // ConsumerBase#internalReceive(long,TimeUnit) will be called before + // ConsumerListener#receive(Consumer,Message), so, need to inject Context into Message. + VirtualFieldStore.inject(message, current); + } + } + } + + @SuppressWarnings("unused") + public static class ConsumerSyncReceiveAdviser { + private ConsumerSyncReceiveAdviser() {} + + @Advice.OnMethodEnter + public static void before(@Advice.Local(value = "startTime") long startTime) { + startTime = System.currentTimeMillis(); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class) + public static void after( + @Advice.This Consumer consumer, + @Advice.Return Message message, + @Advice.Thrown Throwable t, + @Advice.Local(value = "startTime") long startTime) { + if (t != null) { + return; + } + + Context parent = Context.current(); + startAndEndConsumerReceive(parent, message, startTime, consumer); + // No need to inject context to message. + } + } + + @SuppressWarnings("unused") + public static class ConsumerAsyncReceiveAdviser { + private ConsumerAsyncReceiveAdviser() {} + + @Advice.OnMethodEnter + public static void before(@Advice.Local(value = "startTime") long startTime) { + startTime = System.currentTimeMillis(); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class) + public static void after( + @Advice.This Consumer consumer, + @Advice.Return CompletableFuture> future, + @Advice.Local(value = "startTime") long startTime) { + future.whenComplete( + (message, t) -> { + if (t != null) { + return; + } + + Context parent = Context.current(); + startAndEndConsumerReceive(parent, message, startTime, consumer); + // No need to inject context to message. + }); + } + } +} diff --git a/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/MessageInstrumentation.java b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/MessageInstrumentation.java new file mode 100644 index 000000000000..b6bba579e1c6 --- /dev/null +++ b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/MessageInstrumentation.java @@ -0,0 +1,43 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v28; + +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.pulsar.client.api.Message; + +public class MessageInstrumentation implements TypeInstrumentation { + @Override + public ElementMatcher typeMatcher() { + return named("org.apache.pulsar.client.impl.MessageImpl"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isMethod().and(isPublic()).and(named("recycle")).and(takesArguments(0)), + MessageInstrumentation.class.getName() + "$MessageRecycleAdvice"); + } + + @SuppressWarnings("unused") + public static class MessageRecycleAdvice { + private MessageRecycleAdvice() {} + + @Advice.OnMethodExit + public static void after(@Advice.This Message message) { + // Clean context to prevent memory leak. + VirtualFieldStore.inject(message, null); + } + } +} diff --git a/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/MessageListenerInstrumentation.java b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/MessageListenerInstrumentation.java new file mode 100644 index 000000000000..b09d33b739d8 --- /dev/null +++ b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/MessageListenerInstrumentation.java @@ -0,0 +1,96 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v28; + +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.pulsar.v28.telemetry.PulsarSingletons; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.implementation.bytecode.assign.Assigner; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageListener; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; + +public class MessageListenerInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + // return hasSuperType(named("org.apache.pulsar.client.api.MessageListener")); + // can't enhance MessageListener here like above due to jvm can't enhance lambda. + return named("org.apache.pulsar.client.impl.conf.ConsumerConfigurationData"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isMethod().and(isPublic()).and(named("getMessageListener")), + MessageListenerInstrumentation.class.getName() + "$ConsumerConfigurationDataMethodAdviser"); + } + + @SuppressWarnings("unused") + public static class ConsumerConfigurationDataMethodAdviser { + private ConsumerConfigurationDataMethodAdviser() {} + + @Advice.OnMethodExit + public static void after( + @Advice.This ConsumerConfigurationData data, + @Advice.Return(readOnly = false, typing = Assigner.Typing.DYNAMIC) + MessageListener listener) { + if (listener == null) { + return; + } + + listener = new MessageListenerWrapper<>(listener); + } + } + + public static class MessageListenerWrapper implements MessageListener { + private static final long serialVersionUID = 1L; + + private final MessageListener delegator; + + public MessageListenerWrapper(MessageListener messageListener) { + this.delegator = messageListener; + } + + @Override + public void received(Consumer consumer, Message msg) { + Context parent = VirtualFieldStore.extract(msg); + + Instrumenter, Attributes> instrumenter = + PulsarSingletons.consumerListenerInstrumenter(); + if (!instrumenter.shouldStart(parent, msg)) { + this.delegator.received(consumer, msg); + return; + } + + Context current = instrumenter.start(parent, msg); + try (Scope scope = current.makeCurrent()) { + this.delegator.received(consumer, msg); + instrumenter.end(current, msg, null, null); + } catch (Throwable t) { + instrumenter.end(current, msg, null, t); + throw t; + } + } + + @Override + public void reachedEndOfTopic(Consumer consumer) { + this.delegator.reachedEndOfTopic(consumer); + } + } +} diff --git a/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/ProducerData.java b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/ProducerData.java new file mode 100644 index 000000000000..ad9d2290c1e3 --- /dev/null +++ b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/ProducerData.java @@ -0,0 +1,20 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v28; + +public class ProducerData { + public final String url; + public final String topic; + + private ProducerData(String url, String topic) { + this.url = url; + this.topic = topic; + } + + public static ProducerData create(String url, String topic) { + return new ProducerData(url, topic); + } +} diff --git a/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/ProducerImplInstrumentation.java b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/ProducerImplInstrumentation.java new file mode 100644 index 000000000000..aba69d5e1db4 --- /dev/null +++ b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/ProducerImplInstrumentation.java @@ -0,0 +1,153 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v28; + +import static net.bytebuddy.matcher.ElementMatchers.hasSuperType; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.pulsar.v28.telemetry.PulsarSingletons; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.concurrent.CompletableFuture; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.SendCallback; + +public class ProducerImplInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("org.apache.pulsar.client.impl.ProducerImpl"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isConstructor() + .and(isPublic()) + .and( + takesArgument(0, hasSuperType(named("org.apache.pulsar.client.api.PulsarClient")))), + ProducerImplInstrumentation.class.getName() + "$ProducerImplConstructorAdviser"); + + transformer.applyAdviceToMethod( + isMethod() + .and(named("sendAsync")) + .and(takesArgument(1, named("org.apache.pulsar.client.impl.SendCallback"))), + ProducerImplInstrumentation.class.getName() + "$ProducerSendAsyncMethodAdviser"); + } + + @SuppressWarnings("unused") + public static class ProducerImplConstructorAdviser { + private ProducerImplConstructorAdviser() {} + + @Advice.OnMethodExit + public static void intercept( + @Advice.This ProducerImpl producer, @Advice.Argument(value = 0) PulsarClient client) { + PulsarClientImpl pulsarClient = (PulsarClientImpl) client; + String brokerUrl = pulsarClient.getLookup().getServiceUrl(); + String topic = producer.getTopic(); + topic = topic == null ? "unknown" : topic; + brokerUrl = brokerUrl == null ? "unknown" : brokerUrl; + VirtualFieldStore.inject(producer, brokerUrl, topic); + } + } + + @SuppressWarnings("unused") + public static class ProducerSendAsyncMethodAdviser { + private ProducerSendAsyncMethodAdviser() {} + + @Advice.OnMethodEnter + public static void before( + @Advice.This ProducerImpl producer, + @Advice.Argument(value = 0) Message message, + @Advice.Argument(value = 1, readOnly = false) SendCallback callback) { + Context parent = Context.current(); + Instrumenter, Attributes> instrumenter = PulsarSingletons.producerInstrumenter(); + + Context current = null; + if (instrumenter.shouldStart(parent, message)) { + current = instrumenter.start(parent, message); + } + + callback = new SendCallbackWrapper(current, message, callback, producer); + } + } + + public static class SendCallbackWrapper implements SendCallback { + + private final Context context; + private final Message message; + private final SendCallback delegator; + private final ProducerImpl producer; + + public SendCallbackWrapper( + Context context, Message message, SendCallback callback, ProducerImpl producer) { + this.context = context; + this.message = message; + this.delegator = callback; + this.producer = producer; + } + + @Override + public void sendComplete(Exception e) { + if (context == null) { + this.delegator.sendComplete(e); + return; + } + + Instrumenter, Attributes> instrumenter = PulsarSingletons.producerInstrumenter(); + ProducerData producerData = VirtualFieldStore.extract(producer); + Attributes attributes = + Attributes.of( + SemanticAttributes.NET_SOCK_PEER_ADDR, + producerData.url, + SemanticAttributes.MESSAGING_DESTINATION_NAME, + producerData.topic); + + try (Scope ignore = context.makeCurrent()) { + this.delegator.sendComplete(e); + } finally { + instrumenter.end(context, message, attributes, e); + } + } + + @Override + public void addCallback(MessageImpl msg, SendCallback scb) { + this.delegator.addCallback(msg, scb); + } + + @Override + public SendCallback getNextSendCallback() { + return this.delegator.getNextSendCallback(); + } + + @Override + public MessageImpl getNextMessage() { + return this.delegator.getNextMessage(); + } + + @Override + public CompletableFuture getFuture() { + return this.delegator.getFuture(); + } + } +} diff --git a/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/PulsarInstrumentationModule.java b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/PulsarInstrumentationModule.java new file mode 100644 index 000000000000..f0539afec663 --- /dev/null +++ b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/PulsarInstrumentationModule.java @@ -0,0 +1,28 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v28; + +import com.google.auto.service.AutoService; +import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import java.util.Arrays; +import java.util.List; + +@AutoService(InstrumentationModule.class) +public class PulsarInstrumentationModule extends InstrumentationModule { + public PulsarInstrumentationModule() { + super("apache-pulsar", "apache-pulsar-2.8.0"); + } + + @Override + public List typeInstrumentations() { + return Arrays.asList( + new ConsumerImplInstrumentation(), + new ProducerImplInstrumentation(), + new MessageInstrumentation(), + new MessageListenerInstrumentation()); + } +} diff --git a/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/VirtualFieldStore.java b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/VirtualFieldStore.java new file mode 100644 index 000000000000..8a9de33592c6 --- /dev/null +++ b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/VirtualFieldStore.java @@ -0,0 +1,63 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v28; + +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.util.VirtualField; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.impl.TopicMessageImpl; + +public class VirtualFieldStore { + private static final VirtualField, Context> MSG_FIELD = + VirtualField.find(Message.class, Context.class); + private static final VirtualField, ProducerData> PRODUCER_FIELD = + VirtualField.find(Producer.class, ProducerData.class); + private static final VirtualField, String> CONSUMER_FIELD = + VirtualField.find(Consumer.class, String.class); + + private VirtualFieldStore() {} + + public static void inject(Message instance, Context context) { + if (instance instanceof TopicMessageImpl) { + TopicMessageImpl topicMessage = (TopicMessageImpl) instance; + instance = topicMessage.getMessage(); + } + if (instance != null) { + MSG_FIELD.set(instance, context); + } + } + + public static void inject(Producer instance, String serviceUrl, String topic) { + PRODUCER_FIELD.set(instance, ProducerData.create(serviceUrl, topic)); + } + + public static void inject(Consumer instance, String serviceUrl) { + CONSUMER_FIELD.set(instance, serviceUrl); + } + + public static Context extract(Message instance) { + if (instance instanceof TopicMessageImpl) { + TopicMessageImpl topicMessage = (TopicMessageImpl) instance; + instance = topicMessage.getMessage(); + } + if (instance == null) { + return Context.current(); + } + Context ctx = MSG_FIELD.get(instance); + return ctx == null ? Context.current() : ctx; + } + + public static ProducerData extract(Producer instance) { + return PRODUCER_FIELD.get(instance); + } + + public static String extract(Consumer instance) { + String brokerUrl = CONSUMER_FIELD.get(instance); + return brokerUrl == null ? "unknown" : brokerUrl; + } +} diff --git a/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/telemetry/ConsumerAttributesExtractor.java b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/telemetry/ConsumerAttributesExtractor.java new file mode 100644 index 000000000000..fa20dd72cf53 --- /dev/null +++ b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/telemetry/ConsumerAttributesExtractor.java @@ -0,0 +1,39 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v28.telemetry; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import javax.annotation.Nullable; +import org.apache.pulsar.client.api.Message; + +enum ConsumerAttributesExtractor implements AttributesExtractor, Attributes> { + INSTANCE; + + private ConsumerAttributesExtractor() {} + + @Override + public void onStart(AttributesBuilder attributes, Context parentContext, Message message) {} + + @Override + public void onEnd( + AttributesBuilder attributesBuilder, + Context context, + Message message, + @Nullable Attributes attributes, + @Nullable Throwable error) { + if (attributes != null && !attributes.isEmpty()) { + attributesBuilder.putAll(attributes); + } + + if (message.getTopicName() != null) { + attributesBuilder.put(SemanticAttributes.MESSAGING_DESTINATION_NAME, message.getTopicName()); + } + } +} diff --git a/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/telemetry/MessageTextMapGetter.java b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/telemetry/MessageTextMapGetter.java new file mode 100644 index 000000000000..0dc3e8d0ec05 --- /dev/null +++ b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/telemetry/MessageTextMapGetter.java @@ -0,0 +1,25 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v28.telemetry; + +import io.opentelemetry.context.propagation.TextMapGetter; +import javax.annotation.Nullable; +import org.apache.pulsar.client.api.Message; + +enum MessageTextMapGetter implements TextMapGetter> { + INSTANCE; + + @Override + public Iterable keys(Message message) { + return message.getProperties().keySet(); + } + + @Nullable + @Override + public String get(@Nullable Message message, String key) { + return message == null ? null : message.getProperties().get(key); + } +} diff --git a/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/telemetry/MessageTextMapSetter.java b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/telemetry/MessageTextMapSetter.java new file mode 100644 index 000000000000..b6d523cf4328 --- /dev/null +++ b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/telemetry/MessageTextMapSetter.java @@ -0,0 +1,23 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v28.telemetry; + +import io.opentelemetry.context.propagation.TextMapSetter; +import javax.annotation.Nullable; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.impl.MessageImpl; + +enum MessageTextMapSetter implements TextMapSetter> { + INSTANCE; + + @Override + public void set(@Nullable Message carrier, String key, String value) { + if (carrier instanceof MessageImpl) { + MessageImpl message = (MessageImpl) carrier; + message.getMessageBuilder().addProperty().setKey(key).setValue(value); + } + } +} diff --git a/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/telemetry/ProducerAttributesExtractor.java b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/telemetry/ProducerAttributesExtractor.java new file mode 100644 index 000000000000..d312fae6c514 --- /dev/null +++ b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/telemetry/ProducerAttributesExtractor.java @@ -0,0 +1,62 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v28.telemetry; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import javax.annotation.Nullable; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.common.api.proto.MessageMetadata; + +enum ProducerAttributesExtractor implements AttributesExtractor, Attributes> { + INSTANCE; + + private ProducerAttributesExtractor() {} + + @Override + public void onStart(AttributesBuilder attributes, Context parentContext, Message message) {} + + @Override + public void onEnd( + AttributesBuilder attributesBuilder, + Context context, + Message message, + @Nullable Attributes attributes, + @Nullable Throwable error) { + if (attributes != null) { + attributesBuilder.putAll(attributes); + } + + if (message instanceof MessageImpl) { + MessageType type = MessageType.NORMAL; + MessageImpl impl = (MessageImpl) message; + MessageMetadata metadata = impl.getMessageBuilder(); + if (metadata.hasTxnidMostBits() || metadata.hasTxnidLeastBits()) { + type = MessageType.TXN; + } else if (metadata.hasDeliverAtTime()) { + type = MessageType.DELAY; + } else if (metadata.hasOrderingKey()) { + type = MessageType.ORDER; + } else if (metadata.hasChunkId()) { + type = MessageType.CHUNK; + } + + attributesBuilder.put(SemanticAttributes.MESSAGE_TYPE, type.name()); + } + } + + enum MessageType { + NORMAL, + DELAY, + TXN, + ORDER, + CHUNK + } +} diff --git a/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/telemetry/PulsarMessagingAttributesGetter.java b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/telemetry/PulsarMessagingAttributesGetter.java new file mode 100644 index 000000000000..01f61db83d04 --- /dev/null +++ b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/telemetry/PulsarMessagingAttributesGetter.java @@ -0,0 +1,87 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v28.telemetry; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import javax.annotation.Nullable; +import org.apache.pulsar.client.api.Message; + +enum PulsarMessagingAttributesGetter implements MessagingAttributesGetter, Attributes> { + INSTANCE; + + @Override + public String getSystem(Message message) { + return "pulsar"; + } + + @Override + public String getDestinationKind(Message message) { + return SemanticAttributes.MessagingDestinationKindValues.TOPIC; + } + + @Nullable + @Override + public String getDestination(Message message) { + return null; + } + + @Override + public boolean isTemporaryDestination(Message message) { + return false; + } + + @Nullable + @Override + public String getProtocol(Message message) { + return null; + } + + @Nullable + @Override + public String getProtocolVersion(Message message) { + return null; + } + + @Nullable + @Override + public String getUrl(Message message) { + return null; + } + + @Nullable + @Override + public String getConversationId(Message message) { + return null; + } + + @Nullable + @Override + public Long getMessagePayloadSize(Message message) { + if (message != null) { + return (long) message.size(); + } + + return null; + } + + @Nullable + @Override + public Long getMessagePayloadCompressedSize(Message message) { + return null; + } + + @Nullable + @Override + public String getMessageId(Message message, @Nullable Attributes attributes) { + if (message != null && message.getMessageId() != null) { + return message.getMessageId().toString(); + } + + return null; + } +} diff --git a/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/telemetry/PulsarSingletons.java b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/telemetry/PulsarSingletons.java new file mode 100644 index 000000000000..1fa8233e9837 --- /dev/null +++ b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/telemetry/PulsarSingletons.java @@ -0,0 +1,128 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v28.telemetry; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil; +import io.opentelemetry.javaagent.instrumentation.pulsar.v28.VirtualFieldStore; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.time.Instant; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; + +public final class PulsarSingletons { + private static final String INSTRUMENTATION_NAME = "io.opentelemetry.pulsar-client-2.8"; + private static final OpenTelemetry TELEMETRY = GlobalOpenTelemetry.get(); + private static final TextMapPropagator PROPAGATOR = + TELEMETRY.getPropagators().getTextMapPropagator(); + + private static final SpanNameExtractor> CONSUMER_RECEIVE = + new MessagingSpanNameExtractor<>(SpanKind.CONSUMER, MessageOperation.RECEIVE); + private static final SpanNameExtractor> CONSUMER_PROCESS = + new MessagingSpanNameExtractor<>(SpanKind.CONSUMER, MessageOperation.PROCESS); + private static final SpanNameExtractor> PRODUCER_SEND = + new MessagingSpanNameExtractor<>(SpanKind.PRODUCER, MessageOperation.SEND); + + private static final Instrumenter, Attributes> CONSUMER_LISTENER_INSTRUMENTER = + createConsumerListenerInstrumenter(); + private static final Instrumenter, Attributes> CONSUMER_RECEIVE_INSTRUMENTER = + createConsumerReceiveInstrumenter(); + private static final Instrumenter, Attributes> PRODUER_INSTRUMENTER = + createProducerInstrumenter(); + + public static Instrumenter, Attributes> consumerListenerInstrumenter() { + return CONSUMER_LISTENER_INSTRUMENTER; + } + + public static Instrumenter, Attributes> consumerReceiveInstrumenter() { + return CONSUMER_RECEIVE_INSTRUMENTER; + } + + public static Instrumenter, Attributes> producerInstrumenter() { + return PRODUER_INSTRUMENTER; + } + + private static Instrumenter, Attributes> createConsumerReceiveInstrumenter() { + MessagingAttributesGetter, Attributes> getter = + PulsarMessagingAttributesGetter.INSTANCE; + + return Instrumenter., Attributes>builder( + TELEMETRY, INSTRUMENTATION_NAME, CONSUMER_RECEIVE) + .addAttributesExtractor(ConsumerAttributesExtractor.INSTANCE) + .addAttributesExtractor( + MessagingAttributesExtractor.create(getter, MessageOperation.RECEIVE)) + .buildConsumerInstrumenter(MessageTextMapGetter.INSTANCE); + } + + private static Instrumenter, Attributes> createConsumerListenerInstrumenter() { + MessagingAttributesGetter, Attributes> getter = + PulsarMessagingAttributesGetter.INSTANCE; + + return Instrumenter., Attributes>builder( + TELEMETRY, INSTRUMENTATION_NAME, CONSUMER_PROCESS) + .addAttributesExtractor( + MessagingAttributesExtractor.create(getter, MessageOperation.PROCESS)) + .addAttributesExtractor(ConsumerAttributesExtractor.INSTANCE) + .buildInstrumenter(); + } + + private static Instrumenter, Attributes> createProducerInstrumenter() { + MessagingAttributesGetter, Attributes> getter = + PulsarMessagingAttributesGetter.INSTANCE; + + return Instrumenter., Attributes>builder( + TELEMETRY, INSTRUMENTATION_NAME, PRODUCER_SEND) + .addAttributesExtractor(ProducerAttributesExtractor.INSTANCE) + .addAttributesExtractor(MessagingAttributesExtractor.create(getter, MessageOperation.SEND)) + .buildProducerInstrumenter(MessageTextMapSetter.INSTANCE); + } + + public static Context startAndEndConsumerReceive( + Context parent, Message message, long start, Consumer consumer) { + if (message == null || !CONSUMER_RECEIVE_INSTRUMENTER.shouldStart(parent, message)) { + return null; + } + + String brokerUrl = VirtualFieldStore.extract(consumer); + Attributes attributes = Attributes.of(SemanticAttributes.NET_SOCK_PEER_ADDR, brokerUrl); + // startAndEnd not supports extract trace context from carrier + // start not supports custom startTime + // extract trace context by using TEXT_MAP_PROPAGATOR here. + return InstrumenterUtil.startAndEnd( + CONSUMER_RECEIVE_INSTRUMENTER, + PROPAGATOR.extract(parent, message, MessageTextMapGetter.INSTANCE), + message, + attributes, + null, + Instant.ofEpochMilli(start), + Instant.now()); + } + + private PulsarSingletons() {} + + static class MessagingSpanNameExtractor implements SpanNameExtractor { + private final String name; + + private MessagingSpanNameExtractor(SpanKind kind, MessageOperation operation) { + this.name = kind.name() + "/" + operation.name(); + } + + @Override + public String extract(T unused) { + return this.name; + } + } +} diff --git a/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/pulsar/v28/PulsarClientTest.groovy b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/pulsar/v28/PulsarClientTest.groovy new file mode 100644 index 000000000000..e38e38013fa9 --- /dev/null +++ b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/pulsar/v28/PulsarClientTest.groovy @@ -0,0 +1,506 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v28 + +import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification +import io.opentelemetry.instrumentation.test.asserts.SpanAssert +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import org.apache.pulsar.client.admin.PulsarAdmin +import org.apache.pulsar.client.api.Consumer +import org.apache.pulsar.client.api.Message +import org.apache.pulsar.client.api.MessageListener +import org.apache.pulsar.client.api.Producer +import org.apache.pulsar.client.api.PulsarClient +import org.apache.pulsar.client.api.Schema +import org.apache.pulsar.client.api.SubscriptionInitialPosition +import org.junit.Assert +import org.testcontainers.containers.PulsarContainer +import org.testcontainers.utility.DockerImageName +import spock.lang.Shared + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +import static io.opentelemetry.api.trace.SpanKind.CONSUMER +import static io.opentelemetry.api.trace.SpanKind.INTERNAL +import static io.opentelemetry.api.trace.SpanKind.PRODUCER + +class PulsarClientTest extends AgentInstrumentationSpecification { + + private static final DockerImageName DEFAULT_IMAGE_NAME = + DockerImageName.parse("apachepulsar/pulsar:2.8.0") + + @Shared + private PulsarContainer pulsar + @Shared + private PulsarClient client + @Shared + private PulsarAdmin admin + @Shared + private Producer producer + @Shared + private Consumer consumer + @Shared + private Producer producer1 + + @Shared + private String brokerUrl + + @Override + def setupSpec() { + pulsar = new PulsarContainer(DEFAULT_IMAGE_NAME) + pulsar.start() + + brokerUrl = pulsar.pulsarBrokerUrl + client = PulsarClient.builder().serviceUrl(brokerUrl).build() + admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.httpServiceUrl).build() + } + + @Override + def cleanupSpec() { + producer?.close() + consumer?.close() + producer1?.close() + client?.close() + admin?.close() + pulsar.close() + } + + def "test send non-partitioned topic"() { + setup: + def topic = "persistent://public/default/testNonPartitionedTopic_" + UUID.randomUUID() + admin.topics().createNonPartitionedTopic(topic) + producer = + client.newProducer(Schema.STRING).topic(topic) + .enableBatching(false).create() + + String msg = UUID.randomUUID().toString() + + def msgId + runWithSpan("parent") { + msgId = producer.send(msg) + } + + def traces = waitForTraces(1) + Assert.assertEquals(traces.size(), 1) + def spans = traces[0] + Assert.assertEquals(spans.size(), 2) + def parent = spans.find { + it0 -> + it0.name.equalsIgnoreCase("parent") + } + def producer = spans.find { + it0 -> + it0.name.equalsIgnoreCase("PRODUCER/SEND") + } + Assert.assertNotNull(parent) + Assert.assertNotNull(producer) + + SpanAssert.assertSpan(parent) { + name("parent") + kind(INTERNAL) + hasNoParent() + } + + SpanAssert.assertSpan(producer) { + name("PRODUCER/SEND") + kind(PRODUCER) + childOf parent + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" + "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl + "$SemanticAttributes.MESSAGE_TYPE" "NORMAL" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" topic + "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() + "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + } + } + } + + def "test consume non-partitioned topic"() { + setup: + def topic = "persistent://public/default/testNonPartitionedTopic_" + UUID.randomUUID() + def latch = new CountDownLatch(1) + admin.topics().createNonPartitionedTopic(topic) + consumer = client.newConsumer(Schema.STRING) + .subscriptionName("test_sub") + .topic(topic) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .messageListener(new MessageListener() { + @Override + void received(Consumer consumer, Message msg) { + consumer.acknowledge(msg) + latch.countDown() + } + }) + .subscribe() + + producer = client.newProducer(Schema.STRING) + .topic(topic) + .enableBatching(false) + .create() + + def msgId + def msg = UUID.randomUUID().toString() + runWithSpan("parent") { + msgId = producer.send(msg) + } + + latch.await(1, TimeUnit.MINUTES) + // Wait until all the spans finished. + Thread.sleep(TimeUnit.SECONDS.toMillis(20)) + + def traces = waitForTraces(1) + def spans = traces[0] + Assert.assertEquals(spans.size(), 4) + def parent = spans.find { + it0 -> + it0.name.equalsIgnoreCase("parent") + } + def send = spans.find { + it0 -> + it0.name.equalsIgnoreCase("PRODUCER/SEND") + } + def receive = spans.find { + it0 -> + it0.name.equalsIgnoreCase("CONSUMER/RECEIVE") + } + + def process = spans.find { + it0 -> + it0.name.equalsIgnoreCase("CONSUMER/PROCESS") + } + + SpanAssert.assertSpan(parent) { + name("parent") + kind(INTERNAL) + hasNoParent() + } + + SpanAssert.assertSpan(send) { + name("PRODUCER/SEND") + kind(PRODUCER) + childOf parent + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" topic + "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() + "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + "$SemanticAttributes.MESSAGE_TYPE" "NORMAL" + } + } + + SpanAssert.assertSpan(receive) { + name("CONSUMER/RECEIVE") + kind(CONSUMER) + childOf(send) + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" topic + "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() + "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + "$SemanticAttributes.MESSAGING_OPERATION" "receive" + } + } + + SpanAssert.assertSpan(process) { + name("CONSUMER/PROCESS") + kind(INTERNAL) + childOf(receive) + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" topic + "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() + "$SemanticAttributes.MESSAGING_OPERATION" "process" + "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + } + } + } + + + def "test send partitioned topic"() { + setup: + def topic = "persistent://public/default/testPartitionedTopic_" + UUID.randomUUID() + admin.topics().createPartitionedTopic(topic, 2) + producer = + client.newProducer(Schema.STRING).topic(topic) + .enableBatching(false).create() + + String msg = UUID.randomUUID().toString() + + def msgId + runWithSpan("parent") { + msgId = producer.send(msg) + } + + def traces = waitForTraces(1) + def spans = traces[0] + Assert.assertEquals(spans.size(), 2) + + def parent = spans.find { + it0 -> + it0.name.equalsIgnoreCase("parent") + } + def send = spans.find { + it0 -> + it0.name.equalsIgnoreCase("PRODUCER/SEND") + } + + SpanAssert.assertSpan(parent) { + name("parent") + kind(INTERNAL) + hasNoParent() + } + + SpanAssert.assertSpan(send) { + name("PRODUCER/SEND") + kind(PRODUCER) + childOf parent + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" { + t -> + return t.toString().contains(topic) + } + "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() + "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + "$SemanticAttributes.MESSAGE_TYPE" "NORMAL" + } + } + } + + def "test consume partitioned topic"() { + setup: + def topic = "persistent://public/default/testPartitionedTopic_" + UUID.randomUUID() + admin.topics().createPartitionedTopic(topic, 2) + + def latch = new CountDownLatch(1) + consumer = client.newConsumer(Schema.STRING) + .subscriptionName("test_sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .topic(topic) + .messageListener(new MessageListener() { + @Override + void received(Consumer consumer, Message msg) { + consumer.acknowledge(msg) + latch.countDown() + } + }) + .subscribe() + + producer = client.newProducer(Schema.STRING) + .topic(topic) + .enableBatching(false) + .create() + + def msgId + def msg = UUID.randomUUID().toString() + runWithSpan("parent") { + msgId = producer.send(msg) + } + + latch.await(1, TimeUnit.MINUTES) + // Wait until all the spans finished. + Thread.sleep(TimeUnit.SECONDS.toMillis(20)) + + def traces = waitForTraces(1) + Assert.assertEquals(traces.size(), 1) + def spans = traces[0] + Assert.assertEquals(spans.size(), 4) + + def parent = spans.find { + it0 -> + it0.name.equalsIgnoreCase("parent") + } + def send = spans.find { + it0 -> + it0.name.equalsIgnoreCase("PRODUCER/SEND") + } + def receive = spans.find { + it0 -> + it0.name.equalsIgnoreCase("CONSUMER/RECEIVE") + } + + def process = spans.find { + it0 -> + it0.name.equalsIgnoreCase("CONSUMER/PROCESS") + } + + SpanAssert.assertSpan(parent) { + name("parent") + kind(INTERNAL) + hasNoParent() + } + + SpanAssert.assertSpan(send) { + name("PRODUCER/SEND") + kind(PRODUCER) + childOf parent + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" { + v -> + return v.toString().contains(topic) + } + "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() + "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + "$SemanticAttributes.MESSAGE_TYPE" "NORMAL" + } + } + + SpanAssert.assertSpan(receive) { + name("CONSUMER/RECEIVE") + kind(CONSUMER) + childOf(send) + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" { + v -> + return v.toString().contains(topic) + } + "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() + "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + "$SemanticAttributes.MESSAGING_OPERATION" "receive" + } + } + + SpanAssert.assertSpan(process) { + name("CONSUMER/PROCESS") + kind(INTERNAL) + childOf(receive) + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" { + v -> + return v.toString().contains(topic) + } + "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() + "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + "$SemanticAttributes.MESSAGING_OPERATION" "process" + } + } + } + + + def "test consume multi-topics"() { + setup: + + def topic = "persistent://public/default/testNonPartitionedTopic_" + UUID.randomUUID() + def topic1 = "persistent://public/default/testNonPartitionedTopic_" + UUID.randomUUID() + + def latch = new CountDownLatch(2) + producer = client.newProducer(Schema.STRING) + .topic(topic) + .enableBatching(false) + .create() + producer1 = client.newProducer(Schema.STRING) + .topic(topic1) + .enableBatching(false) + .create() + + runWithSpan("parent") { + producer.send(UUID.randomUUID().toString()) + producer1.send(UUID.randomUUID().toString()) + } + + consumer = client.newConsumer(Schema.STRING) + .topic(topic1, topic) + .subscriptionName("test_sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .messageListener(new MessageListener() { + @Override + void received(Consumer consumer, Message msg) { + consumer.acknowledge(msg) + latch.countDown() + } + }) + .subscribe() + + latch.await(1, TimeUnit.MINUTES) + // Wait until all the spans finished. + Thread.sleep(TimeUnit.SECONDS.toMillis(20)) + + def traces = waitForTraces(1) + Assert.assertEquals(traces.size(), 1) + def spans = traces[0] + Assert.assertEquals(spans.size(), 7) + + def parent = spans.find { + it0 -> + it0.name.equalsIgnoreCase("parent") + } + + SpanAssert.assertSpan(parent) { + hasNoParent() + kind(INTERNAL) + } + + + def sendSpans = spans.findAll { + it0 -> + it0.name.equalsIgnoreCase("PRODUCER/SEND") + } + + sendSpans.forEach { + it0 -> + SpanAssert.assertSpan(it0) { + kind(PRODUCER) + childOf(parent) + } + } + + def receiveSpans = spans.findAll { + it0 -> + it0.name.equalsIgnoreCase("CONSUMER/RECEIVE") + } + + def processSpans = spans.findAll { + it0 -> + it0.name.equalsIgnoreCase("CONSUMER/PROCESS") + } + + receiveSpans.forEach { + it0 -> + def parentSpanId = it0.getParentSpanId() + def parent0 = sendSpans.find { + v -> + (v.spanId == parentSpanId) + } + + SpanAssert.assertSpan(it0) { + kind(CONSUMER) + childOf(parent0) + } + } + + processSpans.forEach { + it0 -> + def parentSpanId = it0.getParentSpanId() + def parent0 = receiveSpans.find { + v -> + (v.spanId == parentSpanId) + } + + SpanAssert.assertSpan(it0) { + kind(INTERNAL) + childOf(parent0) + } + } + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 760a0d8ffc0e..552ab328e6b5 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -146,6 +146,7 @@ hideFromDependabot(":instrumentation:apache-httpclient:apache-httpclient-4.0:jav hideFromDependabot(":instrumentation:apache-httpclient:apache-httpclient-4.3:library") hideFromDependabot(":instrumentation:apache-httpclient:apache-httpclient-4.3:testing") hideFromDependabot(":instrumentation:apache-httpclient:apache-httpclient-5.0:javaagent") +hideFromDependabot(":instrumentation:apache-pulsar:apache-pulsar-2.8:javaagent") hideFromDependabot(":instrumentation:armeria-1.3:javaagent") hideFromDependabot(":instrumentation:armeria-1.3:library") hideFromDependabot(":instrumentation:armeria-1.3:testing")