Skip to content

Commit

Permalink
Returning back to IReceivedMessage usage.
Browse files Browse the repository at this point in the history
  • Loading branch information
houseofcat committed Mar 31, 2024
1 parent 8b085be commit fbc19e2
Show file tree
Hide file tree
Showing 13 changed files with 203 additions and 447 deletions.
4 changes: 3 additions & 1 deletion src/HouseofCat.RabbitMQ/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ public static class Constants

// Consumer
public static string HeaderForContentType { get; set; } = "ContentType";
public static string HeaderValueForContentTypeApplicationJson { get; set; } = "application/json;";
public const string HeaderValueForContentTypeBinary = "application/octet-stream";
public const string HeaderValueForContentTypePlainText = "text/plain";
public const string HeaderValueForContentTypeJson = "application/json";
public static string HeaderForObjectType { get; set; } = "X-RD-OBJECTTYPE";
public static string HeaderValueForMessageObjectType { get; set; } = "IMESSAGE";

Expand Down
239 changes: 9 additions & 230 deletions src/HouseofCat.RabbitMQ/Consumer/Consumer.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using HouseofCat.Dataflows;
using HouseofCat.RabbitMQ.Pools;
using HouseofCat.Utilities.Errors;
using HouseofCat.Utilities.Helpers;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
Expand All @@ -10,7 +10,6 @@
using System.Threading.Channels;
using System.Threading.Tasks;
using static HouseofCat.RabbitMQ.LogMessages;
using HouseofCat.Utilities.Helpers;

namespace HouseofCat.RabbitMQ;

Expand All @@ -21,57 +20,22 @@ public interface IConsumer<TFromQueue>
ConsumerOptions ConsumerOptions { get; }
bool Started { get; }

Task DataflowExecutionEngineAsync(
Func<TFromQueue, Task<bool>> workBodyAsync,
int maxDoP = 4,
bool ensureOrdered = true,
int boundedCapacity = 1000,
TaskScheduler taskScheduler = null,
CancellationToken token = default);

Task DataflowExecutionEngineAsync(
Func<TFromQueue, Task<bool>> workBodyAsync,
int maxDoP = 4,
bool ensureOrdered = true,
Func<TFromQueue, Task<TFromQueue>> preWorkBodyAsync = null,
Func<bool, Task> postWorkBodyAsync = null,
int boundedCapacity = 1000,
TaskScheduler taskScheduler = null,
CancellationToken token = default);

Task ChannelExecutionEngineAsync(
Func<ReceivedMessage, Task<bool>> workBodyAsync,
int maxDoP = 4,
bool ensureOrdered = true,
Func<bool, Task> postWorkBodyAsync = null,
int boundedCapacity = 1000,
TaskScheduler taskScheduler = null,
CancellationToken token = default);

Task DirectChannelExecutionEngineAsync(
Func<ReceivedMessage, Task<bool>> workBodyAsync,
int maxDoP = 4,
bool ensureOrdered = true,
TaskScheduler taskScheduler = null,
CancellationToken token = default);

ChannelReader<TFromQueue> GetConsumerBuffer();
ValueTask<TFromQueue> ReadAsync();
Task<IEnumerable<TFromQueue>> ReadUntilEmptyAsync();
Task StartConsumerAsync();
Task StopConsumerAsync(bool immediate = false);
IAsyncEnumerable<TFromQueue> StreamUntilConsumerStopAsync();
IAsyncEnumerable<TFromQueue> StreamUntilQueueEmptyAsync();
}

