diff --git a/instrumentation/aws-sdk/README.md b/instrumentation/aws-sdk/README.md index da63eb6e49ad..afc2b69874a7 100644 --- a/instrumentation/aws-sdk/README.md +++ b/instrumentation/aws-sdk/README.md @@ -1,5 +1,11 @@ # Settings for the AWS SDK instrumentation -| System property | Type | Default | Description | -|---|---|---|---| -| `otel.instrumentation.aws-sdk.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes. | +For more information, see the respective public setters in the `AwsSdkTelemetryBuilder` classes: + +* [SDK v1](./aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkTelemetryBuilder.java) +* [SDK v2](./aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetryBuilder.java) + +| System property | Type | Default | Description | +|---|---|---|------------------------------------------------------------------------------------------------------------------------------------------------| +| `otel.instrumentation.aws-sdk.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes. | +| `otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging` | Boolean | `false` | Enable propagation via message attributes using configured propagator (in addition to X-Ray). At the moment, Supports only SQS and the v2 SDK. | diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/autoconfigure/TracingExecutionInterceptor.java b/instrumentation/aws-sdk/aws-sdk-2.2/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/autoconfigure/TracingExecutionInterceptor.java index 53a7797a90bc..657a51b6b7db 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/autoconfigure/TracingExecutionInterceptor.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/autoconfigure/TracingExecutionInterceptor.java @@ -32,9 +32,14 @@ public class TracingExecutionInterceptor implements ExecutionInterceptor { ConfigPropertiesUtil.getBoolean( "otel.instrumentation.aws-sdk.experimental-span-attributes", false); + private static final boolean USE_MESSAGING_PROPAGATOR = + ConfigPropertiesUtil.getBoolean( + "otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", false); + private final ExecutionInterceptor delegate = AwsSdkTelemetry.builder(GlobalOpenTelemetry.get()) .setCaptureExperimentalSpanAttributes(CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) + .setUseConfiguredPropagatorForMessaging(USE_MESSAGING_PROPAGATOR) .build() .newExecutionInterceptor(); diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkInstrumenterFactory.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkInstrumenterFactory.java index 67ffcaf3ad86..584588983057 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkInstrumenterFactory.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkInstrumenterFactory.java @@ -20,31 +20,43 @@ final class AwsSdkInstrumenterFactory { private static final String INSTRUMENTATION_NAME = "io.opentelemetry.aws-sdk-2.2"; + static final AttributesExtractor rpcAttributesExtractor = + RpcClientAttributesExtractor.create(AwsSdkRpcAttributesGetter.INSTANCE); + private static final AwsSdkExperimentalAttributesExtractor experimentalAttributesExtractor = + new AwsSdkExperimentalAttributesExtractor(); + static final AwsSdkHttpAttributesGetter httpAttributesGetter = new AwsSdkHttpAttributesGetter(); private static final AwsSdkNetAttributesGetter netAttributesGetter = new AwsSdkNetAttributesGetter(); static final AttributesExtractor httpAttributesExtractor = HttpClientAttributesExtractor.create(httpAttributesGetter, netAttributesGetter); - static final AttributesExtractor rpcAttributesExtractor = - RpcClientAttributesExtractor.create(AwsSdkRpcAttributesGetter.INSTANCE); - private static final AwsSdkExperimentalAttributesExtractor experimentalAttributesExtractor = - new AwsSdkExperimentalAttributesExtractor(); + private static final AwsSdkSpanKindExtractor spanKindExtractor = new AwsSdkSpanKindExtractor(); private static final List> - defaultAttributesExtractors = Arrays.asList(httpAttributesExtractor, rpcAttributesExtractor); + defaultAttributesExtractors = Arrays.asList(rpcAttributesExtractor); private static final List> extendedAttributesExtractors = + Arrays.asList(rpcAttributesExtractor, experimentalAttributesExtractor); + + private static final List> + defaultConsumerAttributesExtractors = + Arrays.asList(rpcAttributesExtractor, httpAttributesExtractor); + + private static final List> + extendedConsumerAttributesExtractors = Arrays.asList( - httpAttributesExtractor, rpcAttributesExtractor, experimentalAttributesExtractor); + rpcAttributesExtractor, httpAttributesExtractor, experimentalAttributesExtractor); static Instrumenter requestInstrumenter( OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) { return createInstrumenter( openTelemetry, - captureExperimentalSpanAttributes, + captureExperimentalSpanAttributes + ? extendedAttributesExtractors + : defaultAttributesExtractors, AwsSdkInstrumenterFactory.spanKindExtractor); } @@ -52,20 +64,21 @@ static Instrumenter consumerInstrumenter( OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) { return createInstrumenter( - openTelemetry, captureExperimentalSpanAttributes, SpanKindExtractor.alwaysConsumer()); + openTelemetry, + captureExperimentalSpanAttributes + ? extendedConsumerAttributesExtractors + : defaultConsumerAttributesExtractors, + SpanKindExtractor.alwaysConsumer()); } private static Instrumenter createInstrumenter( OpenTelemetry openTelemetry, - boolean captureExperimentalSpanAttributes, + List> extractors, SpanKindExtractor spanKindExtractor) { return Instrumenter.builder( openTelemetry, INSTRUMENTATION_NAME, AwsSdkInstrumenterFactory::spanName) - .addAttributesExtractors( - captureExperimentalSpanAttributes - ? extendedAttributesExtractors - : defaultAttributesExtractors) + .addAttributesExtractors(extractors) .buildInstrumenter(spanKindExtractor); } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetry.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetry.java index 5ed145a3c1d0..166c8ca36bee 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetry.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetry.java @@ -6,7 +6,9 @@ package io.opentelemetry.instrumentation.awssdk.v2_2; import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.context.propagation.TextMapPropagator; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import javax.annotation.Nullable; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.interceptor.ExecutionAttributes; import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; @@ -42,8 +44,15 @@ public static AwsSdkTelemetryBuilder builder(OpenTelemetry openTelemetry) { private final Instrumenter requestInstrumenter; private final Instrumenter consumerInstrumenter; private final boolean captureExperimentalSpanAttributes; + @Nullable private final TextMapPropagator messagingPropagator; + private final boolean useXrayPropagator; - AwsSdkTelemetry(OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) { + AwsSdkTelemetry( + OpenTelemetry openTelemetry, + boolean captureExperimentalSpanAttributes, + boolean useMessagingPropagator, + boolean useXrayPropagator) { + this.useXrayPropagator = useXrayPropagator; this.requestInstrumenter = AwsSdkInstrumenterFactory.requestInstrumenter( openTelemetry, captureExperimentalSpanAttributes); @@ -51,6 +60,8 @@ public static AwsSdkTelemetryBuilder builder(OpenTelemetry openTelemetry) { AwsSdkInstrumenterFactory.consumerInstrumenter( openTelemetry, captureExperimentalSpanAttributes); this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + this.messagingPropagator = + useMessagingPropagator ? openTelemetry.getPropagators().getTextMapPropagator() : null; } /** @@ -59,6 +70,10 @@ public static AwsSdkTelemetryBuilder builder(OpenTelemetry openTelemetry) { */ public ExecutionInterceptor newExecutionInterceptor() { return new TracingExecutionInterceptor( - requestInstrumenter, consumerInstrumenter, captureExperimentalSpanAttributes); + requestInstrumenter, + consumerInstrumenter, + captureExperimentalSpanAttributes, + messagingPropagator, + useXrayPropagator); } } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetryBuilder.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetryBuilder.java index 56dfb39b4e9c..0418d950223a 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetryBuilder.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AwsSdkTelemetryBuilder.java @@ -15,6 +15,10 @@ public final class AwsSdkTelemetryBuilder { private boolean captureExperimentalSpanAttributes; + private boolean useMessagingPropagator; + + private boolean useXrayPropagator = true; + AwsSdkTelemetryBuilder(OpenTelemetry openTelemetry) { this.openTelemetry = openTelemetry; } @@ -31,10 +35,50 @@ public AwsSdkTelemetryBuilder setCaptureExperimentalSpanAttributes( return this; } + /** + * Sets whether the {@link io.opentelemetry.context.propagation.TextMapPropagator} configured in + * the provided {@link OpenTelemetry} should be used to inject into supported messaging attributes + * (currently only SQS; SNS may follow). + * + *

