Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhance][pulsar] add apache-pulsar client support #5926

Merged
merged 51 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
693b89b
add apache-pulsar support
dao-jun Apr 23, 2022
7086bd8
remove repositories
dao-jun Apr 23, 2022
29cc849
checkstyle
dao-jun Apr 24, 2022
1ed1686
checkstyle
dao-jun Apr 24, 2022
e347fa8
bug fix
dao-jun Apr 25, 2022
883c401
bug fix
dao-jun Apr 25, 2022
47e9f1c
bug fix
dao-jun Apr 25, 2022
9cc4f45
fix pulsar producer send back
dao-jun Apr 25, 2022
09052da
review fix
dao-jun May 12, 2022
2e127e6
checkstyle fix
dao-jun May 14, 2022
975671c
checkstyle fix
dao-jun May 16, 2022
f22dd03
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-java…
dao-jun May 16, 2022
0845f0e
merge master into current
dao-jun May 16, 2022
34c425a
codestyle fix
dao-jun May 16, 2022
1e6aa3d
codestyle fix
dao-jun May 16, 2022
d23932b
complete tests
dao-jun Jun 12, 2022
57c7b74
Merge branch 'main' into dev/pulsar
dao-jun Nov 29, 2022
79ca3a2
Refactor with Instrument API
dao-jun Dec 2, 2022
6755ae6
Fix tests
dao-jun Dec 2, 2022
2f5ab6f
Fix checkstyle
dao-jun Dec 2, 2022
a7f3da7
Fix checkstyle
dao-jun Dec 3, 2022
1f6faed
Fix checkstyle
dao-jun Dec 3, 2022
a2fe9a3
Fix checkstyle
dao-jun Dec 3, 2022
b87b21a
Fix checkstyle
dao-jun Dec 3, 2022
92f5c72
Fix instrumentation
dao-jun Dec 5, 2022
63438b6
review fix
dao-jun Dec 10, 2022
40c1115
fix checkstyle
dao-jun Dec 10, 2022
7184126
fix tests
dao-jun Dec 10, 2022
333f636
fix tests
dao-jun Dec 10, 2022
db57a7b
fix tests
dao-jun Dec 10, 2022
3de32af
fix tests
dao-jun Dec 10, 2022
01b616a
fix tests
dao-jun Dec 11, 2022
5207cc9
fix tests
dao-jun Dec 11, 2022
1f307c6
fix checkstyle
dao-jun Dec 12, 2022
fbeb11e
fix instrumentation & test
dao-jun Dec 13, 2022
ab5f0c5
fix instrumentation
dao-jun Dec 13, 2022
e0f214d
fix test `test send non-partitioned topic`
dao-jun Dec 15, 2022
8328ca1
fix test `test send non-partitioned topic`
dao-jun Dec 15, 2022
5a6917b
fix test `test send non-partitioned topic`
dao-jun Dec 15, 2022
c8b7551
fix all tests
dao-jun Dec 15, 2022
7fe7329
fix all tests
dao-jun Dec 15, 2022
8dd1207
fix tests
dao-jun Dec 19, 2022
b704cd8
fix tests
dao-jun Dec 20, 2022
90268c2
fix tests
dao-jun Dec 20, 2022
2e02517
fix tests
dao-jun Dec 20, 2022
5c05c29
fix tests codenarc
dao-jun Dec 20, 2022
cdc53bd
review fix
dao-jun Feb 27, 2023
399d63f
fix code format
dao-jun Feb 27, 2023
cbd934f
Merge branch 'main' into dev/pulsar
dao-jun Feb 27, 2023
2d44361
fix deprecated semantic attributes
dao-jun Feb 27, 2023
9d0af62
review fix
dao-jun Feb 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
plugins {
id("otel.javaagent-instrumentation")
}

muzzle {
pass {
group.set("org.apache.pulsar")
module.set("pulsar-client")
versions.set("[2.8.0,)")
assertInverse.set(true)
}
}

