Skip to content

Commit

Permalink
Switching DataFlows to OpenTelemetry vs. IMetricsProvider.
Browse files Browse the repository at this point in the history
  • Loading branch information
houseofcat committed Mar 30, 2024
1 parent 1941856 commit ea789aa
Show file tree
Hide file tree
Showing 17 changed files with 596 additions and 126 deletions.
7 changes: 7 additions & 0 deletions RabbitMQ.Dataflows.sln
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "rabbitmq", "rabbitmq", "{5C
guides\rabbitmq\Serialization.md = guides\rabbitmq\Serialization.md
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenTelemetry.Console.Tests", "tests\OpenTelemetry.Console.Tests\OpenTelemetry.Console.Tests.csproj", "{077E07C3-9A35-42A0-8228-E9778F02DFCE}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -136,6 +138,10 @@ Global
{528C015E-58FC-44B4-BAC5-05BCFCD506C9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{528C015E-58FC-44B4-BAC5-05BCFCD506C9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{528C015E-58FC-44B4-BAC5-05BCFCD506C9}.Release|Any CPU.Build.0 = Release|Any CPU
{077E07C3-9A35-42A0-8228-E9778F02DFCE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{077E07C3-9A35-42A0-8228-E9778F02DFCE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{077E07C3-9A35-42A0-8228-E9778F02DFCE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{077E07C3-9A35-42A0-8228-E9778F02DFCE}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -160,6 +166,7 @@ Global
{0930ACF5-372E-49AF-B53F-A06C3A5FA997} = {D01D59E9-75CC-4C74-A887-22B0A55C994B}
{528C015E-58FC-44B4-BAC5-05BCFCD506C9} = {D01D59E9-75CC-4C74-A887-22B0A55C994B}
{5C94F432-8229-4FFA-8DBF-AC1DEDE66265} = {1AB5E832-AD36-40AF-A7E9-A105A778116C}
{077E07C3-9A35-42A0-8228-E9778F02DFCE} = {D01D59E9-75CC-4C74-A887-22B0A55C994B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {D7099845-267C-4232-8764-5DF25D6B2C79}
Expand Down
65 changes: 32 additions & 33 deletions src/HouseofCat.Dataflows/BaseDataflow.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
using HouseofCat.Compression;
using HouseofCat.Dataflows.Extensions;
using HouseofCat.Encryption;
using HouseofCat.Metrics;
using HouseofCat.Serialization;
using OpenTelemetry.Trace;
using System;
using System.Runtime.ExceptionServices;
using System.Threading.Tasks;
Expand All @@ -17,7 +18,6 @@ namespace HouseofCat.Dataflows;
protected ISerializationProvider _serializationProvider;
protected IEncryptionProvider _encryptionProvider;
protected ICompressionProvider _compressProvider;
protected IMetricsProvider _metricsProvider;

protected ISourceBlock<TState> _currentBlock;
public Task Completion { get; protected set; }
Expand Down Expand Up @@ -60,20 +60,20 @@ public TransformBlock<TState, TState> GetTransformBlock(
public TransformBlock<TState, TState> GetWrappedTransformBlock(
Func<TState, TState> action,
ExecutionDataflowBlockOptions options,
string metricIdentifier,
bool metricMicroScale = false,
string metricUnit = null,
string metricDescription = null)
string metricIdentifier)
{
TState WrapAction(TState state)
{
using var childSpan = state.CreateActiveSpan(metricIdentifier, SpanKind.Consumer);
try
{
using var multiDispose = _metricsProvider.TrackAndDuration(metricIdentifier, metricMicroScale, metricUnit, metricDescription, state.MetricTags);

return action(state);
}
catch (Exception ex)
{
childSpan?.SetStatus(Status.Error.WithDescription(ex.Message));
childSpan?.RecordException(ex);
state.IsFaulted = true;
state.EDI = ExceptionDispatchInfo.Capture(ex);
return state;
Expand All @@ -86,20 +86,19 @@ TState WrapAction(TState state)
public TransformBlock<TState, TState> GetWrappedTransformBlock(
Func<TState, Task<TState>> action,
ExecutionDataflowBlockOptions options,
string metricIdentifier,
bool metricMicroScale = false,
string metricUnit = null,
string metricDescription = null)
string metricIdentifier)
{
async Task<TState> WrapActionAsync(TState state)
{
using var childSpan = state.CreateActiveSpan(metricIdentifier, SpanKind.Consumer);
try
{
using var multiDispose = _metricsProvider.TrackAndDuration(metricIdentifier, metricMicroScale, metricUnit, metricDescription, state.MetricTags);
return await action(state).ConfigureAwait(false);
}
catch (Exception ex)
{
childSpan?.SetStatus(Status.Error.WithDescription(ex.Message));
childSpan?.RecordException(ex);
state.IsFaulted = true;
state.EDI = ExceptionDispatchInfo.Capture(ex);
return state;
Expand All @@ -112,20 +111,20 @@ async Task<TState> WrapActionAsync(TState state)
public ActionBlock<TState> GetWrappedActionBlock(
Action<TState> action,
ExecutionDataflowBlockOptions options,
string metricIdentifier,
bool metricMicroScale = false,
string metricUnit = null,
string metricDescription = null)
string spanName)
{
void WrapAction(TState state)
{
using var childSpan = state.CreateActiveSpan(spanName, SpanKind.Consumer);
try
{
using var multiDispose = _metricsProvider.TrackAndDuration(metricIdentifier, metricMicroScale, metricUnit, metricDescription, state.MetricTags);
action(state);
}
catch
{ /* Actions are terminating block, so swallow (maybe log) */ }
catch (Exception ex)
{
childSpan?.SetStatus(Status.Error.WithDescription(ex.Message));
childSpan?.RecordException(ex);
}
}

return new ActionBlock<TState>(WrapAction, options);
Expand All @@ -134,20 +133,20 @@ void WrapAction(TState state)
public ActionBlock<TState> GetWrappedActionBlock(
Func<TState, TState> action,
ExecutionDataflowBlockOptions options,
string metricIdentifier,
bool metricMicroScale = false,
string metricUnit = null,
string metricDescription = null)
string spanName)
{
void WrapAction(TState state)
{
using var childSpan = state.CreateActiveSpan(spanName, SpanKind.Consumer);
try
{
using var multiDispose = _metricsProvider.TrackAndDuration(metricIdentifier, metricMicroScale, metricUnit, metricDescription, state.MetricTags);
action(state);
}
catch
{ /* Actions are terminating block, so swallow (maybe log) */ }
catch (Exception ex)
{
childSpan?.SetStatus(Status.Error.WithDescription(ex.Message));
childSpan?.RecordException(ex);
}
}

return new ActionBlock<TState>(WrapAction, options);
Expand All @@ -156,20 +155,20 @@ void WrapAction(TState state)
public ActionBlock<TState> GetWrappedActionBlock(
Func<TState, Task> action,
ExecutionDataflowBlockOptions options,
string metricIdentifier,
bool metricMicroScale = false,
string metricUnit = null,
string metricDescription = null)
string spanName)
{
async Task WrapActionAsync(TState state)
{
using var childSpan = state.CreateActiveSpan(spanName, SpanKind.Consumer);
try
{
using var multiDispose = _metricsProvider.TrackAndDuration(metricIdentifier, metricMicroScale, metricUnit, metricDescription, state.MetricTags);
await action(state).ConfigureAwait(false);
}
catch
{ /* Actions are terminating block, so swallow (maybe log) */ }
catch (Exception ex)
{
childSpan?.SetStatus(Status.Error.WithDescription(ex.Message));
childSpan?.RecordException(ex);
}
}

return new ActionBlock<TState>(WrapActionAsync, options);
Expand Down
137 changes: 137 additions & 0 deletions src/HouseofCat.Dataflows/Extensions/WorkStateExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
using OpenTelemetry.Trace;
using System.Diagnostics;

namespace HouseofCat.Dataflows.Extensions;

public static class WorkStateExtensions
{
public static void SetOpenTelemetryError(this IWorkState state, string message = null)
{
if (state is null) return;
state.SetCurrentActivityAsError(message);

if (state.RootSpan is null) return;
state.SetCurrentSpanAsError(message);
}

public static void SetCurrentActivityAsError(this IWorkState state, string message = null)
{
var activity = Activity.Current;
if (activity is null) return;

activity.SetStatus(ActivityStatusCode.Error, message ?? state.EDI.SourceException?.Message);
if (state?.EDI is not null)
{
if (state.EDI.SourceException is not null)
{
activity.RecordException(state.EDI.SourceException);
}
}
else
{
activity.SetStatus(ActivityStatusCode.Error, message);
}
}

public static void SetCurrentSpanAsError(this IWorkState state, string message = null)
{
var span = Tracer.CurrentSpan;
if (span is null) return;

span.SetStatus(Status.Error);
span.SetAttribute("Error", message);
if (state?.EDI is not null)
{
if (state.EDI.SourceException is not null)
{
span.RecordException(state.EDI.SourceException);
}
}
}

public static string OpenTelemetryDefaultProviderTraceSourceNameKey { get; set; } = "OpenTelemetryTraceSourceName";
public static string OpenTelemetryDefaultProviderTracerServiceVersionKey { get; set; } = "OpenTelemetryTraceSourceVersion";

private static readonly string _defaultProviderTracerSourceName = "HouseofCat.Dataflows";

public static void SetWorkflowNameAsOpenTelemetrySourceName(
this IWorkState state,
string workflowName,
string version = null)
{
state.Data[OpenTelemetryDefaultProviderTraceSourceNameKey] = workflowName ?? _defaultProviderTracerSourceName;
state.Data[OpenTelemetryDefaultProviderTracerServiceVersionKey] = version;
}

public static void StartRootSpan(
this IWorkState state,
string spanName,
SpanKind spanKind = SpanKind.Internal,
SpanAttributes spanAttributes = null)
{
if (state.Data.TryGetValue(OpenTelemetryDefaultProviderTraceSourceNameKey, out var workflowName))
{
state.Data.TryGetValue(OpenTelemetryDefaultProviderTracerServiceVersionKey, out var version);

state.RootSpan = TracerProvider
.Default
?.GetTracer(
workflowName?.ToString() ?? _defaultProviderTracerSourceName,
version?.ToString())
?.StartRootSpan(spanName, spanKind, initialAttributes: spanAttributes);
}
else
{
state.RootSpan = TracerProvider
.Default
?.GetTracer(_defaultProviderTracerSourceName, null)
?.StartRootSpan(spanName, spanKind, initialAttributes: spanAttributes);
}
}

public static TelemetrySpan CreateActiveSpan(
this IWorkState state,
string spanName,
SpanKind spanKind = SpanKind.Internal,
SpanAttributes spanAttributes = null)
{
if (state.Data.TryGetValue(OpenTelemetryDefaultProviderTraceSourceNameKey, out var workflowName))
{
state.Data.TryGetValue(OpenTelemetryDefaultProviderTracerServiceVersionKey, out var version);

return TracerProvider
.Default
?.GetTracer(
workflowName?.ToString() ?? _defaultProviderTracerSourceName,
version?.ToString())
?.StartActiveSpan(
spanName,
spanKind,
parentContext: state.RootSpan?.Context ?? default,
initialAttributes: spanAttributes);
}
else
{
return TracerProvider
.Default
?.GetTracer(_defaultProviderTracerSourceName, null)
?.StartActiveSpan(
spanName,
spanKind,
parentContext: state.RootSpan?.Context ?? default,
initialAttributes: spanAttributes);
}
}

public static void EndRootSpan(
this IWorkState state,
bool includeErrorWhenFaulted = false)
{
if (includeErrorWhenFaulted && state.IsFaulted)
{
state.SetOpenTelemetryError();
}
state?.RootSpan?.End();
state?.RootSpan?.Dispose();
}
}
7 changes: 4 additions & 3 deletions src/HouseofCat.Dataflows/IWorkState.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using OpenTelemetry.Trace;
using System.Collections.Generic;
using System.Runtime.ExceptionServices;

namespace HouseofCat.Dataflows;
Expand All @@ -18,6 +19,6 @@ public interface IWorkState
// Outbound
byte[] SendData { get; set; }

// Metrics
IDictionary<string, string> MetricTags { get; set; }
// RootSpan
TelemetrySpan RootSpan { get; set; }
}
1 change: 1 addition & 0 deletions src/HouseofCat.Metrics/HouseofCat.Metrics.csproj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\..\common.props" />
<ItemGroup>
<PackageReference Include="OpenTelemetry.Api" Version="1.7.0" />
<PackageReference Include="prometheus-net" Version="8.2.1" />
<PackageReference Include="prometheus-net.DotNetRuntime" Version="4.4.0" />
</ItemGroup>
Expand Down
Loading

0 comments on commit ea789aa

Please sign in to comment.