Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add http client response attributes to aws sqs process spans #10074

Merged
merged 1 commit into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
* SPDX-License-Identifier: Apache-2.0
*/

import io.opentelemetry.instrumentation.api.semconv.http.internal.HttpAttributes
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.semconv.SemanticAttributes
import spock.lang.Shared
Expand Down Expand Up @@ -190,14 +189,15 @@ class S3TracingTest extends AgentInstrumentationSpecification {
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" { it.startsWith("http://") }
"$SemanticAttributes.SERVER_ADDRESS" String
"$SemanticAttributes.SERVER_PORT" { it == null || Number }
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "s3ToSqsTestQueue"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$HttpAttributes.ERROR_TYPE" "_OTHER"
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
}
}
span(2) {
Expand Down Expand Up @@ -523,14 +523,15 @@ class S3TracingTest extends AgentInstrumentationSpecification {
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" { it.startsWith("http://") }
"$SemanticAttributes.SERVER_ADDRESS" String
"$SemanticAttributes.SERVER_PORT" { it == null || Number }
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "s3ToSnsToSqsTestQueue"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$HttpAttributes.ERROR_TYPE" "_OTHER"
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
}
}
span(1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
* SPDX-License-Identifier: Apache-2.0
*/

import io.opentelemetry.instrumentation.api.semconv.http.internal.HttpAttributes
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.semconv.SemanticAttributes
import spock.lang.Shared
Expand Down Expand Up @@ -179,14 +178,15 @@ class SnsTracingTest extends AgentInstrumentationSpecification {
"rpc.service" "AmazonSQS"
"rpc.method" "ReceiveMessage"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" String
"$SemanticAttributes.SERVER_ADDRESS" String
"$SemanticAttributes.SERVER_PORT" { it == null || Number }
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "snsToSqsTestQueue"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$HttpAttributes.ERROR_TYPE" "_OTHER"
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
}
}
span(2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import javax.annotation.Nullable;

final class AwsSdkInstrumenterFactory {
Expand Down Expand Up @@ -98,23 +97,23 @@ Instrumenter<SqsReceiveRequest, Response<?>> consumerReceiveInstrumenter() {
openTelemetry,
MessagingSpanNameExtractor.create(getter, operation),
SpanKindExtractor.alwaysConsumer(),
toSqsRequestExtractors(attributesExtractors(), Function.identity()),
toSqsRequestExtractors(attributesExtractors()),
singletonList(messagingAttributeExtractor),
messagingReceiveInstrumentationEnabled);
}

Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter() {
Instrumenter<SqsProcessRequest, Response<?>> consumerProcessInstrumenter() {
MessageOperation operation = MessageOperation.PROCESS;
SqsProcessRequestAttributesGetter getter = SqsProcessRequestAttributesGetter.INSTANCE;
AttributesExtractor<SqsProcessRequest, Void> messagingAttributeExtractor =
AttributesExtractor<SqsProcessRequest, Response<?>> messagingAttributeExtractor =
messagingAttributesExtractor(getter, operation);

InstrumenterBuilder<SqsProcessRequest, Void> builder =
Instrumenter.<SqsProcessRequest, Void>builder(
InstrumenterBuilder<SqsProcessRequest, Response<?>> builder =
Instrumenter.<SqsProcessRequest, Response<?>>builder(
openTelemetry,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractors(toSqsRequestExtractors(attributesExtractors(), unused -> null))
.addAttributesExtractors(toSqsRequestExtractors(attributesExtractors()))
.addAttributesExtractor(messagingAttributeExtractor);

if (messagingReceiveInstrumentationEnabled) {
Expand All @@ -128,14 +127,12 @@ Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter() {
return builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}

private static <RESPONSE>
List<AttributesExtractor<AbstractSqsRequest, RESPONSE>> toSqsRequestExtractors(
List<AttributesExtractor<Request<?>, Response<?>>> extractors,
Function<RESPONSE, Response<?>> responseConverter) {
List<AttributesExtractor<AbstractSqsRequest, RESPONSE>> result = new ArrayList<>();
private static List<AttributesExtractor<AbstractSqsRequest, Response<?>>> toSqsRequestExtractors(
List<AttributesExtractor<Request<?>, Response<?>>> extractors) {
List<AttributesExtractor<AbstractSqsRequest, Response<?>>> result = new ArrayList<>();
for (AttributesExtractor<Request<?>, Response<?>> extractor : extractors) {
result.add(
new AttributesExtractor<AbstractSqsRequest, RESPONSE>() {
new AttributesExtractor<AbstractSqsRequest, Response<?>>() {
@Override
public void onStart(
AttributesBuilder attributes,
Expand All @@ -149,14 +146,9 @@ public void onEnd(
AttributesBuilder attributes,
Context context,
AbstractSqsRequest sqsRequest,
@Nullable RESPONSE response,
@Nullable Response<?> response,
@Nullable Throwable error) {
extractor.onEnd(
attributes,
context,
sqsRequest.getRequest(),
responseConverter.apply(response),
error);
extractor.onEnd(attributes, context, sqsRequest.getRequest(), response, error);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static AwsSdkTelemetryBuilder builder(OpenTelemetry openTelemetry) {

private final Instrumenter<Request<?>, Response<?>> requestInstrumenter;
private final Instrumenter<SqsReceiveRequest, Response<?>> consumerReceiveInstrumenter;
private final Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter;
private final Instrumenter<SqsProcessRequest, Response<?>> consumerProcessInstrumenter;
private final Instrumenter<Request<?>, Response<?>> producerInstrumenter;

AwsSdkTelemetry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private static void afterConsumerResponse(

Instrumenter<SqsReceiveRequest, Response<?>> consumerReceiveInstrumenter =
requestHandler.getConsumerReceiveInstrumenter();
Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter =
Instrumenter<SqsProcessRequest, Response<?>> consumerProcessInstrumenter =
requestHandler.getConsumerProcessInstrumenter();

Context receiveContext = null;
Expand All @@ -75,7 +75,8 @@ private static void afterConsumerResponse(
timer.now());
}

addTracing(receiveMessageResult, request, consumerProcessInstrumenter, receiveContext);
addTracing(
receiveMessageResult, request, response, consumerProcessInstrumenter, receiveContext);
}

private static final Field messagesField = getMessagesField();
Expand All @@ -93,7 +94,8 @@ private static Field getMessagesField() {
private static void addTracing(
ReceiveMessageResult receiveMessageResult,
Request<?> request,
Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter,
Response<?> response,
Instrumenter<SqsProcessRequest, Response<?>> consumerProcessInstrumenter,
Context receiveContext) {
if (messagesField == null) {
return;
Expand All @@ -107,6 +109,7 @@ private static void addTracing(
receiveMessageResult.getMessages(),
consumerProcessInstrumenter,
request,
response,
receiveContext));
} catch (IllegalAccessException ignored) {
// should not happen, we call setAccessible on the field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@

package io.opentelemetry.instrumentation.awssdk.v1_11;

import com.amazonaws.Response;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;

enum SqsProcessRequestAttributesGetter
implements MessagingAttributesGetter<SqsProcessRequest, Void> {
implements MessagingAttributesGetter<SqsProcessRequest, Response<?>> {
INSTANCE;

@Override
Expand Down Expand Up @@ -52,7 +53,7 @@ public Long getMessagePayloadCompressedSize(SqsProcessRequest request) {

@Override
@Nullable
public String getMessageId(SqsProcessRequest request, @Nullable Void response) {
public String getMessageId(SqsProcessRequest request, @Nullable Response<?> response) {
return request.getMessage().getMessageId();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.instrumentation.awssdk.v1_11;

import com.amazonaws.Request;
import com.amazonaws.Response;
import com.amazonaws.services.sqs.model.Message;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
Expand All @@ -16,8 +17,9 @@
class TracingIterator implements Iterator<Message> {

private final Iterator<Message> delegateIterator;
private final Instrumenter<SqsProcessRequest, Void> instrumenter;
private final Instrumenter<SqsProcessRequest, Response<?>> instrumenter;
private final Request<?> request;
private final Response<?> response;
private final Context receiveContext;

/*
Expand All @@ -30,21 +32,24 @@ class TracingIterator implements Iterator<Message> {

private TracingIterator(
Iterator<Message> delegateIterator,
Instrumenter<SqsProcessRequest, Void> instrumenter,
Instrumenter<SqsProcessRequest, Response<?>> instrumenter,
Request<?> request,
Response<?> response,
Context receiveContext) {
this.delegateIterator = delegateIterator;
this.instrumenter = instrumenter;
this.request = request;
this.response = response;
this.receiveContext = receiveContext;
}

public static Iterator<Message> wrap(
Iterator<Message> delegateIterator,
Instrumenter<SqsProcessRequest, Void> instrumenter,
Instrumenter<SqsProcessRequest, Response<?>> instrumenter,
Request<?> request,
Response<?> response,
Context receiveContext) {
return new TracingIterator(delegateIterator, instrumenter, request, receiveContext);
return new TracingIterator(delegateIterator, instrumenter, request, response, receiveContext);
}

@Override
Expand Down Expand Up @@ -80,7 +85,7 @@ public Message next() {
private void closeScopeAndEndSpan() {
if (currentScope != null) {
currentScope.close();
instrumenter.end(currentContext, currentRequest, null, null);
instrumenter.end(currentContext, currentRequest, response, null);
currentScope = null;
currentRequest = null;
currentContext = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.instrumentation.awssdk.v1_11;

import com.amazonaws.Request;
import com.amazonaws.Response;
import com.amazonaws.internal.SdkInternalList;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.Message;
Expand All @@ -18,28 +19,32 @@
class TracingList extends SdkInternalList<Message> {
private static final long serialVersionUID = 1L;

private final transient Instrumenter<SqsProcessRequest, Void> instrumenter;
private final transient Instrumenter<SqsProcessRequest, Response<?>> instrumenter;
private final transient Request<?> request;
private final transient Response<?> response;
private final transient Context receiveContext;
private boolean firstIterator = true;

private TracingList(
List<Message> list,
Instrumenter<SqsProcessRequest, Void> instrumenter,
Instrumenter<SqsProcessRequest, Response<?>> instrumenter,
Request<?> request,
Response<?> response,
Context receiveContext) {
super(list);
this.instrumenter = instrumenter;
this.request = request;
this.response = response;
this.receiveContext = receiveContext;
}

public static SdkInternalList<Message> wrap(
List<Message> list,
Instrumenter<SqsProcessRequest, Void> instrumenter,
Instrumenter<SqsProcessRequest, Response<?>> instrumenter,
Request<?> request,
Response<?> response,
Context receiveContext) {
return new TracingList(list, instrumenter, request, receiveContext);
return new TracingList(list, instrumenter, request, response, receiveContext);
}

@Override
Expand All @@ -49,7 +54,7 @@ public Iterator<Message> iterator() {
// However, this is not thread-safe, but usually the first (hopefully only) traversal of
// List is performed in the same thread that called receiveMessage()
if (firstIterator && !inAwsClient()) {
it = TracingIterator.wrap(super.iterator(), instrumenter, request, receiveContext);
it = TracingIterator.wrap(super.iterator(), instrumenter, request, response, receiveContext);
firstIterator = false;
} else {
it = super.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ final class TracingRequestHandler extends RequestHandler2 {

private final Instrumenter<Request<?>, Response<?>> requestInstrumenter;
private final Instrumenter<SqsReceiveRequest, Response<?>> consumerReceiveInstrumenter;
private final Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter;
private final Instrumenter<SqsProcessRequest, Response<?>> consumerProcessInstrumenter;
private final Instrumenter<Request<?>, Response<?>> producerInstrumenter;

TracingRequestHandler(
Instrumenter<Request<?>, Response<?>> requestInstrumenter,
Instrumenter<SqsReceiveRequest, Response<?>> consumerReceiveInstrumenter,
Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter,
Instrumenter<SqsProcessRequest, Response<?>> consumerProcessInstrumenter,
Instrumenter<Request<?>, Response<?>> producerInstrumenter) {
this.requestInstrumenter = requestInstrumenter;
this.consumerReceiveInstrumenter = consumerReceiveInstrumenter;
Expand Down Expand Up @@ -103,7 +103,7 @@ Instrumenter<SqsReceiveRequest, Response<?>> getConsumerReceiveInstrumenter() {
return consumerReceiveInstrumenter;
}

Instrumenter<SqsProcessRequest, Void> getConsumerProcessInstrumenter() {
Instrumenter<SqsProcessRequest, Response<?>> getConsumerProcessInstrumenter() {
return consumerProcessInstrumenter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder
import com.amazonaws.services.sqs.model.ReceiveMessageRequest
import com.amazonaws.services.sqs.model.SendMessageRequest
import io.opentelemetry.instrumentation.api.semconv.http.internal.HttpAttributes
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.instrumentation.test.utils.PortUtils
import io.opentelemetry.semconv.SemanticAttributes
Expand Down Expand Up @@ -121,14 +120,15 @@ abstract class AbstractSqsSuppressReceiveSpansTest extends InstrumentationSpecif
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" "http://localhost:$sqsPort"
"$SemanticAttributes.SERVER_ADDRESS" "localhost"
"$SemanticAttributes.SERVER_PORT" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$HttpAttributes.ERROR_TYPE" "_OTHER"
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
}
}
span(2) {
Expand Down Expand Up @@ -213,14 +213,15 @@ abstract class AbstractSqsSuppressReceiveSpansTest extends InstrumentationSpecif
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" "http://localhost:$sqsPort"
"$SemanticAttributes.SERVER_ADDRESS" "localhost"
"$SemanticAttributes.SERVER_PORT" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$HttpAttributes.ERROR_TYPE" "_OTHER"
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
}
}
span(2) {
Expand Down
Loading
Loading