Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument spring-kafka batch message listeners #3922

Merged
merged 4 commits into from
Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ public static <REQUEST, RESPONSE> InstrumenterBuilder<REQUEST, RESPONSE> newBuil
private final SpanNameExtractor<? super REQUEST> spanNameExtractor;
private final SpanKindExtractor<? super REQUEST> spanKindExtractor;
private final SpanStatusExtractor<? super REQUEST, ? super RESPONSE> spanStatusExtractor;
private final List<? extends SpanLinksExtractor<? super REQUEST>> spanLinksExtractors;
private final List<? extends AttributesExtractor<? super REQUEST, ? super RESPONSE>>
attributesExtractors;
private final List<? extends SpanLinkExtractor<? super REQUEST>> spanLinkExtractors;
private final List<? extends RequestListener> requestListeners;
private final ErrorCauseExtractor errorCauseExtractor;
@Nullable private final StartTimeExtractor<REQUEST> startTimeExtractor;
Expand All @@ -83,8 +83,8 @@ public static <REQUEST, RESPONSE> InstrumenterBuilder<REQUEST, RESPONSE> newBuil
this.spanNameExtractor = builder.spanNameExtractor;
this.spanKindExtractor = builder.spanKindExtractor;
this.spanStatusExtractor = builder.spanStatusExtractor;
this.spanLinksExtractors = new ArrayList<>(builder.spanLinksExtractors);
this.attributesExtractors = new ArrayList<>(builder.attributesExtractors);
this.spanLinkExtractors = new ArrayList<>(builder.spanLinkExtractors);
this.requestListeners = new ArrayList<>(builder.requestListeners);
this.errorCauseExtractor = builder.errorCauseExtractor;
this.startTimeExtractor = builder.startTimeExtractor;
Expand Down Expand Up @@ -131,8 +131,9 @@ public Context start(Context parentContext, REQUEST request) {
spanBuilder.setStartTimestamp(startTimeExtractor.extract(request));
}