public class Consumer : IConsumer<ReceivedMessage>, IDisposable
public class Consumer : IConsumer<IReceivedMessage>, IDisposable
{
private readonly ILogger<Consumer> _logger;
private readonly SemaphoreSlim _conLock = new SemaphoreSlim(1, 1);
private readonly SemaphoreSlim _executionLock = new SemaphoreSlim(1, 1);
private IChannelHost _chanHost;
private bool _disposedValue;
private Channel<ReceivedMessage> _consumerChannel;
private Channel<IReceivedMessage> _consumerChannel;

public string ConsumerTag { get; private set; }
private bool _shutdown;
Expand Down Expand Up @@ -116,7 +80,7 @@ public async Task StartConsumerAsync()
{
await SetChannelHostAsync().ConfigureAwait(false);
_shutdown = false;
_consumerChannel = Channel.CreateBounded<ReceivedMessage>(
_consumerChannel = Channel.CreateBounded<IReceivedMessage>(
new BoundedChannelOptions(ConsumerOptions.BatchSize!.Value)
{
FullMode = ConsumerOptions.BehaviorWhenFull!.Value
Expand Down Expand Up @@ -406,9 +370,9 @@ await _chanHost
_chanHost.ChannelId);
}

public ChannelReader<ReceivedMessage> GetConsumerBuffer() => _consumerChannel.Reader;
public ChannelReader<IReceivedMessage> GetConsumerBuffer() => _consumerChannel.Reader;

public async ValueTask<ReceivedMessage> ReadAsync()
public async ValueTask<IReceivedMessage> ReadAsync()
{
if (!await _consumerChannel.Reader.WaitToReadAsync().ConfigureAwait(false)) throw new InvalidOperationException(ExceptionMessages.ChannelReadErrorMessage);

Expand All @@ -418,11 +382,11 @@ public async ValueTask<ReceivedMessage> ReadAsync()
.ConfigureAwait(false);
}

public async Task<IEnumerable<ReceivedMessage>> ReadUntilEmptyAsync()
public async Task<IEnumerable<IReceivedMessage>> ReadUntilEmptyAsync()
{
if (!await _consumerChannel.Reader.WaitToReadAsync().ConfigureAwait(false)) throw new InvalidOperationException(ExceptionMessages.ChannelReadErrorMessage);

var list = new List<ReceivedMessage>();
var list = new List<IReceivedMessage>();
await _consumerChannel.Reader.WaitToReadAsync().ConfigureAwait(false);
while (_consumerChannel.Reader.TryRead(out var message))
{
Expand All @@ -433,19 +397,7 @@ public async Task<IEnumerable<ReceivedMessage>> ReadUntilEmptyAsync()
return list;
}

public async IAsyncEnumerable<ReceivedMessage> StreamUntilQueueEmptyAsync()
{
if (!await _consumerChannel.Reader.WaitToReadAsync().ConfigureAwait(false)) throw new InvalidOperationException(ExceptionMessages.ChannelReadErrorMessage);

await _consumerChannel.Reader.WaitToReadAsync().ConfigureAwait(false);
while (_consumerChannel.Reader.TryRead(out var message))
{
if (message == null) { break; }
yield return message;
}
}

public async IAsyncEnumerable<ReceivedMessage> StreamUntilConsumerStopAsync()
public async IAsyncEnumerable<IReceivedMessage> StreamUntilConsumerStopAsync()
{
if (!await _consumerChannel.Reader.WaitToReadAsync().ConfigureAwait(false)) throw new InvalidOperationException(ExceptionMessages.ChannelReadErrorMessage);

Expand All @@ -455,179 +407,6 @@ public async IAsyncEnumerable<ReceivedMessage> StreamUntilConsumerStopAsync()
}
}

public async Task DataflowExecutionEngineAsync(
Func<ReceivedMessage, Task<bool>> workBodyAsync,
int maxDoP = 4,
bool ensureOrdered = true,
int boundedCapacity = 1000,
TaskScheduler taskScheduler = null,
CancellationToken token = default)
{
var dataflowEngine = new DataflowEngine<ReceivedMessage, bool>(workBodyAsync, maxDoP, ensureOrdered, null, null, boundedCapacity, taskScheduler);

await TransferDataToDataflowEngine(dataflowEngine, token);
}

public async Task DataflowExecutionEngineAsync(
Func<ReceivedMessage, Task<bool>> workBodyAsync,
int maxDoP = 4,
bool ensureOrdered = true,
Func<ReceivedMessage, Task<ReceivedMessage>> preWorkBodyAsync = null,
Func<bool, Task> postWorkBodyAsync = null,
int boundedCapacity = 1000,
TaskScheduler taskScheduler = null,
CancellationToken token = default)
{
var dataflowEngine = new DataflowEngine<ReceivedMessage, bool>(
workBodyAsync,
maxDoP,
ensureOrdered,
preWorkBodyAsync,
postWorkBodyAsync,
boundedCapacity,
taskScheduler);

await TransferDataToDataflowEngine(dataflowEngine, token);
}

private async Task TransferDataToDataflowEngine(
DataflowEngine<ReceivedMessage, bool> dataflowEngine,
CancellationToken token = default)
{
await _executionLock.WaitAsync(2000, token).ConfigureAwait(false);

try
{
while (await _consumerChannel.Reader.WaitToReadAsync(token).ConfigureAwait(false))
{
while (_consumerChannel.Reader.TryRead(out var receivedMessage))
{
if (receivedMessage != null)
{
_logger.LogDebug(
Consumers.ConsumerDataflowQueueing,
ConsumerOptions.ConsumerName,
receivedMessage.DeliveryTag);

await dataflowEngine
.EnqueueWorkAsync(receivedMessage)
.ConfigureAwait(false);
}
}
}
}
catch (OperationCanceledException)
{
_logger.LogWarning(
Consumers.ConsumerDataflowActionCancelled,
ConsumerOptions.ConsumerName);
}
catch (Exception ex)
{
_logger.LogError(
Consumers.ConsumerDataflowError,
ConsumerOptions.ConsumerName,
ex.Message);
}
finally { _executionLock.Release(); }
}

public async Task ChannelExecutionEngineAsync(
Func<ReceivedMessage, Task<bool>> workBodyAsync,
int maxDoP = 4,
bool ensureOrdered = true,
Func<bool, Task> postWorkBodyAsync = null,
int boundedCapacity = 1000,
TaskScheduler taskScheduler = null,
CancellationToken token = default)
{
var channelBlockEngine = new ChannelBlockEngine<ReceivedMessage, bool>(
workBodyAsync,
maxDoP,
ensureOrdered,
postWorkBodyAsync,
boundedCapacity,
taskScheduler,
token);

await TransferDataToChannelBlockEngine(channelBlockEngine, token);
}

public async Task DirectChannelExecutionEngineAsync(
Func<ReceivedMessage, Task<bool>> workBodyAsync,
int maxDoP = 4,
bool ensureOrdered = true,
TaskScheduler taskScheduler = null,
CancellationToken token = default)
{
_ = new ChannelBlockEngine<ReceivedMessage, bool>(
_consumerChannel, workBodyAsync, maxDoP, ensureOrdered, taskScheduler, token);

await _executionLock.WaitAsync(2000, token).ConfigureAwait(false);

try
{
while (await _consumerChannel.Reader.WaitToReadAsync(token).ConfigureAwait(false))
{
await Task.Delay(4, token); // sleep until channel is finished.
}
}
catch (OperationCanceledException)
{
_logger.LogWarning(
Consumers.ConsumerDataflowActionCancelled,
ConsumerOptions.ConsumerName);
}
catch (Exception ex)
{
_logger.LogError(
Consumers.ConsumerDataflowError,
ConsumerOptions.ConsumerName,
ex.Message);
}
finally { _executionLock.Release(); }
}

private async Task TransferDataToChannelBlockEngine(
ChannelBlockEngine<ReceivedMessage, bool> channelBlockEngine,
CancellationToken token = default)
{
await _executionLock.WaitAsync(2000, token).ConfigureAwait(false);

try
{
while (await _consumerChannel.Reader.WaitToReadAsync(token).ConfigureAwait(false))
{
var receivedMessage = await _consumerChannel.Reader.ReadAsync(token);
if (receivedMessage != null)
{
_logger.LogDebug(
Consumers.ConsumerDataflowQueueing,
ConsumerOptions.ConsumerName,
receivedMessage.DeliveryTag);

await channelBlockEngine
.EnqueueWorkAsync(receivedMessage)
.ConfigureAwait(false);
}
}
}
catch (OperationCanceledException)
{
_logger.LogWarning(
Consumers.ConsumerDataflowActionCancelled,
ConsumerOptions.ConsumerName);
}
catch (Exception ex)
{
_logger.LogError(
Consumers.ConsumerDataflowError,
ConsumerOptions.ConsumerName,
ex.Message);
}
finally { _executionLock.Release(); }
}

protected virtual void Dispose(bool disposing)
{
if (!_disposedValue)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
using HouseofCat.Dataflows;
using HouseofCat.RabbitMQ.Dataflows;
using System;
using System.Threading;
using System.Threading.Tasks;
using HouseofCat.Dataflows;
using HouseofCat.RabbitMQ.Dataflows;

namespace HouseofCat.RabbitMQ;

public static class ConsumerExtensions
{
public static ValueTask DirectChannelExecutionEngineAsync(
this IConsumer<ReceivedMessage> consumer,
Func<ReceivedMessage, Task<IRabbitWorkState>> workBodyAsync,
this IConsumer<IReceivedMessage> consumer,
Func<IReceivedMessage, Task<IRabbitWorkState>> workBodyAsync,
Func<IRabbitWorkState, Task> postWorkBodyAsync = null,
TaskScheduler taskScheduler = null,
CancellationToken cancellationToken = default)
{
var channelReaderBlockEngine = new ChannelReaderBlockEngine<ReceivedMessage, IRabbitWorkState>(
var channelReaderBlockEngine = new ChannelReaderBlockEngine<IReceivedMessage, IRabbitWorkState>(
consumer.GetConsumerBuffer(),
workBodyAsync,
consumer.ConsumerOptions.ConsumerPipelineOptions.MaxDegreesOfParallelism,
Expand Down
Loading

0 comments on commit fbc19e2

Please sign in to comment.