Skip to content

Commit

Permalink
Updated MassTransit instrumentation to OpenTelemetry 1.0 (#85)
Browse files Browse the repository at this point in the history
* Updated MassTransit instrumentation

* fixed unit tests

* Assert only sampled traces

* Update CODEOWNERS
  • Loading branch information
alexvaluyskiy authored Mar 15, 2021
1 parent 8323c83 commit 3bac8e7
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 84 deletions.
4 changes: 2 additions & 2 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ src/OpenTelemetry.Contrib.Exporter.Stackdriver/ @ope
src/OpenTelemetry.Contrib.Extensions.AWSXRay/ @open-telemetry/dotnet-contrib-approvers @srprash @lupengamzn
src/OpenTelemetry.Contrib.Instrumentation.Elasticsearch/ @open-telemetry/dotnet-contrib-approvers
src/OpenTelemetry.Contrib.Instrumentation.EntityFrameworkCore/ @open-telemetry/dotnet-contrib-approvers
src/OpenTelemetry.Contrib.Instrumentation.MassTransit/ @open-telemetry/dotnet-contrib-approvers
src/OpenTelemetry.Contrib.Instrumentation.MassTransit/ @open-telemetry/dotnet-contrib-approvers @alexvaluyskiy
src/OpenTelemetry.Contrib.Instrumentation.GrpcCore/ @open-telemetry/dotnet-contrib-approvers @pcwiese

test/OpenTelemetry.Contrib.Exporter.Stackdriver.Tests/ @open-telemetry/dotnet-contrib-approvers
test/OpenTelemetry.Contrib.Extensions.AWSXRay.Tests/ @open-telemetry/dotnet-contrib-approvers @srprash @lupengamzn
test/OpenTelemetry.Contrib.Instrumentation.Elasticsearch.Tests/ @open-telemetry/dotnet-contrib-approvers
test/OpenTelemetry.Contrib.Instrumentation.EntityFrameworkCoreTests/ @open-telemetry/dotnet-contrib-approvers
test/OpenTelemetry.Contrib.Instrumentation.MassTransit.Tests/ @open-telemetry/dotnet-contrib-approvers
test/OpenTelemetry.Contrib.Instrumentation.MassTransit.Tests/ @open-telemetry/dotnet-contrib-approvers @alexvaluyskiy
test/OpenTelemetry.Contrib.Instrumentation.GrpcCore.Tests/ @open-telemetry/dotnet-contrib-approvers @pcwiese
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,62 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using OpenTelemetry.Instrumentation;
using OpenTelemetry.Trace;

namespace OpenTelemetry.Contrib.Instrumentation.MassTransit.Implementation
{
internal class MassTransitDiagnosticListener : ListenerHandler
{
private readonly ActivitySourceAdapter activitySource;
internal static readonly AssemblyName AssemblyName = typeof(MassTransitDiagnosticListener).Assembly.GetName();
internal static readonly string ActivitySourceName = AssemblyName.Name;
internal static readonly Version Version = AssemblyName.Version;
internal static readonly ActivitySource ActivitySource = new ActivitySource(ActivitySourceName, Version.ToString());

private readonly MassTransitInstrumentationOptions options;

public MassTransitDiagnosticListener(ActivitySourceAdapter activitySource, MassTransitInstrumentationOptions options)
: base("MassTransit")
public MassTransitDiagnosticListener(string name, MassTransitInstrumentationOptions options)
: base(name)
{
this.activitySource = activitySource;
this.options = options;
}

public override void OnStartActivity(Activity activity, object payload)
{
if (this.options.TracedOperations != null && !this.options.TracedOperations.Contains(activity.OperationName))
{
MassTransitInstrumentationEventSource.Log.RequestIsFilteredOut(activity.OperationName);
activity.IsAllDataRequested = false;
return;
}

activity.DisplayName = this.GetDisplayName(activity);

this.activitySource.Start(activity, this.GetActivityKind(activity));
ActivityInstrumentationHelper.SetActivitySourceProperty(activity, ActivitySource);
ActivityInstrumentationHelper.SetKindProperty(activity, this.GetActivityKind(activity));
}

public override void OnStopActivity(Activity activity, object payload)
{
if (this.options.TracedOperations != null && !this.options.TracedOperations.Contains(activity.OperationName))
{
MassTransitInstrumentationEventSource.Log.RequestIsFilteredOut(activity.OperationName);
activity.IsAllDataRequested = false;
return;
}

if (activity.IsAllDataRequested)
{
this.TransformMassTransitTags(activity);
try
{
this.TransformMassTransitTags(activity);
}
catch (Exception ex)
{
MassTransitInstrumentationEventSource.Log.EnrichmentException(ex);
}
}

this.activitySource.Stop(activity);
}

private string GetDisplayName(Activity activity)
Expand Down Expand Up @@ -94,8 +108,8 @@ private void TransformMassTransitTags(Activity activity)

this.RenameTag(activity, TagName.MessageId, SemanticConventions.AttributeMessagingMessageId);
this.RenameTag(activity, TagName.ConversationId, SemanticConventions.AttributeMessagingConversationId);
this.RenameTag(activity, TagName.InitiatorId, SemanticConventions.AttributeMessagingMassTransitInitiatorId);
this.RenameTag(activity, TagName.CorrelationId, SemanticConventions.AttributeMessagingMassTransitCorrelationId);
this.RenameTag(activity, TagName.InitiatorId, MassTransitSemanticConventions.AttributeMessagingMassTransitInitiatorId);
this.RenameTag(activity, TagName.CorrelationId, MassTransitSemanticConventions.AttributeMessagingMassTransitCorrelationId);

activity.SetTag(TagName.SourceAddress, null);
}
Expand All @@ -105,8 +119,8 @@ private void TransformMassTransitTags(Activity activity)

this.RenameTag(activity, TagName.MessageId, SemanticConventions.AttributeMessagingMessageId);
this.RenameTag(activity, TagName.ConversationId, SemanticConventions.AttributeMessagingConversationId);
this.RenameTag(activity, TagName.InitiatorId, SemanticConventions.AttributeMessagingMassTransitInitiatorId);
this.RenameTag(activity, TagName.CorrelationId, SemanticConventions.AttributeMessagingMassTransitCorrelationId);
this.RenameTag(activity, TagName.InitiatorId, MassTransitSemanticConventions.AttributeMessagingMassTransitInitiatorId);
this.RenameTag(activity, TagName.CorrelationId, MassTransitSemanticConventions.AttributeMessagingMassTransitCorrelationId);

activity.SetTag(TagName.MessageId, null);

Expand All @@ -116,7 +130,7 @@ private void TransformMassTransitTags(Activity activity)
}
else if (activity.OperationName == OperationName.Consumer.Consume)
{
this.RenameTag(activity, TagName.ConsumerType, SemanticConventions.AttributeMessagingMassTransitConsumerType);
this.RenameTag(activity, TagName.ConsumerType, MassTransitSemanticConventions.AttributeMessagingMassTransitConsumerType);
}
else if (activity.OperationName == OperationName.Consumer.Handle)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// <copyright file="MassTransitInstrumentationEventSource.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;
using System.Diagnostics.Tracing;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Contrib.Instrumentation.MassTransit.Implementation
{
/// <summary>
/// EventSource events emitted from the project.
/// </summary>
[EventSource(Name = "OpenTelemetry-Instrumentation-MassTransit")]
internal class MassTransitInstrumentationEventSource : EventSource
{
public static MassTransitInstrumentationEventSource Log = new MassTransitInstrumentationEventSource();

[Event(1, Message = "Request is filtered out.", Level = EventLevel.Verbose)]
public void RequestIsFilteredOut(string eventName)
{
this.WriteEvent(1, eventName);
}

[NonEvent]
public void EnrichmentException(Exception ex)
{
if (this.IsEnabled(EventLevel.Error, (EventKeywords)(-1)))
{
this.EnrichmentException(ex.ToInvariantString());
}
}

[Event(2, Message = "Enrich threw exception. Exception {0}.", Level = EventLevel.Error)]
public void EnrichmentException(string exception)
{
this.WriteEvent(2, exception);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// <copyright file="MassTransitSemanticConventions.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

namespace OpenTelemetry.Contrib.Instrumentation.MassTransit.Implementation
{
internal class MassTransitSemanticConventions
{
public const string AttributeMessagingMassTransitInitiatorId = "messaging.masstransit.initiator_id";
public const string AttributeMessagingMassTransitCorrelationId = "messaging.masstransit.correlation_id";
public const string AttributeMessagingMassTransitConsumerType = "messaging.masstransit.consumer_type";
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,25 @@
using System;
using OpenTelemetry.Contrib.Instrumentation.MassTransit.Implementation;
using OpenTelemetry.Instrumentation;
using OpenTelemetry.Trace;

namespace OpenTelemetry.Contrib.Instrumentation.MassTransit
{
internal class MassTransitInstrumentation : IDisposable
{
private readonly DiagnosticSourceSubscriber diagnosticSourceSubscriber;
internal const string MassTransitDiagnosticListenerName = "MassTransit";

/// <summary>
/// Initializes a new instance of the <see cref="MassTransitInstrumentation"/> class.
/// </summary>
/// <param name="activitySource">ActivitySource adapter instance.</param>
public MassTransitInstrumentation(ActivitySourceAdapter activitySource)
: this(activitySource, new MassTransitInstrumentationOptions())
{
}
private readonly DiagnosticSourceSubscriber diagnosticSourceSubscriber;

/// <summary>
/// Initializes a new instance of the <see cref="MassTransitInstrumentation"/> class.
/// </summary>
/// <param name="activitySource">ActivitySource adapter instance.</param>
/// <param name="options">Instrumentation options.</param>
public MassTransitInstrumentation(ActivitySourceAdapter activitySource, MassTransitInstrumentationOptions options)
public MassTransitInstrumentation(MassTransitInstrumentationOptions options)
{
var diagnosticListener = new MassTransitDiagnosticListener(activitySource, options);
this.diagnosticSourceSubscriber = new DiagnosticSourceSubscriber(diagnosticListener, null);
this.diagnosticSourceSubscriber = new DiagnosticSourceSubscriber(
name => new MassTransitDiagnosticListener(name, options),
listener => listener.Name == MassTransitDiagnosticListenerName,
null);
this.diagnosticSourceSubscriber.Subscribe();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
<Description>OpenTelemetry instrumentation for MassTransit</Description>
<PackageTags>$(PackageTags);distributed-tracing;MassTransit</PackageTags>
<MinVerTagPrefix>Instrumentation.MassTransit-</MinVerTagPrefix>
<IncludeSharedInstrumentationSource>true</IncludeSharedInstrumentationSource>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="OpenTelemetry" Version="0.7.0-beta.1" />
<PackageReference Include="OpenTelemetry" Version="1.0.2-alpha.0.29" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

using System;
using OpenTelemetry.Contrib.Instrumentation.MassTransit;
using OpenTelemetry.Contrib.Instrumentation.MassTransit.Implementation;

namespace OpenTelemetry.Trace
{
Expand All @@ -42,7 +43,15 @@ public static TracerProviderBuilder AddMassTransitInstrumentation(
var options = new MassTransitInstrumentationOptions();
configureMassTransitInstrumentationOptions?.Invoke(options);

return builder.AddInstrumentation(activitySource => new MassTransitInstrumentation(activitySource, options));
builder.AddInstrumentation(() => new MassTransitInstrumentation(options));
builder.AddSource(MassTransitDiagnosticListener.ActivitySourceName);

builder.AddLegacySource(OperationName.Consumer.Consume);
builder.AddLegacySource(OperationName.Consumer.Handle);
builder.AddLegacySource(OperationName.Transport.Send);
builder.AddLegacySource(OperationName.Transport.Receive);

return builder;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public async Task ShouldMapMassTransitTagsForConsumeMessageToOpenTelemetrySpecif
Assert.NotNull(expectedMessageContext);
Assert.Equal("OpenTelemetry.Contrib.Instrumentation.MassTransit.Tests.TestConsumer process", actualActivity.DisplayName);
Assert.Equal(ActivityKind.Internal, actualActivity.Kind);
Assert.Equal("OpenTelemetry.Contrib.Instrumentation.MassTransit.Tests.TestConsumer", actualActivity.GetTagValue(SemanticConventions.AttributeMessagingMassTransitConsumerType)?.ToString());
Assert.Equal("OpenTelemetry.Contrib.Instrumentation.MassTransit.Tests.TestConsumer", actualActivity.GetTagValue(MassTransitSemanticConventions.AttributeMessagingMassTransitConsumerType)?.ToString());

Assert.Null(actualActivity.GetTagValue(TagName.SpanKind));
Assert.Null(actualActivity.GetTagValue(TagName.PeerService));
Expand Down Expand Up @@ -223,8 +223,12 @@ public async Task ShouldMapMassTransitTagsForHandleMessageToOpenTelemetrySpecifi
}
}

[Fact]
public async Task MassTransitInstrumentationTestOptions()
[Theory]
[InlineData(OperationName.Consumer.Consume)]
[InlineData(OperationName.Consumer.Handle)]
[InlineData(OperationName.Transport.Send)]
[InlineData(OperationName.Transport.Receive)]
public async Task MassTransitInstrumentationTestOptions(string operationName)
{
using Activity activity = new Activity("Parent");
activity.SetParentId(
Expand All @@ -237,7 +241,7 @@ public async Task MassTransitInstrumentationTestOptions()
using (Sdk.CreateTracerProviderBuilder()
.AddProcessor(activityProcessor.Object)
.AddMassTransitInstrumentation(o =>
o.TracedOperations = new HashSet<string>(new[] { OperationName.Consumer.Consume }))
o.TracedOperations = new HashSet<string>(new[] { operationName }))
.Build())
{
var harness = new InMemoryTestHarness();
Expand All @@ -261,11 +265,11 @@ await harness.InputQueueSendEndpoint.Send<TestMessage>(new
}
}

Assert.Equal(4, activityProcessor.Invocations.Count);
Assert.Equal(8, activityProcessor.Invocations.Count);

var consumes = this.GetActivitiesFromInvocationsByOperationName(activityProcessor.Invocations, OperationName.Consumer.Consume);
var consumes = this.GetActivitiesFromInvocationsByOperationName(activityProcessor.Invocations, operationName);

Assert.Equal(2, consumes.Count());
Assert.Single(consumes);
}

private IEnumerable<Activity> GetActivitiesFromInvocationsByOperationName(IEnumerable<IInvocation> invocations, string operationName)
Expand All @@ -275,6 +279,7 @@ private IEnumerable<Activity> GetActivitiesFromInvocationsByOperationName(IEnume
.Where(i =>
i.Arguments.OfType<Activity>()
.Any(a => a.OperationName == operationName))
.Where(i => i.Method.Name == "OnEnd")
.Select(i => i.Arguments.OfType<Activity>().Single());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
<Description>Unit test project for OpenTelemetry MassTransit instrumentation</Description>
<TargetFrameworks>netcoreapp2.1;netcoreapp3.1</TargetFrameworks>
<TargetFrameworks Condition="$(OS) == 'Windows_NT'">$(TargetFrameworks);net461</TargetFrameworks>
<IncludeSharedTestSource>true</IncludeSharedTestSource>
</PropertyGroup>

<ItemGroup>
Expand Down

0 comments on commit 3bac8e7

Please sign in to comment.