Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhance][pulsar] add apache-pulsar client support #5926

Merged
merged 51 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
693b89b
add apache-pulsar support
dao-jun Apr 23, 2022
7086bd8
remove repositories
dao-jun Apr 23, 2022
29cc849
checkstyle
dao-jun Apr 24, 2022
1ed1686
checkstyle
dao-jun Apr 24, 2022
e347fa8
bug fix
dao-jun Apr 25, 2022
883c401
bug fix
dao-jun Apr 25, 2022
47e9f1c
bug fix
dao-jun Apr 25, 2022
9cc4f45
fix pulsar producer send back
dao-jun Apr 25, 2022
09052da
review fix
dao-jun May 12, 2022
2e127e6
checkstyle fix
dao-jun May 14, 2022
975671c
checkstyle fix
dao-jun May 16, 2022
f22dd03
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-java…
dao-jun May 16, 2022
0845f0e
merge master into current
dao-jun May 16, 2022
34c425a
codestyle fix
dao-jun May 16, 2022
1e6aa3d
codestyle fix
dao-jun May 16, 2022
d23932b
complete tests
dao-jun Jun 12, 2022
57c7b74
Merge branch 'main' into dev/pulsar
dao-jun Nov 29, 2022
79ca3a2
Refactor with Instrument API
dao-jun Dec 2, 2022
6755ae6
Fix tests
dao-jun Dec 2, 2022
2f5ab6f
Fix checkstyle
dao-jun Dec 2, 2022
a7f3da7
Fix checkstyle
dao-jun Dec 3, 2022
1f6faed
Fix checkstyle
dao-jun Dec 3, 2022
a2fe9a3
Fix checkstyle
dao-jun Dec 3, 2022
b87b21a
Fix checkstyle
dao-jun Dec 3, 2022
92f5c72
Fix instrumentation
dao-jun Dec 5, 2022
63438b6
review fix
dao-jun Dec 10, 2022
40c1115
fix checkstyle
dao-jun Dec 10, 2022
7184126
fix tests
dao-jun Dec 10, 2022
333f636
fix tests
dao-jun Dec 10, 2022
db57a7b
fix tests
dao-jun Dec 10, 2022
3de32af
fix tests
dao-jun Dec 10, 2022
01b616a
fix tests
dao-jun Dec 11, 2022
5207cc9
fix tests
dao-jun Dec 11, 2022
1f307c6
fix checkstyle
dao-jun Dec 12, 2022
fbeb11e
fix instrumentation & test
dao-jun Dec 13, 2022
ab5f0c5
fix instrumentation
dao-jun Dec 13, 2022
e0f214d
fix test `test send non-partitioned topic`
dao-jun Dec 15, 2022
8328ca1
fix test `test send non-partitioned topic`
dao-jun Dec 15, 2022
5a6917b
fix test `test send non-partitioned topic`
dao-jun Dec 15, 2022
c8b7551
fix all tests
dao-jun Dec 15, 2022
7fe7329
fix all tests
dao-jun Dec 15, 2022
8dd1207
fix tests
dao-jun Dec 19, 2022
b704cd8
fix tests
dao-jun Dec 20, 2022
90268c2
fix tests
dao-jun Dec 20, 2022
2e02517
fix tests
dao-jun Dec 20, 2022
5c05c29
fix tests codenarc
dao-jun Dec 20, 2022
cdc53bd
review fix
dao-jun Feb 27, 2023
399d63f
fix code format
dao-jun Feb 27, 2023
cbd934f
Merge branch 'main' into dev/pulsar
dao-jun Feb 27, 2023
2d44361
fix deprecated semantic attributes
dao-jun Feb 27, 2023
9d0af62
review fix
dao-jun Feb 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions instrumentation/apache-pulsar/javaagent/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
plugins {
id("otel.javaagent-instrumentation")
}

muzzle {
pass {
group.set("org.apache.pulsar")
module.set("pulsar-client")
versions.set("[2.8.0,)")
assertInverse.set(true)
}
}

