Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Instrumentation.AWS] Move adding request and response info to AWSTracingPipelineHandler #2137

2 changes: 2 additions & 0 deletions src/OpenTelemetry.Instrumentation.AWS/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

* Move adding request and response info to AWSTracingPipelineHandler
([#2137](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/2137))
* Drop support for .NET 6 as this target is no longer supported and add .NET 8 target.
([#2139](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/2139))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using Amazon.Runtime.Telemetry;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Extensions.AWS.Trace;
using OpenTelemetry.Trace;

namespace OpenTelemetry.Instrumentation.AWS.Implementation;

Expand All @@ -28,191 +27,22 @@ internal class AWSPropagatorPipelineHandler : PipelineHandler
carrier[name] = value;
};

private readonly AWSClientInstrumentationOptions options;

public AWSPropagatorPipelineHandler(AWSClientInstrumentationOptions options)
{
this.options = options;
}

public override void InvokeSync(IExecutionContext executionContext)
{
this.ProcessBeginRequest(executionContext);
ProcessBeginRequest(executionContext);

base.InvokeSync(executionContext);

ProcessEndRequest(executionContext);
}

public override async Task<T> InvokeAsync<T>(IExecutionContext executionContext)
{
T? ret = null;

this.ProcessBeginRequest(executionContext);

ret = await base.InvokeAsync<T>(executionContext).ConfigureAwait(false);

ProcessEndRequest(executionContext);

return ret;
}

#if NET
[System.Diagnostics.CodeAnalysis.UnconditionalSuppressMessage(
"Trimming",
"IL2075",
Justification = "The reflected properties were already used by the AWS SDK's marshallers so the properties could not have been trimmed.")]
#endif
private static void AddRequestSpecificInformation(Activity activity, IRequestContext requestContext)
{
var service = requestContext.ServiceMetaData.ServiceId;

if (AWSServiceHelper.ServiceRequestParameterMap.TryGetValue(service, out var parameters))
{
AmazonWebServiceRequest request = requestContext.OriginalRequest;

foreach (var parameter in parameters)
{
try
{
// for bedrock agent, we only extract one attribute based on the operation.
if (AWSServiceType.IsBedrockAgentService(service))
{
if (AWSServiceHelper.OperationNameToResourceMap()[AWSServiceHelper.GetAWSOperationName(requestContext)] != parameter)
{
continue;
}
}

var property = request.GetType().GetProperty(parameter);
if (property != null)
{
if (AWSServiceHelper.ParameterAttributeMap.TryGetValue(parameter, out var attribute))
{
activity.SetTag(attribute, property.GetValue(request));
}
}
}
catch (Exception)
{
// Guard against any reflection-related exceptions when running in AoT.
// See https://github.com/open-telemetry/opentelemetry-dotnet-contrib/issues/1543#issuecomment-1907667722.
}
}
}
ProcessBeginRequest(executionContext);

if (AWSServiceType.IsDynamoDbService(service))
{
activity.SetTag(SemanticConventions.AttributeDbSystem, AWSSemanticConventions.AttributeValueDynamoDb);
}
else if (AWSServiceType.IsSqsService(service))
{
SqsRequestContextHelper.AddAttributes(
requestContext, AWSMessagingUtils.InjectIntoDictionary(new PropagationContext(activity.Context, Baggage.Current)));
}
else if (AWSServiceType.IsSnsService(service))
{
SnsRequestContextHelper.AddAttributes(
requestContext, AWSMessagingUtils.InjectIntoDictionary(new PropagationContext(activity.Context, Baggage.Current)));
}
else if (AWSServiceType.IsBedrockRuntimeService(service))
{
activity.SetTag(AWSSemanticConventions.AttributeGenAiSystem, "aws_bedrock");
}
return await base.InvokeAsync<T>(executionContext).ConfigureAwait(false);
}

#if NET
[System.Diagnostics.CodeAnalysis.UnconditionalSuppressMessage(
"Trimming",
"IL2075",
Justification = "The reflected properties were already used by the AWS SDK's marshallers so the properties could not have been trimmed.")]
#endif
private static void AddResponseSpecificInformation(Activity activity, IExecutionContext executionContext)
private static void ProcessBeginRequest(IExecutionContext executionContext)
{
var service = executionContext.RequestContext.ServiceMetaData.ServiceId;
var responseContext = executionContext.ResponseContext;

if (AWSServiceHelper.ServiceResponseParameterMap.TryGetValue(service, out var parameters))
{
AmazonWebServiceResponse response = responseContext.Response;

foreach (var parameter in parameters)
{
try
{
// for bedrock agent, extract attribute from object in response.
if (AWSServiceType.IsBedrockAgentService(service))
{
var operationName = Utils.RemoveSuffix(response.GetType().Name, "Response");
if (AWSServiceHelper.OperationNameToResourceMap()[operationName] == parameter)
{
AddBedrockAgentResponseAttribute(activity, response, parameter);
}
}

var property = response.GetType().GetProperty(parameter);
if (property != null)
{
if (AWSServiceHelper.ParameterAttributeMap.TryGetValue(parameter, out var attribute))
{
activity.SetTag(attribute, property.GetValue(response));
}
}
}
catch (Exception)
{
// Guard against any reflection-related exceptions when running in AoT.
// See https://github.com/open-telemetry/opentelemetry-dotnet-contrib/issues/1543#issuecomment-1907667722.
}
}
}
}

#if NET
[System.Diagnostics.CodeAnalysis.UnconditionalSuppressMessage(
"Trimming",
"IL2075",
Justification = "The reflected properties were already used by the AWS SDK's marshallers so the properties could not have been trimmed.")]
#endif
private static void AddBedrockAgentResponseAttribute(Activity activity, AmazonWebServiceResponse response, string parameter)
{
var responseObject = response.GetType().GetProperty(Utils.RemoveSuffix(parameter, "Id"));
if (responseObject != null)
{
var attributeObject = responseObject.GetValue(response);
if (attributeObject != null)
{
var property = attributeObject.GetType().GetProperty(parameter);
if (property != null)
{
if (AWSServiceHelper.ParameterAttributeMap.TryGetValue(parameter, out var attribute))
{
activity.SetTag(attribute, property.GetValue(attributeObject));
}
}
}
}
}

private static void ProcessEndRequest(IExecutionContext executionContext)
{
var currentActivity = Activity.Current;

if (currentActivity == null || !currentActivity.Source.Name.StartsWith(TelemetryConstants.TelemetryScopePrefix, StringComparison.Ordinal))
{
return;
}

AddResponseSpecificInformation(currentActivity, executionContext);
}

private void ProcessBeginRequest(IExecutionContext executionContext)
{
if (this.options.SuppressDownstreamInstrumentation)
{
SuppressInstrumentationScope.Enter();
}

var currentActivity = Activity.Current;

// Propagate the current activity if it was created by the AWS SDK
Expand All @@ -221,7 +51,6 @@ private void ProcessBeginRequest(IExecutionContext executionContext)
return;
}

AddRequestSpecificInformation(currentActivity, executionContext.RequestContext);
AwsPropagator.Inject(
new PropagationContext(currentActivity.Context, Baggage.Current),
executionContext.RequestContext.Request.Headers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ internal static class AWSSemanticConventions
public const string AttributeAWSBedrockDataSourceId = "aws.bedrock.data_source.id";
public const string AttributeAWSBedrockGuardrailId = "aws.bedrock.guardrail.id";
public const string AttributeAWSBedrockKnowledgeBaseId = "aws.bedrock.knowledge_base.id";
public const string AttributeAWSBedrock = "aws_bedrock";

// should be global convention for Gen AI attributes
public const string AttributeGenAiModelId = "gen_ai.request.model";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
namespace OpenTelemetry.Instrumentation.AWS.Implementation;

/// <summary>
/// Wires <see cref="AWSPropagatorPipelineHandler"/> into the AWS <see cref="RuntimePipeline"/>
/// so it can inject trace headers and add request information to the tags.
/// Wires <see cref="AWSTracingPipelineHandler"/> and <see cref="AWSPropagatorPipelineHandler"/> into the AWS
/// <see cref="RuntimePipeline"/> so they can inject trace headers and add request information to the tags.
/// </summary>
internal class AWSTracingPipelineCustomizer : IRuntimePipelineCustomizer
{
Expand All @@ -34,7 +34,12 @@ public void Customize(Type serviceClientType, RuntimePipeline pipeline)
return;
}

var propagatingPipelineHandler = new AWSPropagatorPipelineHandler(this.options);
var tracingPipelineHandler = new AWSTracingPipelineHandler(this.options);
var propagatingPipelineHandler = new AWSPropagatorPipelineHandler();

// AWSTracingPipelineHandler must execute early in the AWS SDK pipeline
// in order to manipulate outgoing requests objects before they are marshalled (ie serialized).
pipeline.AddHandlerBefore<Marshaller>(tracingPipelineHandler);

// AWSPropagatorPipelineHandler executes after the AWS SDK has marshalled (ie serialized)
// the outgoing request object so that it can work with the request's Headers
Expand Down
Loading
Loading