diff --git a/src/HouseofCat.Dataflows/BaseDataflow.cs b/src/HouseofCat.Dataflows/BaseDataflow.cs index 74a8ba4..e1576fb 100644 --- a/src/HouseofCat.Dataflows/BaseDataflow.cs +++ b/src/HouseofCat.Dataflows/BaseDataflow.cs @@ -76,8 +76,8 @@ TState WrapAction(TState state) } catch (Exception ex) { - childSpan?.SetStatus(Status.Error.WithDescription(ex.Message)); childSpan?.RecordException(ex); + childSpan?.SetStatus(Status.Error.WithDescription(ex.Message)); state.IsFaulted = true; state.EDI = ExceptionDispatchInfo.Capture(ex); return state; @@ -101,8 +101,8 @@ async Task WrapActionAsync(TState state) } catch (Exception ex) { - childSpan?.SetStatus(Status.Error.WithDescription(ex.Message)); childSpan?.RecordException(ex); + childSpan?.SetStatus(Status.Error.WithDescription(ex.Message)); state.IsFaulted = true; state.EDI = ExceptionDispatchInfo.Capture(ex); return state; @@ -126,12 +126,12 @@ void WrapAction(TState state) } catch (Exception ex) { - childSpan?.SetStatus(Status.Error.WithDescription(ex.Message)); childSpan?.RecordException(ex); + childSpan?.SetStatus(Status.Error.WithDescription(ex.Message)); } childSpan?.End(); - state.EndRootSpan(); + state.EndRootSpan(true); } return new ActionBlock(WrapAction, options); @@ -151,12 +151,12 @@ void WrapAction(TState state) } catch (Exception ex) { - childSpan?.SetStatus(Status.Error.WithDescription(ex.Message)); childSpan?.RecordException(ex); + childSpan?.SetStatus(Status.Error.WithDescription(ex.Message)); } childSpan?.End(); - state.EndRootSpan(); + state.EndRootSpan(true); } return new ActionBlock(WrapAction, options); @@ -176,12 +176,12 @@ async Task WrapActionAsync(TState state) } catch (Exception ex) { - childSpan?.SetStatus(Status.Error.WithDescription(ex.Message)); childSpan?.RecordException(ex); + childSpan?.SetStatus(Status.Error.WithDescription(ex.Message)); } childSpan?.End(); - state.EndRootSpan(); + state.EndRootSpan(true); } return new ActionBlock(WrapActionAsync, options); diff --git a/src/HouseofCat.Dataflows/Extensions/WorkStateExtensions.cs b/src/HouseofCat.Dataflows/Extensions/WorkStateExtensions.cs index 1494bd8..3cae7d5 100644 --- a/src/HouseofCat.Dataflows/Extensions/WorkStateExtensions.cs +++ b/src/HouseofCat.Dataflows/Extensions/WorkStateExtensions.cs @@ -162,6 +162,11 @@ public static TelemetrySpan CreateActiveChildSpan( attributes: attributes); } + public static void AddEvent(this IWorkState state, string name, string description) + { + state.AddEvent(name, description); + } + public static void EndRootSpan( this IWorkState state, bool includeErrorWhenFaulted = false) diff --git a/src/HouseofCat.RabbitMQ/Dataflows/ConsumerDataflow.cs b/src/HouseofCat.RabbitMQ/Dataflows/ConsumerDataflow.cs index e1f6712..384b8b1 100644 --- a/src/HouseofCat.RabbitMQ/Dataflows/ConsumerDataflow.cs +++ b/src/HouseofCat.RabbitMQ/Dataflows/ConsumerDataflow.cs @@ -191,6 +191,19 @@ public ConsumerDataflow SetEncryptionProvider(IEncryptionProvider provid #region Step Adders + protected static readonly string _defaultSpanNameFormat = "{0}.{1}"; + protected static readonly string _defaultStepSpanNameFormat = "{0}.{1}.{2}"; + + protected string GetSpanName(string stepName) + { + return string.Format(_defaultSpanNameFormat, WorkflowName, stepName); + } + + protected string GetStepSpanName(string stepName) + { + return string.Format(_defaultStepSpanNameFormat, WorkflowName, _suppliedTransforms.Count, stepName); + } + protected virtual ITargetBlock CreateTargetBlock( int boundedCapacity, TaskScheduler taskScheduler = null) => new BufferBlock( @@ -212,7 +225,7 @@ public ConsumerDataflow WithErrorHandling( { _errorBuffer = CreateTargetBlock(boundedCapacity, taskScheduler); var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler); - _errorAction = GetLastWrappedActionBlock(action, executionOptions, $"{WorkflowName}.ErrorHandler"); + _errorAction = GetLastWrappedActionBlock(action, executionOptions, GetSpanName("error_handler")); } return this; } @@ -229,7 +242,7 @@ public ConsumerDataflow WithErrorHandling( { _errorBuffer = CreateTargetBlock(boundedCapacity, taskScheduler); var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler); - _errorAction = GetLastWrappedActionBlock(action, executionOptions, $"{WorkflowName}.ErrorHandler"); + _errorAction = GetLastWrappedActionBlock(action, executionOptions, GetSpanName("error_handler")); } return this; } @@ -240,19 +253,6 @@ public ConsumerDataflow WithReadyToProcessBuffer(int boundedCapacity, Ta return this; } - protected static readonly string _defaultSpanNameFormat = "{0}.{1}"; - protected static readonly string _defaultStepSpanNameFormat = "{0}.{1}.{2}"; - - protected string GetSpanName(string stepName) - { - return string.Format(_defaultSpanNameFormat, WorkflowName, stepName); - } - - protected string GetStepSpanName(string stepName) - { - return string.Format(_defaultStepSpanNameFormat, WorkflowName, _suppliedTransforms.Count, stepName); - } - public ConsumerDataflow AddStep( Func suppliedStep, string stepName, diff --git a/src/HouseofCat.Utilities/Extensions/ServiceCollectionExtensions.cs b/src/HouseofCat.Utilities/Extensions/ServiceCollectionExtensions.cs index f543e38..d16cbcd 100644 --- a/src/HouseofCat.Utilities/Extensions/ServiceCollectionExtensions.cs +++ b/src/HouseofCat.Utilities/Extensions/ServiceCollectionExtensions.cs @@ -5,12 +5,15 @@ using OpenTelemetry.Resources; using OpenTelemetry.Trace; using System; +using System.Collections.Generic; using System.Reflection; namespace HouseofCat.Utilities.Extensions; public static class ServiceCollectionExtensions { + public static string DeploymentEnvironmentKey { get; set; } = "deployment.environment"; + public static void AddOpenTelemetryExporter( this IServiceCollection services, IConfiguration config) @@ -22,6 +25,7 @@ public static void AddOpenTelemetryExporter( bool.TryParse(config["OpenTelemetry:Enabled"] ?? "false", out var enabled); if (!enabled) return; + var environment = config["OpenTelemetry:Environment"] ?? "Dev"; var otlpServiceName = config["OpenTelemetry:ServiceName"] ?? sourceName; var otlpServiceNamespace = config["OpenTelemetry:ServiceNamespace"] ?? "hoc"; var otlpServiceVersion = config["OpenTelemetry:ServiceVersion"] ?? sourceVersion; @@ -39,7 +43,12 @@ public static void AddOpenTelemetryExporter( resource => resource.AddService( serviceName: otlpServiceName, serviceNamespace: otlpServiceNamespace, - serviceVersion: otlpServiceVersion)); + serviceVersion: otlpServiceVersion) + .AddAttributes( + new[] + { + new KeyValuePair(DeploymentEnvironmentKey, environment) + })); otlpBuilder .WithTracing( diff --git a/tests/RabbitMQ.Console.Tests/Program.cs b/tests/RabbitMQ.Console.Tests/Program.cs index 7af4aeb..ca642c6 100644 --- a/tests/RabbitMQ.Console.Tests/Program.cs +++ b/tests/RabbitMQ.Console.Tests/Program.cs @@ -1,29 +1,46 @@ -using RabbitMQ.ConsoleTests; -using Microsoft.Extensions.Logging; +using HouseofCat.Utilities.Extensions; using HouseofCat.Utilities.Helpers; +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +using RabbitMQ.ConsoleTests; var loggerFactory = LogHelpers.CreateConsoleLoggerFactory(LogLevel.Information); LogHelpers.LoggerFactory = loggerFactory; var logger = loggerFactory.CreateLogger(); -// Basic Tests -//await BasicGetTests.RunBasicGetAsync(logger, "./RabbitMQ.BasicGetTests.json"); +var builder = WebApplication.CreateBuilder(args); +var configuration = new ConfigurationBuilder() + .SetBasePath(AppContext.BaseDirectory) + .AddJsonFile("appsettings.json") + .Build(); + +builder.Services.AddOpenTelemetryExporter(configuration); + +using var app = builder.Build(); + +logger.LogInformation("Tests complete! Press CTRL+C to gracefully exit...."); -// Publisher Tests -//await PublisherTests.RunSlowPublisherTestAsync(logger, "./RabbitMQ.PublisherTests.json"); -//await PublisherTests.RunAutoPublisherStandaloneAsync(); +app.Lifetime.ApplicationStarted.Register( + async () => + { + // Basic Tests + //await BasicGetTests.RunBasicGetAsync(logger, "./RabbitMQ.BasicGetTests.json"); -// Consumer Tests -//await ConsumerTests.RunConsumerTestAsync(logger, "./RabbitMQ.ConsumerTests.json"); + // Publisher Tests + //await PublisherTests.RunSlowPublisherTestAsync(logger, "./RabbitMQ.PublisherTests.json"); + //await PublisherTests.RunAutoPublisherStandaloneAsync(); -// PubSub Tests -//await PubSubTests.RunPubSubTestAsync(logger, "./RabbitMQ.PubSubTests.json"); -//await PubSubTests.RunPubSubCheckForDuplicateTestAsync(logger, "./RabbitMQ.PubSubTests.json"); + // Consumer Tests + //await ConsumerTests.RunConsumerTestAsync(logger, "./RabbitMQ.ConsumerTests.json"); -// RabbitService Tests -await RabbitServiceTests.RunRabbitServicePingPongTestAsync(loggerFactory, "./RabbitMQ.RabbitServiceTests.json"); -//await RabbitServiceTests.RunRabbitServiceAltPingPongTestAsync(loggerFactory, "./RabbitMQ.RabbitServiceTests.json"); + // PubSub Tests + //await PubSubTests.RunPubSubTestAsync(logger, "./RabbitMQ.PubSubTests.json"); + //await PubSubTests.RunPubSubCheckForDuplicateTestAsync(logger, "./RabbitMQ.PubSubTests.json"); -logger.LogInformation("Tests complete! Press return to exit...."); + // RabbitService Tests + await RabbitServiceTests.RunRabbitServicePingPongTestAsync(loggerFactory, "./RabbitMQ.RabbitServiceTests.json"); + //await RabbitServiceTests.RunRabbitServiceAltPingPongTestAsync(loggerFactory, "./RabbitMQ.RabbitServiceTests.json"); + }); -Console.ReadLine(); +await app.RunAsync(); diff --git a/tests/RabbitMQ.Console.Tests/RabbitMQ.Console.Tests.csproj b/tests/RabbitMQ.Console.Tests/RabbitMQ.Console.Tests.csproj index de53682..cf12ea4 100644 --- a/tests/RabbitMQ.Console.Tests/RabbitMQ.Console.Tests.csproj +++ b/tests/RabbitMQ.Console.Tests/RabbitMQ.Console.Tests.csproj @@ -13,6 +13,9 @@ + + Always + Always diff --git a/tests/RabbitMQ.Console.Tests/appsettings.json b/tests/RabbitMQ.Console.Tests/appsettings.json new file mode 100644 index 0000000..59492b1 --- /dev/null +++ b/tests/RabbitMQ.Console.Tests/appsettings.json @@ -0,0 +1,11 @@ +{ + "OpenTelemetry": { + "Enabled": false, + "ServiceVersion": "v1.0.0", + "Environment": "production", + "EndpointUrl": "https://ingest.us.signoz.cloud:443", + "HeaderFormat": "{0}={1}", + "HeaderKey": "signoz-access-token", + "ApiKey": "*" + } +} diff --git a/tests/RabbitMQ.ConsumerDataflows.Tests/Program.cs b/tests/RabbitMQ.ConsumerDataflows.Tests/Program.cs index 12597cb..91e17df 100644 --- a/tests/RabbitMQ.ConsumerDataflows.Tests/Program.cs +++ b/tests/RabbitMQ.ConsumerDataflows.Tests/Program.cs @@ -29,7 +29,12 @@ "write_message_to_console", (state) => { - Console.WriteLine(Encoding.UTF8.GetString(state.ReceivedMessage.Body.Span)); + var message = Encoding.UTF8.GetString(state.ReceivedMessage.Body.Span); + if (message == "throw") + { + throw new Exception("Throwing an exception!"); + } + Console.WriteLine(message); return state; }); @@ -75,7 +80,6 @@ (state) => { logger.LogError(state?.EDI?.SourceException, "Error Step!"); - state?.ReceivedMessage?.NackMessage(requeue: true); state?.ReceivedMessage?.Complete(); });