From cf7719cd19344da4e4b52d2a9965af69560cbe7f Mon Sep 17 00:00:00 2001 From: Stefan Ossendorf Date: Fri, 22 Nov 2024 22:18:30 +0100 Subject: [PATCH] Updated RabbitMQ library to v7 Fixes #4303 --- .../Csla.Channels.RabbitMq.csproj | 2 +- .../Csla.Channels.RabbitMq/ProxyListener.cs | 50 +++++++------- .../Csla.Channels.RabbitMq/RabbitMqPortal.cs | 66 ++++++++++--------- .../Csla.Channels.RabbitMq/RabbitMqProxy.cs | 36 +++++----- 4 files changed, 84 insertions(+), 70 deletions(-) diff --git a/Source/Csla.Channels.RabbitMq/Csla.Channels.RabbitMq.csproj b/Source/Csla.Channels.RabbitMq/Csla.Channels.RabbitMq.csproj index 5c3a1484cb..11099136a6 100644 --- a/Source/Csla.Channels.RabbitMq/Csla.Channels.RabbitMq.csproj +++ b/Source/Csla.Channels.RabbitMq/Csla.Channels.RabbitMq.csproj @@ -15,7 +15,7 @@ - + diff --git a/Source/Csla.Channels.RabbitMq/ProxyListener.cs b/Source/Csla.Channels.RabbitMq/ProxyListener.cs index 5674fb2a22..66f5903496 100644 --- a/Source/Csla.Channels.RabbitMq/ProxyListener.cs +++ b/Source/Csla.Channels.RabbitMq/ProxyListener.cs @@ -6,6 +6,7 @@ // Handles replies from data portal server //----------------------------------------------------------------------- +using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using RabbitMQ.Client; using RabbitMQ.Client.Events; @@ -25,7 +26,7 @@ internal class ProxyListener : IDisposable /// /// Gets or sets the channel (model) for RabbitMQ. /// - protected IModel? Channel { get; set; } + protected IChannel? Channel { get; set; } /// /// Gets or sets the queue for inbound messages @@ -73,7 +74,7 @@ public static ProxyListener GetListener(Uri queueUri) #if NET8_0_OR_GREATER [MemberNotNull(nameof(Connection), nameof(Channel), nameof(ReplyQueue))] #endif - private void InitializeRabbitMQ() + private async Task InitializeRabbitMQ() { var factory = new ConnectionFactory { HostName = _queueUri.Host }; if (_queueUri.Port < 0) @@ -83,8 +84,10 @@ private void InitializeRabbitMQ() factory.UserName = userInfo[0]; if (userInfo.Length > 1) factory.Password = userInfo[1]; - Connection = factory.CreateConnection(); - Channel = Connection.CreateModel(); +#pragma warning disable CS8774 // 11/22/2024, Nullable analysis can't track nullability with async/await + Connection = await factory.CreateConnectionAsync(); + Channel = await Connection.CreateChannelAsync(); +#pragma warning restore CS8774 string[] query; if (string.IsNullOrWhiteSpace(_queueUri.Query)) query = []; @@ -93,25 +96,24 @@ private void InitializeRabbitMQ() if (query.Length == 0 || !query[0].StartsWith("reply=")) { IsNamedReplyQueue = false; - ReplyQueue = Channel.QueueDeclare(); +#pragma warning disable CS8774 // 11/22/2024, Nullable analysis can't track nullability with async/await + ReplyQueue = await Channel.QueueDeclareAsync(); +#pragma warning restore CS8774 } else { IsNamedReplyQueue = true; var split = query[0].Split('='); - ReplyQueue = Channel.QueueDeclare( - queue: split[1], - durable: false, - exclusive: false, - autoDelete: false, - arguments: null); +#pragma warning disable CS8774 // 11/22/2024, Nullable analysis can't track nullability with async/await + ReplyQueue = await Channel.QueueDeclareAsync(queue: split[1], durable: false, exclusive: false, autoDelete: false, arguments: null); +#pragma warning restore CS8774 } } private volatile bool IsListening; private readonly Lock ListeningLock = LockFactory.Create(); - public void StartListening() + public async Task StartListening() { if (IsListening) return; lock (ListeningLock) @@ -120,11 +122,17 @@ public void StartListening() IsListening = true; } - InitializeRabbitMQ(); + await InitializeRabbitMQ(); + Debug.Assert(Channel != null); - var consumer = new EventingBasicConsumer(Channel); - consumer.Received += (_, ea) => + var consumer = new AsyncEventingBasicConsumer(Channel); + consumer.ReceivedAsync += async (_, ea) => { + if (ea.BasicProperties.CorrelationId is null) + { + throw new InvalidOperationException($"{nameof(BasicDeliverEventArgs.BasicProperties)}.{nameof(IReadOnlyBasicProperties.CorrelationId)} == null"); + } + if (Wip.WorkInProgress.TryRemove(ea.BasicProperties.CorrelationId, out WipItem? item)) { item.Response = ea.Body.ToArray(); @@ -137,16 +145,14 @@ public void StartListening() // listeners; if so requeue the message up to 9 times if (IsNamedReplyQueue && ea.BasicProperties.Priority < 9) { - ea.BasicProperties.Priority++; - Channel.BasicPublish( - exchange: "", - routingKey: ReplyQueue?.QueueName, - basicProperties: ea.BasicProperties, - body: ea.Body); + var updatedBasicProperties = new BasicProperties(ea.BasicProperties); + ++updatedBasicProperties.Priority; + + await Channel.BasicPublishAsync(exchange: "", routingKey: ReplyQueue!.QueueName, mandatory: true, basicProperties: updatedBasicProperties, body: ea.Body.ToArray()); } } }; - Channel.BasicConsume(queue: ReplyQueue?.QueueName, autoAck: true, consumer: consumer); + await Channel.BasicConsumeAsync(queue: ReplyQueue!.QueueName, autoAck: true, consumer: consumer); } public void Dispose() diff --git a/Source/Csla.Channels.RabbitMq/RabbitMqPortal.cs b/Source/Csla.Channels.RabbitMq/RabbitMqPortal.cs index 2dd5b99809..a89b755886 100644 --- a/Source/Csla.Channels.RabbitMq/RabbitMqPortal.cs +++ b/Source/Csla.Channels.RabbitMq/RabbitMqPortal.cs @@ -6,6 +6,7 @@ // Exposes server-side DataPortal functionality through RabbitMQ //----------------------------------------------------------------------- +using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Security.Principal; using Csla.Core; @@ -34,7 +35,7 @@ internal RabbitMqPortal(ApplicationContext applicationContext, IDataPortalServer private readonly ApplicationContext _applicationContext; private IConnection? Connection; - private IModel? Channel; + private IChannel? Channel; private string? DataPortalQueueName; private Uri DataPortalUri { get; set; } @@ -42,7 +43,7 @@ internal RabbitMqPortal(ApplicationContext applicationContext, IDataPortalServer #if NET8_0_OR_GREATER [MemberNotNull(nameof(DataPortalUri), nameof(DataPortalQueueName), nameof(Connection), nameof(Channel))] #endif - private void InitializeRabbitMQ() + private async Task InitializeRabbitMQ() { if (Connection == null || DataPortalUri == null || Channel == null || DataPortalQueueName == null) { @@ -58,39 +59,42 @@ private void InitializeRabbitMQ() factory.UserName = userInfo[0]; if (userInfo.Length > 1) factory.Password = userInfo[1]; - Connection = factory.CreateConnection(); - Channel = Connection.CreateModel(); +#pragma warning disable CS8774 // 11/22/2024, Nullable analysis can't track nullability with async/await + Connection = await factory.CreateConnectionAsync(); + Channel = await Connection.CreateChannelAsync(); +#pragma warning restore CS8774 } } /// /// Start processing inbound messages. /// - public void StartListening() + public async Task StartListening() { - InitializeRabbitMQ(); - Channel?.QueueDeclare( - queue: DataPortalQueueName, - durable: false, - exclusive: false, - autoDelete: false, - arguments: null); - - var consumer = new EventingBasicConsumer(Channel); - consumer.Received += (_, ea) => - { - InvokePortal(ea, ea.Body.ToArray()); - }; - Channel.BasicConsume(queue: DataPortalQueueName, autoAck: true, consumer: consumer); + await InitializeRabbitMQ(); + await Channel!.QueueDeclareAsync(queue: DataPortalQueueName!, durable: false, exclusive: false, autoDelete: false); + + var consumer = new AsyncEventingBasicConsumer(Channel); + consumer.ReceivedAsync += async (_, ea) => await InvokePortal(ea, ea.Body.ToArray()); + await Channel.BasicConsumeAsync(queue: DataPortalQueueName!, autoAck: true, consumer: consumer); } - private async void InvokePortal(BasicDeliverEventArgs ea, byte[] requestData) + private async Task InvokePortal(BasicDeliverEventArgs ea, byte[] requestData) { + if (ea.BasicProperties.ReplyTo is null) + { + throw new InvalidOperationException($"{nameof(BasicDeliverEventArgs.BasicProperties)}.{nameof(IReadOnlyBasicProperties.ReplyTo)} == null"); + } + if (ea.BasicProperties.CorrelationId is null) + { + throw new InvalidOperationException($"{nameof(BasicDeliverEventArgs.BasicProperties)}.{nameof(IReadOnlyBasicProperties.CorrelationId)} == null"); + } + var result = _applicationContext.CreateInstanceDI(); try { var request = _applicationContext.GetRequiredService().Deserialize(requestData); - result = await CallPortal(ea.BasicProperties.Type, request); + result = await CallPortal(ea.BasicProperties.Type ?? throw new InvalidOperationException($"{nameof(BasicDeliverEventArgs.BasicProperties)}.{nameof(IReadOnlyBasicProperties.Type)} == null"), request); } catch (Exception ex) { @@ -100,29 +104,27 @@ private async void InvokePortal(BasicDeliverEventArgs ea, byte[] requestData) try { var response = _applicationContext.GetRequiredService().Serialize(result); - SendMessage(ea.BasicProperties.ReplyTo, ea.BasicProperties.CorrelationId, response); + await SendMessage(ea.BasicProperties.ReplyTo, ea.BasicProperties.CorrelationId, response); } catch (Exception ex) { result = _applicationContext.CreateInstanceDI(); result.ErrorData = _applicationContext.CreateInstanceDI(ex); var response = _applicationContext.GetRequiredService().Serialize(result); - SendMessage(ea.BasicProperties.ReplyTo, ea.BasicProperties.CorrelationId, response); + await SendMessage(ea.BasicProperties.ReplyTo, ea.BasicProperties.CorrelationId, response); } } - private void SendMessage(string target, string correlationId, byte[] request) + private async Task SendMessage(string target, string correlationId, byte[] request) { - InitializeRabbitMQ(); + await InitializeRabbitMQ(); if (Channel is null) throw new InvalidOperationException($"{nameof(Channel)} == null"); - var props = Channel.CreateBasicProperties(); - props.CorrelationId = correlationId; - Channel.BasicPublish( - exchange: "", - routingKey: target, - basicProperties: props, - body: request); + var props = new BasicProperties + { + CorrelationId = correlationId + }; + await Channel.BasicPublishAsync(exchange: "", routingKey: target, mandatory: true, basicProperties: props, body: request); } private async Task CallPortal(string operation, object request) diff --git a/Source/Csla.Channels.RabbitMq/RabbitMqProxy.cs b/Source/Csla.Channels.RabbitMq/RabbitMqProxy.cs index 2750878337..851296ac2b 100644 --- a/Source/Csla.Channels.RabbitMq/RabbitMqProxy.cs +++ b/Source/Csla.Channels.RabbitMq/RabbitMqProxy.cs @@ -44,7 +44,7 @@ public RabbitMqProxy(ApplicationContext applicationContext, RabbitMqProxyOptions /// /// Gets or sets the channel (model) for RabbitMQ. /// - protected IModel? Channel { get; set; } + protected IChannel? Channel { get; set; } /// /// Gets or sets the name of the data portal @@ -71,7 +71,7 @@ public RabbitMqProxy(ApplicationContext applicationContext, RabbitMqProxyOptions #if NET8_0_OR_GREATER [MemberNotNull(nameof(Connection), nameof(Channel), nameof(QueueListener))] #endif - protected virtual void InitializeRabbitMQ() + protected virtual async Task InitializeRabbitMQ() { if (Connection == null || Channel == null || QueueListener == null) { @@ -91,12 +91,14 @@ protected virtual void InitializeRabbitMQ() factory.UserName = userInfo[0]; if (userInfo.Length > 1) factory.Password = userInfo[1]; - Connection = factory.CreateConnection(); - Channel = Connection.CreateModel(); +#pragma warning disable CS8774 // 11/22/2024, Nullable analysis can't track nullability with async/await + Connection = await factory.CreateConnectionAsync(); + Channel = await Connection.CreateChannelAsync(); +#pragma warning restore CS8774 if (QueueListener == null) { QueueListener = ProxyListener.GetListener(url); - QueueListener.StartListening(); + await QueueListener.StartListening(); } } } @@ -125,7 +127,7 @@ public override async Task Create(Type objectType, object crit try { - InitializeRabbitMQ(); + await InitializeRabbitMQ(); return await base.Create(objectType, criteria, context, isSync); } finally @@ -151,7 +153,7 @@ public override async Task Fetch(Type objectType, object crite try { - InitializeRabbitMQ(); + await InitializeRabbitMQ(); return await base.Fetch(objectType, criteria, context, isSync); } finally @@ -176,7 +178,7 @@ public override async Task Update(object obj, DataPortalContex try { - InitializeRabbitMQ(); + await InitializeRabbitMQ(); return await base.Update(obj, context, isSync); } finally @@ -202,7 +204,7 @@ public override async Task Delete(Type objectType, object crit try { - InitializeRabbitMQ(); + await InitializeRabbitMQ(); return await base.Delete(objectType, criteria, context, isSync); } finally @@ -226,7 +228,7 @@ protected override async Task CallDataPortalServer(byte[] serialized, st var resetEvent = new Threading.AsyncManualResetEvent(); var wip = Wip.WorkInProgress.GetOrAdd(correlationId, new WipItem(resetEvent)); - SendMessage(QueueListener!.ReplyQueue!.QueueName, correlationId, operation, serialized); + await SendMessage(QueueListener!.ReplyQueue!.QueueName, correlationId, operation, serialized); var timeout = Task.Delay(Options.Timeout); if (await Task.WhenAny(wip.ResetEvent.WaitAsync(), timeout) == timeout) @@ -235,16 +237,20 @@ protected override async Task CallDataPortalServer(byte[] serialized, st return wip.Response!; } - private void SendMessage(string sender, string correlationId, string operation, byte[] request) + private async Task SendMessage(string sender, string correlationId, string operation, byte[] request) { - var props = Channel!.CreateBasicProperties(); + var props = new BasicProperties + { + CorrelationId = correlationId, + Type = operation + }; if (!string.IsNullOrWhiteSpace(sender)) props.ReplyTo = sender; - props.CorrelationId = correlationId; - props.Type = operation; - Channel.BasicPublish( + + await Channel!.BasicPublishAsync( exchange: "", routingKey: DataPortalQueueName, + mandatory: true, basicProperties: props, body: request); }