Skip to content

Commit

Permalink
bugfix: AWS SQS MD5 Hash Mismatch (#1572)
Browse files Browse the repository at this point in the history
  • Loading branch information
ppittle authored Feb 22, 2024
1 parent 2f3a694 commit bfe42e2
Show file tree
Hide file tree
Showing 13 changed files with 184 additions and 120 deletions.
3 changes: 3 additions & 0 deletions src/OpenTelemetry.Instrumentation.AWS/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

* BREAKING: Switched AWSServiceName tag to use ServiceId instead of ServiceName
([#1572](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/1572))

## 1.1.0-beta.3

Released 2024-Jan-26
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Amazon.Runtime;
using Amazon.Runtime.Internal;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Extensions.AWS.Trace;

namespace OpenTelemetry.Instrumentation.AWS.Implementation;

/// <summary>
/// Uses <see cref="AWSXRayPropagator"/> to inject the current Activity Context and
/// Baggage into the outgoing AWS SDK Request.
/// <para />
/// Must execute after the AWS SDK has marshalled (ie serialized)
/// the outgoing request object so that it can work with the <see cref="IRequest"/>'s
/// <see cref="IRequest.Headers"/>.
/// </summary>
internal class AWSPropagatorPipelineHandler : PipelineHandler
{
private static readonly AWSXRayPropagator AwsPropagator = new();

private static readonly Action<IDictionary<string, string>, string, string> Setter = (carrier, name, value) =>
{
carrier[name] = value;
};

/// <summary>
/// Rely on the the <see cref="AWSTracingPipelineHandler.Activity"/> for retrieving the current
/// context.
/// </summary>
private readonly AWSTracingPipelineHandler tracingPipelineHandler;

public AWSPropagatorPipelineHandler(AWSTracingPipelineHandler tracingPipelineHandler)
{
this.tracingPipelineHandler = tracingPipelineHandler;
}

public override void InvokeSync(IExecutionContext executionContext)
{
this.ProcessBeginRequest(executionContext);

base.InvokeSync(executionContext);
}

public override async Task<T> InvokeAsync<T>(IExecutionContext executionContext)
{
this.ProcessBeginRequest(executionContext);

return await base.InvokeAsync<T>(executionContext).ConfigureAwait(false);
}

private void ProcessBeginRequest(IExecutionContext executionContext)
{
if (this.tracingPipelineHandler.Activity == null)
{
return;
}

AwsPropagator.Inject(
new PropagationContext(this.tracingPipelineHandler.Activity.Context, Baggage.Current),
executionContext.RequestContext.Request.Headers,
Setter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ internal class AWSServiceHelper
};

internal static string GetAWSServiceName(IRequestContext requestContext)
=> Utils.RemoveAmazonPrefixFromServiceName(requestContext.Request.ServiceName);
=> Utils.RemoveAmazonPrefixFromServiceName(requestContext.ServiceMetaData.ServiceId);

internal static string GetAWSOperationName(IRequestContext requestContext)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ namespace OpenTelemetry.Instrumentation.AWS.Implementation;

internal class AWSServiceType
{
internal const string DynamoDbService = "DynamoDBv2";
internal const string DynamoDbService = "DynamoDB";
internal const string SQSService = "SQS";
internal const string SNSService = "SimpleNotificationService"; // SNS
internal const string SNSService = "SNS";

internal static bool IsDynamoDbService(string service)
=> DynamoDbService.Equals(service, StringComparison.OrdinalIgnoreCase);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@

namespace OpenTelemetry.Instrumentation.AWS.Implementation;

/// <summary>
/// Wires <see cref="AWSTracingPipelineHandler"/> and <see cref="AWSPropagatorPipelineHandler"/>
/// into the AWS <see cref="RuntimePipeline"/> so they can inject trace headers and wrap sdk calls in spans.
/// </summary>
internal class AWSTracingPipelineCustomizer : IRuntimePipelineCustomizer
{
private readonly AWSClientInstrumentationOptions options;
Expand All @@ -31,6 +35,15 @@ public void Customize(Type serviceClientType, RuntimePipeline pipeline)
return;
}

pipeline.AddHandlerBefore<RetryHandler>(new AWSTracingPipelineHandler(this.options));
var tracingPipelineHandler = new AWSTracingPipelineHandler(this.options);
var propagatingPipelineHandler = new AWSPropagatorPipelineHandler(tracingPipelineHandler);

// AWSTracingPipelineHandler must execute early in the AWS SDK pipeline
// in order to manipulate outgoing requests objects before they are marshalled (ie serialized).
pipeline.AddHandlerBefore<Marshaller>(tracingPipelineHandler);

// AWSPropagatorPipelineHandler executes after the AWS SDK has marshalled (ie serialized)
// the outgoing request object so that it can work with the request's Headers
pipeline.AddHandlerBefore<RetryHandler>(propagatingPipelineHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,27 @@
// SPDX-License-Identifier: Apache-2.0

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using Amazon.Runtime;
using Amazon.Runtime.Internal;
using Amazon.Util;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Extensions.AWS.Trace;
using OpenTelemetry.Trace;

namespace OpenTelemetry.Instrumentation.AWS.Implementation;

/// <summary>
/// Wraps the outgoing AWS SDK Request in a Span and adds additional AWS specific Tags.
/// Depending on the target AWS Service, additional request specific information may be injected as well.
/// <para />
/// This <see cref="PipelineHandler"/> must execute early in the AWS SDK pipeline
/// in order to manipulate outgoing requests objects before they are marshalled (ie serialized).
/// </summary>
internal sealed class AWSTracingPipelineHandler : PipelineHandler
{
internal const string ActivitySourceName = "Amazon.AWS.AWSClientInstrumentation";

private static readonly AWSXRayPropagator AwsPropagator = new();
private static readonly Action<IDictionary<string, string>, string, string> Setter = (carrier, name, value) =>
{
carrier[name] = value;
};

private static readonly ActivitySource AWSSDKActivitySource = new(ActivitySourceName);

private readonly AWSClientInstrumentationOptions options;
Expand All @@ -33,27 +32,29 @@ public AWSTracingPipelineHandler(AWSClientInstrumentationOptions options)
this.options = options;
}

public Activity? Activity { get; private set; }

public override void InvokeSync(IExecutionContext executionContext)
{
var activity = this.ProcessBeginRequest(executionContext);
this.Activity = this.ProcessBeginRequest(executionContext);
try
{
base.InvokeSync(executionContext);
}
catch (Exception ex)
{
if (activity != null)
if (this.Activity != null)
{
ProcessException(activity, ex);
ProcessException(this.Activity, ex);
}

throw;
}
finally
{
if (activity != null)
if (this.Activity != null)
{
ProcessEndRequest(executionContext, activity);
ProcessEndRequest(executionContext, this.Activity);
}
}
}
Expand All @@ -62,25 +63,25 @@ public override async Task<T> InvokeAsync<T>(IExecutionContext executionContext)
{
T? ret = null;

var activity = this.ProcessBeginRequest(executionContext);
this.Activity = this.ProcessBeginRequest(executionContext);
try
{
ret = await base.InvokeAsync<T>(executionContext).ConfigureAwait(false);
}
catch (Exception ex)
{
if (activity != null)
if (this.Activity != null)
{
ProcessException(activity, ex);
ProcessException(this.Activity, ex);
}

throw;
}
finally
{
if (activity != null)
if (this.Activity != null)
{
ProcessEndRequest(executionContext, activity);
ProcessEndRequest(executionContext, this.Activity);
}
}

Expand Down Expand Up @@ -241,8 +242,6 @@ private static string FetchRequestId(IRequestContext requestContext, IResponseCo
AddRequestSpecificInformation(activity, requestContext, service);
}

AwsPropagator.Inject(new PropagationContext(activity.Context, Baggage.Current), requestContext.Request.Headers, Setter);

return activity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Collections.Generic;
using System.Linq;
using Amazon.Runtime;
using Amazon.Runtime.Internal;
using Amazon.SimpleNotificationService.Model;

namespace OpenTelemetry.Instrumentation.AWS.Implementation;
Expand All @@ -18,9 +17,8 @@ internal class SnsRequestContextHelper

internal static void AddAttributes(IRequestContext context, IReadOnlyDictionary<string, string> attributes)
{
var parameters = context.Request?.ParameterCollection;
var originalRequest = context.OriginalRequest as PublishRequest;
if (originalRequest?.MessageAttributes == null || parameters == null)
if (originalRequest?.MessageAttributes == null)
{
return;
}
Expand All @@ -38,23 +36,9 @@ internal static void AddAttributes(IRequestContext context, IReadOnlyDictionary<
return;
}

int nextAttributeIndex = attributesCount + 1;
foreach (var param in attributes)
{
AddAttribute(parameters, originalRequest, param.Key, param.Value, nextAttributeIndex);
nextAttributeIndex++;
originalRequest.MessageAttributes[param.Key] = new MessageAttributeValue { DataType = "String", StringValue = param.Value };
}
}

private static void AddAttribute(ParameterCollection parameters, PublishRequest originalRequest, string name, string value, int attributeIndex)
{
var prefix = "MessageAttributes.entry." + attributeIndex;
parameters.Add(prefix + ".Name", name);
parameters.Add(prefix + ".Value.DataType", "String");
parameters.Add(prefix + ".Value.StringValue", value);

// Add injected attributes to the original request as well.
// This dictionary must be in sync with parameters collection to pass through the MD5 hash matching check.
originalRequest.MessageAttributes.Add(name, new MessageAttributeValue { DataType = "String", StringValue = value });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Collections.Generic;
using System.Linq;
using Amazon.Runtime;
using Amazon.Runtime.Internal;
using Amazon.SQS.Model;

namespace OpenTelemetry.Instrumentation.AWS.Implementation;
Expand All @@ -18,9 +17,8 @@ internal class SqsRequestContextHelper

internal static void AddAttributes(IRequestContext context, IReadOnlyDictionary<string, string> attributes)
{
var parameters = context.Request?.ParameterCollection;
var originalRequest = context.OriginalRequest as SendMessageRequest;
if (originalRequest?.MessageAttributes == null || parameters == null)
if (originalRequest?.MessageAttributes == null)
{
return;
}
Expand All @@ -38,23 +36,9 @@ internal static void AddAttributes(IRequestContext context, IReadOnlyDictionary<
return;
}

int nextAttributeIndex = attributesCount + 1;
foreach (var param in attributes)
{
AddAttribute(parameters, originalRequest, param.Key, param.Value, nextAttributeIndex);
nextAttributeIndex++;
originalRequest.MessageAttributes[param.Key] = new MessageAttributeValue { DataType = "String", StringValue = param.Value };
}
}

private static void AddAttribute(ParameterCollection parameters, SendMessageRequest originalRequest, string name, string value, int attributeIndex)
{
var prefix = "MessageAttribute." + attributeIndex;
parameters.Add(prefix + ".Name", name);
parameters.Add(prefix + ".Value.DataType", "String");
parameters.Add(prefix + ".Value.StringValue", value);

// Add injected attributes to the original request as well.
// This dictionary must be in sync with parameters collection to pass through the MD5 hash matching check.
originalRequest.MessageAttributes.Add(name, new MessageAttributeValue { DataType = "String", StringValue = value });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static TracerProviderBuilder AddAWSInstrumentation(
configure?.Invoke(awsClientOptions);

_ = new AWSClientsInstrumentation(awsClientOptions);
builder.AddSource("Amazon.AWS.AWSClientInstrumentation");
builder.AddSource(AWSTracingPipelineHandler.ActivitySourceName);
return builder;
}
}
Loading

0 comments on commit bfe42e2

Please sign in to comment.