Skip to content

Commit

Permalink
Improve SendMessage support for ConsumerDataflows.
Browse files Browse the repository at this point in the history
  • Loading branch information
houseofcat committed Apr 29, 2024
1 parent aeb5c34 commit 0b2419f
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 40 deletions.
23 changes: 14 additions & 9 deletions src/HouseofCat.RabbitMQ/Dataflows/ConsumerDataflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,20 @@ public ConsumerDataflow(
}
}

public async Task StopAsync(bool immediate = false)
public async Task StopAsync(
bool immediate = false,
bool shutdownService = false)
{
foreach (var consumerBlock in _consumerBlocks)
{
await consumerBlock.StopConsumingAsync(immediate).ConfigureAwait(false);
consumerBlock.Complete();
}

await _rabbitService.ShutdownAsync(false);
if (shutdownService)
{
await _rabbitService.ShutdownAsync(immediate);
}
}

/// <summary>
Expand Down Expand Up @@ -345,7 +350,7 @@ public ConsumerDataflow<TState> WithCreateSendMessage(
if (_createSendMessage is null)
{
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
_createSendMessage = GetWrappedTransformBlock(createMessage, executionOptions, GetSpanName("create_send_message"));
_createSendMessage = GetWrappedTransformBlock(createMessage, executionOptions, GetSpanName("create send message"));
}
return this;
}
Expand All @@ -366,7 +371,7 @@ public ConsumerDataflow<TState> WithSendCompressedStep(
executionOptions,
true,
x => !x.ReceivedMessage.Compressed,
GetSpanName("compress"));
GetSpanName("compress send message"));
}
return this;
}
Expand All @@ -387,7 +392,7 @@ public ConsumerDataflow<TState> WithSendEncryptedStep(
executionOptions,
true,
x => !x.ReceivedMessage.Encrypted,
GetSpanName("encrypt"));
GetSpanName("encrypt send message"));
}
return this;
}
Expand All @@ -401,7 +406,7 @@ public ConsumerDataflow<TState> WithSendStep(
if (_sendMessageBlock is null)
{
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
_sendMessageBlock = GetWrappedPublishTransformBlock(_rabbitService, executionOptions);
_sendMessageBlock = GetWrappedSendTransformBlock(_rabbitService, executionOptions);
}
return this;
}
Expand Down Expand Up @@ -664,14 +669,14 @@ async Task<TState> WrapActionAsync(TState state)
return new TransformBlock<TState, TState>(WrapActionAsync, options);
}

