Skip to content

Commit

Permalink
RabbitService Comcrypt and Decomcrypt refactoring.
Browse files Browse the repository at this point in the history
  • Loading branch information
houseofcat committed Apr 23, 2024
1 parent d7c9790 commit 96ae993
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 57 deletions.
4 changes: 2 additions & 2 deletions src/HouseofCat.RabbitMQ/Consumer/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface IConsumer<TFromQueue>
Task StopConsumerAsync(bool immediate = false);

Task<IEnumerable<TFromQueue>> ReadUntilEmptyAsync(CancellationToken token = default);
Task<IAsyncEnumerable<IReceivedMessage>> ReadUntilStopAsync(CancellationToken token = default);
ValueTask<IAsyncEnumerable<IReceivedMessage>> ReadUntilStopAsync(CancellationToken token = default);
}

public class Consumer : IConsumer<IReceivedMessage>, IDisposable
Expand Down Expand Up @@ -489,7 +489,7 @@ public async Task<IEnumerable<IReceivedMessage>> ReadUntilEmptyAsync(Cancellatio
return list;
}

public async Task<IAsyncEnumerable<IReceivedMessage>> ReadUntilStopAsync(CancellationToken token = default)
public async ValueTask<IAsyncEnumerable<IReceivedMessage>> ReadUntilStopAsync(CancellationToken token = default)
{
if (!await _consumerChannel.Reader.WaitToReadAsync(token).ConfigureAwait(false)) throw new InvalidOperationException(ExceptionMessages.ChannelReadErrorMessage);

Expand Down
4 changes: 2 additions & 2 deletions src/HouseofCat.RabbitMQ/Options/PoolOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ public sealed class PoolOptions
/// Number of channels to keep in each of the channel pool. Used in round-robin to perform actions.
/// <para>Default value is 0.</para>
/// </summary>
public ushort Channels { get; set; } = 1;
public ushort Channels { get; set; } = 2;

/// <summary>
/// Number of ackable channels to keep in each of the channel pool. Used in round-robin to perform actions.
/// <para>Default value is 10.</para>
/// </summary>
public ushort AckableChannels { get; set; } = 1;
public ushort AckableChannels { get; set; } = 2;

/// <summary>
/// The time to sleep (in ms) when an error occurs on Channel or Connection creation. It's best not to be hyper aggressive with this value.
Expand Down
93 changes: 43 additions & 50 deletions src/HouseofCat.RabbitMQ/Services/RabbitService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,93 +229,86 @@ public IConsumer<IReceivedMessage> GetConsumer(string consumerName)
return value;
}

public async Task DecomcryptAsync(IMessage message)
public async Task ComcryptAsync(IMessage message)
{
if (message is null) return;

var decrypted = Decrypt(message);
if (message is null || message.Body.Length == 0) return;

if (decrypted)
{
await DecompressAsync(message).ConfigureAwait(false);
}
await CompressAsync(message).ConfigureAwait(false);
Encrypt(message);
}

public async Task ComcryptAsync(IMessage message)
public async Task DecomcryptAsync(IMessage message)
{
if (message is null) return;
if (message is null || message.Body.Length == 0) return;

await CompressAsync(message).ConfigureAwait(false);

Encrypt(message);
Decrypt(message);
await DecompressAsync(message).ConfigureAwait(false);
}

public bool Encrypt(IMessage message)
{
if (!message.Metadata.Encrypted())
if (EncryptionProvider is null || message.Metadata.Encrypted())
{
message.Body = EncryptionProvider.Encrypt(message.Body);
message.Metadata.Fields[Constants.HeaderForEncrypted] = true;
message.Metadata.Fields[Constants.HeaderForEncryption] = EncryptionProvider.Type;
message.Metadata.Fields[Constants.HeaderForEncryptDate] = TimeHelpers.GetDateTimeNow(TimeFormat);

return true;
return false;
}

return false;
message.Body = EncryptionProvider.Encrypt(message.Body);
message.Metadata.Fields[Constants.HeaderForEncrypted] = true;
message.Metadata.Fields[Constants.HeaderForEncryption] = EncryptionProvider.Type;
message.Metadata.Fields[Constants.HeaderForEncryptDate] = TimeHelpers.GetDateTimeNow(TimeFormat);
return true;
}

public bool Decrypt(IMessage message)
{
if (message.Metadata.Encrypted())
if (EncryptionProvider is null || !message.Metadata.Encrypted())
{
message.Body = EncryptionProvider.Decrypt(message.Body);
message.Metadata.Fields[Constants.HeaderForEncrypted] = false;

message.Metadata.Fields.Remove(Constants.HeaderForEncryption);
message.Metadata.Fields.Remove(Constants.HeaderForEncryptDate);

return true;
return false;
}

return false;
message.Body = EncryptionProvider.Decrypt(message.Body);
message.Metadata.Fields[Constants.HeaderForEncrypted] = false;
message.Metadata.Fields.Remove(Constants.HeaderForEncryption);
message.Metadata.Fields.Remove(Constants.HeaderForEncryptDate);
return true;
}

