Skip to content

Commit

Permalink
aws-sdk-2.2: Support non-X-Ray propagation for SQS (#8405)
Browse files Browse the repository at this point in the history
Co-authored-by: Mateusz Rzeszutek <[email protected]>
  • Loading branch information
Oberon00 and Mateusz Rzeszutek authored May 16, 2023
1 parent c0e220d commit f98a97f
Show file tree
Hide file tree
Showing 13 changed files with 692 additions and 54 deletions.
12 changes: 9 additions & 3 deletions instrumentation/aws-sdk/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Settings for the AWS SDK instrumentation

| System property | Type | Default | Description |
|---|---|---|---|
| `otel.instrumentation.aws-sdk.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes. |
For more information, see the respective public setters in the `AwsSdkTelemetryBuilder` classes:

* [SDK v1](./aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkTelemetryBuilder.java)
* [SDK v2](./aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetryBuilder.java)

| System property | Type | Default | Description |
|---|---|---|------------------------------------------------------------------------------------------------------------------------------------------------|
| `otel.instrumentation.aws-sdk.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes. |
| `otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging` | Boolean | `false` | Enable propagation via message attributes using configured propagator (in addition to X-Ray). At the moment, Supports only SQS and the v2 SDK. |
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,14 @@ public class TracingExecutionInterceptor implements ExecutionInterceptor {
ConfigPropertiesUtil.getBoolean(
"otel.instrumentation.aws-sdk.experimental-span-attributes", false);

private static final boolean USE_MESSAGING_PROPAGATOR =
ConfigPropertiesUtil.getBoolean(
"otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", false);

private final ExecutionInterceptor delegate =
AwsSdkTelemetry.builder(GlobalOpenTelemetry.get())
.setCaptureExperimentalSpanAttributes(CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES)
.setUseConfiguredPropagatorForMessaging(USE_MESSAGING_PROPAGATOR)
.build()
.newExecutionInterceptor();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,52 +20,65 @@
final class AwsSdkInstrumenterFactory {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.aws-sdk-2.2";

static final AttributesExtractor<ExecutionAttributes, SdkHttpResponse> rpcAttributesExtractor =
RpcClientAttributesExtractor.create(AwsSdkRpcAttributesGetter.INSTANCE);
private static final AwsSdkExperimentalAttributesExtractor experimentalAttributesExtractor =
new AwsSdkExperimentalAttributesExtractor();

static final AwsSdkHttpAttributesGetter httpAttributesGetter = new AwsSdkHttpAttributesGetter();
private static final AwsSdkNetAttributesGetter netAttributesGetter =
new AwsSdkNetAttributesGetter();
static final AttributesExtractor<ExecutionAttributes, SdkHttpResponse> httpAttributesExtractor =
HttpClientAttributesExtractor.create(httpAttributesGetter, netAttributesGetter);
static final AttributesExtractor<ExecutionAttributes, SdkHttpResponse> rpcAttributesExtractor =
RpcClientAttributesExtractor.create(AwsSdkRpcAttributesGetter.INSTANCE);
private static final AwsSdkExperimentalAttributesExtractor experimentalAttributesExtractor =
new AwsSdkExperimentalAttributesExtractor();

private static final AwsSdkSpanKindExtractor spanKindExtractor = new AwsSdkSpanKindExtractor();

private static final List<AttributesExtractor<ExecutionAttributes, SdkHttpResponse>>
defaultAttributesExtractors = Arrays.asList(httpAttributesExtractor, rpcAttributesExtractor);
defaultAttributesExtractors = Arrays.asList(rpcAttributesExtractor);

private static final List<AttributesExtractor<ExecutionAttributes, SdkHttpResponse>>
extendedAttributesExtractors =
Arrays.asList(rpcAttributesExtractor, experimentalAttributesExtractor);

private static final List<AttributesExtractor<ExecutionAttributes, SdkHttpResponse>>
defaultConsumerAttributesExtractors =
Arrays.asList(rpcAttributesExtractor, httpAttributesExtractor);

private static final List<AttributesExtractor<ExecutionAttributes, SdkHttpResponse>>
extendedConsumerAttributesExtractors =
Arrays.asList(
httpAttributesExtractor, rpcAttributesExtractor, experimentalAttributesExtractor);
rpcAttributesExtractor, httpAttributesExtractor, experimentalAttributesExtractor);

static Instrumenter<ExecutionAttributes, SdkHttpResponse> requestInstrumenter(
OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {

return createInstrumenter(
openTelemetry,
captureExperimentalSpanAttributes,
captureExperimentalSpanAttributes
? extendedAttributesExtractors
: defaultAttributesExtractors,
AwsSdkInstrumenterFactory.spanKindExtractor);
}

static Instrumenter<ExecutionAttributes, SdkHttpResponse> consumerInstrumenter(
OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {

return createInstrumenter(
openTelemetry, captureExperimentalSpanAttributes, SpanKindExtractor.alwaysConsumer());
openTelemetry,
captureExperimentalSpanAttributes
? extendedConsumerAttributesExtractors
: defaultConsumerAttributesExtractors,
SpanKindExtractor.alwaysConsumer());
}

private static Instrumenter<ExecutionAttributes, SdkHttpResponse> createInstrumenter(
OpenTelemetry openTelemetry,
boolean captureExperimentalSpanAttributes,
List<AttributesExtractor<ExecutionAttributes, SdkHttpResponse>> extractors,
SpanKindExtractor<ExecutionAttributes> spanKindExtractor) {

return Instrumenter.<ExecutionAttributes, SdkHttpResponse>builder(
openTelemetry, INSTRUMENTATION_NAME, AwsSdkInstrumenterFactory::spanName)
.addAttributesExtractors(
captureExperimentalSpanAttributes
? extendedAttributesExtractors
: defaultAttributesExtractors)
.addAttributesExtractors(extractors)
.buildInstrumenter(spanKindExtractor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
package io.opentelemetry.instrumentation.awssdk.v2_2;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import javax.annotation.Nullable;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
Expand Down Expand Up @@ -42,15 +44,24 @@ public static AwsSdkTelemetryBuilder builder(OpenTelemetry openTelemetry) {
private final Instrumenter<ExecutionAttributes, SdkHttpResponse> requestInstrumenter;
private final Instrumenter<ExecutionAttributes, SdkHttpResponse> consumerInstrumenter;
private final boolean captureExperimentalSpanAttributes;
@Nullable private final TextMapPropagator messagingPropagator;
private final boolean useXrayPropagator;

AwsSdkTelemetry(OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
AwsSdkTelemetry(
OpenTelemetry openTelemetry,
boolean captureExperimentalSpanAttributes,
boolean useMessagingPropagator,
boolean useXrayPropagator) {
this.useXrayPropagator = useXrayPropagator;
this.requestInstrumenter =
AwsSdkInstrumenterFactory.requestInstrumenter(
openTelemetry, captureExperimentalSpanAttributes);
this.consumerInstrumenter =
AwsSdkInstrumenterFactory.consumerInstrumenter(
openTelemetry, captureExperimentalSpanAttributes);
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
this.messagingPropagator =
useMessagingPropagator ? openTelemetry.getPropagators().getTextMapPropagator() : null;
}

/**
Expand All @@ -59,6 +70,10 @@ public static AwsSdkTelemetryBuilder builder(OpenTelemetry openTelemetry) {
*/
public ExecutionInterceptor newExecutionInterceptor() {
return new TracingExecutionInterceptor(
requestInstrumenter, consumerInstrumenter, captureExperimentalSpanAttributes);
requestInstrumenter,
consumerInstrumenter,
captureExperimentalSpanAttributes,
messagingPropagator,
useXrayPropagator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ public final class AwsSdkTelemetryBuilder {

private boolean captureExperimentalSpanAttributes;

private boolean useMessagingPropagator;

private boolean useXrayPropagator = true;

AwsSdkTelemetryBuilder(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
}
Expand All @@ -31,10 +35,50 @@ public AwsSdkTelemetryBuilder setCaptureExperimentalSpanAttributes(
return this;
}

/**
* Sets whether the {@link io.opentelemetry.context.propagation.TextMapPropagator} configured in
* the provided {@link OpenTelemetry} should be used to inject into supported messaging attributes
* (currently only SQS; SNS may follow).
*
* <p>In addition, the X-Ray propagator is always used.
*
* <p>Using the messaging propagator is needed if your tracing vendor requires special tracestate
* entries or legacy propagation information that cannot be transported via X-Ray headers. It may
* also be useful if you need to directly connect spans over messaging in your tracing backend,
* bypassing any intermediate spans/X-Ray segments that AWS may create in the delivery process.
*
* <p>This option is off by default. If enabled, on extraction the configured propagator will be
* preferred over X-Ray if it can extract anything.
*/
@CanIgnoreReturnValue
public AwsSdkTelemetryBuilder setUseConfiguredPropagatorForMessaging(
boolean useMessagingPropagator) {
this.useMessagingPropagator = useMessagingPropagator;
return this;
}

/**
* This setter implemented package-private for testing the messaging propagator, it does not seem
* too useful in general. The option is on by default.
*
* <p>If this needs to be exposed for non-testing use cases, consider if you need to refine this
* feature so that it disable this only for requests supported by {@link
* #setUseConfiguredPropagatorForMessaging(boolean)}
*/
@CanIgnoreReturnValue
AwsSdkTelemetryBuilder setUseXrayPropagator(boolean useMessagingPropagator) {
this.useXrayPropagator = useMessagingPropagator;
return this;
}

/**
* Returns a new {@link AwsSdkTelemetry} with the settings of this {@link AwsSdkTelemetryBuilder}.
*/
public AwsSdkTelemetry build() {
return new AwsSdkTelemetry(openTelemetry, captureExperimentalSpanAttributes);
return new AwsSdkTelemetry(
openTelemetry,
captureExperimentalSpanAttributes,
useMessagingPropagator,
useXrayPropagator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Collections;
import java.util.Map;
import javax.annotation.Nullable;
import software.amazon.awssdk.core.SdkPojo;

/**
* Reflective access to aws-sdk-java-sqs class Message.
Expand All @@ -21,10 +22,18 @@
* prevent the entire instrumentation from loading when the plugin isn't available. We need to
* carefully check this class has all reflection errors result in no-op, and in the future we will
* hopefully come up with a better pattern.
*
* @see <a
* href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/sqs/model/Message.html">SDK
* Javadoc</a>
* @see <a
* href="https://github.com/aws/aws-sdk-java-v2/blob/2.2.0/services/sqs/src/main/resources/codegen-resources/service-2.json#L821-L856">Definition
* JSON</a>
*/
final class SqsMessageAccess {

@Nullable private static final MethodHandle GET_ATTRIBUTES;
@Nullable private static final MethodHandle GET_MESSAGE_ATTRIBUTES;

static {
Class<?> messageClass = null;
Expand All @@ -43,8 +52,18 @@ final class SqsMessageAccess {
// Ignore
}
GET_ATTRIBUTES = getAttributes;

MethodHandle getMessageAttributes = null;
try {
getMessageAttributes =
lookup.findVirtual(messageClass, "messageAttributes", methodType(Map.class));
} catch (NoSuchMethodException | IllegalAccessException e) {
// Ignore
}
GET_MESSAGE_ATTRIBUTES = getMessageAttributes;
} else {
GET_ATTRIBUTES = null;
GET_MESSAGE_ATTRIBUTES = null;
}
}

Expand All @@ -61,4 +80,16 @@ static Map<String, String> getAttributes(Object message) {
}

private SqsMessageAccess() {}

@SuppressWarnings("unchecked")
public static Map<String, SdkPojo> getMessageAttributes(Object message) {
if (GET_MESSAGE_ATTRIBUTES == null) {
return Collections.emptyMap();
}
try {
return (Map<String, SdkPojo>) GET_MESSAGE_ATTRIBUTES.invoke(message);
} catch (Throwable t) {
return Collections.emptyMap();
}
}
}
Loading

0 comments on commit f98a97f

Please sign in to comment.