dependencies {
library("org.apache.pulsar:pulsar-client:2.8.0")

testImplementation("javax.annotation:javax.annotation-api:1.3.2")
testImplementation("org.testcontainers:pulsar:1.17.1")
testImplementation("org.apache.pulsar:pulsar-client-admin:2.8.0")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pulsar.v28;

import static io.opentelemetry.javaagent.instrumentation.pulsar.v28.telemetry.PulsarSingletons.startAndEndConsumerReceive;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isProtected;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.concurrent.CompletableFuture;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;

public class ConsumerImplInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return namedOneOf(
"org.apache.pulsar.client.impl.ConsumerImpl",
"org.apache.pulsar.client.impl.MultiTopicsConsumerImpl");
}

@Override
public void transform(TypeTransformer transformer) {
String klassName = ConsumerImplInstrumentation.class.getName();

transformer.applyAdviceToMethod(isConstructor(), klassName + "$ConsumerConstructorAdviser");

// internalReceive will apply to Consumer#receive(long,TimeUnit)
// and called before MessageListener#receive.
transformer.applyAdviceToMethod(
isMethod()
.and(isProtected())
.and(named("internalReceive"))
.and(takesArguments(2))
.and(takesArgument(1, named("java.util.concurrent.TimeUnit"))),
klassName + "$ConsumerInternalReceiveAdviser");
// receive/batchReceive will apply to Consumer#receive()/Consumer#batchReceive()
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
.and(namedOneOf("receive", "batchReceive"))
.and(takesArguments(0)),
klassName + "$ConsumerSyncReceiveAdviser");
// receiveAsync/batchReceiveAsync will apply to
// Consumer#receiveAsync()/Consumer#batchReceiveAsync()
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
.and(namedOneOf("receiveAsync", "batchReceiveAsync"))
.and(takesArguments(0)),
klassName + "$ConsumerAsyncReceiveAdviser");
}

@SuppressWarnings("unused")
public static class ConsumerConstructorAdviser {
private ConsumerConstructorAdviser() {}

@Advice.OnMethodExit(suppress = Throwable.class)
public static void after(
@Advice.This Consumer<?> consumer, @Advice.Argument(value = 0) PulsarClient client) {

PulsarClientImpl pulsarClient = (PulsarClientImpl) client;
String url = pulsarClient.getLookup().getServiceUrl();
VirtualFieldStore.inject(consumer, url);
}
}

@SuppressWarnings("unused")
public static class ConsumerInternalReceiveAdviser {
private ConsumerInternalReceiveAdviser() {}

@Advice.OnMethodEnter
public static void before(@Advice.Local(value = "startTime") long startTime) {
startTime = System.currentTimeMillis();
}

@Advice.OnMethodExit(onThrowable = Throwable.class)
public static void after(
@Advice.This Consumer<?> consumer,
@Advice.Return Message<?> message,
@Advice.Thrown Throwable t,
@Advice.Local(value = "startTime") long startTime) {
if (t != null) {
return;
}

Context parent = Context.current();
Context current = startAndEndConsumerReceive(parent, message, startTime, consumer);
if (current != null) {
// ConsumerBase#internalReceive(long,TimeUnit) will be called before
// ConsumerListener#receive(Consumer,Message), so, need to inject Context into Message.
VirtualFieldStore.inject(message, current);
}
}
}

@SuppressWarnings("unused")
public static class ConsumerSyncReceiveAdviser {
private ConsumerSyncReceiveAdviser() {}

@Advice.OnMethodEnter
public static void before(@Advice.Local(value = "startTime") long startTime) {
startTime = System.currentTimeMillis();
}

@Advice.OnMethodExit(onThrowable = Throwable.class)
public static void after(
@Advice.This Consumer<?> consumer,
@Advice.Return Message<?> message,
@Advice.Thrown Throwable t,
@Advice.Local(value = "startTime") long startTime) {
if (t != null) {
return;
}

Context parent = Context.current();
startAndEndConsumerReceive(parent, message, startTime, consumer);
// No need to inject context to message.
}
}