In addition, the X-Ray propagator is always used. + * + *

Using the messaging propagator is needed if your tracing vendor requires special tracestate + * entries or legacy propagation information that cannot be transported via X-Ray headers. It may + * also be useful if you need to directly connect spans over messaging in your tracing backend, + * bypassing any intermediate spans/X-Ray segments that AWS may create in the delivery process. + * + *

This option is off by default. If enabled, on extraction the configured propagator will be + * preferred over X-Ray if it can extract anything. + */ + @CanIgnoreReturnValue + public AwsSdkTelemetryBuilder setUseConfiguredPropagatorForMessaging( + boolean useMessagingPropagator) { + this.useMessagingPropagator = useMessagingPropagator; + return this; + } + + /** + * This setter implemented package-private for testing the messaging propagator, it does not seem + * too useful in general. The option is on by default. + * + *

If this needs to be exposed for non-testing use cases, consider if you need to refine this + * feature so that it disable this only for requests supported by {@link + * #setUseConfiguredPropagatorForMessaging(boolean)} + */ + @CanIgnoreReturnValue + AwsSdkTelemetryBuilder setUseXrayPropagator(boolean useMessagingPropagator) { + this.useXrayPropagator = useMessagingPropagator; + return this; + } + /** * Returns a new {@link AwsSdkTelemetry} with the settings of this {@link AwsSdkTelemetryBuilder}. */ public AwsSdkTelemetry build() { - return new AwsSdkTelemetry(openTelemetry, captureExperimentalSpanAttributes); + return new AwsSdkTelemetry( + openTelemetry, + captureExperimentalSpanAttributes, + useMessagingPropagator, + useXrayPropagator); } } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessageAccess.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessageAccess.java index 7e6058febc8e..e8925b0a50f8 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessageAccess.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessageAccess.java @@ -12,6 +12,7 @@ import java.util.Collections; import java.util.Map; import javax.annotation.Nullable; +import software.amazon.awssdk.core.SdkPojo; /** * Reflective access to aws-sdk-java-sqs class Message. @@ -21,10 +22,18 @@ * prevent the entire instrumentation from loading when the plugin isn't available. We need to * carefully check this class has all reflection errors result in no-op, and in the future we will * hopefully come up with a better pattern. + * + * @see SDK + * Javadoc + * @see Definition + * JSON */ final class SqsMessageAccess { @Nullable private static final MethodHandle GET_ATTRIBUTES; + @Nullable private static final MethodHandle GET_MESSAGE_ATTRIBUTES; static { Class messageClass = null; @@ -43,8 +52,18 @@ final class SqsMessageAccess { // Ignore } GET_ATTRIBUTES = getAttributes; + + MethodHandle getMessageAttributes = null; + try { + getMessageAttributes = + lookup.findVirtual(messageClass, "messageAttributes", methodType(Map.class)); + } catch (NoSuchMethodException | IllegalAccessException e) { + // Ignore + } + GET_MESSAGE_ATTRIBUTES = getMessageAttributes; } else { GET_ATTRIBUTES = null; + GET_MESSAGE_ATTRIBUTES = null; } } @@ -61,4 +80,16 @@ static Map getAttributes(Object message) { } private SqsMessageAccess() {} + + @SuppressWarnings("unchecked") + public static Map getMessageAttributes(Object message) { + if (GET_MESSAGE_ATTRIBUTES == null) { + return Collections.emptyMap(); + } + try { + return (Map) GET_MESSAGE_ATTRIBUTES.invoke(message); + } catch (Throwable t) { + return Collections.emptyMap(); + } + } } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessageAttributeValueAccess.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessageAttributeValueAccess.java new file mode 100644 index 000000000000..9277a5b3e7cf --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessageAttributeValueAccess.java @@ -0,0 +1,161 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v2_2; + +import static java.lang.invoke.MethodType.methodType; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import javax.annotation.Nullable; +import software.amazon.awssdk.core.SdkPojo; +import software.amazon.awssdk.utils.builder.SdkBuilder; + +/** + * Reflective access to aws-sdk-java-sqs class Message. + * + *

We currently don't have a good pattern of instrumenting a core library with various plugins + * that need plugin-specific instrumentation - if we accessed the class directly, Muzzle would + * prevent the entire instrumentation from loading when the plugin isn't available. We need to + * carefully check this class has all reflection errors result in no-op, and in the future we will + * hopefully come up with a better pattern. + * + * @see SDK + * Javadoc + * @see Definition + * JSON + */ +final class SqsMessageAttributeValueAccess { + + @Nullable private static final MethodHandle GET_STRING_VALUE; + @Nullable private static final MethodHandle STRING_VALUE; + @Nullable private static final MethodHandle DATA_TYPE; + + @Nullable private static final MethodHandle BUILDER; + + static { + Class messageAttributeValueClass = null; + try { + messageAttributeValueClass = + Class.forName("software.amazon.awssdk.services.sqs.model.MessageAttributeValue"); + } catch (Throwable t) { + // Ignore. + } + if (messageAttributeValueClass != null) { + MethodHandles.Lookup lookup = MethodHandles.publicLookup(); + MethodHandle getStringValue = null; + try { + getStringValue = + lookup.findVirtual(messageAttributeValueClass, "stringValue", methodType(String.class)); + } catch (NoSuchMethodException | IllegalAccessException e) { + // Ignore + } + GET_STRING_VALUE = getStringValue; + } else { + GET_STRING_VALUE = null; + } + + Class builderClass = null; + if (messageAttributeValueClass != null) { + try { + builderClass = + Class.forName( + "software.amazon.awssdk.services.sqs.model.MessageAttributeValue$Builder"); + } catch (Throwable t) { + // Ignore. + } + } + if (builderClass != null) { + MethodHandles.Lookup lookup = MethodHandles.publicLookup(); + MethodHandle stringValue = null; + try { + stringValue = + lookup.findVirtual(builderClass, "stringValue", methodType(builderClass, String.class)); + } catch (NoSuchMethodException | IllegalAccessException e) { + // Ignore + } + STRING_VALUE = stringValue; + + MethodHandle dataType = null; + try { + dataType = + lookup.findVirtual(builderClass, "dataType", methodType(builderClass, String.class)); + } catch (NoSuchMethodException | IllegalAccessException e) { + // Ignore + } + DATA_TYPE = dataType; + + MethodHandle builder = null; + try { + builder = + lookup.findStatic(messageAttributeValueClass, "builder", methodType(builderClass)); + } catch (NoSuchMethodException | IllegalAccessException e) { + // Ignore + } + BUILDER = builder; + } else { + STRING_VALUE = null; + DATA_TYPE = null; + BUILDER = null; + } + } + + @SuppressWarnings({"rawtypes"}) + static String getStringValue(SdkPojo messageAttributeValue) { + if (GET_STRING_VALUE == null) { + return null; + } + try { + return (String) GET_STRING_VALUE.invoke(messageAttributeValue); + } catch (Throwable t) { + return null; + } + } + + /** + * Note that this does not set the (required) dataType automatically, see {@link + * #dataType(SdkBuilder, String)} * + */ + @SuppressWarnings({"rawtypes", "unchecked"}) + static SdkBuilder stringValue(SdkBuilder builder, String value) { + if (STRING_VALUE == null) { + return null; + } + try { + return (SdkBuilder) STRING_VALUE.invoke(builder, value); + } catch (Throwable t) { + return null; + } + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + static SdkBuilder dataType(SdkBuilder builder, String dataType) { + if (DATA_TYPE == null) { + return null; + } + try { + return (SdkBuilder) DATA_TYPE.invoke(builder, dataType); + } catch (Throwable t) { + return null; + } + } + + private SqsMessageAttributeValueAccess() {} + + @SuppressWarnings({"rawtypes"}) + public static SdkBuilder builder() { + if (BUILDER == null) { + return null; + } + + try { + return (SdkBuilder) BUILDER.invoke(); + } catch (Throwable e) { + return null; + } + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsParentContext.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsParentContext.java index fcf73cd889a0..9df3443c660e 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsParentContext.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsParentContext.java @@ -7,13 +7,15 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.propagation.TextMapGetter; +import io.opentelemetry.context.propagation.TextMapPropagator; import io.opentelemetry.contrib.awsxray.propagator.AwsXrayPropagator; import java.util.Collections; import java.util.Map; +import software.amazon.awssdk.core.SdkPojo; final class SqsParentContext { - enum MapGetter implements TextMapGetter> { + enum StringMapGetter implements TextMapGetter> { INSTANCE; @Override @@ -27,15 +29,42 @@ public String get(Map map, String s) { } } + enum MessageAttributeValueMapGetter implements TextMapGetter> { + INSTANCE; + + @Override + public Iterable keys(Map map) { + return map.keySet(); + } + + @Override + public String get(Map map, String s) { + if (map == null) { + return null; + } + SdkPojo value = map.get(s); + if (value == null) { + return null; + } + return SqsMessageAttributeValueAccess.getStringValue(value); + } + } + static final String AWS_TRACE_SYSTEM_ATTRIBUTE = "AWSTraceHeader"; + static Context ofMessageAttributes( + Map messageAttributes, TextMapPropagator propagator) { + return propagator.extract( + Context.root(), messageAttributes, MessageAttributeValueMapGetter.INSTANCE); + } + static Context ofSystemAttributes(Map systemAttributes) { String traceHeader = systemAttributes.get(AWS_TRACE_SYSTEM_ATTRIBUTE); return AwsXrayPropagator.getInstance() .extract( Context.root(), Collections.singletonMap("X-Amzn-Trace-Id", traceHeader), - MapGetter.INSTANCE); + StringMapGetter.INSTANCE); } private SqsParentContext() {} diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsReceiveMessageRequestAccess.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsReceiveMessageRequestAccess.java index 5b1be4f93d71..b6ac23ded0fe 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsReceiveMessageRequestAccess.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsReceiveMessageRequestAccess.java @@ -10,7 +10,9 @@ import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Optional; import javax.annotation.Nullable; import software.amazon.awssdk.core.SdkRequest; @@ -22,10 +24,18 @@ * prevent the entire instrumentation from loading when the plugin isn't available. We need to * carefully check this class has all reflection errors result in no-op, and in the future we will * hopefully come up with a better pattern. + * + * @see SDK + * Javadoc + * @see Definition + * JSON */ final class SqsReceiveMessageRequestAccess { @Nullable private static final MethodHandle ATTRIBUTE_NAMES_WITH_STRINGS; + @Nullable private static final MethodHandle MESSAGE_ATTRIBUTE_NAMES; static { Class receiveMessageRequestClass = null; @@ -48,8 +58,21 @@ final class SqsReceiveMessageRequestAccess { // Ignore } ATTRIBUTE_NAMES_WITH_STRINGS = withAttributeNames; + + MethodHandle messageAttributeNames = null; + try { + messageAttributeNames = + lookup.findVirtual( + receiveMessageRequestClass, + "messageAttributeNames", + methodType(receiveMessageRequestClass, Collection.class)); + } catch (NoSuchMethodException | IllegalAccessException e) { + // Ignore + } + MESSAGE_ATTRIBUTE_NAMES = messageAttributeNames; } else { ATTRIBUTE_NAMES_WITH_STRINGS = null; + MESSAGE_ATTRIBUTE_NAMES = null; } } @@ -71,5 +94,29 @@ static void attributeNamesWithStrings(SdkRequest.Builder builder, List a } } + static void messageAttributeNames( + SdkRequest.Builder builder, List messageAttributeNames) { + if (MESSAGE_ATTRIBUTE_NAMES == null) { + return; + } + try { + MESSAGE_ATTRIBUTE_NAMES.invoke(builder, messageAttributeNames); + } catch (Throwable throwable) { + // Ignore + } + } + private SqsReceiveMessageRequestAccess() {} + + @SuppressWarnings({"rawtypes", "unchecked"}) + static List getAttributeNames(SdkRequest request) { + Optional optional = request.getValueForField("AttributeNames", List.class); + return optional.isPresent() ? (List) optional.get() : Collections.emptyList(); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + static List getMessageAttributeNames(SdkRequest request) { + Optional optional = request.getValueForField("MessageAttributeNames", List.class); + return optional.isPresent() ? (List) optional.get() : Collections.emptyList(); + } } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsSendMessageRequestAccess.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsSendMessageRequestAccess.java new file mode 100644 index 000000000000..3a1142144334 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsSendMessageRequestAccess.java @@ -0,0 +1,91 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v2_2; + +import static java.lang.invoke.MethodType.methodType; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import javax.annotation.Nullable; +import software.amazon.awssdk.core.SdkPojo; +import software.amazon.awssdk.core.SdkRequest; + +/** + * Reflective access to aws-sdk-java-sqs class ReceiveMessageRequest. + * + *

We currently don't have a good pattern of instrumenting a core library with various plugins + * that need plugin-specific instrumentation - if we accessed the class directly, Muzzle would + * prevent the entire instrumentation from loading when the plugin isn't available. We need to + * carefully check this class has all reflection errors result in no-op, and in the future we will + * hopefully come up with a better pattern. + * + * @see SDK + * Javadoc + * @see Definition + * JSON + */ +final class SqsSendMessageRequestAccess { + + @Nullable private static final MethodHandle MESSAGE_ATTRIBUTES; + + static { + Class sendMessageRequestClass = null; + try { + sendMessageRequestClass = + Class.forName("software.amazon.awssdk.services.sqs.model.SendMessageRequest$Builder"); + } catch (Throwable t) { + // Ignore. + } + if (sendMessageRequestClass != null) { + MethodHandles.Lookup lookup = MethodHandles.publicLookup(); + MethodHandle messageAttributes = null; + try { + messageAttributes = + lookup.findVirtual( + sendMessageRequestClass, + "messageAttributes", + methodType(sendMessageRequestClass, Map.class)); + } catch (NoSuchMethodException | IllegalAccessException e) { + // Ignore + } + MESSAGE_ATTRIBUTES = messageAttributes; + } else { + MESSAGE_ATTRIBUTES = null; + } + } + + static boolean isInstance(SdkRequest request) { + return request + .getClass() + .getName() + .equals("software.amazon.awssdk.services.sqs.model.SendMessageRequest"); + } + + static void messageAttributes( + SdkRequest.Builder builder, Map messageAttributes) { + if (MESSAGE_ATTRIBUTES == null) { + return; + } + try { + MESSAGE_ATTRIBUTES.invoke(builder, messageAttributes); + } catch (Throwable throwable) { + // Ignore + } + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + static Map messageAttributes(SdkRequest request) { + Optional optional = request.getValueForField("AttributeNames", Map.class); + return optional.isPresent() ? (Map) optional.get() : Collections.emptyMap(); + } + + private SqsSendMessageRequestAccess() {} +} diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingExecutionInterceptor.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingExecutionInterceptor.java index dc76480a9ba6..8373b1e05df0 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingExecutionInterceptor.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingExecutionInterceptor.java @@ -7,17 +7,24 @@ import static io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkRequestType.DYNAMODB; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.TextMapPropagator; import io.opentelemetry.contrib.awsxray.propagator.AwsXrayPropagator; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; +import javax.annotation.Nullable; import software.amazon.awssdk.awscore.AwsResponse; import software.amazon.awssdk.core.ClientType; +import software.amazon.awssdk.core.SdkPojo; import software.amazon.awssdk.core.SdkRequest; import software.amazon.awssdk.core.SdkResponse; import software.amazon.awssdk.core.interceptor.Context; @@ -27,6 +34,7 @@ import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute; import software.amazon.awssdk.http.SdkHttpRequest; import software.amazon.awssdk.http.SdkHttpResponse; +import software.amazon.awssdk.utils.builder.SdkBuilder; /** AWS request execution interceptor. */ final class TracingExecutionInterceptor implements ExecutionInterceptor { @@ -47,29 +55,38 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor { private final Instrumenter requestInstrumenter; private final Instrumenter consumerInstrumenter; private final boolean captureExperimentalSpanAttributes; + @Nullable private final TextMapPropagator messagingPropagator; + private final boolean useXrayPropagator; private final FieldMapper fieldMapper; TracingExecutionInterceptor( Instrumenter requestInstrumenter, Instrumenter consumerInstrumenter, - boolean captureExperimentalSpanAttributes) { + boolean captureExperimentalSpanAttributes, + TextMapPropagator messagingPropagator, + boolean useXrayPropagator) { this.requestInstrumenter = requestInstrumenter; this.consumerInstrumenter = consumerInstrumenter; this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + this.messagingPropagator = messagingPropagator; + this.useXrayPropagator = useXrayPropagator; this.fieldMapper = new FieldMapper(); } @Override - public void afterMarshalling( - Context.AfterMarshalling context, ExecutionAttributes executionAttributes) { + public SdkRequest modifyRequest( + Context.ModifyRequest context, ExecutionAttributes executionAttributes) { + + // This is the latest point where we can start the span, since we might need to inject + // it into the request payload. This means that HTTP attributes need to be captured later. io.opentelemetry.context.Context parentOtelContext = io.opentelemetry.context.Context.current(); - executionAttributes.putAttribute(SDK_REQUEST_ATTRIBUTE, context.request()); - SdkHttpRequest httpRequest = context.httpRequest(); - executionAttributes.putAttribute(SDK_HTTP_REQUEST_ATTRIBUTE, httpRequest); + SdkRequest request = context.request(); + executionAttributes.putAttribute(SDK_REQUEST_ATTRIBUTE, request); if (!requestInstrumenter.shouldStart(parentOtelContext, executionAttributes)) { - return; + // NB: We also skip injection in case we don't start. + return request; } io.opentelemetry.context.Context otelContext = @@ -96,38 +113,151 @@ public void afterMarshalling( clearAttributes(executionAttributes); throw throwable; } - } - @Override - public SdkRequest modifyRequest( - Context.ModifyRequest context, ExecutionAttributes executionAttributes) { - SdkRequest request = context.request(); if (SqsReceiveMessageRequestAccess.isInstance(request)) { - List existingAttributeNames = getAttributeNames(request); - if (!existingAttributeNames.contains(SqsParentContext.AWS_TRACE_SYSTEM_ATTRIBUTE)) { - List attributeNames = new ArrayList<>(); - attributeNames.addAll(existingAttributeNames); - attributeNames.add(SqsParentContext.AWS_TRACE_SYSTEM_ATTRIBUTE); - SdkRequest.Builder builder = request.toBuilder(); - SqsReceiveMessageRequestAccess.attributeNamesWithStrings(builder, attributeNames); - return builder.build(); + return modifySqsReceiveMessageRequest(request); + } else if (messagingPropagator != null) { + if (SqsSendMessageRequestAccess.isInstance(request)) { + return injectIntoSqsSendMessageRequest(request, otelContext); } + // TODO: Support SendMessageBatchRequest (and thus SendMessageBatchRequestEntry) } return request; } - @SuppressWarnings({"rawtypes", "unchecked"}) - private static List getAttributeNames(SdkRequest request) { - Optional optional = request.getValueForField("AttributeNames", List.class); - return optional.isPresent() ? (List) optional.get() : Collections.emptyList(); + private SdkRequest injectIntoSqsSendMessageRequest( + SdkRequest request, io.opentelemetry.context.Context otelContext) { + Map messageAttributes = + new HashMap<>(SqsSendMessageRequestAccess.messageAttributes(request)); + messagingPropagator.inject( + otelContext, + messageAttributes, + (carrier, k, v) -> { + @SuppressWarnings("rawtypes") + SdkBuilder builder = SqsMessageAttributeValueAccess.builder(); + if (builder == null) { + return; + } + builder = SqsMessageAttributeValueAccess.stringValue(builder, v); + if (builder == null) { + return; + } + builder = SqsMessageAttributeValueAccess.dataType(builder, "String"); + if (builder == null) { + return; + } + carrier.put(k, (SdkPojo) builder.build()); + }); + if (messageAttributes.size() > 10) { // Too many attributes, we don't want to break the call. + return request; + } + SdkRequest.Builder builder = request.toBuilder(); + SqsSendMessageRequestAccess.messageAttributes(builder, messageAttributes); + return builder.build(); + } + + private SdkRequest modifySqsReceiveMessageRequest(SdkRequest request) { + boolean hasXrayAttribute = true; + List existingAttributeNames = null; + if (useXrayPropagator) { + existingAttributeNames = SqsReceiveMessageRequestAccess.getAttributeNames(request); + hasXrayAttribute = + existingAttributeNames.contains(SqsParentContext.AWS_TRACE_SYSTEM_ATTRIBUTE); + } + + boolean hasMessageAttribute = true; + List existingMessageAttributeNames = null; + if (messagingPropagator != null) { + existingMessageAttributeNames = + SqsReceiveMessageRequestAccess.getMessageAttributeNames(request); + hasMessageAttribute = existingMessageAttributeNames.containsAll(messagingPropagator.fields()); + } + + if (hasMessageAttribute && hasXrayAttribute) { + return request; + } + + SdkRequest.Builder builder = request.toBuilder(); + if (!hasXrayAttribute) { + List attributeNames = new ArrayList<>(existingAttributeNames); + attributeNames.add(SqsParentContext.AWS_TRACE_SYSTEM_ATTRIBUTE); + SqsReceiveMessageRequestAccess.attributeNamesWithStrings(builder, attributeNames); + } + if (messagingPropagator != null) { + List messageAttributeNames = new ArrayList<>(existingMessageAttributeNames); + for (String field : messagingPropagator.fields()) { + if (!existingMessageAttributeNames.contains(field)) { + messageAttributeNames.add(field); + } + } + SqsReceiveMessageRequestAccess.messageAttributeNames(builder, messageAttributeNames); + } + return builder.build(); + } + + @Override + public void afterMarshalling( + Context.AfterMarshalling context, ExecutionAttributes executionAttributes) { + + // Since we merge the HTTP attributes into an already started span instead of creating a + // full child span, we have to do some dirty work here. + // + // As per HTTP conventions, we should actually only create spans for the "physical" requests but + // not for the encompassing logical request, see + // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/trace/semantic_conventions/http.md#http-request-retries-and-redirects + // Specific AWS SDK conventions also don't mention this peculiar hybrid span convention, see + // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/trace/semantic_conventions/instrumentation/aws-sdk.md + // + // TODO: Consider removing net+http conventions & relying on lower-level client instrumentation + + io.opentelemetry.context.Context otelContext = getContext(executionAttributes); + if (otelContext == null) { + // No context, no sense in doing anything else (but this is not expected) + return; + } + + SdkHttpRequest httpRequest = context.httpRequest(); + executionAttributes.putAttribute(SDK_HTTP_REQUEST_ATTRIBUTE, httpRequest); + + // We ought to pass the parent of otelContext here, but we didn't store it, and it shouldn't + // make a difference (unless we start supporting the http.resend_count attribute in this + // instrumentation, which, logically, we can't on this level of abstraction) + onHttpRequestAvailable(executionAttributes, otelContext, Span.fromContext(otelContext)); + } + + private static void onHttpResponseAvailable( + ExecutionAttributes executionAttributes, + io.opentelemetry.context.Context otelContext, + Span span, + SdkHttpResponse httpResponse) { + // For the httpAttributesExtractor dance, see afterMarshalling + AttributesBuilder builder = Attributes.builder(); // NB: UnsafeAttributes are package-private + AwsSdkInstrumenterFactory.httpAttributesExtractor.onEnd( + builder, otelContext, executionAttributes, httpResponse, null); + span.setAllAttributes(builder.build()); + } + + private static void onHttpRequestAvailable( + ExecutionAttributes executionAttributes, + io.opentelemetry.context.Context parentContext, + Span span) { + AttributesBuilder builder = Attributes.builder(); // NB: UnsafeAttributes are package-private + AwsSdkInstrumenterFactory.httpAttributesExtractor.onStart( + builder, parentContext, executionAttributes); + span.setAllAttributes(builder.build()); } @Override @SuppressWarnings("deprecation") // deprecated class to be updated once published in new location public SdkHttpRequest modifyHttpRequest( Context.ModifyHttpRequest context, ExecutionAttributes executionAttributes) { + SdkHttpRequest httpRequest = context.httpRequest(); + if (!useXrayPropagator) { + return httpRequest; + } + io.opentelemetry.context.Context otelContext = getContext(executionAttributes); if (otelContext == null) { return httpRequest; @@ -170,7 +300,12 @@ public void afterExecution( Span span = Span.fromContext(otelContext); onUserAgentHeaderAvailable(span, executionAttributes); onSdkResponse(span, context.response(), executionAttributes); - requestInstrumenter.end(otelContext, executionAttributes, context.httpResponse(), null); + + SdkHttpResponse httpResponse = context.httpResponse(); + + onHttpResponseAvailable( + executionAttributes, otelContext, Span.fromContext(otelContext), httpResponse); + requestInstrumenter.end(otelContext, executionAttributes, httpResponse, null); } clearAttributes(executionAttributes); } @@ -184,19 +319,28 @@ private void afterConsumerResponse( } } - @SuppressWarnings({"rawtypes", "unchecked"}) - private static List getMessages(SdkResponse response) { - Optional optional = response.getValueForField("Messages", List.class); - return optional.isPresent() ? optional.get() : Collections.emptyList(); - } - private void createConsumerSpan( Object message, ExecutionAttributes executionAttributes, SdkHttpResponse httpResponse) { - io.opentelemetry.context.Context parentContext = - SqsParentContext.ofSystemAttributes(SqsMessageAccess.getAttributes(message)); + + io.opentelemetry.context.Context parentContext = io.opentelemetry.context.Context.root(); + + if (messagingPropagator != null) { + parentContext = + SqsParentContext.ofMessageAttributes( + SqsMessageAccess.getMessageAttributes(message), messagingPropagator); + } + + if (useXrayPropagator && parentContext == io.opentelemetry.context.Context.root()) { + parentContext = SqsParentContext.ofSystemAttributes(SqsMessageAccess.getAttributes(message)); + } if (consumerInstrumenter.shouldStart(parentContext, executionAttributes)) { io.opentelemetry.context.Context context = consumerInstrumenter.start(parentContext, executionAttributes); + + // TODO: Even if we keep HTTP attributes (see afterMarshalling), does it make sense here + // per-message? + // TODO: Should we really create root spans if we can't extract anything, or should we attach + // to the current context? consumerInstrumenter.end(context, executionAttributes, httpResponse, null); } } @@ -250,4 +394,10 @@ private static void clearAttributes(ExecutionAttributes executionAttributes) { static io.opentelemetry.context.Context getContext(ExecutionAttributes attributes) { return attributes.getAttribute(CONTEXT_ATTRIBUTE); } + + @SuppressWarnings({"rawtypes", "unchecked"}) + static List getMessages(SdkResponse response) { + Optional optional = response.getValueForField("Messages", List.class); + return optional.isPresent() ? optional.get() : Collections.emptyList(); + } } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTestWithW3CPropagator.groovy b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTestWithW3CPropagator.groovy new file mode 100644 index 000000000000..4099b0ae91a6 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTestWithW3CPropagator.groovy @@ -0,0 +1,23 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v2_2 + +import io.opentelemetry.instrumentation.test.LibraryTestTrait +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration + +class Aws2SqsTracingTestWithW3CPropagator extends AbstractAws2SqsTracingTest implements LibraryTestTrait { + @Override + ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder() { + return ClientOverrideConfiguration.builder() + .addExecutionInterceptor( + AwsSdkTelemetry.builder(getOpenTelemetry()) + .setCaptureExperimentalSpanAttributes(true) + .setUseConfiguredPropagatorForMessaging(true) // Difference to main test + .setUseXrayPropagator(false) // Disable to confirm messaging propagator actually works + .build() + .newExecutionInterceptor()) + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTestWithW3CPropagatorAndXrayPropagator.groovy b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTestWithW3CPropagatorAndXrayPropagator.groovy new file mode 100644 index 000000000000..a4af1e1b56d9 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTestWithW3CPropagatorAndXrayPropagator.groovy @@ -0,0 +1,23 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v2_2 + +import io.opentelemetry.instrumentation.test.LibraryTestTrait +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration + +/** We want to test the combination of W3C + Xray, as that's what you'll get in prod if you enable W3C. */ +class Aws2SqsTracingTestWithW3CPropagatorAndXrayPropagator extends AbstractAws2SqsTracingTest implements LibraryTestTrait { + @Override + ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder() { + return ClientOverrideConfiguration.builder() + .addExecutionInterceptor( + AwsSdkTelemetry.builder(getOpenTelemetry()) + .setCaptureExperimentalSpanAttributes(true) + .setUseConfiguredPropagatorForMessaging(true) // Difference to main test + .build() + .newExecutionInterceptor()) + } +}