Skip to content

Commit

Permalink
collect pubsub attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
nozik committed Oct 2, 2022
1 parent 10b4ce6 commit fbb2b8f
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;

import com.google.cloud.pubsub.v1.Publisher;
import com.google.pubsub.v1.PubsubMessage;
import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
Expand All @@ -35,9 +36,10 @@ public void transform(TypeTransformer typeTransformer) {
public static class PubsubPublisherAddAttributesAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnterHandle(
@Advice.This Publisher publisher,
@Advice.Argument(value = 0, readOnly = false) PubsubMessage pubsubMessage) {
Context parentContext = Java8BytecodeBridge.currentContext();
startAndInjectSpan(parentContext, pubsubMessage);
startAndInjectSpan(parentContext, pubsubMessage, publisher);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.javaagent.instrumentation.pubsub;

import com.google.cloud.pubsub.v1.Publisher;
import com.google.pubsub.v1.PubsubMessage;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
Expand All @@ -13,6 +14,7 @@
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -44,13 +46,7 @@ public static Instrumenter<PubsubMessage, Void> publisherInstrumenter() {
}

private static Instrumenter<PubsubMessage, Void> createPublisherInstrumenter() {
SpanNameExtractor publisherSpanNameExtractor =
new SpanNameExtractor() {
@Override
public String extract(Object o) {
return publisherSpanName;
}
};
SpanNameExtractor publisherSpanNameExtractor = o -> publisherSpanName;

return Instrumenter.<PubsubMessage, Void>builder(
GlobalOpenTelemetry.get(), instrumentationName, publisherSpanNameExtractor)
Expand All @@ -59,20 +55,15 @@ public String extract(Object o) {

public static Instrumenter<PubsubMessage, Void> createSubscriberInstrumenter() {

SpanNameExtractor subscriberSpanNameExtractor =
new SpanNameExtractor() {
@Override
public String extract(Object o) {
return subscriberSpanName;
}
};
SpanNameExtractor subscriberSpanNameExtractor = o -> subscriberSpanName;

return Instrumenter.<PubsubMessage, Void>builder(
GlobalOpenTelemetry.get(), instrumentationName, subscriberSpanNameExtractor)
.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}

public static void startAndInjectSpan(Context parentContext, PubsubMessage pubsubMessage) {
public static void startAndInjectSpan(
Context parentContext, PubsubMessage pubsubMessage, Publisher publisher) {
if (!publisherInstrumenter().shouldStart(parentContext, pubsubMessage)) {
return;
}
Expand All @@ -83,6 +74,10 @@ public static void startAndInjectSpan(Context parentContext, PubsubMessage pubsu
Context context = publisherInstrumenter().start(parentContext, pubsubMessage);
Span span = Java8BytecodeBridge.spanFromContext(context);
span.setAttribute(MESSAGE_PAYLOAD_ATTRIBUTE, new String(pubsubMessage.getData().toByteArray()));
span.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "pubsub");
span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic");
span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, publisher.getTopicNameString());

GlobalOpenTelemetry.get()
.getPropagators()
.getTextMapPropagator()
Expand Down

0 comments on commit fbb2b8f

Please sign in to comment.