Skip to content

Commit

Permalink
Initial RabbitMQ logging
Browse files Browse the repository at this point in the history
Fix #564
  • Loading branch information
eerhardt committed Nov 1, 2023
1 parent 79a87b7 commit 1ace2fc
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 5 deletions.
6 changes: 2 additions & 4 deletions samples/eShopLite/AppHost/Program.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
var builder = DistributedApplication.CreateBuilder(args);

builder.AddAzureProvisioning();

var catalogDb = builder.AddPostgresContainer("postgres").AddDatabase("catalogdb");

var basketCache = builder.AddRedisContainer("basketcache");

var catalogService = builder.AddProject<Projects.CatalogService>("catalogservice")
.WithReference(catalogDb)
.WithReplicas(2);
.WithReference(catalogDb);
//.WithReplicas(2);

var messaging = builder.AddRabbitMQContainer("messaging");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,16 @@ private static void AddRabbitMQ(

IConnectionFactory CreateConnectionFactory(IServiceProvider sp)
{
var connectionString = settings.ConnectionString;
// ensure the log forwarder is initialized
sp.GetRequiredService<RabbitMQEventSourceLogForwarder>().Start();

var factory = new ConnectionFactory();

var configurationOptionsSection = configSection.GetSection("ConnectionFactory");
configurationOptionsSection.Bind(factory);

// the connection string from settings should win over the one from the ConnectionFactory section
var connectionString = settings.ConnectionString;
if (!string.IsNullOrEmpty(connectionString))
{
factory.Uri = new(connectionString);
Expand All @@ -105,6 +107,8 @@ IConnectionFactory CreateConnectionFactory(IServiceProvider sp)
builder.Services.AddKeyedSingleton<IConnection>(serviceKey, (sp, key) => CreateConnection(sp.GetRequiredKeyedService<IConnectionFactory>(key), settings.MaxConnectRetryCount));
}

builder.Services.AddSingleton<RabbitMQEventSourceLogForwarder>();

if (settings.Tracing)
{
// Note that RabbitMQ.Client v6.6 doesn't have built-in support for tracing. See https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1261
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Diagnostics.Tracing;
using System.Text;
using Microsoft.Extensions.Logging;

// implementation adapted from https://github.com/Azure/azure-sdk-for-net/blob/827d6b193d0bab75ea98c553c48e5c48e2a41f84/sdk/extensions/Microsoft.Extensions.Azure/src/Internal/AzureEventSourceLogForwarder.cs#L18

internal sealed class RabbitMQEventSourceLogForwarder : IDisposable
{
private readonly ILoggerFactory _loggerFactory;
private readonly ConcurrentDictionary<string, ILogger> _loggers = new ConcurrentDictionary<string, ILogger>();
private RabbitMQEventSourceListener? _listener;

public RabbitMQEventSourceLogForwarder(ILoggerFactory loggerFactory)
{
_loggerFactory = loggerFactory;
}

/// <summary>
/// Initiates the log forwarding from the RabbitMQ event sources to a provided <see cref="ILoggerFactory"/>, call <see cref="Dispose"/> to stop forwarding.
/// </summary>
public void Start()
{
_listener ??= new RabbitMQEventSourceListener(LogEvent, EventLevel.Verbose);
}

private void LogEvent(EventWrittenEventArgs eventData)
{
if (_loggerFactory == null)
{
return;
}

var logger = _loggers.GetOrAdd(eventData.EventSource.Name, name => _loggerFactory.CreateLogger(ToLoggerName(name)));
logger.Log(MapLevel(eventData.Level), new EventId(eventData.EventId, eventData.EventName), new EventSourceEvent(eventData), null, FormatMessage);
}

private static string ToLoggerName(string name)
{
return name.Replace('-', '.');
}

public void Dispose() => _listener?.Dispose();

private static LogLevel MapLevel(EventLevel level)
{
switch (level)
{
case EventLevel.Critical:
return LogLevel.Critical;
case EventLevel.Error:
return LogLevel.Error;
case EventLevel.Informational:
return LogLevel.Information;
case EventLevel.Verbose:
return LogLevel.Debug;
case EventLevel.Warning:
return LogLevel.Warning;
case EventLevel.LogAlways:
return LogLevel.Information;
default:
throw new ArgumentOutOfRangeException(nameof(level), level, null);
}
}

private static string FormatMessage(EventSourceEvent eventSourceEvent, Exception? exception)
{
return EventSourceEventFormatting.Format(eventSourceEvent.EventData);
}

private readonly struct EventSourceEvent : IReadOnlyList<KeyValuePair<string, object?>>
{
public EventWrittenEventArgs EventData { get; }

public EventSourceEvent(EventWrittenEventArgs eventData)
{
EventData = eventData;
}

public IEnumerator<KeyValuePair<string, object?>> GetEnumerator()
{
for (var i = 0; i < Count; i++)
{
yield return new KeyValuePair<string, object?>(EventData.PayloadNames![i], EventData.Payload![i]);
}
}

IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}

public int Count => EventData.PayloadNames?.Count ?? 0;

public KeyValuePair<string, object?> this[int index] => new(EventData.PayloadNames![index], EventData.Payload![index]);
}

/// <summary>
/// Implementation of <see cref="EventListener"/> that listens to events produced by the RabbitMQ.Client library.
/// </summary>
private sealed class RabbitMQEventSourceListener : EventListener
{
private readonly List<EventSource> _eventSources = new List<EventSource>();

private readonly Action<EventWrittenEventArgs> _log;
private readonly EventLevel _level;

public RabbitMQEventSourceListener(Action<EventWrittenEventArgs> log, EventLevel level)
{
_log = log;
_level = level;

foreach (EventSource eventSource in _eventSources)
{
OnEventSourceCreated(eventSource);
}

_eventSources.Clear();
}

protected sealed override void OnEventSourceCreated(EventSource eventSource)
{
base.OnEventSourceCreated(eventSource);

if (_log == null)
{
_eventSources.Add(eventSource);
}

if (eventSource.Name == "rabbitmq-dotnet-client" || eventSource.Name == "rabbitmq-client")
{
EnableEvents(eventSource, _level);
}
}

protected sealed override void OnEventWritten(EventWrittenEventArgs eventData)
{
// Workaround https://github.com/dotnet/corefx/issues/42600
if (eventData.EventId == -1)
{
return;
}

// There is a very tight race during the listener creation where EnableEvents was called
// and the thread producing events not observing the `_log` field assignment
_log?.Invoke(eventData);
}
}

private static class EventSourceEventFormatting
{
public static string Format(EventWrittenEventArgs eventData)
{
var stringBuilder = new StringBuilder();
if (eventData.EventId == 3)
{
FormatErrorEvent(eventData, stringBuilder);
}
else
{
stringBuilder.Append(eventData.EventName);

if (!string.IsNullOrWhiteSpace(eventData.Message))
{
stringBuilder.AppendLine();
stringBuilder.Append(nameof(eventData.Message)).Append(" = ").Append(eventData.Message);
}

if (eventData.PayloadNames != null)
{
for (var i = 0; i < eventData.PayloadNames.Count; i++)
{
stringBuilder.AppendLine();
stringBuilder.Append(eventData.PayloadNames[i]).Append(" = ").Append(eventData.Payload?[i]);
}
}
}

return stringBuilder.ToString();
}

private static void FormatErrorEvent(EventWrittenEventArgs eventData, StringBuilder stringBuilder)
{
Debug.Assert(eventData.PayloadNames?.Count == 2 && eventData.Payload?.Count == 2);

if (eventData.PayloadNames?.Count == 2 && eventData.Payload?.Count == 2)
{
Debug.Assert(eventData.PayloadNames[0] == "message");
Debug.Assert(eventData.PayloadNames[1] == "ex");

stringBuilder.Append(eventData.Payload[0]);

if (eventData.Payload[1] is IDictionary<string, object?> exPayload)
{
stringBuilder.AppendLine();
foreach (var kvp in exPayload)
{
stringBuilder.Append(kvp.Key).Append(" = ").Append(kvp.Value).AppendLine();
}
}
}
}
}
}

0 comments on commit 1ace2fc

Please sign in to comment.