dependencies {
library("org.apache.pulsar:pulsar-client:2.8.0")

testImplementation("javax.annotation:javax.annotation-api:1.3.2")
testImplementation("org.testcontainers:pulsar:1.17.1")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pulsar;

import static io.opentelemetry.javaagent.instrumentation.pulsar.PulsarTelemetry.CONSUMER_NAME;
import static io.opentelemetry.javaagent.instrumentation.pulsar.PulsarTelemetry.PROPAGATOR;
import static io.opentelemetry.javaagent.instrumentation.pulsar.PulsarTelemetry.SERVICE_URL;
import static io.opentelemetry.javaagent.instrumentation.pulsar.PulsarTelemetry.SUBSCRIPTION;
import static io.opentelemetry.javaagent.instrumentation.pulsar.PulsarTelemetry.TOPIC;
import static io.opentelemetry.javaagent.instrumentation.pulsar.PulsarTelemetry.TRACER;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isProtected;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.pulsar.info.ClientEnhanceInfo;
import io.opentelemetry.javaagent.instrumentation.pulsar.info.MessageEnhanceInfo;
import io.opentelemetry.javaagent.instrumentation.pulsar.textmap.MessageTextMapGetter;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;

public class ConsumerImplInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.pulsar.client.impl.ConsumerImpl");
}

@Override
public void transform(TypeTransformer transformer) {
String klassName = ConsumerImplInstrumentation.class.getName();

transformer.applyAdviceToMethod(isConstructor(), klassName + "$ConsumerImplConstructorAdviser");

transformer.applyAdviceToMethod(
isMethod()
.and(isProtected())
.and(named("messageProcessed"))
.and(takesArgument(0, named("org.apache.pulsar.client.api.Message"))),
klassName + "$ConsumerImplMethodAdviser");
}

@SuppressWarnings("unused")
public static class ConsumerImplConstructorAdviser {

@Advice.OnMethodExit
tjiuming marked this conversation as resolved.
Show resolved Hide resolved
public static void before(
@Advice.This ConsumerImpl<?> consumer,
@Advice.Argument(value = 0) PulsarClient client,
@Advice.Argument(value = 1) String topic) {

PulsarClientImpl pulsarClient = (PulsarClientImpl) client;
String url = pulsarClient.getLookup().getServiceUrl();

ClientEnhanceInfo info = new ClientEnhanceInfo(topic, url);
ClientEnhanceInfo.virtualField(consumer, info);
}
}

@SuppressWarnings("unused")
public static class ConsumerImplMethodAdviser {

@Advice.OnMethodEnter
public static Scope before(
@Advice.This ConsumerImpl<?> consumer, @Advice.Argument(value = 0) Message<?> message) {
ClientEnhanceInfo info = ClientEnhanceInfo.virtualField(consumer);
if (null == info) {
return Scope.noop();
}

MessageImpl<?> messageImpl = (MessageImpl<?>) message;
Context context =
PROPAGATOR.extract(Context.current(), messageImpl, MessageTextMapGetter.INSTANCE);

return TRACER
.spanBuilder("Pulsar://ConsumerImpl/messageProcessed")
tjiuming marked this conversation as resolved.
Show resolved Hide resolved
.setParent(context)
.setSpanKind(SpanKind.CONSUMER)
.setAttribute(TOPIC, info.topic)
.setAttribute(SERVICE_URL, info.brokerUrl)
.setAttribute(SUBSCRIPTION, consumer.getSubscription())
.setAttribute(CONSUMER_NAME, consumer.getConsumerName())
.startSpan()
.makeCurrent();
}

@Advice.OnMethodExit(onThrowable = Throwable.class)
public static void after(
@Advice.This ConsumerImpl<?> consumer,
@Advice.Argument(value = 0) Message<?> message,
@Advice.Thrown Throwable t,
@Advice.Enter Scope scope) {
ClientEnhanceInfo info = ClientEnhanceInfo.virtualField(consumer);
if (null == info || scope == null) {
if (null != scope) {
scope.close();
tjiuming marked this conversation as resolved.
Show resolved Hide resolved
}
return;
}

MessageEnhanceInfo messageInfo = MessageEnhanceInfo.virtualField(message);
if (null != messageInfo) {
messageInfo.setFields(Context.current(), consumer.getTopic(), message.getMessageId());
}

Span span = Span.current();
if (t != null) {
span.recordException(t);
}

span.end();
scope.close();
tjiuming marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pulsar;

import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.named;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.pulsar.info.MessageEnhanceInfo;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;

public class MessageInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.pulsar.client.impl.MessageImpl")
.or(named("org.apache.pulsar.client.impl.TopicMessageImpl"));
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor(), MessageInstrumentation.class.getName() + "$MessageConstructorAdviser");
}

