Skip to content

Commit

Permalink
Adding extra WithCreateSendMessage step in ConsumerDataflow.
Browse files Browse the repository at this point in the history
  • Loading branch information
houseofcat committed Apr 30, 2024
1 parent 255669d commit 36dd565
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 5 deletions.
4 changes: 2 additions & 2 deletions guides/rabbitmq/ConsumerDataflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ outbound message and assign it to the State.

```csharp
dataflow.WithCreateSendMessage(
async (state) =>
(state) =>
{
var message = new Message
{
Expand Down Expand Up @@ -718,7 +718,7 @@ var dataflowService = new ConsumerDataflowService<CustomWorkState>(rabbitService

// Manually modify the internal Dataflow.
dataflowService.Dataflow.WithCreateSendMessage(
async (state) =>
(state) =>
{
var message = new Message
{
Expand Down
15 changes: 15 additions & 0 deletions src/HouseofCat.RabbitMQ/Dataflows/ConsumerDataflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,21 @@ public ConsumerDataflow<TState> WithCreateSendMessage(
return this;
}

public ConsumerDataflow<TState> WithCreateSendMessage(
Func<TState, TState> createMessage,
int? maxDoP = null,
bool? ensureOrdered = null,
int? boundedCapacity = null,
TaskScheduler taskScheduler = null)
{
if (_createSendMessage is null)
{
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
_createSendMessage = GetWrappedTransformBlock(createMessage, executionOptions, GetSpanName("create send message"));
}
return this;
}

public ConsumerDataflow<TState> WithSendCompressedStep(
int? maxDoP = null,
bool? ensureOrdered = null,
Expand Down
4 changes: 1 addition & 3 deletions tests/RabbitMQ.ConsumerDataflowService/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

// Manually modify the internal Dataflow.
dataflowService.Dataflow.WithCreateSendMessage(
async (state) =>
(state) =>
{
var message = new Message
{
Expand All @@ -42,8 +42,6 @@
ParentSpanContext = state.WorkflowSpan?.Context,
};

await rabbitService.ComcryptAsync(message);

state.SendMessage = message;
return state;
});
Expand Down

0 comments on commit 36dd565

Please sign in to comment.