Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
houseofcat committed Mar 17, 2024
1 parent 0e9f5a2 commit 16e79eb
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
4 changes: 2 additions & 2 deletions src/HouseofCat.RabbitMQ.Dataflows/ConsumerBlock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public async Task StartConsumingAsync()
{
_cts = new CancellationTokenSource();
await _consumer.StartConsumerAsync().ConfigureAwait(false);
_bufferProcessor = PushToBufferAsync(_cts.Token);
_bufferProcessor = PushToBufferBlockAsync(_cts.Token);
}

public async Task StopConsumingAsync(bool immediate = false)
Expand Down Expand Up @@ -93,7 +93,7 @@ public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOu
}

// Fast
protected virtual async Task PushToBufferAsync(CancellationToken token = default)
protected virtual async Task PushToBufferBlockAsync(CancellationToken token = default)
{
try
{
Expand Down
8 changes: 4 additions & 4 deletions src/HouseofCat.RabbitMQ/Consumer/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,12 @@ public async Task StartConsumerAsync()
});

await Task.Yield();
bool success;
do
var success = false;
while (!success)
{
_logger.LogTrace(Consumers.StartingConsumerLoop, ConsumerOptions.ConsumerName);
success = await StartConsumingAsync().ConfigureAwait(false);
}
while (!success);

_logger.LogDebug(Consumers.Started, ConsumerOptions.ConsumerName);

Expand Down Expand Up @@ -433,11 +432,12 @@ await Task.Delay(Options.PoolOptions.SleepOnErrorInterval)
}

if (!channelHealthy)
{ // This is an inner infinite loop, until Channel is healthy/rebuilt.
{
_logger.LogWarning(
Consumers.ConsumerChannelReplacedEvent,
ConsumerOptions.ConsumerName);

// This is an inner infinite loop, until Channel is healthy/rebuilt.
await _chanHost
.WaitUntilChannelIsReadyAsync(Options.PoolOptions.SleepOnErrorInterval)
.ConfigureAwait(false);
Expand Down

0 comments on commit 16e79eb

Please sign in to comment.