for (SpanLinkExtractor<? super REQUEST> extractor : spanLinkExtractors) {
spanBuilder.addLink(extractor.extract(parentContext, request));
SpanLinksBuilder spanLinksBuilder = new SpanLinksBuilderImpl(spanBuilder);
for (SpanLinksExtractor<? super REQUEST> spanLinksExtractor : spanLinksExtractors) {
spanLinksExtractor.extract(spanLinksBuilder, parentContext, request);
}

UnsafeAttributes attributesBuilder = new UnsafeAttributes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public final class InstrumenterBuilder<REQUEST, RESPONSE> {
final String instrumentationName;
final SpanNameExtractor<? super REQUEST> spanNameExtractor;

final List<SpanLinksExtractor<? super REQUEST>> spanLinksExtractors = new ArrayList<>();
final List<AttributesExtractor<? super REQUEST, ? super RESPONSE>> attributesExtractors =
new ArrayList<>();
final List<SpanLinkExtractor<? super REQUEST>> spanLinkExtractors = new ArrayList<>();
final List<RequestListener> requestListeners = new ArrayList<>();

SpanKindExtractor<? super REQUEST> spanKindExtractor = SpanKindExtractor.alwaysInternal();
Expand Down Expand Up @@ -100,10 +100,10 @@ public InstrumenterBuilder<REQUEST, RESPONSE> addAttributesExtractors(
return addAttributesExtractors(Arrays.asList(attributesExtractors));
}

/** Adds a {@link SpanLinkExtractor} to extract span link from requests. */
public InstrumenterBuilder<REQUEST, RESPONSE> addSpanLinkExtractor(
SpanLinkExtractor<REQUEST> spanLinkExtractor) {
spanLinkExtractors.add(spanLinkExtractor);
/** Adds a {@link SpanLinksExtractor} to extract span links from requests. */
public InstrumenterBuilder<REQUEST, RESPONSE> addSpanLinksExtractor(
SpanLinksExtractor<REQUEST> spanLinksExtractor) {
spanLinksExtractors.add(spanLinksExtractor);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,22 @@
package io.opentelemetry.instrumentation.api.instrumenter;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.context.propagation.TextMapGetter;

final class PropagatorBasedSpanLinkExtractor<REQUEST> implements SpanLinkExtractor<REQUEST> {
final class PropagatorBasedSpanLinksExtractor<REQUEST> implements SpanLinksExtractor<REQUEST> {
private final ContextPropagators propagators;
private final TextMapGetter<REQUEST> getter;

PropagatorBasedSpanLinkExtractor(ContextPropagators propagators, TextMapGetter<REQUEST> getter) {
PropagatorBasedSpanLinksExtractor(ContextPropagators propagators, TextMapGetter<REQUEST> getter) {
this.propagators = propagators;
this.getter = getter;
}

@Override
public SpanContext extract(Context parentContext, REQUEST request) {
public void extract(SpanLinksBuilder spanLinks, Context parentContext, REQUEST request) {
Context extracted = propagators.getTextMapPropagator().extract(parentContext, request, getter);
return Span.fromContext(extracted).getSpanContext();
spanLinks.addLink(Span.fromContext(extracted).getSpanContext());
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.api.instrumenter;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanContext;

/** A builder that exposes methods for adding links to a span. */
public interface SpanLinksBuilder {

/**
* Adds a link to the newly created {@code Span}. Invalid {@link SpanContext}s will be skipped.
*
* @param spanContext the context of the linked {@code Span}.
* @return this.
* @see SpanBuilder#addLink(SpanContext)
*/
SpanLinksBuilder addLink(SpanContext spanContext);

/**
* Adds a link to the newly created {@code Span}. Invalid {@link SpanContext}s will be skipped.
*
* @param spanContext the context of the linked {@code Span}.
* @param attributes the attributes of the {@code Link}.
* @return this.
* @see SpanBuilder#addLink(SpanContext)
*/
SpanLinksBuilder addLink(SpanContext spanContext, Attributes attributes);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.api.instrumenter;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanContext;

final class SpanLinksBuilderImpl implements SpanLinksBuilder {
private final SpanBuilder spanBuilder;

SpanLinksBuilderImpl(SpanBuilder spanBuilder) {
this.spanBuilder = spanBuilder;
}

@Override
public SpanLinksBuilder addLink(SpanContext spanContext) {
spanBuilder.addLink(spanContext);
return this;
}

@Override
public SpanLinksBuilder addLink(SpanContext spanContext, Attributes attributes) {
spanBuilder.addLink(spanContext, attributes);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.api.instrumenter;

import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.context.propagation.TextMapGetter;

/** Extractor of span links for a request. */
@FunctionalInterface
public interface SpanLinksExtractor<REQUEST> {

/**
* Extracts {@link SpanContext}s that should be linked to the newly created span and adds them to
* {@code spanLinks}.
*/
void extract(SpanLinksBuilder spanLinks, Context parentContext, REQUEST request);

/**
* Returns a new {@link SpanLinksExtractor} that will extract a {@link SpanContext} from the
* request using configured {@code propagators}.
*/
static <REQUEST> SpanLinksExtractor<REQUEST> fromUpstreamRequest(
ContextPropagators propagators, TextMapGetter<REQUEST> getter) {
return new PropagatorBasedSpanLinksExtractor<>(propagators, getter);
}
}
Comment on lines +13 to +31
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc: @anuraaga new Instrumenter API addition

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ abstract class SpanSuppressionStrategy {
private static final SpanSuppressionStrategy SERVER_STRATEGY =
new SuppressIfSameSpanKeyStrategy(singleton(SpanKey.SERVER));
private static final SpanSuppressionStrategy CONSUMER_STRATEGY =
new StoreOnlyStrategy(singleton(SpanKey.CONSUMER));
new SuppressIfSameSpanKeyStrategy(singleton(SpanKey.CONSUMER));
private static final SpanSuppressionStrategy ALL_CLIENTS_STRATEGY =
new SuppressIfSameSpanKeyStrategy(singleton(SpanKey.ALL_CLIENTS));
private static final SpanSuppressionStrategy ALL_PRODUCERS_STRATEGY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,17 @@ protected void onEnd(
}
}

static class LinkExtractor implements SpanLinkExtractor<Map<String, String>> {
static class LinksExtractor implements SpanLinksExtractor<Map<String, String>> {

@Override
public SpanContext extract(Context parentContext, Map<String, String> request) {
return SpanContext.create(
request.get("linkTraceId"),
request.get("linkSpanId"),
TraceFlags.getSampled(),
TraceState.getDefault());
public void extract(
SpanLinksBuilder spanLinks, Context parentContext, Map<String, String> request) {
spanLinks.addLink(
SpanContext.create(
request.get("linkTraceId"),
request.get("linkSpanId"),
TraceFlags.getSampled(),
TraceState.getDefault()));
}
}

Expand Down Expand Up @@ -145,7 +148,7 @@ void server() {
Instrumenter.<Map<String, String>, Map<String, String>>newBuilder(
otelTesting.getOpenTelemetry(), "test", unused -> "span")
.addAttributesExtractors(new AttributesExtractor1(), new AttributesExtractor2())
.addSpanLinkExtractor(new LinkExtractor())
.addSpanLinksExtractor(new LinksExtractor())
.newServerInstrumenter(new MapGetter());

Context context = instrumenter.start(Context.root(), REQUEST);
Expand Down Expand Up @@ -258,7 +261,7 @@ void server_http() {
mockNetAttributes,
new AttributesExtractor1(),
new AttributesExtractor2())
.addSpanLinkExtractor(new LinkExtractor())
.addSpanLinksExtractor(new LinksExtractor())
.newServerInstrumenter(new MapGetter());

when(mockNetAttributes.peerIp(REQUEST, null)).thenReturn("2.2.2.2");
Expand Down Expand Up @@ -297,7 +300,7 @@ void server_http_xForwardedFor() {
mockNetAttributes,
new AttributesExtractor1(),
new AttributesExtractor2())
.addSpanLinkExtractor(new LinkExtractor())
.addSpanLinksExtractor(new LinksExtractor())
.newServerInstrumenter(new MapGetter());

Map<String, String> request = new HashMap<>(REQUEST);
Expand Down Expand Up @@ -340,7 +343,7 @@ void server_http_noForwarded() {
mockNetAttributes,
new AttributesExtractor1(),
new AttributesExtractor2())
.addSpanLinkExtractor(new LinkExtractor())
.addSpanLinksExtractor(new LinksExtractor())
.newServerInstrumenter(new MapGetter());

Map<String, String> request = new HashMap<>(REQUEST);
Expand Down Expand Up @@ -378,7 +381,7 @@ void client() {
Instrumenter.<Map<String, String>, Map<String, String>>newBuilder(
otelTesting.getOpenTelemetry(), "test", unused -> "span")
.addAttributesExtractors(new AttributesExtractor1(), new AttributesExtractor2())
.addSpanLinkExtractor(new LinkExtractor())
.addSpanLinksExtractor(new LinksExtractor())
.newClientInstrumenter(Map::put);

Map<String, String> request = new HashMap<>(REQUEST);
Expand Down Expand Up @@ -512,7 +515,8 @@ void shouldNotAddInvalidLink() {
Instrumenter<String, String> instrumenter =
Instrumenter.<String, String>newBuilder(
otelTesting.getOpenTelemetry(), "test", request -> "test span")
.addSpanLinkExtractor((parentContext, request) -> SpanContext.getInvalid())
.addSpanLinksExtractor(
(spanLinks, parentContext, request) -> spanLinks.addLink(SpanContext.getInvalid()))
.newInstrumenter();

// when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package io.opentelemetry.instrumentation.api.instrumenter;

import static java.util.Collections.singletonMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.verify;

import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanId;
Expand All @@ -19,31 +19,37 @@
import io.opentelemetry.context.propagation.TextMapGetter;
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

class PropagatorBasedSpanLinkExtractorTest {
@ExtendWith(MockitoExtension.class)
class PropagatorBasedSpanLinksExtractorTest {
private static final String TRACE_ID = TraceId.fromLongs(0, 123);
private static final String SPAN_ID = SpanId.fromLong(456);

@Mock SpanLinksBuilder spanLinks;

@Test
void shouldExtractSpanLink() {
// given
ContextPropagators propagators =
ContextPropagators.create(W3CTraceContextPropagator.getInstance());

SpanLinkExtractor<Map<String, String>> underTest =
SpanLinkExtractor.fromUpstreamRequest(propagators, new MapGetter());
SpanLinksExtractor<Map<String, String>> underTest =
SpanLinksExtractor.fromUpstreamRequest(propagators, new MapGetter());

Map<String, String> request =
singletonMap("traceparent", String.format("00-%s-%s-01", TRACE_ID, SPAN_ID));

// when
SpanContext link = underTest.extract(Context.root(), request);
underTest.extract(spanLinks, Context.root(), request);

// then
assertEquals(
SpanContext.createFromRemoteParent(
TRACE_ID, SPAN_ID, TraceFlags.getSampled(), TraceState.getDefault()),
link);
verify(spanLinks)
.addLink(
SpanContext.createFromRemoteParent(
TRACE_ID, SPAN_ID, TraceFlags.getSampled(), TraceState.getDefault()));
}

static final class MapGetter implements TextMapGetter<Map<String, String>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ public void consumerSpan_getSet() {

Context context = SpanKey.CONSUMER.storeInContext(Context.root(), SPAN);

// never suppress CONSUMER
assertThat(strategy.shouldSuppress(context, SpanKind.CONSUMER)).isFalse();
assertThat(strategy.shouldSuppress(context, SpanKind.CONSUMER)).isTrue();

assertThat(SpanKey.CONSUMER.fromContextOrNull(context)).isSameAs(SPAN);
allClientSpanKeys().forEach(spanKey -> assertThat(spanKey.fromContextOrNull(context)).isNull());
Expand Down Expand Up @@ -188,13 +187,13 @@ public void noKeys_serverIsSuppressed() {
}

@Test
public void noKeys_consumerIsNeverSuppressed() {
public void noKeys_consumerIsSuppressed() {

SpanSuppressionStrategy strategy = SpanSuppressionStrategy.from(new HashSet<>());

Context context = strategy.storeInContext(Context.root(), SpanKind.CONSUMER, SPAN);

assertThat(strategy.shouldSuppress(context, SpanKind.CONSUMER)).isFalse();
assertThat(strategy.shouldSuppress(context, SpanKind.CONSUMER)).isTrue();
assertThat(SpanKey.CONSUMER.fromContextOrNull(context)).isSameAs(SPAN);

allClientSpanKeys()
Expand Down
Loading