diff --git a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/README.md b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/README.md index c200496bbdf0..cb6891804eb5 100644 --- a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/README.md +++ b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/README.md @@ -132,3 +132,27 @@ In order to enable requested propagation for a handler, configure it on the SDK ``` If using the wrappers, set the `OTEL_PROPAGATORS` environment variable as described [here](https://github.com/open-telemetry/opentelemetry-java/blob/main/sdk-extensions/autoconfigure/README.md#propagator). + +## Using SqsMessageHandler +This instrumentation takes a collection of SQS messages. +A span wraps the function call doHandle with appropriate span attributes and span links. +Span links are added for each of the messages as if this were a batch of messages. + +1. Setup SqsMessageHandler with your business logic. Pass in your OpenTelemetry and the name of the destination. +2. Call the "handle" method on SqsMessageHandler and pass in your collection of messages. +3. Under the hood it will call the "doHandle" method. + +```java +OpenTelemetry openTelemetry; +Collection messages; + +SqsMessageHandler messageHandler = + new SqsMessageHandler(openTelemetry, "destination") { + @Override + public void doHandle(Collection messages) { + // My business logic + } +}; + +messageHandler.handle(messages); +``` diff --git a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/build.gradle.kts b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/build.gradle.kts index 02899f461ec1..5612078509a7 100644 --- a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/build.gradle.kts +++ b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/build.gradle.kts @@ -34,6 +34,7 @@ dependencies { testImplementation("io.opentelemetry:opentelemetry-extension-trace-propagators") testImplementation("com.google.guava:guava") + implementation(project(":instrumentation:message-handler:message-handler-1.0:library")) testImplementation(project(":instrumentation:aws-lambda:aws-lambda-events-2.2:testing")) testImplementation("uk.org.webcompere:system-stubs-jupiter") } diff --git a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/SqsMessageHandler.java b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/SqsMessageHandler.java new file mode 100644 index 000000000000..de1c5d409f41 --- /dev/null +++ b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/SqsMessageHandler.java @@ -0,0 +1,190 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awslambdaevents.v2_2; + +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapGetter; +import io.opentelemetry.contrib.awsxray.propagator.AwsXrayPropagator; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.messagehandler.MessageHandler; +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.Collections; +import java.util.Locale; +import java.util.Map; + +public abstract class SqsMessageHandler implements MessageHandler { + private static final String AWS_TRACE_HEADER_SQS_ATTRIBUTE_KEY = "AWSTraceHeader"; + static final String AWS_TRACE_HEADER_PROPAGATOR_KEY = "x-amzn-trace-id"; + + private final OpenTelemetry openTelemetry; + private final String destination; + private SpanKindExtractor> spanKindExtractor; + private SpanNameExtractor> spanNameExtractor; + + public SqsMessageHandler(OpenTelemetry openTelemetry, String destination) { + this.openTelemetry = openTelemetry; + this.destination = destination; + this.spanKindExtractor = SpanKindExtractor.alwaysConsumer(); + spanNameExtractor = e -> destination + " process"; + } + + public void setSpanNameExtactor(SpanNameExtractor> spanNameExtractor) { + this.spanNameExtractor = spanNameExtractor; + } + + @Override + public Instrumenter, Void> getMessageInstrumenter() { + return Instrumenter., Void>builder( + openTelemetry, "io.opentelemetry.aws-lambda-events-2.2", spanNameExtractor) + .addAttributesExtractor(getMessageOperationAttributeExtractor()) + .addSpanLinksExtractor(getSpanLinksExtractor()) + .buildInstrumenter(spanKindExtractor); + } + + public void setSpanKindExtractor(SpanKindExtractor> spanKindExtractor) { + this.spanKindExtractor = spanKindExtractor; + } + + protected MessagingAttributesGetter, Void> + getMessageingAttributesGetter() { + String destination = this.destination; + + return new MessagingAttributesGetter, Void>() { + @Nullable + @Override + public String getSystem(Collection v) { + return "AmazonSQS"; + } + + @Nullable + @Override + public String getDestinationKind(Collection v) { + return null; + } + + @Nullable + @Override + public String getDestination(Collection v) { + return destination; + } + + @Override + public boolean isTemporaryDestination(Collection v) { + return false; + } + + @Nullable + @Override + public String getConversationId(Collection v) { + return null; + } + + @Nullable + @Override + public Long getMessagePayloadSize(Collection v) { + long total = 0; + + for (SQSEvent.SQSMessage message : v) { + total += getPayloadSize(message); + } + + return total; + } + + @Nullable + @Override + public Long getMessagePayloadCompressedSize(Collection v) { + return null; + } + + @Nullable + @Override + public String getMessageId(Collection request, Void v) { + return null; + } + }; + } + + protected AttributesExtractor, Void> getMessageOperationAttributeExtractor() { + return MessagingAttributesExtractor.create( + getMessageingAttributesGetter(), MessageOperation.PROCESS); + } + + protected SpanLinksExtractor> getSpanLinksExtractor() { + return (spanLinks, parentContext, request) -> { + for (SQSEvent.SQSMessage message : request) { + SpanContext messageSpanCtx = getUpstreamContext(openTelemetry, message); + + if (messageSpanCtx!= null && messageSpanCtx.isValid()) { + spanLinks.addLink(messageSpanCtx); + } + } + }; + } + + public int getPayloadSize(SQSEvent.SQSMessage message) { + return message.getBody().length(); + } + + public SpanContext getUpstreamContext(OpenTelemetry openTelemetry, SQSEvent.SQSMessage message) { + String parentHeader = null; + + if (message.getAttributes() != null) { + parentHeader = message.getAttributes().get(AWS_TRACE_HEADER_SQS_ATTRIBUTE_KEY); + } + + if (parentHeader == null && + message.getMessageAttributes() != null) + { + // We need to do a case-insensitive search + for (Map.Entry entry: message.getMessageAttributes().entrySet()) { + if (entry.getKey().equalsIgnoreCase(AWS_TRACE_HEADER_PROPAGATOR_KEY)) { + parentHeader = entry.getValue().getStringValue(); + break; + } + } + } + + if (parentHeader != null) { + Context xrayContext = + AwsXrayPropagator.getInstance() + .extract( + Context.root(), + Collections.singletonMap(AWS_TRACE_HEADER_PROPAGATOR_KEY, parentHeader), + MapGetter.INSTANCE); + + return Span.fromContext(xrayContext).getSpanContext(); + } + + return null; + } + + private enum MapGetter implements TextMapGetter> { + INSTANCE; + + @Override + public Iterable keys(Map map) { + return map.keySet(); + } + + @Override + public String get(Map map, String s) { + return map.get(s.toLowerCase(Locale.ROOT)); + } + } +} diff --git a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/test/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/SqsMessageHandlerTest.java b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/test/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/SqsMessageHandlerTest.java new file mode 100644 index 000000000000..c78c6c69af37 --- /dev/null +++ b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/test/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/SqsMessageHandlerTest.java @@ -0,0 +1,410 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awslambdaevents.v2_2; + +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import org.junit.jupiter.api.Test; +import java.lang.reflect.Constructor; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class SqsMessageHandlerTest extends XrayTestInstrumenter { + private static class SqsMessageHandlerImpl extends SqsMessageHandler { + public final AtomicInteger handleCalls = new AtomicInteger(); + + public SqsMessageHandlerImpl(OpenTelemetry openTelemetry, + String destination) { + super(openTelemetry, destination); + } + + @Override + public void doHandle(Collection request) { + handleCalls.getAndIncrement(); + } + } + + @Test + public void simple() { + SqsMessageHandlerImpl messageHandler = new SqsMessageHandlerImpl( + getOpenTelemetry(), + "destination"); + + List messages = new LinkedList<>(); + SQSEvent.SQSMessage message = newMessage(); + message.setBody("Hello"); + Map attributes = new TreeMap<>(); + attributes.put("AWSTraceHeader", "Root=1-99555555-123456789012345678901234;Parent=9934567890123456;Sampled=1"); + message.setAttributes(attributes); + messages.add(message); + + Span parentSpan = getOpenTelemetry().getTracer("test").spanBuilder("test").startSpan(); + + try (Scope scope = parentSpan.makeCurrent()) { + messageHandler.handle(messages); + } + + parentSpan.end(); + + assertThat(messageHandler.handleCalls.get()).isEqualTo(1); + + waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("test").hasTotalAttributeCount(0).hasTotalRecordedLinks(0), + span -> + span.hasName("destination process") + .hasKind(SpanKind.CONSUMER) + .hasLinks( + LinkData.create( + SpanContext.createFromRemoteParent( + "99555555123456789012345678901234", + "9934567890123456", + TraceFlags.getSampled(), + TraceState.getDefault()))) + .hasTotalRecordedLinks(1) + .hasAttribute(SemanticAttributes.MESSAGING_OPERATION, "process") + .hasAttribute(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS") + .hasAttribute(SemanticAttributes.MESSAGING_DESTINATION_NAME, "destination") + .hasAttribute(SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, 5L) + .hasTotalAttributeCount(4) + .hasParentSpanId(parentSpan.getSpanContext().getSpanId()) + .hasTraceId(parentSpan.getSpanContext().getTraceId()))); + } + + @Test + public void simpleUnsampled() { + SqsMessageHandlerImpl messageHandler = new SqsMessageHandlerImpl( + getOpenTelemetry(), + "destination"); + + List messages = new LinkedList<>(); + SQSEvent.SQSMessage message = newMessage(); + message.setBody("Hello"); + Map attributes = new TreeMap<>(); + attributes.put("AWSTraceHeader", "Root=1-99555555-123456789012345678901234;Parent=9934567890123456;Sampled=0"); + message.setAttributes(attributes); + messages.add(message); + + Span parentSpan = getOpenTelemetry().getTracer("test").spanBuilder("test").startSpan(); + + try (Scope scope = parentSpan.makeCurrent()) { + messageHandler.handle(messages); + } + + parentSpan.end(); + + assertThat(messageHandler.handleCalls.get()).isEqualTo(1); + + waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("test").hasTotalAttributeCount(0).hasTotalRecordedLinks(0), + span -> + span.hasName("destination process") + .hasKind(SpanKind.CONSUMER) + .hasLinks( + LinkData.create( + SpanContext.createFromRemoteParent( + "99555555123456789012345678901234", + "9934567890123456", + TraceFlags.getDefault(), + TraceState.getDefault()))) + .hasAttribute(SemanticAttributes.MESSAGING_OPERATION, "process") + .hasAttribute(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS") + .hasAttribute(SemanticAttributes.MESSAGING_DESTINATION_NAME, "destination") + .hasAttribute(SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, 5L) + .hasTotalAttributeCount(4) + .hasParentSpanId(parentSpan.getSpanContext().getSpanId()) + .hasTraceId(parentSpan.getSpanContext().getTraceId()))); + } + + @Test + public void simpleUseMessageAttribute() { + SqsMessageHandlerImpl messageHandler = new SqsMessageHandlerImpl( + getOpenTelemetry(), + "destination"); + + List messages = new LinkedList<>(); + SQSEvent.SQSMessage message = newMessage(); + message.setBody("Hello"); + Map attributes = new TreeMap<>(); + SQSEvent.MessageAttribute value = new SQSEvent.MessageAttribute(); + value.setDataType("String"); + value.setStringValue("Root=1-99555555-123456789012345678901234;Parent=9934567890123456;Sampled=1"); + attributes.put("X-Amzn-Trace-Id", value); + message.setMessageAttributes(attributes); + messages.add(message); + + Span parentSpan = getOpenTelemetry().getTracer("test").spanBuilder("test").startSpan(); + + try (Scope scope = parentSpan.makeCurrent()) { + messageHandler.handle(messages); + } + + parentSpan.end(); + + assertThat(messageHandler.handleCalls.get()).isEqualTo(1); + + waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("test").hasTotalAttributeCount(0).hasTotalRecordedLinks(0), + span -> + span.hasName("destination process") + .hasKind(SpanKind.CONSUMER) + .hasLinks( + LinkData.create( + SpanContext.createFromRemoteParent( + "99555555123456789012345678901234", + "9934567890123456", + TraceFlags.getSampled(), + TraceState.getDefault()))) + .hasTotalRecordedLinks(1) + .hasAttribute(SemanticAttributes.MESSAGING_OPERATION, "process") + .hasAttribute(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS") + .hasAttribute(SemanticAttributes.MESSAGING_DESTINATION_NAME, "destination") + .hasAttribute(SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, 5L) + .hasTotalAttributeCount(4) + .hasParentSpanId(parentSpan.getSpanContext().getSpanId()) + .hasTraceId(parentSpan.getSpanContext().getTraceId()))); + } + + @Test + public void twoMessages() { + List messages = new LinkedList<>(); + SQSEvent.SQSMessage message = newMessage(); + message.setBody("Hello"); + Map attributes = new TreeMap<>(); + attributes.put("AWSTraceHeader", "Root=1-55555555-123456789012345678901234;Parent=1234567890123456;Sampled=1"); + message.setAttributes(attributes); + messages.add(message); + + SQSEvent.SQSMessage message2 = newMessage(); + message2.setBody("Hello World"); + Map attributes2 = new TreeMap<>(); + attributes2.put("AWSTraceHeader", "Root=1-66555555-123456789012345678901234;Parent=6634567890123456;Sampled=1"); + message2.setAttributes(attributes2); + messages.add(message2); + + SqsMessageHandlerImpl messageHandler = new SqsMessageHandlerImpl( + getOpenTelemetry(), + "destination"); + + messageHandler.setSpanKindExtractor(SpanKindExtractor.alwaysServer()); + + Span parentSpan = getOpenTelemetry().getTracer("test").spanBuilder("test").startSpan(); + + try (Scope scope = parentSpan.makeCurrent()) { + messageHandler.handle(messages); + } + + parentSpan.end(); + + assertThat(messageHandler.handleCalls.get()).isEqualTo(1); + + waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("test").hasTotalAttributeCount(0).hasTotalRecordedLinks(0), + span -> + span.hasName("destination process") + .hasKind(SpanKind.SERVER) + .hasLinks( + LinkData.create( + SpanContext.createFromRemoteParent( + "55555555123456789012345678901234", + "1234567890123456", + TraceFlags.getSampled(), + TraceState.getDefault())), + LinkData.create( + SpanContext.createFromRemoteParent( + "66555555123456789012345678901234", + "6634567890123456", + TraceFlags.getSampled(), + TraceState.getDefault()))) + .hasTotalRecordedLinks(2) + .hasAttribute(SemanticAttributes.MESSAGING_OPERATION, "process") + .hasAttribute(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS") + .hasAttribute(SemanticAttributes.MESSAGING_DESTINATION_NAME, "destination") + .hasAttribute(SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, 16L) + .hasTotalAttributeCount(4) + .hasParentSpanId(parentSpan.getSpanContext().getSpanId()) + .hasTraceId(parentSpan.getSpanContext().getTraceId()))); + } + + @Test + public void twoRuns() { + List messages1 = new LinkedList<>(); + List messages2 = new LinkedList<>(); + SQSEvent.SQSMessage message = newMessage(); + message.setBody("Hello"); + Map attributes = new TreeMap<>(); + attributes.put("AWSTraceHeader", "Root=1-55555555-123456789012345678901234;Parent=1234567890123456;Sampled=1"); + message.setAttributes(attributes); + messages1.add(message); + + SQSEvent.SQSMessage message2 = newMessage(); + message2.setBody("SecondMessage"); + Map attributes2 = new TreeMap<>(); + attributes2.put("AWSTraceHeader", "Root=1-77555555-123456789012345678901234;Parent=7734567890123456;Sampled=1"); + message2.setAttributes(attributes2); + messages2.add(message2); + + SqsMessageHandlerImpl messageHandler = new SqsMessageHandlerImpl( + getOpenTelemetry(), + "destination"); + + Span parentSpan1 = getOpenTelemetry().getTracer("test").spanBuilder("test1").startSpan(); + try (Scope scope = parentSpan1.makeCurrent()) { + messageHandler.handle(messages1); + } + parentSpan1.end(); + + Span parentSpan2 = getOpenTelemetry().getTracer("test").spanBuilder("test2").startSpan(); + try (Scope scope = parentSpan2.makeCurrent()) { + messageHandler.handle(messages2); + } + parentSpan2.end(); + + assertThat(messageHandler.handleCalls.get()).isEqualTo(2); + + waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("test1").hasTotalAttributeCount(0).hasTotalRecordedLinks(0), + span -> + span.hasName("destination process") + .hasKind(SpanKind.CONSUMER) + .hasLinks( + LinkData.create( + SpanContext.createFromRemoteParent( + "55555555123456789012345678901234", + "1234567890123456", + TraceFlags.getSampled(), + TraceState.getDefault()))) + .hasTotalRecordedLinks(1) + .hasAttribute(SemanticAttributes.MESSAGING_OPERATION, "process") + .hasAttribute(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS") + .hasAttribute(SemanticAttributes.MESSAGING_DESTINATION_NAME, "destination") + .hasAttribute(SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, 5L) + .hasTotalAttributeCount(4) + .hasParentSpanId(parentSpan1.getSpanContext().getSpanId()) + .hasTraceId(parentSpan1.getSpanContext().getTraceId())), + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("test2").hasTotalAttributeCount(0).hasTotalRecordedLinks(0), + span -> + span.hasName("destination process") + .hasKind(SpanKind.CONSUMER) + .hasLinks( + LinkData.create( + SpanContext.createFromRemoteParent( + "77555555123456789012345678901234", + "7734567890123456", + TraceFlags.getSampled(), + TraceState.getDefault()))) + .hasTotalRecordedLinks(1) + .hasAttribute(SemanticAttributes.MESSAGING_OPERATION, "process") + .hasAttribute(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS") + .hasAttribute(SemanticAttributes.MESSAGING_DESTINATION_NAME, "destination") + .hasAttribute(SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, 13L) + .hasTotalAttributeCount(4) + .hasParentSpanId(parentSpan2.getSpanContext().getSpanId()) + .hasTraceId(parentSpan2.getSpanContext().getTraceId()))); + } + + @Test + public void noMessages() { + SqsMessageHandlerImpl messageHandler = new SqsMessageHandlerImpl( + getOpenTelemetry(), + "destination"); + + Span parentSpan = getOpenTelemetry().getTracer("test").spanBuilder("test").startSpan(); + try (Scope scope = parentSpan.makeCurrent()) { + messageHandler.handle(new LinkedList<>()); + } + parentSpan.end(); + + assertThat(messageHandler.handleCalls.get()).isEqualTo(1); + + waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("test").hasTotalAttributeCount(0).hasTotalRecordedLinks(0), + span -> + span.hasName("destination process") + .hasKind(SpanKind.CONSUMER) + .hasTotalRecordedLinks(0) + .hasAttribute(SemanticAttributes.MESSAGING_OPERATION, "process") + .hasAttribute(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS") + .hasAttribute(SemanticAttributes.MESSAGING_DESTINATION_NAME, "destination") + .hasAttribute(SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, 0L) + .hasTotalAttributeCount(4) + .hasParentSpanId(parentSpan.getSpanContext().getSpanId()) + .hasTraceId(parentSpan.getSpanContext().getTraceId()))); + } + + @Test + public void malformedTraceId() { + List messages = new LinkedList<>(); + SQSEvent.SQSMessage message = newMessage(); + message.setBody("Hello"); + Map attributes = new TreeMap<>(); + attributes.put("AWSTraceHeader", "Root=1-55555555-error;Parent=1234567890123456;Sampled=1"); + message.setAttributes(attributes); + messages.add(message); + + SqsMessageHandlerImpl messageHandler = new SqsMessageHandlerImpl( + getOpenTelemetry(), + "destination"); + + Span parentSpan = getOpenTelemetry().getTracer("test").spanBuilder("test").startSpan(); + + assertThrows( + RuntimeException.class, + () -> { + try (Scope scope = parentSpan.makeCurrent()) { + messageHandler.handle(messages); + } + }); + + parentSpan.end(); + + assertThat(messageHandler.handleCalls.get()).isEqualTo(0); + + waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("test").hasTotalAttributeCount(0).hasTotalRecordedLinks(0))); + } + + private static SQSEvent.SQSMessage newMessage() { + try { + Constructor ctor = SQSEvent.SQSMessage.class.getDeclaredConstructor(); + ctor.setAccessible(true); + return ctor.newInstance(); + } catch (Throwable t) { + throw new AssertionError(t); + } + } +} diff --git a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/test/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/XrayTestInstrumenter.java b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/test/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/XrayTestInstrumenter.java new file mode 100644 index 000000000000..8718a740ac6e --- /dev/null +++ b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/test/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/XrayTestInstrumenter.java @@ -0,0 +1,160 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awslambdaevents.v2_2; + +import static io.opentelemetry.semconv.resource.attributes.ResourceAttributes.SERVICE_NAME; +import static org.awaitility.Awaitility.await; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.contrib.awsxray.propagator.AwsXrayPropagator; +import io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.export.MetricReader; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.testing.assertj.TraceAssert; +import io.opentelemetry.sdk.testing.assertj.TracesAssert; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricExporter; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.ReadWriteSpan; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.awaitility.core.ConditionTimeoutException; +import org.junit.jupiter.api.AfterEach; + +public class XrayTestInstrumenter { + private static final OpenTelemetrySdk openTelemetry; + private static final InMemorySpanExporter testSpanExporter; + + static { + testSpanExporter = InMemorySpanExporter.create(); + InMemoryMetricExporter testMetricExporter = + InMemoryMetricExporter.create(AggregationTemporality.DELTA); + + MetricReader metricReader = + PeriodicMetricReader.builder(testMetricExporter) + // Set really long interval. We'll call forceFlush when we need the metrics + // instead of collecting them periodically. + .setInterval(Duration.ofNanos(Long.MAX_VALUE)) + .build(); + + Resource resource = Resource.getDefault().merge(Resource.create( + Attributes.of(SERVICE_NAME, "MyServiceName"))); + + openTelemetry = + OpenTelemetrySdk.builder() + .setTracerProvider( + SdkTracerProvider.builder() + .setResource(resource) + .addSpanProcessor(new FlushTrackingSpanProcessor()) + .addSpanProcessor(SimpleSpanProcessor.create(testSpanExporter)) + .build()) + .setMeterProvider(SdkMeterProvider.builder().registerMetricReader(metricReader).build()) + .setPropagators(ContextPropagators.create(AwsXrayPropagator.getInstance())) + .buildAndRegisterGlobal(); + } + + @AfterEach + public void resetTests() { + testSpanExporter.reset(); + } + + protected OpenTelemetrySdk getOpenTelemetry() { + return openTelemetry; + } + + private static class FlushTrackingSpanProcessor implements SpanProcessor { + @Override + public void onStart(Context parentContext, ReadWriteSpan span) {} + + @Override + public boolean isStartRequired() { + return false; + } + + @Override + public void onEnd(ReadableSpan span) {} + + @Override + public boolean isEndRequired() { + return false; + } + + @Override + public CompletableResultCode forceFlush() { + return CompletableResultCode.ofSuccess(); + } + } + + @SafeVarargs + @SuppressWarnings("varargs") + public final void waitAndAssertTraces(Consumer... assertions) { + waitAndAssertTraces(null, Arrays.asList(assertions), true); + } + + private > void waitAndAssertTraces( + @Nullable Comparator> traceComparator, + Iterable assertions, + boolean verifyScopeVersion) { + List> assertionsList = new ArrayList<>(); + assertions.forEach(assertionsList::add); + + try { + await() + .untilAsserted(() -> doAssertTraces(traceComparator, assertionsList, verifyScopeVersion)); + } catch (ConditionTimeoutException e) { + // Don't throw this failure since the stack is the awaitility thread, causing confusion. + // Instead, just assert one more time on the test thread, which will fail with a better stack + // trace. + // TODO(anuraaga): There is probably a better way to do this. + doAssertTraces(traceComparator, assertionsList, verifyScopeVersion); + } + } + + private void doAssertTraces( + @Nullable Comparator> traceComparator, + List> assertionsList, + boolean verifyScopeVersion) { + List> traces = waitForTraces(assertionsList.size()); + if (verifyScopeVersion) { + TelemetryDataUtil.assertScopeVersion(traces); + } + if (traceComparator != null) { + traces.sort(traceComparator); + } + TracesAssert.assertThat(traces).hasTracesSatisfyingExactly(assertionsList); + } + + public final List> waitForTraces(int numberOfTraces) { + try { + return TelemetryDataUtil.waitForTraces( + this::getExportedSpans, numberOfTraces, 20, TimeUnit.SECONDS); + } catch (TimeoutException | InterruptedException e) { + throw new AssertionError("Error waiting for " + numberOfTraces + " traces", e); + } + } + + public List getExportedSpans() { + return testSpanExporter.getFinishedSpanItems(); + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/README.md b/instrumentation/aws-sdk/aws-sdk-2.2/library/README.md index 7e473fee406b..68432d08a552 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/README.md +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/README.md @@ -24,3 +24,27 @@ This format is the only format recognized by AWS managed services, and populatin propagating the trace through them. If this does not fulfill your use case, perhaps because you are using the same SDK with a different non-AWS managed service, let us know so we can provide configuration for this behavior. + +## Using SqsMessageHandler +This instrumentation takes a collection of SQS messages. +A span wraps the function call doHandle with appropriate span attributes and span links. +Span links are added for each of the messages as if this were a batch of messages. + +1. Setup SqsMessageHandler with your business logic. Pass in your OpenTelemetry and the name of the destination. +2. Call the "handle" method on SqsMessageHandler and pass in your collection of messages. +3. Under the hood it will call the "doHandle" method. + +```java +OpenTelemetry openTelemetry; +Collection messages; + +SqsMessageHandler messageHandler = + new SqsMessageHandler(openTelemetry, "destination") { + @Override + public void doHandle(Collection messages) { + // My business logic + } +}; + +messageHandler.handle(messages); +``` diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/build.gradle.kts b/instrumentation/aws-sdk/aws-sdk-2.2/library/build.gradle.kts index b84ddd1a4936..3d2ed10ae727 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/build.gradle.kts +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/build.gradle.kts @@ -6,7 +6,10 @@ dependencies { implementation("io.opentelemetry.contrib:opentelemetry-aws-xray-propagator") library("software.amazon.awssdk:aws-core:2.2.0") + implementation(project(":instrumentation:message-handler:message-handler-1.0:library")) + library("software.amazon.awssdk:aws-json-protocol:2.2.0") + library("software.amazon.awssdk:sqs:2.20.0") testImplementation(project(":instrumentation:aws-sdk:aws-sdk-2.2:testing")) diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessageHandler.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessageHandler.java new file mode 100644 index 000000000000..529bae7b0f5b --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessageHandler.java @@ -0,0 +1,155 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v2_2; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.messagehandler.MessageHandler; +import software.amazon.awssdk.core.SdkPojo; +import software.amazon.awssdk.services.sqs.model.Message; +import java.util.Collection; +import java.util.Map; +import javax.annotation.Nullable; + +public abstract class SqsMessageHandler implements MessageHandler { + private final OpenTelemetry openTelemetry; + private final String destination; + private SpanKindExtractor> spanKindExtractor; + private SpanNameExtractor> spanNameExtractor; + + public SqsMessageHandler(OpenTelemetry openTelemetry, String destination) { + this.openTelemetry = openTelemetry; + this.destination = destination; + this.spanKindExtractor = SpanKindExtractor.alwaysConsumer(); + spanNameExtractor = e -> destination + " process"; + } + + public void setSpanNameExtactor(SpanNameExtractor> spanNameExtractor) { + this.spanNameExtractor = spanNameExtractor; + } + + @Override + public Instrumenter, Void> getMessageInstrumenter() { + return Instrumenter., Void>builder( + openTelemetry, "io.opentelemetry.aws-sdk-2.2", spanNameExtractor) + .addAttributesExtractor(getMessageOperationAttributeExtractor()) + .addSpanLinksExtractor(getSpanLinksExtractor()) + .buildInstrumenter(spanKindExtractor); + } + + public void setSpanKindExtractor(SpanKindExtractor> spanKindExtractor) { + this.spanKindExtractor = spanKindExtractor; + } + + protected MessagingAttributesGetter, Void> + getMessageingAttributesGetter() { + String destination = this.destination; + + return new MessagingAttributesGetter, Void>() { + @Nullable + @Override + public String getSystem(Collection v) { + return "AmazonSQS"; + } + + @Nullable + @Override + @SuppressWarnings({"deprecation"}) // Inheriting from interface + public String getDestinationKind(Collection v) { + return null; + } + + @Nullable + @Override + public String getDestination(Collection v) { + return destination; + } + + @Override + public boolean isTemporaryDestination(Collection v) { + return false; + } + + @Nullable + @Override + public String getConversationId(Collection v) { + return null; + } + + @Nullable + @Override + public Long getMessagePayloadSize(Collection v) { + long total = 0; + + for (Message message : v) { + total += getPayloadSize(message); + } + + return total; + } + + @Nullable + @Override + public Long getMessagePayloadCompressedSize(Collection v) { + return null; + } + + @Nullable + @Override + public String getMessageId(Collection request, Void v) { + return null; + } + }; + } + + protected AttributesExtractor, Void> getMessageOperationAttributeExtractor() { + return MessagingAttributesExtractor.create( + getMessageingAttributesGetter(), MessageOperation.PROCESS); + } + + protected SpanLinksExtractor> getSpanLinksExtractor() { + return (spanLinks, parentContext, request) -> { + for (Message message : request) { + SpanContext messageSpanCtx = getUpstreamContext(openTelemetry, message); + + if (messageSpanCtx!= null && messageSpanCtx.isValid()) { + spanLinks.addLink(messageSpanCtx); + } + } + }; + } + + public int getPayloadSize(Message message) { + return message.body().length(); + } + + public SpanContext getUpstreamContext(OpenTelemetry openTelemetry, Message message) { + TextMapPropagator messagingPropagator = openTelemetry.getPropagators() + .getTextMapPropagator(); + + Context context = SqsParentContext.ofSystemAttributes(SqsMessageAccess.getAttributes(message)); + + if (context == Context.root()) { + Map messageAtributes = SqsMessageAccess.getMessageAttributes(message); + + context = + SqsParentContext.ofMessageAttributes(messageAtributes, messagingPropagator); + } + + return Span.fromContext(context).getSpanContext(); + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessageHandlerTest.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessageHandlerTest.java new file mode 100644 index 000000000000..9b5c1cf280a0 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsMessageHandlerTest.java @@ -0,0 +1,410 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v2_2; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; + +public class SqsMessageHandlerTest extends XrayTestInstrumenter { + private static class SqsMessageHandlerImpl extends SqsMessageHandler { + public final AtomicInteger handleCalls = new AtomicInteger(); + + public SqsMessageHandlerImpl(OpenTelemetry openTelemetry, + String destination) { + super(openTelemetry, destination); + } + + @Override + public void doHandle(Collection request) { + handleCalls.getAndIncrement(); + } + } + + @Test + public void simple() { + SqsMessageHandlerImpl messageHandler = new SqsMessageHandlerImpl( + getOpenTelemetry(), + "destination"); + + messageHandler.setSpanNameExtactor(e-> "MySpan"); + + List messages = new LinkedList<>(); + messages.add(Message.builder() + .body("Hello") + .attributesWithStrings( + Collections.singletonMap( + "AWSTraceHeader", + "Root=1-55555555-123456789012345678901234;Parent=1234567890123456;Sampled=1")) + .build()); + + Span parentSpan = getOpenTelemetry().getTracer("test").spanBuilder("test").startSpan(); + + try (Scope scope = parentSpan.makeCurrent()) { + messageHandler.handle(messages); + } + + parentSpan.end(); + + assertThat(messageHandler.handleCalls.get()).isEqualTo(1); + + waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("test").hasTotalAttributeCount(0).hasTotalRecordedLinks(0), + span -> + span.hasName("MySpan") + .hasKind(SpanKind.CONSUMER) + .hasLinks( + LinkData.create( + SpanContext.createFromRemoteParent( + "55555555123456789012345678901234", + "1234567890123456", + TraceFlags.getSampled(), + TraceState.getDefault()))) + .hasTotalRecordedLinks(1) + .hasAttribute(SemanticAttributes.MESSAGING_OPERATION, "process") + .hasAttribute(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS") + .hasAttribute(SemanticAttributes.MESSAGING_DESTINATION_NAME, "destination") + .hasAttribute(SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, 5L) + .hasTotalAttributeCount(4) + .hasParentSpanId(parentSpan.getSpanContext().getSpanId()) + .hasTraceId(parentSpan.getSpanContext().getTraceId()))); + } + + @Test + public void simpleUnsampled() { + SqsMessageHandlerImpl messageHandler = new SqsMessageHandlerImpl( + getOpenTelemetry(), + "destination"); + + messageHandler.setSpanNameExtactor(e-> "MySpan"); + + List messages = new LinkedList<>(); + messages.add(Message.builder() + .body("Hello") + .attributesWithStrings( + Collections.singletonMap( + "AWSTraceHeader", + "Root=1-55555555-123456789012345678901234;Parent=1234567890123456;Sampled=0")) + .build()); + + Span parentSpan = getOpenTelemetry().getTracer("test").spanBuilder("test").startSpan(); + + try (Scope scope = parentSpan.makeCurrent()) { + messageHandler.handle(messages); + } + + parentSpan.end(); + + assertThat(messageHandler.handleCalls.get()).isEqualTo(1); + + waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("test").hasTotalAttributeCount(0).hasTotalRecordedLinks(0), + span -> + span.hasName("MySpan") + .hasKind(SpanKind.CONSUMER) + .hasLinks( + LinkData.create( + SpanContext.createFromRemoteParent( + "55555555123456789012345678901234", + "1234567890123456", + TraceFlags.getDefault(), + TraceState.getDefault()))) + .hasAttribute(SemanticAttributes.MESSAGING_OPERATION, "process") + .hasAttribute(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS") + .hasAttribute(SemanticAttributes.MESSAGING_DESTINATION_NAME, "destination") + .hasAttribute(SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, 5L) + .hasTotalAttributeCount(4) + .hasParentSpanId(parentSpan.getSpanContext().getSpanId()) + .hasTraceId(parentSpan.getSpanContext().getTraceId()))); + } + + @Test + public void simpleUseMessageAttribute() { + SqsMessageHandlerImpl messageHandler = new SqsMessageHandlerImpl( + getOpenTelemetry(), + "destination"); + + List messages = new LinkedList<>(); + messages.add(Message.builder() + .body("Hello") + .messageAttributes(Collections.singletonMap("X-Amzn-Trace-Id", + MessageAttributeValue.builder().dataType("String") + .stringValue("Root=1-55555555-123456789012345678901234;Parent=1234567890123456;Sampled=1") + .build())) + .build()); + + Span parentSpan = getOpenTelemetry() + .getTracer("test") + .spanBuilder("test") + .startSpan(); + + try (Scope scope = parentSpan.makeCurrent()) { + messageHandler.handle(messages); + } + + parentSpan.end(); + + assertThat(messageHandler.handleCalls.get()).isEqualTo(1); + + waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("test").hasTotalAttributeCount(0).hasTotalRecordedLinks(0), + span -> + span.hasName("destination process") + .hasKind(SpanKind.CONSUMER) + .hasLinks( + LinkData.create( + SpanContext.createFromRemoteParent( + "55555555123456789012345678901234", + "1234567890123456", + TraceFlags.getSampled(), + TraceState.getDefault()))) + .hasTotalRecordedLinks(1) + .hasAttribute(SemanticAttributes.MESSAGING_OPERATION, "process") + .hasAttribute(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS") + .hasAttribute(SemanticAttributes.MESSAGING_DESTINATION_NAME, "destination") + .hasAttribute(SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, 5L) + .hasTotalAttributeCount(4) + .hasParentSpanId(parentSpan.getSpanContext().getSpanId()) + .hasTraceId(parentSpan.getSpanContext().getTraceId()))); + } + + @Test + public void twoMessages() { + List messages = new LinkedList(); + messages.add(Message.builder() + .body("Hello") + .attributesWithStrings( + Collections.singletonMap( + "AWSTraceHeader", + "Root=1-55555555-123456789012345678901234;Parent=1234567890123456;Sampled=1")) + .build()); + messages.add(Message.builder() + .body("Hello World") + .attributesWithStrings( + Collections.singletonMap( + "AWSTraceHeader", + "Root=1-66555555-123456789012345678901234;Parent=6634567890123456;Sampled=1")) + .build()); + + SqsMessageHandlerImpl messageHandler = new SqsMessageHandlerImpl( + getOpenTelemetry(), + "destination"); + + messageHandler.setSpanKindExtractor(SpanKindExtractor.alwaysConsumer()); + + Span parentSpan = getOpenTelemetry().getTracer("test").spanBuilder("test").startSpan(); + + try (Scope scope = parentSpan.makeCurrent()) { + messageHandler.handle(messages); + } + + parentSpan.end(); + + assertThat(messageHandler.handleCalls.get()).isEqualTo(1); + + waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("test").hasTotalAttributeCount(0).hasTotalRecordedLinks(0), + span -> + span.hasName("destination process") + .hasKind(SpanKind.CONSUMER) + .hasLinks( + LinkData.create( + SpanContext.createFromRemoteParent( + "55555555123456789012345678901234", + "1234567890123456", + TraceFlags.getSampled(), + TraceState.getDefault())), + LinkData.create( + SpanContext.createFromRemoteParent( + "66555555123456789012345678901234", + "6634567890123456", + TraceFlags.getSampled(), + TraceState.getDefault()))) + .hasTotalRecordedLinks(2) + .hasAttribute(SemanticAttributes.MESSAGING_OPERATION, "process") + .hasAttribute(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS") + .hasAttribute(SemanticAttributes.MESSAGING_DESTINATION_NAME, "destination") + .hasAttribute(SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, 16L) + .hasTotalAttributeCount(4) + .hasParentSpanId(parentSpan.getSpanContext().getSpanId()) + .hasTraceId(parentSpan.getSpanContext().getTraceId()))); + } + + @Test + public void twoRuns() { + List messages1 = new LinkedList(); + messages1.add(Message.builder() + .body("Hello") + .attributesWithStrings( + Collections.singletonMap( + "AWSTraceHeader", + "Root=1-55555555-123456789012345678901234;Parent=1234567890123456;Sampled=1")) + .build()); + + List messages2 = new LinkedList(); + messages2.add(Message.builder() + .body("SecondMessage") + .attributesWithStrings( + Collections.singletonMap( + "AWSTraceHeader", + "Root=1-77555555-123456789012345678901234;Parent=7734567890123456;Sampled=1")) + .build()); + + SqsMessageHandlerImpl messageHandler = new SqsMessageHandlerImpl( + getOpenTelemetry(), + "destination"); + + Span parentSpan1 = getOpenTelemetry().getTracer("test").spanBuilder("test1").startSpan(); + try (Scope scope = parentSpan1.makeCurrent()) { + messageHandler.handle(messages1); + } + parentSpan1.end(); + + Span parentSpan2 = getOpenTelemetry().getTracer("test").spanBuilder("test2").startSpan(); + try (Scope scope = parentSpan2.makeCurrent()) { + messageHandler.handle(messages2); + } + parentSpan2.end(); + + assertThat(messageHandler.handleCalls.get()).isEqualTo(2); + + waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("test1").hasTotalAttributeCount(0).hasTotalRecordedLinks(0), + span -> + span.hasName("destination process") + .hasKind(SpanKind.CONSUMER) + .hasLinks( + LinkData.create( + SpanContext.createFromRemoteParent( + "55555555123456789012345678901234", + "1234567890123456", + TraceFlags.getSampled(), + TraceState.getDefault()))) + .hasTotalRecordedLinks(1) + .hasAttribute(SemanticAttributes.MESSAGING_OPERATION, "process") + .hasAttribute(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS") + .hasAttribute(SemanticAttributes.MESSAGING_DESTINATION_NAME, "destination") + .hasAttribute(SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, 5L) + .hasTotalAttributeCount(4) + .hasParentSpanId(parentSpan1.getSpanContext().getSpanId()) + .hasTraceId(parentSpan1.getSpanContext().getTraceId())), + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("test2").hasTotalAttributeCount(0).hasTotalRecordedLinks(0), + span -> + span.hasName("destination process") + .hasKind(SpanKind.CONSUMER) + .hasLinks( + LinkData.create( + SpanContext.createFromRemoteParent( + "77555555123456789012345678901234", + "7734567890123456", + TraceFlags.getSampled(), + TraceState.getDefault()))) + .hasTotalRecordedLinks(1) + .hasAttribute(SemanticAttributes.MESSAGING_OPERATION, "process") + .hasAttribute(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS") + .hasAttribute(SemanticAttributes.MESSAGING_DESTINATION_NAME, "destination") + .hasAttribute(SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, 13L) + .hasTotalAttributeCount(4) + .hasParentSpanId(parentSpan2.getSpanContext().getSpanId()) + .hasTraceId(parentSpan2.getSpanContext().getTraceId()))); + } + + @Test + public void noMessages() { + SqsMessageHandlerImpl messageHandler = new SqsMessageHandlerImpl( + getOpenTelemetry(), + "destination"); + + Span parentSpan = getOpenTelemetry().getTracer("test").spanBuilder("test").startSpan(); + try (Scope scope = parentSpan.makeCurrent()) { + messageHandler.handle(new LinkedList<>()); + } + parentSpan.end(); + + assertThat(messageHandler.handleCalls.get()).isEqualTo(1); + + waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("test").hasTotalAttributeCount(0).hasTotalRecordedLinks(0), + span -> + span.hasName("destination process") + .hasKind(SpanKind.CONSUMER) + .hasTotalRecordedLinks(0) + .hasAttribute(SemanticAttributes.MESSAGING_OPERATION, "process") + .hasAttribute(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS") + .hasAttribute(SemanticAttributes.MESSAGING_DESTINATION_NAME, "destination") + .hasAttribute(SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, 0L) + .hasTotalAttributeCount(4) + .hasParentSpanId(parentSpan.getSpanContext().getSpanId()) + .hasTraceId(parentSpan.getSpanContext().getTraceId()))); + } + + @Test + public void malformedTraceId() { + List messages = new LinkedList(); + messages.add(Message.builder() + .body("Hello") + .attributesWithStrings( + Collections.singletonMap( + "AWSTraceHeader", + "Root=1-55555555-error;Parent=1234567890123456;Sampled=1")) + .build()); + + SqsMessageHandlerImpl messageHandler = new SqsMessageHandlerImpl( + getOpenTelemetry(), + "destination"); + + Span parentSpan = getOpenTelemetry().getTracer("test").spanBuilder("test").startSpan(); + + assertThrows( + RuntimeException.class, + () -> { + try (Scope scope = parentSpan.makeCurrent()) { + messageHandler.handle(messages); + } + }); + + parentSpan.end(); + + assertThat(messageHandler.handleCalls.get()).isEqualTo(0); + + waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("test").hasTotalAttributeCount(0).hasTotalRecordedLinks(0))); + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/XrayTestInstrumenter.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/XrayTestInstrumenter.java new file mode 100644 index 000000000000..a2af1d01c2eb --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/XrayTestInstrumenter.java @@ -0,0 +1,160 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v2_2; + +import static io.opentelemetry.semconv.resource.attributes.ResourceAttributes.SERVICE_NAME; +import static org.awaitility.Awaitility.await; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.contrib.awsxray.propagator.AwsXrayPropagator; +import io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.export.MetricReader; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.testing.assertj.TraceAssert; +import io.opentelemetry.sdk.testing.assertj.TracesAssert; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricExporter; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.ReadWriteSpan; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.awaitility.core.ConditionTimeoutException; +import org.junit.jupiter.api.AfterEach; + +public class XrayTestInstrumenter { + private static final OpenTelemetrySdk openTelemetry; + private static final InMemorySpanExporter testSpanExporter; + + static { + testSpanExporter = InMemorySpanExporter.create(); + InMemoryMetricExporter testMetricExporter = + InMemoryMetricExporter.create(AggregationTemporality.DELTA); + + MetricReader metricReader = + PeriodicMetricReader.builder(testMetricExporter) + // Set really long interval. We'll call forceFlush when we need the metrics + // instead of collecting them periodically. + .setInterval(Duration.ofNanos(Long.MAX_VALUE)) + .build(); + + Resource resource = Resource.getDefault().merge(Resource.create( + Attributes.of(SERVICE_NAME, "MyServiceName"))); + + openTelemetry = + OpenTelemetrySdk.builder() + .setTracerProvider( + SdkTracerProvider.builder() + .setResource(resource) + .addSpanProcessor(new FlushTrackingSpanProcessor()) + .addSpanProcessor(SimpleSpanProcessor.create(testSpanExporter)) + .build()) + .setMeterProvider(SdkMeterProvider.builder().registerMetricReader(metricReader).build()) + .setPropagators(ContextPropagators.create(AwsXrayPropagator.getInstance())) + .buildAndRegisterGlobal(); + } + + @AfterEach + public void resetTests() { + testSpanExporter.reset(); + } + + protected OpenTelemetrySdk getOpenTelemetry() { + return openTelemetry; + } + + private static class FlushTrackingSpanProcessor implements SpanProcessor { + @Override + public void onStart(Context parentContext, ReadWriteSpan span) {} + + @Override + public boolean isStartRequired() { + return false; + } + + @Override + public void onEnd(ReadableSpan span) {} + + @Override + public boolean isEndRequired() { + return false; + } + + @Override + public CompletableResultCode forceFlush() { + return CompletableResultCode.ofSuccess(); + } + } + + @SafeVarargs + @SuppressWarnings("varargs") + public final void waitAndAssertTraces(Consumer... assertions) { + waitAndAssertTraces(null, Arrays.asList(assertions), true); + } + + private > void waitAndAssertTraces( + @Nullable Comparator> traceComparator, + Iterable assertions, + boolean verifyScopeVersion) { + List> assertionsList = new ArrayList<>(); + assertions.forEach(assertionsList::add); + + try { + await() + .untilAsserted(() -> doAssertTraces(traceComparator, assertionsList, verifyScopeVersion)); + } catch (ConditionTimeoutException e) { + // Don't throw this failure since the stack is the awaitility thread, causing confusion. + // Instead, just assert one more time on the test thread, which will fail with a better stack + // trace. + // TODO(anuraaga): There is probably a better way to do this. + doAssertTraces(traceComparator, assertionsList, verifyScopeVersion); + } + } + + private void doAssertTraces( + @Nullable Comparator> traceComparator, + List> assertionsList, + boolean verifyScopeVersion) { + List> traces = waitForTraces(assertionsList.size()); + if (verifyScopeVersion) { + TelemetryDataUtil.assertScopeVersion(traces); + } + if (traceComparator != null) { + traces.sort(traceComparator); + } + TracesAssert.assertThat(traces).hasTracesSatisfyingExactly(assertionsList); + } + + public final List> waitForTraces(int numberOfTraces) { + try { + return TelemetryDataUtil.waitForTraces( + this::getExportedSpans, numberOfTraces, 20, TimeUnit.SECONDS); + } catch (TimeoutException | InterruptedException e) { + throw new AssertionError("Error waiting for " + numberOfTraces + " traces", e); + } + } + + public List getExportedSpans() { + return testSpanExporter.getFinishedSpanItems(); + } +} diff --git a/instrumentation/message-handler/message-handler-1.0/library/README.md b/instrumentation/message-handler/message-handler-1.0/library/README.md new file mode 100644 index 000000000000..718815c7bbc0 --- /dev/null +++ b/instrumentation/message-handler/message-handler-1.0/library/README.md @@ -0,0 +1,9 @@ +# Message Handler + +This package contains instrumentation for message systems. + +The instrumentation will process messages and wrap the calls in a span with appropriate span attributes and span links. + +## Available Message Handlers +- `io.opentelemetry.instrumentation.awssdk.v2_2.SqsMessageHandler` - Process SQS messages for the AWS SDK library. This is found in io.opentelemetry.aws-sdk-2.2. +- `io.opentelemetry.instrumentation.awslambdaevents.v2_2.SqsMessageHandler` - Process SQS messages for the Lambda instrumentation. This is found in io.opentelemetry.aws-lambda-events-2.2. diff --git a/instrumentation/message-handler/message-handler-1.0/library/build.gradle.kts b/instrumentation/message-handler/message-handler-1.0/library/build.gradle.kts new file mode 100644 index 000000000000..9ac5eeb86863 --- /dev/null +++ b/instrumentation/message-handler/message-handler-1.0/library/build.gradle.kts @@ -0,0 +1,7 @@ +plugins { + id("otel.library-instrumentation") +} + +dependencies { + implementation("io.opentelemetry:opentelemetry-sdk") +} diff --git a/instrumentation/message-handler/message-handler-1.0/library/src/main/java/io/opentelemetry/instrumentation/messagehandler/MessageHandler.java b/instrumentation/message-handler/message-handler-1.0/library/src/main/java/io/opentelemetry/instrumentation/messagehandler/MessageHandler.java new file mode 100644 index 000000000000..1f7772120b3a --- /dev/null +++ b/instrumentation/message-handler/message-handler-1.0/library/src/main/java/io/opentelemetry/instrumentation/messagehandler/MessageHandler.java @@ -0,0 +1,34 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.messagehandler; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import java.util.Collection; + +public interface MessageHandler { + + Instrumenter, Void> getMessageInstrumenter(); + + void doHandle(Collection request); + + default void handle(Collection request) { + Instrumenter, Void> instrumenter = getMessageInstrumenter(); + Throwable error = null; + + Context context = instrumenter.start(Context.current(), request); + + try (Scope scope = context.makeCurrent()) { + doHandle(request); + } catch (Throwable t) { + error = t; + throw t; + } finally { + instrumenter.end(context, request, null, error); + } + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 5975389e0709..b23e6157126d 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -356,6 +356,7 @@ hideFromDependabot(":instrumentation:logback:logback-mdc-1.0:javaagent") hideFromDependabot(":instrumentation:logback:logback-mdc-1.0:library") hideFromDependabot(":instrumentation:logback:logback-mdc-1.0:testing") hideFromDependabot(":instrumentation:methods:javaagent") +hideFromDependabot(":instrumentation:message-handler:message-handler-1.0:library") hideFromDependabot(":instrumentation:micrometer:micrometer-1.5:javaagent") hideFromDependabot(":instrumentation:micrometer:micrometer-1.5:library") hideFromDependabot(":instrumentation:micrometer:micrometer-1.5:testing")