Skip to content

Commit

Permalink
Cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
houseofcat committed Mar 30, 2024
1 parent 1b2f61a commit 17d7297
Show file tree
Hide file tree
Showing 13 changed files with 100 additions and 232 deletions.
1 change: 0 additions & 1 deletion src/HouseofCat.Dataflows/DataflowExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

Expand Down
1 change: 0 additions & 1 deletion src/HouseofCat.Dataflows/Extensions/WorkStateExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ public static void EndRootSpan(
{
state.SetOpenTelemetryError();
}
state.RootSpan?.End();
state.RootSpan?.Dispose();
}
}
6 changes: 4 additions & 2 deletions src/HouseofCat.RabbitMQ/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ public static class Constants
public static string HeaderPrefix { get; set; } = "X-";

// Consumer
public static string HeaderForContentType { get; set; } = "ContentType";
public static string HeaderValueForContentTypeApplicationJson { get; set; } = "application/json;";
public static string HeaderForObjectType { get; set; } = "X-RD-OBJECTTYPE";
public static string HeaderValueForMessage { get; set; } = "IMESSAGE";
public static string HeaderValueForMessageObjectType { get; set; } = "IMESSAGE";

public static string HeaderValueForUnknown { get; set; } = "UNKNOWN";
public static string HeaderValueForUnknownObjectType { get; set; } = "UNK";
public static string HeaderForEncrypted { get; set; } = "X-RD-ENCRYPTED";
public static string HeaderForEncryption { get; set; } = "X-RD-ENCRYPTION";
public static string HeaderForEncryptDate { get; set; } = "X-RD-ENCRYPTDATE";
Expand Down
88 changes: 68 additions & 20 deletions src/HouseofCat.RabbitMQ/Dataflows/ConsumerDataflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public ConsumerDataflow<TState> WithErrorHandling(
{
_errorBuffer = CreateTargetBlock(boundedCapacity, taskScheduler);
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
_errorAction = GetLastWrappedActionBlock(action, executionOptions, $"{WorkflowName}_ErrorHandler");
_errorAction = GetLastWrappedActionBlock(action, executionOptions, $"{WorkflowName}.ErrorHandler");
}
return this;
}
Expand All @@ -250,7 +250,7 @@ public ConsumerDataflow<TState> WithErrorHandling(
{
_errorBuffer = CreateTargetBlock(boundedCapacity, taskScheduler);
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
_errorAction = GetLastWrappedActionBlock(action, executionOptions, $"{WorkflowName}_ErrorHandler");
_errorAction = GetLastWrappedActionBlock(action, executionOptions, $"{WorkflowName}.ErrorHandler");
}
return this;
}
Expand Down Expand Up @@ -309,7 +309,7 @@ public ConsumerDataflow<TState> WithFinalization(
if (_finalization == null)
{
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
_finalization = GetLastWrappedActionBlock(action, executionOptions, $"{WorkflowName}_Finalization");
_finalization = GetLastWrappedActionBlock(action, executionOptions, $"{WorkflowName}.Finalization");
}
return this;
}
Expand All @@ -325,12 +325,26 @@ public ConsumerDataflow<TState> WithFinalization(
if (_finalization == null)
{
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
_finalization = GetLastWrappedActionBlock(action, executionOptions, $"{WorkflowName}_Finalization");
_finalization = GetLastWrappedActionBlock(action, executionOptions, $"{WorkflowName}.Finalization");
}
return this;
}

