Skip to content

Commit

Permalink
[Instrumentation.AWS] add additioanl aws pipeline handler that activa…
Browse files Browse the repository at this point in the history
…tes after RequestContext.Request has been built so that the Propagator can inject request headers.
  • Loading branch information
ppittle committed Feb 12, 2024
1 parent 4bd642f commit 87085d1
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Amazon.Runtime;
using Amazon.Runtime.Internal;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Extensions.AWS.Trace;

namespace OpenTelemetry.Instrumentation.AWS.Implementation;

/// <summary>
/// Uses <see cref="AWSXRayPropagator"/> to inject the current Activity Context and
/// Baggage into the outgoing AWS SDK Request.
/// <para />
/// Must execute after the AWS SDK has marshalled (ie serialized)
/// the outgoing request object so that it can work with the <see cref="IRequest"/>'s
/// <see cref="IRequest.Headers"/>.
/// </summary>
internal class AWSPropagatorPipelineHandler : PipelineHandler
{
private static readonly AWSXRayPropagator AwsPropagator = new();

private static readonly Action<IDictionary<string, string>, string, string> Setter = (carrier, name, value) =>
{
carrier[name] = value;
};

/// <summary>
/// Rely on the the <see cref="AWSTracingPipelineHandler.Activity"/> for retrieving the current
/// context.
/// </summary>
private readonly AWSTracingPipelineHandler tracingPipelineHandler;

public AWSPropagatorPipelineHandler(AWSTracingPipelineHandler tracingPipelineHandler)
{
this.tracingPipelineHandler = tracingPipelineHandler;
}

public override void InvokeSync(IExecutionContext executionContext)
{
this.ProcessBeginRequest(executionContext);

base.InvokeSync(executionContext);
}

public override async Task<T> InvokeAsync<T>(IExecutionContext executionContext)
{
this.ProcessBeginRequest(executionContext);

return await base.InvokeAsync<T>(executionContext).ConfigureAwait(false);
}

private void ProcessBeginRequest(IExecutionContext executionContext)
{
if (this.tracingPipelineHandler.Activity == null)
{
return;
}

AwsPropagator.Inject(
new PropagationContext(this.tracingPipelineHandler.Activity.Context, Baggage.Current),
executionContext.RequestContext.Request.Headers,
Setter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@

namespace OpenTelemetry.Instrumentation.AWS.Implementation;

/// <summary>
/// Wires <see cref="AWSTracingPipelineHandler"/> and <see cref="AWSPropagatorPipelineHandler"/>
/// into the AWS <see cref="RuntimePipeline"/> so they can inject trace headers and wrap sdk calls in spans.
/// </summary>
internal class AWSTracingPipelineCustomizer : IRuntimePipelineCustomizer
{
private readonly AWSClientInstrumentationOptions options;
Expand All @@ -31,6 +35,15 @@ public void Customize(Type serviceClientType, RuntimePipeline pipeline)
return;
}

pipeline.AddHandlerBefore<Marshaller>(new AWSTracingPipelineHandler(this.options));
var tracingPipelineHandler = new AWSTracingPipelineHandler(this.options);
var propagatingPipelineHandler = new AWSPropagatorPipelineHandler(tracingPipelineHandler);

// AWSTracingPipelineHandler must execute early in the AWS SDK pipeline
// in order to manipulate outgoing requests objects before they are marshalled (ie serialized).
pipeline.AddHandlerBefore<Marshaller>(tracingPipelineHandler);

// AWSPropagatorPipelineHandler executes after the AWS SDK has marshalled (ie serialized)
// the outgoing request object so that it can work with the request's Headers
pipeline.AddHandlerBefore<RetryHandler>(propagatingPipelineHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,27 @@
// SPDX-License-Identifier: Apache-2.0

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using Amazon.Runtime;
using Amazon.Runtime.Internal;
using Amazon.Util;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Extensions.AWS.Trace;
using OpenTelemetry.Trace;

namespace OpenTelemetry.Instrumentation.AWS.Implementation;

/// <summary>
/// Wraps the outgoing AWS SDK Request in a Span and adds additional AWS specific Tags.
/// Depending on the target AWS Service, additional request specific information may be injected as well.
/// <para />
/// This <see cref="PipelineHandler"/> must execute early in the AWS SDK pipeline
/// in order to manipulate outgoing requests objects before they are marshalled (ie serialized).
/// </summary>
internal sealed class AWSTracingPipelineHandler : PipelineHandler
{
internal const string ActivitySourceName = "Amazon.AWS.AWSClientInstrumentation";

private static readonly AWSXRayPropagator AwsPropagator = new();
private static readonly Action<IDictionary<string, string>, string, string> Setter = (carrier, name, value) =>
{
carrier[name] = value;
};

private static readonly ActivitySource AWSSDKActivitySource = new(ActivitySourceName);

private readonly AWSClientInstrumentationOptions options;
Expand All @@ -33,27 +32,29 @@ public AWSTracingPipelineHandler(AWSClientInstrumentationOptions options)
this.options = options;
}

public Activity? Activity { get; private set; }

public override void InvokeSync(IExecutionContext executionContext)
{
var activity = this.ProcessBeginRequest(executionContext);
this.Activity = this.ProcessBeginRequest(executionContext);
try
{
base.InvokeSync(executionContext);
}
catch (Exception ex)
{
if (activity != null)
if (this.Activity != null)
{
ProcessException(activity, ex);
ProcessException(this.Activity, ex);
}

throw;
}
finally
{
if (activity != null)
if (this.Activity != null)
{
ProcessEndRequest(executionContext, activity);
ProcessEndRequest(executionContext, this.Activity);
}
}
}
Expand All @@ -62,25 +63,25 @@ public override async Task<T> InvokeAsync<T>(IExecutionContext executionContext)
{
T? ret = null;

var activity = this.ProcessBeginRequest(executionContext);
this.Activity = this.ProcessBeginRequest(executionContext);
try
{
ret = await base.InvokeAsync<T>(executionContext).ConfigureAwait(false);
}
catch (Exception ex)
{
if (activity != null)
if (this.Activity != null)
{
ProcessException(activity, ex);
ProcessException(this.Activity, ex);
}

throw;
}
finally
{
if (activity != null)
if (this.Activity != null)
{
ProcessEndRequest(executionContext, activity);
ProcessEndRequest(executionContext, this.Activity);
}
}

Expand Down Expand Up @@ -241,8 +242,6 @@ private static string FetchRequestId(IRequestContext requestContext, IResponseCo
AddRequestSpecificInformation(activity, requestContext, service);
}

AwsPropagator.Inject(new PropagationContext(activity.Context, Baggage.Current), requestContext.Request.Headers, Setter);

return activity;
}
}

0 comments on commit 87085d1

Please sign in to comment.