@SuppressWarnings("unused")
public static class ConsumerAsyncReceiveAdviser {
private ConsumerAsyncReceiveAdviser() {}

@Advice.OnMethodEnter
public static void before(@Advice.Local(value = "startTime") long startTime) {
startTime = System.currentTimeMillis();
}

@Advice.OnMethodExit(onThrowable = Throwable.class)
public static void after(
@Advice.This Consumer<?> consumer,
@Advice.Return CompletableFuture<Message<?>> future,
@Advice.Local(value = "startTime") long startTime) {
future.whenComplete(
(message, t) -> {
if (t != null) {
return;
}

Context parent = Context.current();
startAndEndConsumerReceive(parent, message, startTime, consumer);
// No need to inject context to message.
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pulsar.v28;

import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.pulsar.client.api.Message;

public class MessageInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.pulsar.client.impl.MessageImpl");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod().and(isPublic()).and(named("recycle")).and(takesArguments(0)),
MessageInstrumentation.class.getName() + "$MessageRecycleAdvice");
}

@SuppressWarnings("unused")
public static class MessageRecycleAdvice {
private MessageRecycleAdvice() {}

@Advice.OnMethodExit
public static void after(@Advice.This Message<?> message) {
// Clean context to prevent memory leak.
VirtualFieldStore.inject(message, null);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pulsar.v28;

import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.pulsar.v28.telemetry.PulsarSingletons;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.implementation.bytecode.assign.Assigner;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;

public class MessageListenerInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
// return hasSuperType(named("org.apache.pulsar.client.api.MessageListener"));
// can't enhance MessageListener here like above due to jvm can't enhance lambda.
return named("org.apache.pulsar.client.impl.conf.ConsumerConfigurationData");
Comment on lines +32 to +34
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you try MessageListener and it didn't work? there is some magic that should enhance lambdas: #4182

hasSuperType is a more expensive matcher compared to named however, so the current implementation could still be better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, too. The current implementation works fine, so it needn't to change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many thanks for your review!

}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod().and(isPublic()).and(named("getMessageListener")),
MessageListenerInstrumentation.class.getName() + "$ConsumerConfigurationDataMethodAdviser");
}

@SuppressWarnings("unused")
public static class ConsumerConfigurationDataMethodAdviser {
private ConsumerConfigurationDataMethodAdviser() {}

@Advice.OnMethodExit
public static void after(
@Advice.This ConsumerConfigurationData<?> data,
@Advice.Return(readOnly = false, typing = Assigner.Typing.DYNAMIC)
MessageListener<?> listener) {
if (listener == null) {
return;
}

listener = new MessageListenerWrapper<>(listener);
}
}

public static class MessageListenerWrapper<T> implements MessageListener<T> {
private static final long serialVersionUID = 1L;

private final MessageListener<T> delegator;

public MessageListenerWrapper(MessageListener<T> messageListener) {
this.delegator = messageListener;
}

@Override
public void received(Consumer<T> consumer, Message<T> msg) {
Context parent = VirtualFieldStore.extract(msg);

Instrumenter<Message<?>, Attributes> instrumenter =
PulsarSingletons.consumerListenerInstrumenter();
if (!instrumenter.shouldStart(parent, msg)) {
this.delegator.received(consumer, msg);
return;
}

Context current = instrumenter.start(parent, msg);
try (Scope scope = current.makeCurrent()) {
this.delegator.received(consumer, msg);
instrumenter.end(current, msg, null, null);
} catch (Throwable t) {
instrumenter.end(current, msg, null, t);
throw t;
}
}

@Override
public void reachedEndOfTopic(Consumer<T> consumer) {
this.delegator.reachedEndOfTopic(consumer);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pulsar.v28;

public class ProducerData {
public final String url;
public final String topic;

private ProducerData(String url, String topic) {
this.url = url;
this.topic = topic;
}

public static ProducerData create(String url, String topic) {
return new ProducerData(url, topic);
}
}
Loading