Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
houseofcat committed Apr 23, 2024
1 parent b92e97b commit fd877fd
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 24 deletions.
29 changes: 6 additions & 23 deletions src/HouseofCat.RabbitMQ/Dataflows/ConsumerDataflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ namespace HouseofCat.RabbitMQ.Dataflows;
public class ConsumerDataflow<TState> : BaseDataflow<TState> where TState : class, IRabbitWorkState, new()
{
private readonly IRabbitService _rabbitService;
private readonly ICollection<IConsumer<IReceivedMessage>> _consumers;
private readonly ConsumerOptions _consumerOptions;
private readonly TaskScheduler _taskScheduler;

Expand Down Expand Up @@ -420,30 +419,14 @@ protected virtual void BuildLinkages<TConsumerBlock>(DataflowLinkOptions overrid
_readyBuffer ??= new BufferBlock<TState>();
_postProcessingBuffer ??= new BufferBlock<TState>();

if (_consumers == null)
for (var i = 0; i < _consumerOptions.WorkflowConsumerCount; i++)
{
for (var i = 0; i < _consumerOptions.WorkflowConsumerCount; i++)
var consumerBlock = new TConsumerBlock
{
var consumerBlock = new TConsumerBlock
{
Consumer = new Consumer(_rabbitService.ChannelPool, _consumerOptions.ConsumerName)
};
_consumerBlocks.Add(consumerBlock);
_consumerBlocks[i].LinkTo(_inputBuffer, overrideOptions ?? _linkStepOptions);
}
}
else
{
for (var i = 0; i < _consumers.Count; i++)
{
var consumerBlock = new TConsumerBlock
{
Consumer = _consumers.ElementAt(i)
};

_consumerBlocks.Add(consumerBlock);
_consumerBlocks[i].LinkTo(_inputBuffer, overrideOptions ?? _linkStepOptions);
}
Consumer = new Consumer(_rabbitService.ChannelPool, _consumerOptions.ConsumerName)
};
_consumerBlocks.Add(consumerBlock);
_consumerBlocks[i].LinkTo(_inputBuffer, overrideOptions ?? _linkStepOptions);
}

((ISourceBlock<IReceivedMessage>)_inputBuffer).LinkTo(_buildStateBlock, overrideOptions ?? _linkStepOptions);
Expand Down
2 changes: 1 addition & 1 deletion src/HouseofCat.RabbitMQ/Services/RabbitService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class RabbitService : IRabbitService, IDisposable
public ConcurrentDictionary<string, IConsumer<IReceivedMessage>> Consumers { get; private set; } = new ConcurrentDictionary<string, IConsumer<IReceivedMessage>>();
public ConcurrentDictionary<string, ConsumerOptions> ConsumerOptions { get; private set; } = new ConcurrentDictionary<string, ConsumerOptions>();

public string TimeFormat { get; set; } = TimeHelpers.Formats.CatsAltFormat;
public string TimeFormat { get; set; } = TimeHelpers.Formats.RFC3339Long;

public RabbitService(
string fileNamePath,
Expand Down

0 comments on commit fd877fd

Please sign in to comment.