public ConsumerDataflow<TState> WithBuildState<TOut>(
public ConsumerDataflow<TState> WithBuildState(
int? maxDoP = null,
bool? ensureOrdered = null,
int? boundedCapacity = null,
TaskScheduler taskScheduler = null)
{
if (_buildStateBlock == null)
{
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
_buildStateBlock = GetBuildStateBlock(executionOptions);
}
return this;
}

public ConsumerDataflow<TState> WithBuildStateAndPayload<TOut>(
string stateKey,
int? maxDoP = null,
bool? ensureOrdered = null,
Expand All @@ -342,7 +356,7 @@ public ConsumerDataflow<TState> WithBuildState<TOut>(
if (_buildStateBlock == null)
{
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
_buildStateBlock = GetBuildStateBlock<TOut>(stateKey, executionOptions);
_buildStateBlock = GetBuildStateWithPayloadBlock<TOut>(stateKey, executionOptions);
}
return this;
}
Expand Down Expand Up @@ -582,7 +596,34 @@ private void LinkWithFaultRoute(ISourceBlock<TState> source, IPropagatorBlock<TS

#region Step Wrappers

public virtual TState BuildState<TOut>(string key, ReceivedData data)
public virtual TState BuildState(ReceivedData data)
{
var state = new TState
{
ReceivedData = data,
Data = new Dictionary<string, object>()
};

var attributes = new List<KeyValuePair<string, string>>()
{
KeyValuePair.Create(nameof(_consumerOptions.ConsumerName), _consumerOptions.ConsumerName)
};

if (state.ReceivedData?.Letter?.MessageId is not null)
{
attributes.Add(KeyValuePair.Create(nameof(state.ReceivedData.Letter.MessageId), state.ReceivedData.Letter.MessageId));
}
if (state.ReceivedData?.Letter?.Metadata?.Id is not null)
{
attributes.Add(KeyValuePair.Create(nameof(state.ReceivedData.Letter.Metadata.Id), state.ReceivedData.Letter.Metadata.Id));
}

state.StartRootSpan(WorkflowName, spanKind: SpanKind.Consumer, suppliedAttributes: attributes);

return state;
}

public virtual TState BuildStateAndPayload<TOut>(string key, ReceivedData data)
{
var state = new TState
{
Expand All @@ -607,31 +648,38 @@ public virtual TState BuildState<TOut>(string key, ReceivedData data)
state.StartRootSpan(WorkflowName, spanKind: SpanKind.Consumer, suppliedAttributes: attributes);

if (_serializationProvider != null
&& data.ContentType != Constants.HeaderValueForUnknown)
&& data.ObjectType != Constants.HeaderValueForUnknownObjectType)
{
try
{ state.Data[key] = _serializationProvider.Deserialize<TOut>(data.Data); }
catch (Exception ex)
{
state.IsFaulted = true;
state.EDI = ExceptionDispatchInfo.Capture(ex);
return state;
}
catch { }
}
else
{ state.Data[key] = data.Data; }

return state;
}

public TransformBlock<ReceivedData, TState> GetBuildStateBlock<TOut>(
public TransformBlock<ReceivedData, TState> GetBuildStateWithPayloadBlock<TOut>(
string key,
ExecutionDataflowBlockOptions options)
{
TState BuildStateWrap(ReceivedData data)
{
try
{ return BuildState<TOut>(key, data); }
{ return BuildStateAndPayload<TOut>(key, data); }
catch
{ return null; }
}

return new TransformBlock<ReceivedData, TState>(BuildStateWrap, options);
}

public TransformBlock<ReceivedData, TState> GetBuildStateBlock(
ExecutionDataflowBlockOptions options)
{
TState BuildStateWrap(ReceivedData data)
{
try
{ return BuildState(data); }
catch
{ return null; }
}
Expand Down Expand Up @@ -660,7 +708,7 @@ TState WrapAction(TState state)
}
else if (predicate.Invoke(state))
{
if (state.ReceivedData.ContentType == Constants.HeaderValueForMessage)
if (state.ReceivedData.ObjectType == Constants.HeaderValueForMessageObjectType)
{
if (state.ReceivedData.Letter == null)
{ state.ReceivedData.Letter = _serializationProvider.Deserialize<Letter>(state.ReceivedData.Data); }
Expand Down Expand Up @@ -708,7 +756,7 @@ async Task<TState> WrapActionAsync(TState state)
}
else if (predicate.Invoke(state))
{
if (state.ReceivedData.ContentType == Constants.HeaderValueForMessage)
if (state.ReceivedData.ObjectType == Constants.HeaderValueForMessageObjectType)
{
if (state.ReceivedData.Letter == null)
{ state.ReceivedData.Letter = _serializationProvider.Deserialize<Letter>(state.ReceivedData.Data); }
Expand Down
2 changes: 0 additions & 2 deletions src/HouseofCat.RabbitMQ/Dataflows/RabbitWorkState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,5 @@ public abstract class RabbitWorkState : IRabbitWorkState
public bool IsFaulted { get; set; }
public ExceptionDispatchInfo EDI { get; set; }

public IDictionary<string, string> MetricTags { get; set; }

public TelemetrySpan RootSpan { get; set; }
}
Loading

0 comments on commit 17d7297

Please sign in to comment.