diff --git a/src/HouseofCat.RabbitMQ/Messages/PipeReceivedMessage.cs b/src/HouseofCat.RabbitMQ/Messages/PipeReceivedMessage.cs index d2a76ef..ff2fc16 100644 --- a/src/HouseofCat.RabbitMQ/Messages/PipeReceivedMessage.cs +++ b/src/HouseofCat.RabbitMQ/Messages/PipeReceivedMessage.cs @@ -1,177 +1,32 @@ -using HouseofCat.Utilities.Helpers; -using OpenTelemetry.Trace; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; -using System.Text; using System.Threading.Tasks; namespace HouseofCat.RabbitMQ; -public interface IPipeReceivedMessage +public interface ITaskComplete { void Complete(); Task Completion { get; } } -public sealed class PipeReceivedMessage : IReceivedMessage, IPipeReceivedMessage, IDisposable +public sealed class PipeReceivedMessage : ReceivedMessage, ITaskComplete, IDisposable { - public IMessage Message { get; set; } - public ReadOnlyMemory Body { get; set; } - public IBasicProperties Properties { get; } - - public IModel Channel { get; private set; } - - public bool Ackable { get; } - - public string ObjectType { get; private set; } - - public bool Encrypted { get; private set; } - public string EncryptionType { get; private set; } - public DateTime EncryptedDateTime { get; private set; } - - public bool Compressed { get; private set; } - public string CompressionType { get; private set; } - - public string TraceParentHeader { get; private set; } - public SpanContext? ParentSpanContext { get; set; } - - public string ConsumerTag { get; } - public ulong DeliveryTag { get; } - - public bool FailedToDeserialize { get; set; } - private readonly TaskCompletionSource _completionSource = new TaskCompletionSource(); public Task Completion => _completionSource.Task; - private bool _disposedValue; - public PipeReceivedMessage( IModel channel, BasicGetResult result, - bool ackable) - { - Ackable = ackable; - Channel = channel; - DeliveryTag = result.DeliveryTag; - Properties = result.BasicProperties; - Body = result.Body; - - ReadHeaders(); - } + bool ackable) : base(channel, result, ackable) + { } public PipeReceivedMessage( IModel channel, BasicDeliverEventArgs args, - bool ackable) - { - Ackable = ackable; - Channel = channel; - ConsumerTag = args.ConsumerTag; - DeliveryTag = args.DeliveryTag; - Properties = args.BasicProperties; - Body = args.Body; - - ReadHeaders(); - } - - private void ReadHeaders() - { - if (Properties?.Headers is null) return; - - if (Properties.Headers.TryGetValue(Constants.HeaderForObjectType, out object objectType)) - { - ObjectType = Encoding.UTF8.GetString((byte[])objectType); - - if (Properties.Headers.TryGetValue(Constants.HeaderForEncrypted, out object encryptedValue)) - { Encrypted = (bool)encryptedValue; } - - if (Properties.Headers.TryGetValue(Constants.HeaderForEncryption, out object encryptedType)) - { EncryptionType = Encoding.UTF8.GetString((byte[])encryptedType); } - - if (Properties.Headers.TryGetValue(Constants.HeaderForEncryptDate, out object encryptedDate)) - { EncryptedDateTime = DateTime.Parse(Encoding.UTF8.GetString((byte[])encryptedDate)); } - - if (Properties.Headers.TryGetValue(Constants.HeaderForCompressed, out object compressedValue)) - { Compressed = (bool)compressedValue; } - - if (Properties.Headers.TryGetValue(Constants.HeaderForCompression, out object compressedType)) - { CompressionType = Encoding.UTF8.GetString((byte[])compressedType); } - } - else - { - ObjectType = Constants.HeaderValueForUnknownObjectType; - } - - if (Properties.Headers.TryGetValue(Constants.HeaderForTraceParent, out object traceParentHeader)) - { - TraceParentHeader = Encoding.UTF8.GetString((byte[])traceParentHeader); - if (!string.IsNullOrEmpty(TraceParentHeader)) - { - ParentSpanContext = OpenTelemetryHelpers.ExtractSpanContextFromTraceHeader(TraceParentHeader); - } - } - } - - /// - /// Acknowledges the message server side. - /// - public bool AckMessage() - { - var success = true; - - if (Ackable) - { - try - { - Channel?.BasicAck(DeliveryTag, false); - Channel = null; - } - catch { success = false; } - } - - return success; - } - - /// - /// Negative Acknowledges the message server side with option to requeue. - /// - public bool NackMessage(bool requeue) - { - var success = true; - - if (Ackable) - { - try - { - Channel?.BasicNack(DeliveryTag, false, requeue); - Channel = null; - } - catch { success = false; } - } - - return success; - } - - /// - /// Reject Message server side with option to requeue. - /// - public bool RejectMessage(bool requeue) - { - var success = true; - - if (Ackable) - { - try - { - Channel?.BasicReject(DeliveryTag, requeue); - Channel = null; - } - catch { success = false; } - } - - return success; - } + bool ackable) : base(channel, args, ackable) + { } /// /// A way to indicate this message is fully finished with. @@ -183,28 +38,4 @@ public void Complete() _completionSource.SetResult(true); } } - - private void Dispose(bool disposing) - { - if (!_disposedValue) - { - if (disposing) - { - Complete(); - _completionSource.Task.Dispose(); - } - - if (Channel != null) { Channel = null; } - if (Message != null) { Message = null; } - - _disposedValue = true; - } - } - - public void Dispose() - { - // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method - Dispose(disposing: true); - GC.SuppressFinalize(this); - } } diff --git a/src/HouseofCat.RabbitMQ/Messages/ReceivedMessage.cs b/src/HouseofCat.RabbitMQ/Messages/ReceivedMessage.cs index ee1f353..e7cef7c 100644 --- a/src/HouseofCat.RabbitMQ/Messages/ReceivedMessage.cs +++ b/src/HouseofCat.RabbitMQ/Messages/ReceivedMessage.cs @@ -40,7 +40,7 @@ public interface IReceivedMessage bool RejectMessage(bool requeue); } -public sealed class ReceivedMessage : IReceivedMessage, IDisposable +public class ReceivedMessage : IReceivedMessage, IDisposable { public IMessage Message { get; set; } public ReadOnlyMemory Body { get; set; }