Skip to content

Commit

Permalink
Add basic telemetry support in core message bus implementation
Browse files Browse the repository at this point in the history
Adds core Execute/Notify and CreateMapper telemetry.
Also provides telemetry when notifying subjects.

The messaging "protocol" is specified as the assembly for the message type, and the protocol version, the assembly version. This will allow querying/tracking contracts being used.

Operations for Execute methods are `process` and for notify/onnext, `send` (since they are fire&forget notifications).

A new console app sample that showscases the features is also provided.

Closes #73
  • Loading branch information
kzu committed Nov 19, 2022
1 parent 0e082f6 commit 15e6083
Show file tree
Hide file tree
Showing 15 changed files with 209 additions and 15 deletions.
7 changes: 7 additions & 0 deletions Merq.sln
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Merq.AutoMapper", "src\Merq
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Merq.Dynamically", "src\Samples\Merq.Dynamically\Merq.Dynamically.csproj", "{2CA2D2DD-20EC-4BB8-B641-196BDBFB8D6E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ConsoleApp", "src\Samples\ConsoleApp\ConsoleApp.csproj", "{9D458376-9F9A-4F71-B659-0DA2C6B9A9B7}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -106,6 +108,10 @@ Global
{2CA2D2DD-20EC-4BB8-B641-196BDBFB8D6E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2CA2D2DD-20EC-4BB8-B641-196BDBFB8D6E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2CA2D2DD-20EC-4BB8-B641-196BDBFB8D6E}.Release|Any CPU.Build.0 = Release|Any CPU
{9D458376-9F9A-4F71-B659-0DA2C6B9A9B7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9D458376-9F9A-4F71-B659-0DA2C6B9A9B7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9D458376-9F9A-4F71-B659-0DA2C6B9A9B7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9D458376-9F9A-4F71-B659-0DA2C6B9A9B7}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -115,6 +121,7 @@ Global
{E72D5F5F-4A8B-4C00-9111-B7CCC9A33C09} = {9856158D-721E-4B4C-84CA-1FB3BB19B532}
{1D273A2C-E197-485B-82DC-82755D94C1D4} = {9856158D-721E-4B4C-84CA-1FB3BB19B532}
{2CA2D2DD-20EC-4BB8-B641-196BDBFB8D6E} = {9856158D-721E-4B4C-84CA-1FB3BB19B532}
{9D458376-9F9A-4F71-B659-0DA2C6B9A9B7} = {9856158D-721E-4B4C-84CA-1FB3BB19B532}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {5F401F53-1497-4C3F-BA2B-915F7560AF17}
Expand Down
7 changes: 6 additions & 1 deletion src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
<PackageVersion Include="Microsoft.TestPlatform.ObjectModel" Version="17.3.2" />
<PackageVersion Include="NuGetizer" Version="0.9.1" />
<PackageVersion Include="IFluentInterface" Version="2.1.0" />
<PackageVersion Include="OpenTelemetry.Exporter.Console" Version="1.3.1" />
<PackageVersion Include="OpenTelemetry.Exporter.Zipkin" Version="1.3.1" />
<PackageVersion Include="Spectre.Console" Version="0.45.0" />
<PackageVersion Include="System.Composition.AttributedModel" Version="6.0.0" />
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="6.0.0" />
<PackageVersion Include="ThisAssembly.Project" Version="1.0.10" />
<PackageVersion Include="Microsoft.CSharp" Version="4.7.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp" Version="4.2.0" />
Expand All @@ -20,7 +25,7 @@
<PackageVersion Include="SharpYaml" Version="2.1.0" />
<PackageVersion Include="Scriban" Version="5.5.0" />
<PackageVersion Include="Superpower" Version="3.0.0" />
<PackageVersion Include="Devlooped.Extensions.DependencyInjection.Attributed" Version="1.2.1" />
<PackageVersion Include="Devlooped.Extensions.DependencyInjection.Attributed" Version="1.2.2" />
<PackageVersion Include="RxFree" Version="1.1.2" />
</ItemGroup>
<ItemGroup Label="Tests">
Expand Down
11 changes: 11 additions & 0 deletions src/Merq.AutoMapper/AutoMapperMessageBus.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using AutoMapper;
#if NET6_0_OR_GREATER
using TypePair = AutoMapper.Internal.TypePair;
Expand All @@ -16,6 +18,8 @@ namespace Merq;
/// </summary>
public class AutoMapperMessageBus : MessageBus
{
static readonly ActivitySource tracer = new(ThisAssembly.Project.AssemblyName, ThisAssembly.Project.Version);

readonly ConcurrentDictionary<TypePair, bool> mappedTypes = new();
readonly object sync = new();
IMapper? mapper;
Expand All @@ -39,6 +43,7 @@ IMapper CreateMapper(TypePair pair)
{
lock (sync)
{
using var activity = StartActivity();
return new MapperConfiguration(cfg =>
{
foreach (var key in mappedTypes.Keys)
Expand Down Expand Up @@ -87,4 +92,10 @@ IMapper CreateMapper(TypePair pair)
}
};
}

static Activity? StartActivity([CallerMemberName] string? member = default, [CallerFilePath] string? file = default, [CallerLineNumber] int? line = default)
=> tracer.StartActivity(ActivityKind.Internal, name: member ?? "")
?.SetTag("code.function", member)
?.SetTag("code.filepath", file)
?.SetTag("code.lineno", line);
}
5 changes: 5 additions & 0 deletions src/Merq.AutoMapper/Merq.AutoMapper.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
<ProjectReference Include="..\Merq.Core\Merq.Core.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="System.Diagnostics.DiagnosticSource" />
<PackageReference Include="ThisAssembly.Project" PrivateAssets="all" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net472'">
<PackageReference Include="AutoMapper" VersionOverride="10.1.1" />
</ItemGroup>
Expand Down
1 change: 1 addition & 0 deletions src/Merq.Core/Merq.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" />
<PackageReference Include="ThisAssembly.Project" PrivateAssets="all" />
<PackageReference Include="RxFree" PrivateAssets="all" />
</ItemGroup>
Expand Down
13 changes: 12 additions & 1 deletion src/Merq.Core/MessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using static Merq.Telemetry;

namespace Merq;

Expand Down Expand Up @@ -186,6 +187,8 @@ public bool CanHandle(IExecutable command) => canHandleMap.GetOrAdd(GetCommandTy
public void Execute(ICommand command)
{
var type = GetCommandType(command);
using var activity = StartActivity(type);

if (type.IsPublic)
// For public types, we can use the faster dynamic dispatch approach
ExecuteCore((dynamic)command);
Expand All @@ -206,6 +209,8 @@ public void Execute(ICommand command)
public TResult Execute<TResult>(ICommand<TResult> command)
{
var type = GetCommandType(command);
using var activity = StartActivity(type);

if (type.IsPublic)
// For public types, we can use the faster dynamic dispatch approach
return WithResult<TResult>().Execute((dynamic)command);
Expand All @@ -225,6 +230,8 @@ public TResult Execute<TResult>(ICommand<TResult> command)
public Task ExecuteAsync(IAsyncCommand command, CancellationToken cancellation = default)
{
var type = GetCommandType(command);
using var activity = StartActivity(type);

if (type.IsPublic)
// For public types, we can use the faster dynamic dispatch approach
return ExecuteAsyncCore((dynamic)command, cancellation);
Expand All @@ -246,6 +253,8 @@ public Task ExecuteAsync(IAsyncCommand command, CancellationToken cancellation =
public Task<TResult> ExecuteAsync<TResult>(IAsyncCommand<TResult> command, CancellationToken cancellation = default)
{
var type = GetCommandType(command);
using var activity = StartActivity(type);

if (type.IsPublic)
// For public types, we can use the faster dynamic dispatch approach
return WithResult<TResult>().ExecuteAsync((dynamic)command, cancellation);
Expand All @@ -263,6 +272,7 @@ public Task<TResult> ExecuteAsync<TResult>(IAsyncCommand<TResult> command, Cance
public void Notify<TEvent>(TEvent e)
{
var type = (e ?? throw new ArgumentNullException(nameof(e))).GetType();
using var activity = StartActivity(type, "send");

// TODO: if we prevent Notify for externally produced events, we won't be
// able to notify base event subscribers when those events are produced.
Expand Down Expand Up @@ -294,6 +304,7 @@ public void Notify<TEvent>(TEvent e)
public IObservable<TEvent> Observe<TEvent>()
{
var eventType = typeof(TEvent);
using var activity = StartActivity(eventType, "observe");

// NOTE: in order for the base event subscription to work properly for external
// producers, they must register the service for each T in the TEvent hierarchy.
Expand Down Expand Up @@ -380,7 +391,7 @@ static Type GetCommandType(IExecutable command)
Func<dynamic, object>? FindCommandMapper(Type sourceType, out Type? targetType)
{
targetType = null;
if (GetMapper() is null || collection is null)
if (collection is null || GetMapper() is null)
return null;

var map = mappedCommands.GetOrAdd(sourceType, type =>
Expand Down
3 changes: 3 additions & 0 deletions src/Merq.Core/Subject.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using static Merq.Telemetry;

namespace System.Reactive.Subjects;

Expand All @@ -22,13 +23,15 @@ public override void OnNext(object value)
{
if (mapper == null)
{
using var activity = StartActivity(typeof(T), "send");
OnNext((T)value);
}
else if (maps.GetOrAdd(value.GetType(),
type => mapper(type, typeof(T)) is Func<object, object> map ?
obj => (T)map(value) : null)
is Func<object, T> map)
{
using var activity = StartActivity(typeof(T), "send");
OnNext(map(value));
}
}
Expand Down
22 changes: 22 additions & 0 deletions src/Merq.Core/Telemetry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;

namespace Merq;

static class Telemetry
{
static readonly ActivitySource tracer = new(ThisAssembly.Project.AssemblyName, ThisAssembly.Project.Version);

public static Activity? StartActivity(Type type, string operation = "process", [CallerMemberName] string? member = default, [CallerFilePath] string? file = default, [CallerLineNumber] int? line = default)
=> tracer.StartActivity(ActivityKind.Producer, name: $"{member}/{type.FullName}")
?.SetTag("code.function", member)
?.SetTag("code.filepath", file)
?.SetTag("code.lineno", line)
?.SetTag("messaging.system", "merq")
?.SetTag("messaging.destination", type.FullName)
?.SetTag("messaging.destination_kind", "topic")
?.SetTag("messaging.operation", operation)
?.SetTag("messaging.protocol", type.Assembly.GetName().Name)
?.SetTag("messaging.protocol_version", type.Assembly.GetName().Version?.ToString() ?? "unknown");
}
1 change: 0 additions & 1 deletion src/Merq.DependencyInjection/MerqServicesExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ static partial class MerqServicesExtension
/// <param name="enableAutoMapping">Enables duck-typing behavior for events and commands, where
/// instances of disparate assemblies can observe and execute each other's events and commands
/// as long as their full type name matches.</param>
/// </param>
/// <returns>The <see cref="IServiceCollection"/> so that additional calls can be chained.</returns>
public static IServiceCollection AddMessageBus(this IServiceCollection services, bool addDiscoveredServices = true, bool enableAutoMapping = false)
{
Expand Down
4 changes: 3 additions & 1 deletion src/Merq.VisualStudio.Tests/Merq.VisualStudio.Tests.csproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net472</TargetFramework>
Expand All @@ -21,6 +21,8 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Samples\Library1\Library1.csproj" Aliases="Library1" />
<ProjectReference Include="..\Samples\Library2\Library2.csproj" Aliases="Library2" />
<ProjectReference Include="..\Merq.Tests\Merq.Tests.csproj" />
<ProjectReference Include="..\Merq.VisualStudio\Merq.VisualStudio.csproj" />
</ItemGroup>
Expand Down
40 changes: 40 additions & 0 deletions src/Samples/ConsoleApp/ConsoleApp.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>true</ImplicitUsings>
<!-- Allow inspection of generated code under obj -->
<EmitCompilerGeneratedFiles>true</EmitCompilerGeneratedFiles>
<GenerateDocumentationFile>false</GenerateDocumentationFile>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" />
<PackageReference Include="OpenTelemetry.Exporter.Console" />
<PackageReference Include="OpenTelemetry.Exporter.Zipkin" />
<PackageReference Include="RxFree">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Spectre.Console" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\Merq.AutoMapper\Merq.AutoMapper.csproj" />
<ProjectReference Include="..\Library1\Library1.csproj" Aliases="Library1" />
<ProjectReference Include="..\Library2\Library2.csproj" Aliases="Library2" />
</ItemGroup>

<!-- Item fixes to support project references vs package references -->
<ItemGroup Label="All items in this group aren't needed when referencing the nuget packages instead of project references">
<!-- Analyzers and code fixes otherwise automatically added by Merq package -->
<ProjectReference Include="..\..\Merq.CodeAnalysis\Merq.CodeAnalysis.csproj" ReferenceOutputAssembly="false" OutputItemType="Analyzer" />
<ProjectReference Include="..\..\Merq.CodeFixes\Merq.CodeFixes.csproj" ReferenceOutputAssembly="false" OutputItemType="Analyzer" />
<!-- Source included in Merq.DependencyInjection package to register message bus. -->
<Compile Include="..\..\Merq.DependencyInjection\MerqServicesExtension.cs" Link="MerqServicesExtension.cs" Visible="false" />
<!-- Dependency otherwise brought-in by Merq.DependencyInjection for automated service discovery and registration at compile-time. -->
<PackageReference Include="Devlooped.Extensions.DependencyInjection.Attributed" />
</ItemGroup>

</Project>
53 changes: 53 additions & 0 deletions src/Samples/ConsoleApp/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
extern alias Library1;
extern alias Library2;
using Merq;
using Microsoft.Extensions.DependencyInjection;
using OpenTelemetry;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using static Spectre.Console.AnsiConsole;

// Initialize services
var collection = new ServiceCollection();
// Library1 contains [Service]-annotated classes, which will be automatically registered here.
collection.AddMessageBus(addDiscoveredServices: true, enableAutoMapping: true);

var services = collection.BuildServiceProvider();
var bus = services.GetRequiredService<IMessageBus>();

// Setup OpenTelemetry: https://learn.microsoft.com/en-us/dotnet/core/diagnostics/distributed-tracing-instrumentation-walkthroughs
using var tracer = Sdk
.CreateTracerProviderBuilder()
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("ConsoleApp"))
.AddSource("Merq.Core")
.AddSource("Merq.AutoMapper")
.AddConsoleExporter()
.AddZipkinExporter()
.Build();

MarkupLine("[yellow]Executing with command from same assembly[/]");

// NOTE: we subscribe to an event in Library2, which is (duck)compatible with the one
// notified by the Library1.Echo command handler!
bus.Observe<Library2::Library.OnDidSay>()
.Subscribe(e => MarkupLine($"[red]Received Library2:{e.GetType().Name}.Message={e.Message}[/]"));

// Also observe the original message, for comparison
bus.Observe<Library1::Library.OnDidSay>()
.Subscribe(e => MarkupLine($"[lime]Received Library1:{e.GetType().Name}.Message={e.Message}[/]"));

// We can execute passing an object of the same type/assembly as the EchoHandler expects
var message = bus.Execute(new Library1::Library.Echo("Hello World"));

WriteLine(message);

MarkupLine("[yellow]Executing with command from different assembly[/]");

// But we can also execute passing an object from an entirely different assembly
message = bus.Execute(new Library2::Library.Echo("Hello World"));

WriteLine(message);

// Test rapid fire messages
//Parallel.For(0, 10, i
// => bus.Execute(new Library2::Library.Echo($"Hello World ({i})")));
Loading

0 comments on commit 15e6083

Please sign in to comment.