public async Task<bool> CompressAsync(IMessage message)
{
if (message.Metadata.Encrypted())
{ return false; } // Don't compress after encryption.
if (CompressionProvider is null
|| message.Metadata.Encrypted()
|| message.Metadata.Compressed())
{
return false;
}

if (!message.Metadata.Compressed())
try
{
message.Body = (await CompressionProvider.CompressAsync(message.Body).ConfigureAwait(false)).ToArray();
message.Body = await CompressionProvider.CompressAsync(message.Body).ConfigureAwait(false);
message.Metadata.Fields[Constants.HeaderForCompressed] = true;
message.Metadata.Fields[Constants.HeaderForCompression] = CompressionProvider.Type;

return true;
}

return true;
catch { return false; }
}

public async Task<bool> DecompressAsync(IMessage message)
{
if (message.Metadata.Encrypted())
{ return false; } // Don't decompress before decryption.

if (message.Metadata.Compressed())
if (CompressionProvider is null
|| message.Metadata.Encrypted()
|| !message.Metadata.Compressed())
{
try
{
message.Body = (await CompressionProvider.DecompressAsync(message.Body).ConfigureAwait(false)).ToArray();
message.Metadata.Fields[Constants.HeaderForCompressed] = false;

message.Metadata.Fields.Remove(Constants.HeaderForCompression);
}
catch { return false; }
return false;
}

return true;
try
{
message.Body = await CompressionProvider.DecompressAsync(message.Body).ConfigureAwait(false);
message.Metadata.Fields[Constants.HeaderForCompressed] = false;
message.Metadata.Fields.Remove(Constants.HeaderForCompression);
return true;
}
catch { return false; }
}

public async Task<ReadOnlyMemory<byte>> GetAsync(string queueName)
Expand Down
6 changes: 3 additions & 3 deletions tests/RabbitMQ.Console.Tests/RabbitMQ.RabbitServiceTests.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"TestConsumer": {
"Enabled": true,
"ConsumerName": "TestConsumer",
"BatchSize": 5,
"BatchSize": 10,
"BehaviorWhenFull": 0,
"UseTransientChannels": true,
"AutoAck": false,
Expand All @@ -45,9 +45,9 @@
"BuildQueueExclusive": false,
"BuildQueueAutoDelete": false,
"WorkflowName": "TestConsumerWorkflow",
"WorkflowMaxDegreesOfParallelism": 1,
"WorkflowMaxDegreesOfParallelism": 8,
"WorkflowConsumerCount": 1,
"WorkflowBatchSize": 5,
"WorkflowBatchSize": 10,
"WorkflowEnsureOrdered": false,
"WorkflowWaitForCompletion": false
}
Expand Down
60 changes: 60 additions & 0 deletions tests/UnitTests/RabbitMQ/RabbitServiceTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using HouseofCat.Compression;
using HouseofCat.Encryption;
using HouseofCat.Hashing;
using HouseofCat.RabbitMQ;
using HouseofCat.RabbitMQ.Services;
using HouseofCat.Serialization;
using System.Text;

namespace RabbitMQ;

public class RabbitServiceTests
{
private readonly IRabbitService _rabbitService;

public RabbitServiceTests()
{
var options = new RabbitOptions();
var hashingProvider = new ArgonHashingProvider();

var hashKey = hashingProvider.GetHashKey("Sega", "Nintendo", 32);

_rabbitService = new RabbitService(
options,
new JsonProvider(),
new AesGcmEncryptionProvider(hashKey),
new GzipProvider());
}

[Fact]
public async Task ComcryptTestAsync()
{
var message = new Message("", "TestQueue", Encoding.UTF8.GetBytes("Hello World"));

await _rabbitService.ComcryptAsync(message);

Assert.True(message.Metadata.Encrypted());
Assert.True(message.Metadata.Compressed());
}

[Fact]
public async Task ComcryptDecomcryptTestAsync()
{
var messageAsString = "Hello World";
var message = new Message("", "TestQueue", Encoding.UTF8.GetBytes(messageAsString));

await _rabbitService.ComcryptAsync(message);

Assert.True(message.Metadata.Encrypted());
Assert.True(message.Metadata.Compressed());

await _rabbitService.DecomcryptAsync(message);

Assert.False(message.Metadata.Encrypted());
Assert.False(message.Metadata.Compressed());

var bodyAsString = Encoding.UTF8.GetString(message.Body.Span);

Assert.Equal(messageAsString, bodyAsString);
}
}
1 change: 1 addition & 0 deletions tests/UnitTests/UnitTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<ProjectReference Include="..\..\src\HouseofCat.Data\HouseofCat.Data.csproj" />
<ProjectReference Include="..\..\src\HouseofCat.Encryption\HouseofCat.Encryption.csproj" />
<ProjectReference Include="..\..\src\HouseofCat.Hashing\HouseofCat.Hashing.csproj" />
<ProjectReference Include="..\..\src\HouseofCat.RabbitMQ\HouseofCat.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\HouseofCat.Serialization\HouseofCat.Serialization.csproj" />
</ItemGroup>

Expand Down

0 comments on commit 96ae993

Please sign in to comment.