@SuppressWarnings("unused")
public static class MessageConstructorAdviser {

@Advice.OnMethodExit
public static void before(
@Advice.This Message<?> message, @Advice.AllArguments Object[] allArguments) {

if (message instanceof MessageImpl) {
MessageEnhanceInfo.virtualField(message, new MessageEnhanceInfo());
} else {
Object argument2 = allArguments[2];
if (message instanceof TopicMessageImpl && argument2 instanceof MessageImpl) {
MessageImpl<?> impl = (MessageImpl<?>) argument2;
MessageEnhanceInfo.virtualField(message, MessageEnhanceInfo.virtualField(impl));
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pulsar;

import static io.opentelemetry.javaagent.instrumentation.pulsar.PulsarTelemetry.CONSUMER_NAME;
import static io.opentelemetry.javaagent.instrumentation.pulsar.PulsarTelemetry.MESSAGE_ID;
import static io.opentelemetry.javaagent.instrumentation.pulsar.PulsarTelemetry.SUBSCRIPTION;
import static io.opentelemetry.javaagent.instrumentation.pulsar.PulsarTelemetry.TOPIC;
import static io.opentelemetry.javaagent.instrumentation.pulsar.PulsarTelemetry.TRACER;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.pulsar.info.MessageEnhanceInfo;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.implementation.bytecode.assign.Assigner;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;

public class MessageListenerInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
// return hasSuperType(named("org.apache.pulsar.client.api.MessageListener"));
// can't enhance MessageListener here like above due to jvm can't enhance lambda.
return named("org.apache.pulsar.client.impl.conf.ConsumerConfigurationData");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod().and(isPublic()).and(named("getMessageListener")),
MessageListenerInstrumentation.class.getName() + "$ConsumerConfigurationDataMethodAdviser");
}

@SuppressWarnings("unused")
public static class ConsumerConfigurationDataMethodAdviser {

@Advice.OnMethodExit
public static void after(
@Advice.This ConsumerConfigurationData<?> data,
@Advice.Return(readOnly = false, typing = Assigner.Typing.DYNAMIC)
MessageListener<?> listener) {
if (null == listener) {
return;
}

listener = new MessageListenerWrapper<>(listener);
}
}

public static class MessageListenerWrapper<T> implements MessageListener<T> {
private static final long serialVersionUID = 1L;

private final MessageListener<T> delegator;

public MessageListenerWrapper(MessageListener<T> messageListener) {
this.delegator = messageListener;
}

@Override
public void received(Consumer<T> consumer, Message<T> msg) {
MessageEnhanceInfo info = MessageEnhanceInfo.virtualField(msg);
Context parent = info == null ? Context.current() : info.getContext();
String topic = null == info ? consumer.getTopic() : info.getTopic();
String mid = null == info ? "unknown" : info.getMessageId();

Span span =
TRACER
.spanBuilder("Pulsar://MessageListener/received")
.setParent(parent)
.setSpanKind(SpanKind.CONSUMER)
.setAttribute(TOPIC, topic)
.setAttribute(MESSAGE_ID, mid)
.setAttribute(SUBSCRIPTION, consumer.getSubscription())
.setAttribute(CONSUMER_NAME, consumer.getConsumerName())
.startSpan();

try (Scope scope = span.makeCurrent()) {
this.delegator.received(consumer, msg);
} catch (Throwable t) {
span.recordException(t);
throw t;
} finally {
span.end();
}
}

@Override
public void reachedEndOfTopic(Consumer<T> consumer) {
this.delegator.reachedEndOfTopic(consumer);
}
}
}
Loading