diff --git a/src/OpenTelemetry.Instrumentation.AWS/Implementation/AWSPropagatorPipelineHandler.cs b/src/OpenTelemetry.Instrumentation.AWS/Implementation/AWSPropagatorPipelineHandler.cs new file mode 100644 index 0000000000..3f6ade4fd1 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.AWS/Implementation/AWSPropagatorPipelineHandler.cs @@ -0,0 +1,68 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Amazon.Runtime; +using Amazon.Runtime.Internal; +using OpenTelemetry.Context.Propagation; +using OpenTelemetry.Extensions.AWS.Trace; + +namespace OpenTelemetry.Instrumentation.AWS.Implementation; + +/// +/// Uses to inject the current Activity Context and +/// Baggage into the outgoing AWS SDK Request. +/// +/// Must execute after the AWS SDK has marshalled (ie serialized) +/// the outgoing request object so that it can work with the 's +/// . +/// +internal class AWSPropagatorPipelineHandler : PipelineHandler +{ + private static readonly AWSXRayPropagator AwsPropagator = new(); + + private static readonly Action, string, string> Setter = (carrier, name, value) => + { + carrier[name] = value; + }; + + /// + /// Rely on the the for retrieving the current + /// context. + /// + private readonly AWSTracingPipelineHandler tracingPipelineHandler; + + public AWSPropagatorPipelineHandler(AWSTracingPipelineHandler tracingPipelineHandler) + { + this.tracingPipelineHandler = tracingPipelineHandler; + } + + public override void InvokeSync(IExecutionContext executionContext) + { + this.ProcessBeginRequest(executionContext); + + base.InvokeSync(executionContext); + } + + public override async Task InvokeAsync(IExecutionContext executionContext) + { + this.ProcessBeginRequest(executionContext); + + return await base.InvokeAsync(executionContext).ConfigureAwait(false); + } + + private void ProcessBeginRequest(IExecutionContext executionContext) + { + if (this.tracingPipelineHandler.Activity == null) + { + return; + } + + AwsPropagator.Inject( + new PropagationContext(this.tracingPipelineHandler.Activity.Context, Baggage.Current), + executionContext.RequestContext.Request.Headers, + Setter); + } +} diff --git a/src/OpenTelemetry.Instrumentation.AWS/Implementation/AWSTracingPipelineCustomizer.cs b/src/OpenTelemetry.Instrumentation.AWS/Implementation/AWSTracingPipelineCustomizer.cs index ac03e10452..2a9538bb6c 100644 --- a/src/OpenTelemetry.Instrumentation.AWS/Implementation/AWSTracingPipelineCustomizer.cs +++ b/src/OpenTelemetry.Instrumentation.AWS/Implementation/AWSTracingPipelineCustomizer.cs @@ -7,6 +7,10 @@ namespace OpenTelemetry.Instrumentation.AWS.Implementation; +/// +/// Wires and +/// into the AWS so they can inject trace headers and wrap sdk calls in spans. +/// internal class AWSTracingPipelineCustomizer : IRuntimePipelineCustomizer { private readonly AWSClientInstrumentationOptions options; @@ -31,6 +35,15 @@ public void Customize(Type serviceClientType, RuntimePipeline pipeline) return; } - pipeline.AddHandlerBefore(new AWSTracingPipelineHandler(this.options)); + var tracingPipelineHandler = new AWSTracingPipelineHandler(this.options); + var propagatingPipelineHandler = new AWSPropagatorPipelineHandler(tracingPipelineHandler); + + // AWSTracingPipelineHandler must execute early in the AWS SDK pipeline + // in order to manipulate outgoing requests objects before they are marshalled (ie serialized). + pipeline.AddHandlerBefore(tracingPipelineHandler); + + // AWSPropagatorPipelineHandler executes after the AWS SDK has marshalled (ie serialized) + // the outgoing request object so that it can work with the request's Headers + pipeline.AddHandlerBefore(propagatingPipelineHandler); } } diff --git a/src/OpenTelemetry.Instrumentation.AWS/Implementation/AWSTracingPipelineHandler.cs b/src/OpenTelemetry.Instrumentation.AWS/Implementation/AWSTracingPipelineHandler.cs index 2383c9f8ba..fe4dfc3636 100644 --- a/src/OpenTelemetry.Instrumentation.AWS/Implementation/AWSTracingPipelineHandler.cs +++ b/src/OpenTelemetry.Instrumentation.AWS/Implementation/AWSTracingPipelineHandler.cs @@ -2,28 +2,27 @@ // SPDX-License-Identifier: Apache-2.0 using System; -using System.Collections.Generic; using System.Diagnostics; using System.Threading.Tasks; using Amazon.Runtime; using Amazon.Runtime.Internal; using Amazon.Util; using OpenTelemetry.Context.Propagation; -using OpenTelemetry.Extensions.AWS.Trace; using OpenTelemetry.Trace; namespace OpenTelemetry.Instrumentation.AWS.Implementation; +/// +/// Wraps the outgoing AWS SDK Request in a Span and adds additional AWS specific Tags. +/// Depending on the target AWS Service, additional request specific information may be injected as well. +/// +/// This must execute early in the AWS SDK pipeline +/// in order to manipulate outgoing requests objects before they are marshalled (ie serialized). +/// internal sealed class AWSTracingPipelineHandler : PipelineHandler { internal const string ActivitySourceName = "Amazon.AWS.AWSClientInstrumentation"; - private static readonly AWSXRayPropagator AwsPropagator = new(); - private static readonly Action, string, string> Setter = (carrier, name, value) => - { - carrier[name] = value; - }; - private static readonly ActivitySource AWSSDKActivitySource = new(ActivitySourceName); private readonly AWSClientInstrumentationOptions options; @@ -33,27 +32,29 @@ public AWSTracingPipelineHandler(AWSClientInstrumentationOptions options) this.options = options; } + public Activity? Activity { get; private set; } + public override void InvokeSync(IExecutionContext executionContext) { - var activity = this.ProcessBeginRequest(executionContext); + this.Activity = this.ProcessBeginRequest(executionContext); try { base.InvokeSync(executionContext); } catch (Exception ex) { - if (activity != null) + if (this.Activity != null) { - ProcessException(activity, ex); + ProcessException(this.Activity, ex); } throw; } finally { - if (activity != null) + if (this.Activity != null) { - ProcessEndRequest(executionContext, activity); + ProcessEndRequest(executionContext, this.Activity); } } } @@ -62,25 +63,25 @@ public override async Task InvokeAsync(IExecutionContext executionContext) { T? ret = null; - var activity = this.ProcessBeginRequest(executionContext); + this.Activity = this.ProcessBeginRequest(executionContext); try { ret = await base.InvokeAsync(executionContext).ConfigureAwait(false); } catch (Exception ex) { - if (activity != null) + if (this.Activity != null) { - ProcessException(activity, ex); + ProcessException(this.Activity, ex); } throw; } finally { - if (activity != null) + if (this.Activity != null) { - ProcessEndRequest(executionContext, activity); + ProcessEndRequest(executionContext, this.Activity); } } @@ -241,8 +242,6 @@ private static string FetchRequestId(IRequestContext requestContext, IResponseCo AddRequestSpecificInformation(activity, requestContext, service); } - AwsPropagator.Inject(new PropagationContext(activity.Context, Baggage.Current), requestContext.Request.Headers, Setter); - return activity; } }