Skip to content

Commit

Permalink
Pulsar batch receive instrumentation (#8173)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit authored Apr 4, 2023
1 parent 4d21d45 commit 34bca4b
Show file tree
Hide file tree
Showing 13 changed files with 440 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ dependencies {
tasks.withType<Test>().configureEach {
// TODO run tests both with and without experimental span attributes
jvmArgs("-Dotel.instrumentation.pulsar.experimental-span-attributes=true")
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarSingletons.startAndEndConsumerReceive;
import static io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarSingletons.wrap;
import static io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarSingletons.wrapBatch;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isProtected;
Expand All @@ -25,6 +26,7 @@
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;

Expand All @@ -41,7 +43,7 @@ public ElementMatcher<TypeDescription> typeMatcher() {
public void transform(TypeTransformer transformer) {
String className = ConsumerImplInstrumentation.class.getName();

transformer.applyAdviceToMethod(isConstructor(), className + "$ConsumerConstructorAdviser");
transformer.applyAdviceToMethod(isConstructor(), className + "$ConsumerConstructorAdvice");

// internalReceive will apply to Consumer#receive(long,TimeUnit)
// and called before MessageListener#receive.
Expand All @@ -51,22 +53,27 @@ public void transform(TypeTransformer transformer) {
.and(named("internalReceive"))
.and(takesArguments(2))
.and(takesArgument(1, named("java.util.concurrent.TimeUnit"))),
className + "$ConsumerInternalReceiveAdviser");
className + "$ConsumerInternalReceiveAdvice");
// internalReceive will apply to Consumer#receive()
transformer.applyAdviceToMethod(
isMethod().and(isProtected()).and(named("internalReceive")).and(takesArguments(0)),
className + "$ConsumerSyncReceiveAdviser");
className + "$ConsumerSyncReceiveAdvice");
// internalReceiveAsync will apply to Consumer#receiveAsync()
transformer.applyAdviceToMethod(
isMethod().and(isProtected()).and(named("internalReceiveAsync")).and(takesArguments(0)),
className + "$ConsumerAsyncReceiveAdviser");
// TODO batch receiving not implemented (Consumer#batchReceive() and
// Consumer#batchReceiveAsync())
className + "$ConsumerAsyncReceiveAdvice");
// internalBatchReceiveAsync will apply to Consumer#batchReceive() and
// Consumer#batchReceiveAsync()
transformer.applyAdviceToMethod(
isMethod()
.and(isProtected())
.and(named("internalBatchReceiveAsync"))
.and(takesArguments(0)),
className + "$ConsumerBatchAsyncReceiveAdvice");
}

@SuppressWarnings("unused")
public static class ConsumerConstructorAdviser {
private ConsumerConstructorAdviser() {}
public static class ConsumerConstructorAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void after(
Expand All @@ -79,8 +86,7 @@ public static void after(
}

@SuppressWarnings("unused")
public static class ConsumerInternalReceiveAdviser {
private ConsumerInternalReceiveAdviser() {}
public static class ConsumerInternalReceiveAdvice {

@Advice.OnMethodEnter
public static Timer before() {
Expand All @@ -104,8 +110,7 @@ public static void after(
}

@SuppressWarnings("unused")
public static class ConsumerSyncReceiveAdviser {
private ConsumerSyncReceiveAdviser() {}
public static class ConsumerSyncReceiveAdvice {

@Advice.OnMethodEnter
public static Timer before() {
Expand All @@ -125,8 +130,7 @@ public static void after(
}

@SuppressWarnings("unused")
public static class ConsumerAsyncReceiveAdviser {
private ConsumerAsyncReceiveAdviser() {}
public static class ConsumerAsyncReceiveAdvice {

@Advice.OnMethodEnter
public static Timer before() {
Expand All @@ -141,4 +145,21 @@ public static void after(
future = wrap(future, timer, consumer);
}
}

@SuppressWarnings("unused")
public static class ConsumerBatchAsyncReceiveAdvice {

@Advice.OnMethodEnter
public static Timer before() {
return Timer.start();
}

@Advice.OnMethodExit(onThrowable = Throwable.class)
public static void after(
@Advice.Enter Timer timer,
@Advice.This Consumer<?> consumer,
@Advice.Return(readOnly = false) CompletableFuture<Messages<?>> future) {
future = wrapBatch(future, timer, consumer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public void transform(TypeTransformer transformer) {

@SuppressWarnings("unused")
public static class MessageRecycleAdvice {
private MessageRecycleAdvice() {}

@Advice.OnMethodExit
public static void after(@Advice.This Message<?> message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,11 @@ public ElementMatcher<TypeDescription> typeMatcher() {
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod().and(isPublic()).and(named("getMessageListener")),
MessageListenerInstrumentation.class.getName() + "$ConsumerConfigurationDataMethodAdviser");
MessageListenerInstrumentation.class.getName() + "$ConsumerConfigurationDataMethodAdvice");
}

@SuppressWarnings("unused")
public static class ConsumerConfigurationDataMethodAdviser {
private ConsumerConfigurationDataMethodAdviser() {}
public static class ConsumerConfigurationDataMethodAdvice {

@Advice.OnMethodExit
public static void after(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,17 @@ public void transform(TypeTransformer transformer) {
.and(isPublic())
.and(
takesArgument(0, hasSuperType(named("org.apache.pulsar.client.api.PulsarClient")))),
ProducerImplInstrumentation.class.getName() + "$ProducerImplConstructorAdviser");
ProducerImplInstrumentation.class.getName() + "$ProducerImplConstructorAdvice");

transformer.applyAdviceToMethod(
isMethod()
.and(named("sendAsync"))
.and(takesArgument(1, named("org.apache.pulsar.client.impl.SendCallback"))),
ProducerImplInstrumentation.class.getName() + "$ProducerSendAsyncMethodAdviser");
ProducerImplInstrumentation.class.getName() + "$ProducerSendAsyncMethodAdvice");
}

@SuppressWarnings("unused")
public static class ProducerImplConstructorAdviser {
private ProducerImplConstructorAdviser() {}
public static class ProducerImplConstructorAdvice {

@Advice.OnMethodExit
public static void intercept(
Expand All @@ -68,8 +67,7 @@ public static void intercept(
}

@SuppressWarnings("unused")
public static class ProducerSendAsyncMethodAdviser {
private ProducerSendAsyncMethodAdviser() {}
public static class ProducerSendAsyncMethodAdvice {

@Advice.OnMethodEnter
public static void before(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry;

import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.UrlParser.UrlData;

public class BasePulsarRequest {

private final String destination;
private final UrlData urlData;

protected BasePulsarRequest(String destination, UrlData urlData) {
this.destination = destination;
this.urlData = urlData;
}

public String getDestination() {
return destination;
}

public UrlData getUrlData() {
return urlData;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry;

import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;

enum PulsarBatchMessagingAttributesGetter
implements MessagingAttributesGetter<PulsarBatchRequest, Void> {
INSTANCE;

@Override
public String getSystem(PulsarBatchRequest request) {
return "pulsar";
}

@Override
public String getDestinationKind(PulsarBatchRequest request) {
return SemanticAttributes.MessagingDestinationKindValues.TOPIC;
}

@Nullable
@Override
public String getDestination(PulsarBatchRequest request) {
return request.getDestination();
}

@Override
public boolean isTemporaryDestination(PulsarBatchRequest request) {
return false;
}

@Nullable
@Override
public String getConversationId(PulsarBatchRequest message) {
return null;
}

@Nullable
@Override
public Long getMessagePayloadSize(PulsarBatchRequest request) {
return StreamSupport.stream(request.getMessages().spliterator(), false)
.map(message -> (long) message.size())
.reduce(Long::sum)
.orElse(null);
}

@Nullable
@Override
public Long getMessagePayloadCompressedSize(PulsarBatchRequest request) {
return null;
}

@Nullable
@Override
public String getMessageId(PulsarBatchRequest request, @Nullable Void response) {
return null;
}

@Override
public List<String> getMessageHeader(PulsarBatchRequest request, String name) {
return StreamSupport.stream(request.getMessages().spliterator(), false)
.map(message -> message.getProperty(name))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry;

import static io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.UrlParser.parseUrl;

import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.UrlParser.UrlData;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;

public final class PulsarBatchRequest extends BasePulsarRequest {
private final Messages<?> messages;

private PulsarBatchRequest(Messages<?> messages, String destination, UrlData urlData) {
super(destination, urlData);
this.messages = messages;
}

public static PulsarBatchRequest create(Messages<?> messages, String url) {
return new PulsarBatchRequest(messages, getTopicName(messages), parseUrl(url));
}

private static String getTopicName(Messages<?> messages) {
String topicName = null;
for (Message<?> message : messages) {
String name = message.getTopicName();
if (topicName == null) {
topicName = name;
} else if (!topicName.equals(name)) {
return null;
}
}
return topicName;
}

public Messages<?> getMessages() {
return messages;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor;
import org.apache.pulsar.client.api.Message;

final class PulsarBatchRequestSpanLinksExtractor implements SpanLinksExtractor<PulsarBatchRequest> {
private final SpanLinksExtractor<PulsarRequest> singleRecordLinkExtractor;

PulsarBatchRequestSpanLinksExtractor(TextMapPropagator propagator) {
this.singleRecordLinkExtractor =
new PropagatorBasedSpanLinksExtractor<>(propagator, MessageTextMapGetter.INSTANCE);
}

@Override
public void extract(
SpanLinksBuilder spanLinks, Context parentContext, PulsarBatchRequest request) {

for (Message<?> message : request.getMessages()) {
singleRecordLinkExtractor.extract(
spanLinks, Context.root(), PulsarRequest.create(message, request.getUrlData()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@
import javax.annotation.Nullable;

public final class PulsarNetClientAttributesGetter
implements NetClientAttributesGetter<PulsarRequest, Void> {
implements NetClientAttributesGetter<BasePulsarRequest, Void> {
@Nullable
@Override
public String getTransport(PulsarRequest request, @Nullable Void unused) {
public String getTransport(BasePulsarRequest request, @Nullable Void unused) {
return null;
}

@Nullable
@Override
public String getPeerName(PulsarRequest request) {
public String getPeerName(BasePulsarRequest request) {
return request.getUrlData() != null ? request.getUrlData().getHost() : null;
}

@Nullable
@Override
public Integer getPeerPort(PulsarRequest request) {
public Integer getPeerPort(BasePulsarRequest request) {
return request.getUrlData() != null ? request.getUrlData().getPort() : null;
}
}
Loading

0 comments on commit 34bca4b

Please sign in to comment.