private string PublishStepIdentifier => $"{WorkflowName}_Publish";
public TransformBlock<TState, TState> GetWrappedPublishTransformBlock(
private string SendStepIdentifier => $"{WorkflowName} send";
public TransformBlock<TState, TState> GetWrappedSendTransformBlock(
IRabbitService service,
ExecutionDataflowBlockOptions options)
{
async Task<TState> WrapPublishAsync(TState state)
{
using var childSpan = state.CreateActiveChildSpan(PublishStepIdentifier, state.WorkflowSpan.Context, SpanKind.Producer);
using var childSpan = state.CreateActiveChildSpan(SendStepIdentifier, state.WorkflowSpan.Context, SpanKind.Producer);
try
{
await service.Publisher.PublishAsync(state.SendMessage, true, true).ConfigureAwait(false);
Expand Down
27 changes: 17 additions & 10 deletions src/HouseofCat.RabbitMQ/Services/ConsumerDataflowService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,16 @@ namespace HouseofCat.RabbitMQ.Services;

public class ConsumerDataflowService<TState> where TState : class, IRabbitWorkState, new()
{
private readonly ConsumerDataflow<TState> _dataflow;
public ConsumerDataflow<TState> Dataflow { get; }
private readonly ConsumerOptions _options;

/// <summary>
/// This a basic implementation service class for convenience. It serves as a simple opinionated wrapper
/// of ConsumerDataflow with accessors to a few methods and provides streamlined auto-configuration.
/// </summary>
/// <param name="rabbitService"></param>
/// <param name="consumerName"></param>
/// <param name="taskScheduler"></param>
public ConsumerDataflowService(
IRabbitService rabbitService,
string consumerName,
Expand Down Expand Up @@ -41,12 +48,12 @@ public ConsumerDataflowService(
dataflow = dataflow.WithSendStep();
}

_dataflow = dataflow;
Dataflow = dataflow;
}

public void AddStep(string stepName, Func<TState, TState> step)
{
_dataflow.AddStep(
Dataflow.AddStep(
step,
stepName,
_options.WorkflowMaxDegreesOfParallelism,
Expand All @@ -56,7 +63,7 @@ public void AddStep(string stepName, Func<TState, TState> step)

public void AddStep(string stepName, Func<TState, Task<TState>> step)
{
_dataflow.AddStep(
Dataflow.AddStep(
step,
stepName,
_options.WorkflowMaxDegreesOfParallelism,
Expand All @@ -66,7 +73,7 @@ public void AddStep(string stepName, Func<TState, Task<TState>> step)

public void AddFinalization(Action<TState> step)
{
_dataflow.WithFinalization(
Dataflow.WithFinalization(
step,
_options.WorkflowMaxDegreesOfParallelism,
_options.WorkflowEnsureOrdered,
Expand All @@ -75,7 +82,7 @@ public void AddFinalization(Action<TState> step)

public void AddFinalization(Func<TState, Task> step)
{
_dataflow.WithFinalization(
Dataflow.WithFinalization(
step,
_options.WorkflowMaxDegreesOfParallelism,
_options.WorkflowEnsureOrdered,
Expand All @@ -84,7 +91,7 @@ public void AddFinalization(Func<TState, Task> step)

public void AddErrorHandling(Action<TState> step)
{
_dataflow.WithErrorHandling(
Dataflow.WithErrorHandling(
step,
_options.WorkflowBatchSize,
_options.WorkflowMaxDegreesOfParallelism,
Expand All @@ -93,7 +100,7 @@ public void AddErrorHandling(Action<TState> step)

public void AddErrorHandling(Func<TState, Task> step)
{
_dataflow.WithErrorHandling(
Dataflow.WithErrorHandling(
step,
_options.WorkflowBatchSize,
_options.WorkflowMaxDegreesOfParallelism,
Expand All @@ -102,11 +109,11 @@ public void AddErrorHandling(Func<TState, Task> step)

public async Task StartAsync()
{
await _dataflow.StartAsync();
await Dataflow.StartAsync();
}

public async Task StopAsync()
{
await _dataflow.StopAsync();
await Dataflow.StopAsync();
}
}
40 changes: 19 additions & 21 deletions tests/RabbitMQ.ConsumerDataflowService/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,14 @@

var rabbitService = await Shared.SetupRabbitServiceAsync(loggerFactory, "RabbitMQ.ConsumerDataflows.json");
var dataflowService = new ConsumerDataflowService<CustomWorkState>(rabbitService, "TestConsumer");

dataflowService.AddStep(
"write_message_to_log",
(state) =>
{
var message = Encoding.UTF8.GetString(state.ReceivedMessage.Body.Span);
if (message == "throw")
{
throw new Exception("Throwing an exception!");
}

if (logMessage)
{ logger.LogInformation(message); }

return state;
});

dataflowService.AddStep(
"create_new_secret_message",
dataflowService.Dataflow.WithCreateSendMessage(
async (state) =>
{
var message = new Message
{
Exchange = "",
RoutingKey = "TestQueue",
Body = Encoding.UTF8.GetBytes("Secret Message"),
RoutingKey = state.ReceivedMessage.Message.RoutingKey,
Body = Encoding.UTF8.GetBytes("New Secret Message"),
Metadata = new Metadata
{
PayloadId = Guid.NewGuid().ToString(),
Expand All @@ -64,6 +46,22 @@
return state;
});

dataflowService.AddStep(
"write_message_to_log",
(state) =>
{
var message = Encoding.UTF8.GetString(state.ReceivedMessage.Body.Span);
if (message == "throw")
{
throw new Exception("Throwing an exception!");
}

if (logMessage)
{ logger.LogInformation(message); }

return state;
});

dataflowService.AddFinalization(
(state) =>
{
Expand Down

0 comments on commit 0b2419f

Please sign in to comment.