Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: AWS SQS MD5 Hash Mismatch #1572

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/OpenTelemetry.Instrumentation.AWS/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

* BREAKING: Switched AWSServiceName tag to use ServiceId instead of ServiceName
([#1572](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/1572))

## 1.1.0-beta.3

Released 2024-Jan-26
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Amazon.Runtime;
using Amazon.Runtime.Internal;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Extensions.AWS.Trace;

namespace OpenTelemetry.Instrumentation.AWS.Implementation;

/// <summary>
/// Uses <see cref="AWSXRayPropagator"/> to inject the current Activity Context and
/// Baggage into the outgoing AWS SDK Request.
/// <para />
/// Must execute after the AWS SDK has marshalled (ie serialized)
/// the outgoing request object so that it can work with the <see cref="IRequest"/>'s
/// <see cref="IRequest.Headers"/>.
/// </summary>
internal class AWSPropagatorPipelineHandler : PipelineHandler
{
private static readonly AWSXRayPropagator AwsPropagator = new();

private static readonly Action<IDictionary<string, string>, string, string> Setter = (carrier, name, value) =>
{
carrier[name] = value;
};

/// <summary>
/// Rely on the the <see cref="AWSTracingPipelineHandler.Activity"/> for retrieving the current
/// context.
/// </summary>
private readonly AWSTracingPipelineHandler tracingPipelineHandler;

public AWSPropagatorPipelineHandler(AWSTracingPipelineHandler tracingPipelineHandler)
{
this.tracingPipelineHandler = tracingPipelineHandler;
}

public override void InvokeSync(IExecutionContext executionContext)
{
this.ProcessBeginRequest(executionContext);

base.InvokeSync(executionContext);
}

public override async Task<T> InvokeAsync<T>(IExecutionContext executionContext)
{
this.ProcessBeginRequest(executionContext);

return await base.InvokeAsync<T>(executionContext).ConfigureAwait(false);
}

private void ProcessBeginRequest(IExecutionContext executionContext)
{
if (this.tracingPipelineHandler.Activity == null)
{
return;
}

AwsPropagator.Inject(
new PropagationContext(this.tracingPipelineHandler.Activity.Context, Baggage.Current),
executionContext.RequestContext.Request.Headers,
Setter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ internal class AWSServiceHelper
};

internal static string GetAWSServiceName(IRequestContext requestContext)
=> Utils.RemoveAmazonPrefixFromServiceName(requestContext.Request.ServiceName);
=> Utils.RemoveAmazonPrefixFromServiceName(requestContext.ServiceMetaData.ServiceId);
ppittle marked this conversation as resolved.
Show resolved Hide resolved

internal static string GetAWSOperationName(IRequestContext requestContext)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ namespace OpenTelemetry.Instrumentation.AWS.Implementation;

internal class AWSServiceType
{
internal const string DynamoDbService = "DynamoDBv2";
internal const string DynamoDbService = "DynamoDB";
ppittle marked this conversation as resolved.
Show resolved Hide resolved
internal const string SQSService = "SQS";
internal const string SNSService = "SimpleNotificationService"; // SNS
internal const string SNSService = "SNS";

internal static bool IsDynamoDbService(string service)
=> DynamoDbService.Equals(service, StringComparison.OrdinalIgnoreCase);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@

namespace OpenTelemetry.Instrumentation.AWS.Implementation;

/// <summary>
/// Wires <see cref="AWSTracingPipelineHandler"/> and <see cref="AWSPropagatorPipelineHandler"/>
/// into the AWS <see cref="RuntimePipeline"/> so they can inject trace headers and wrap sdk calls in spans.
/// </summary>
internal class AWSTracingPipelineCustomizer : IRuntimePipelineCustomizer
{
private readonly AWSClientInstrumentationOptions options;
Expand All @@ -31,6 +35,15 @@ public void Customize(Type serviceClientType, RuntimePipeline pipeline)
return;
}

pipeline.AddHandlerBefore<RetryHandler>(new AWSTracingPipelineHandler(this.options));
var tracingPipelineHandler = new AWSTracingPipelineHandler(this.options);
var propagatingPipelineHandler = new AWSPropagatorPipelineHandler(tracingPipelineHandler);

// AWSTracingPipelineHandler must execute early in the AWS SDK pipeline
// in order to manipulate outgoing requests objects before they are marshalled (ie serialized).
pipeline.AddHandlerBefore<Marshaller>(tracingPipelineHandler);

// AWSPropagatorPipelineHandler executes after the AWS SDK has marshalled (ie serialized)
// the outgoing request object so that it can work with the request's Headers
pipeline.AddHandlerBefore<RetryHandler>(propagatingPipelineHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,27 @@
// SPDX-License-Identifier: Apache-2.0

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using Amazon.Runtime;
using Amazon.Runtime.Internal;
using Amazon.Util;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Extensions.AWS.Trace;
using OpenTelemetry.Trace;

namespace OpenTelemetry.Instrumentation.AWS.Implementation;

/// <summary>
/// Wraps the outgoing AWS SDK Request in a Span and adds additional AWS specific Tags.
/// Depending on the target AWS Service, additional request specific information may be injected as well.
/// <para />
/// This <see cref="PipelineHandler"/> must execute early in the AWS SDK pipeline
/// in order to manipulate outgoing requests objects before they are marshalled (ie serialized).
/// </summary>
internal sealed class AWSTracingPipelineHandler : PipelineHandler
{
internal const string ActivitySourceName = "Amazon.AWS.AWSClientInstrumentation";

private static readonly AWSXRayPropagator AwsPropagator = new();
private static readonly Action<IDictionary<string, string>, string, string> Setter = (carrier, name, value) =>
{
carrier[name] = value;
};

private static readonly ActivitySource AWSSDKActivitySource = new(ActivitySourceName);

private readonly AWSClientInstrumentationOptions options;
Expand All @@ -33,27 +32,29 @@ public AWSTracingPipelineHandler(AWSClientInstrumentationOptions options)
this.options = options;
}

public Activity? Activity { get; private set; }

public override void InvokeSync(IExecutionContext executionContext)
{
var activity = this.ProcessBeginRequest(executionContext);
this.Activity = this.ProcessBeginRequest(executionContext);
try
{
base.InvokeSync(executionContext);
}
catch (Exception ex)
{
if (activity != null)
if (this.Activity != null)
{
ProcessException(activity, ex);
ProcessException(this.Activity, ex);
}

throw;
}
finally
{
if (activity != null)
if (this.Activity != null)
{
ProcessEndRequest(executionContext, activity);
ProcessEndRequest(executionContext, this.Activity);
}
}
}
Expand All @@ -62,25 +63,25 @@ public override async Task<T> InvokeAsync<T>(IExecutionContext executionContext)
{
T? ret = null;

var activity = this.ProcessBeginRequest(executionContext);
this.Activity = this.ProcessBeginRequest(executionContext);
try
{
ret = await base.InvokeAsync<T>(executionContext).ConfigureAwait(false);
}
catch (Exception ex)
{
if (activity != null)
if (this.Activity != null)
{
ProcessException(activity, ex);
ProcessException(this.Activity, ex);
}

throw;
}
finally
{
if (activity != null)
if (this.Activity != null)
{
ProcessEndRequest(executionContext, activity);
ProcessEndRequest(executionContext, this.Activity);
}
}

Expand Down Expand Up @@ -241,8 +242,6 @@ private static string FetchRequestId(IRequestContext requestContext, IResponseCo
AddRequestSpecificInformation(activity, requestContext, service);
}

AwsPropagator.Inject(new PropagationContext(activity.Context, Baggage.Current), requestContext.Request.Headers, Setter);

return activity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Collections.Generic;
using System.Linq;
using Amazon.Runtime;
using Amazon.Runtime.Internal;
using Amazon.SimpleNotificationService.Model;

namespace OpenTelemetry.Instrumentation.AWS.Implementation;
Expand All @@ -18,9 +17,8 @@ internal class SnsRequestContextHelper

internal static void AddAttributes(IRequestContext context, IReadOnlyDictionary<string, string> attributes)
{
var parameters = context.Request?.ParameterCollection;
var originalRequest = context.OriginalRequest as PublishRequest;
if (originalRequest?.MessageAttributes == null || parameters == null)
if (originalRequest?.MessageAttributes == null)
{
return;
}
Expand All @@ -38,23 +36,9 @@ internal static void AddAttributes(IRequestContext context, IReadOnlyDictionary<
return;
}

int nextAttributeIndex = attributesCount + 1;
foreach (var param in attributes)
{
AddAttribute(parameters, originalRequest, param.Key, param.Value, nextAttributeIndex);
nextAttributeIndex++;
originalRequest.MessageAttributes[param.Key] = new MessageAttributeValue { DataType = "String", StringValue = param.Value };
}
}

private static void AddAttribute(ParameterCollection parameters, PublishRequest originalRequest, string name, string value, int attributeIndex)
{
var prefix = "MessageAttributes.entry." + attributeIndex;
parameters.Add(prefix + ".Name", name);
parameters.Add(prefix + ".Value.DataType", "String");
parameters.Add(prefix + ".Value.StringValue", value);

// Add injected attributes to the original request as well.
// This dictionary must be in sync with parameters collection to pass through the MD5 hash matching check.
originalRequest.MessageAttributes.Add(name, new MessageAttributeValue { DataType = "String", StringValue = value });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Collections.Generic;
using System.Linq;
using Amazon.Runtime;
using Amazon.Runtime.Internal;
using Amazon.SQS.Model;

namespace OpenTelemetry.Instrumentation.AWS.Implementation;
Expand All @@ -18,9 +17,8 @@ internal class SqsRequestContextHelper

internal static void AddAttributes(IRequestContext context, IReadOnlyDictionary<string, string> attributes)
{
var parameters = context.Request?.ParameterCollection;
var originalRequest = context.OriginalRequest as SendMessageRequest;
if (originalRequest?.MessageAttributes == null || parameters == null)
if (originalRequest?.MessageAttributes == null)
{
return;
}
Expand All @@ -38,23 +36,9 @@ internal static void AddAttributes(IRequestContext context, IReadOnlyDictionary<
return;
}

int nextAttributeIndex = attributesCount + 1;
foreach (var param in attributes)
{
AddAttribute(parameters, originalRequest, param.Key, param.Value, nextAttributeIndex);
nextAttributeIndex++;
originalRequest.MessageAttributes[param.Key] = new MessageAttributeValue { DataType = "String", StringValue = param.Value };
}
}

private static void AddAttribute(ParameterCollection parameters, SendMessageRequest originalRequest, string name, string value, int attributeIndex)
{
var prefix = "MessageAttribute." + attributeIndex;
parameters.Add(prefix + ".Name", name);
parameters.Add(prefix + ".Value.DataType", "String");
parameters.Add(prefix + ".Value.StringValue", value);

// Add injected attributes to the original request as well.
// This dictionary must be in sync with parameters collection to pass through the MD5 hash matching check.
originalRequest.MessageAttributes.Add(name, new MessageAttributeValue { DataType = "String", StringValue = value });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static TracerProviderBuilder AddAWSInstrumentation(
configure?.Invoke(awsClientOptions);

_ = new AWSClientsInstrumentation(awsClientOptions);
builder.AddSource("Amazon.AWS.AWSClientInstrumentation");
builder.AddSource(AWSTracingPipelineHandler.ActivitySourceName);
return builder;
}
}
Loading
Loading