Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
houseofcat committed Apr 21, 2024
1 parent ab34a79 commit b43777c
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 176 deletions.
181 changes: 6 additions & 175 deletions src/HouseofCat.RabbitMQ/Messages/PipeReceivedMessage.cs
Original file line number Diff line number Diff line change
@@ -1,177 +1,32 @@
using HouseofCat.Utilities.Helpers;
using OpenTelemetry.Trace;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading.Tasks;

namespace HouseofCat.RabbitMQ;

public interface IPipeReceivedMessage
public interface ITaskComplete
{
void Complete();
Task<bool> Completion { get; }
}

public sealed class PipeReceivedMessage : IReceivedMessage, IPipeReceivedMessage, IDisposable
public sealed class PipeReceivedMessage : ReceivedMessage, ITaskComplete, IDisposable
{
public IMessage Message { get; set; }
public ReadOnlyMemory<byte> Body { get; set; }
public IBasicProperties Properties { get; }

public IModel Channel { get; private set; }

public bool Ackable { get; }

public string ObjectType { get; private set; }

public bool Encrypted { get; private set; }
public string EncryptionType { get; private set; }
public DateTime EncryptedDateTime { get; private set; }

public bool Compressed { get; private set; }
public string CompressionType { get; private set; }

public string TraceParentHeader { get; private set; }
public SpanContext? ParentSpanContext { get; set; }

public string ConsumerTag { get; }
public ulong DeliveryTag { get; }

public bool FailedToDeserialize { get; set; }

private readonly TaskCompletionSource<bool> _completionSource = new TaskCompletionSource<bool>();
public Task<bool> Completion => _completionSource.Task;

private bool _disposedValue;

public PipeReceivedMessage(
IModel channel,
BasicGetResult result,
bool ackable)
{
Ackable = ackable;
Channel = channel;
DeliveryTag = result.DeliveryTag;
Properties = result.BasicProperties;
Body = result.Body;

ReadHeaders();
}
bool ackable) : base(channel, result, ackable)
{ }

public PipeReceivedMessage(
IModel channel,
BasicDeliverEventArgs args,
bool ackable)
{
Ackable = ackable;
Channel = channel;
ConsumerTag = args.ConsumerTag;
DeliveryTag = args.DeliveryTag;
Properties = args.BasicProperties;
Body = args.Body;

ReadHeaders();
}

private void ReadHeaders()
{
if (Properties?.Headers is null) return;

if (Properties.Headers.TryGetValue(Constants.HeaderForObjectType, out object objectType))
{
ObjectType = Encoding.UTF8.GetString((byte[])objectType);

if (Properties.Headers.TryGetValue(Constants.HeaderForEncrypted, out object encryptedValue))
{ Encrypted = (bool)encryptedValue; }

if (Properties.Headers.TryGetValue(Constants.HeaderForEncryption, out object encryptedType))
{ EncryptionType = Encoding.UTF8.GetString((byte[])encryptedType); }

if (Properties.Headers.TryGetValue(Constants.HeaderForEncryptDate, out object encryptedDate))
{ EncryptedDateTime = DateTime.Parse(Encoding.UTF8.GetString((byte[])encryptedDate)); }

if (Properties.Headers.TryGetValue(Constants.HeaderForCompressed, out object compressedValue))
{ Compressed = (bool)compressedValue; }

if (Properties.Headers.TryGetValue(Constants.HeaderForCompression, out object compressedType))
{ CompressionType = Encoding.UTF8.GetString((byte[])compressedType); }
}
else
{
ObjectType = Constants.HeaderValueForUnknownObjectType;
}

if (Properties.Headers.TryGetValue(Constants.HeaderForTraceParent, out object traceParentHeader))
{
TraceParentHeader = Encoding.UTF8.GetString((byte[])traceParentHeader);
if (!string.IsNullOrEmpty(TraceParentHeader))
{
ParentSpanContext = OpenTelemetryHelpers.ExtractSpanContextFromTraceHeader(TraceParentHeader);
}
}
}

/// <summary>
/// Acknowledges the message server side.
/// </summary>
public bool AckMessage()
{
var success = true;

if (Ackable)
{
try
{
Channel?.BasicAck(DeliveryTag, false);
Channel = null;
}
catch { success = false; }
}

return success;
}

/// <summary>
/// Negative Acknowledges the message server side with option to requeue.
/// </summary>
public bool NackMessage(bool requeue)
{
var success = true;

if (Ackable)
{
try
{
Channel?.BasicNack(DeliveryTag, false, requeue);
Channel = null;
}
catch { success = false; }
}

return success;
}

/// <summary>
/// Reject Message server side with option to requeue.
/// </summary>
public bool RejectMessage(bool requeue)
{
var success = true;

if (Ackable)
{
try
{
Channel?.BasicReject(DeliveryTag, requeue);
Channel = null;
}
catch { success = false; }
}

return success;
}
bool ackable) : base(channel, args, ackable)
{ }

/// <summary>
/// A way to indicate this message is fully finished with.
Expand All @@ -183,28 +38,4 @@ public void Complete()
_completionSource.SetResult(true);
}
}

private void Dispose(bool disposing)
{
if (!_disposedValue)
{
if (disposing)
{
Complete();
_completionSource.Task.Dispose();
}

if (Channel != null) { Channel = null; }
if (Message != null) { Message = null; }

_disposedValue = true;
}
}

public void Dispose()
{
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
}
2 changes: 1 addition & 1 deletion src/HouseofCat.RabbitMQ/Messages/ReceivedMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public interface IReceivedMessage
bool RejectMessage(bool requeue);
}

public sealed class ReceivedMessage : IReceivedMessage, IDisposable
public class ReceivedMessage : IReceivedMessage, IDisposable
{
public IMessage Message { get; set; }
public ReadOnlyMemory<byte> Body { get; set; }
Expand Down

0 comments on commit b43777c

Please sign in to comment.