Skip to content

Commit

Permalink
Code cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
houseofcat committed Apr 29, 2024
1 parent c02371b commit aeb5c34
Show file tree
Hide file tree
Showing 14 changed files with 56 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public partial class GeometryPointTypeHandler : SqlMapper.TypeHandler<GeometryPo

public override GeometryPoint Parse(object value)
{
if (value == null) return null;
if (value is null) return null;

if (!_regex.IsMatch(value.ToString()))
{ throw new ArgumentException("Value is not a Geometry Point"); }
Expand Down
4 changes: 2 additions & 2 deletions src/HouseofCat.RabbitMQ/Consumer/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ await _consumerChannel

protected void EnrichSpanWithTags(TelemetrySpan span, IReceivedMessage receivedMessage)
{
if (span == null || !span.IsRecording) return;
if (span is null || !span.IsRecording) return;

span.SetAttribute(Constants.MessagingSystemKey, Constants.MessagingSystemValue);

Expand Down Expand Up @@ -498,7 +498,7 @@ public async Task<IEnumerable<IReceivedMessage>> ReadUntilEmptyAsync(Cancellatio
await _consumerChannel.Reader.WaitToReadAsync(token).ConfigureAwait(false);
while (_consumerChannel.Reader.TryRead(out var message))
{
if (message == null) { break; }
if (message is null) { break; }
list.Add(message);
}

Expand Down
28 changes: 14 additions & 14 deletions src/HouseofCat.RabbitMQ/Dataflows/ConsumerDataflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public ConsumerDataflow<TState> WithErrorHandling(
TaskScheduler taskScheduler = null)
{
Guard.AgainstNull(action, nameof(action));
if (_errorBuffer == null)
if (_errorBuffer is null)
{
_errorBuffer = CreateTargetBlock(boundedCapacity, taskScheduler);
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
Expand All @@ -194,7 +194,7 @@ public ConsumerDataflow<TState> WithErrorHandling(
TaskScheduler taskScheduler = null)
{
Guard.AgainstNull(action, nameof(action));
if (_errorBuffer == null)
if (_errorBuffer is null)
{
_errorBuffer = CreateTargetBlock(boundedCapacity, taskScheduler);
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
Expand Down Expand Up @@ -254,7 +254,7 @@ public ConsumerDataflow<TState> WithFinalization(
TaskScheduler taskScheduler = null)
{
Guard.AgainstNull(action, nameof(action));
if (_finalization == null)
if (_finalization is null)
{
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
_finalization = GetLastWrappedActionBlock(action, executionOptions, GetSpanName("finalization"));
Expand All @@ -270,7 +270,7 @@ public ConsumerDataflow<TState> WithFinalization(
TaskScheduler taskScheduler = null)
{
Guard.AgainstNull(action, nameof(action));
if (_finalization == null)
if (_finalization is null)
{
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
_finalization = GetLastWrappedActionBlock(action, executionOptions, GetSpanName("finalization"));
Expand All @@ -284,7 +284,7 @@ public ConsumerDataflow<TState> WithBuildState(
int? boundedCapacity = null,
TaskScheduler taskScheduler = null)
{
if (_buildStateBlock == null)
if (_buildStateBlock is null)
{
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
_buildStateBlock = GetBuildStateBlock(executionOptions);
Expand All @@ -299,7 +299,7 @@ public ConsumerDataflow<TState> WithDecryptionStep(
TaskScheduler taskScheduler = null)
{
Guard.AgainstNull(_encryptionProvider, nameof(_encryptionProvider));
if (_decryptBlock == null)
if (_decryptBlock is null)
{
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);

Expand All @@ -320,7 +320,7 @@ public ConsumerDataflow<TState> WithDecompressionStep(
TaskScheduler taskScheduler = null)
{
Guard.AgainstNull(_compressionProvider, nameof(_compressionProvider));
if (_decompressBlock == null)
if (_decompressBlock is null)
{
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);

Expand All @@ -342,7 +342,7 @@ public ConsumerDataflow<TState> WithCreateSendMessage(
int? boundedCapacity = null,
TaskScheduler taskScheduler = null)
{
if (_createSendMessage == null)
if (_createSendMessage is null)
{
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
_createSendMessage = GetWrappedTransformBlock(createMessage, executionOptions, GetSpanName("create_send_message"));
Expand All @@ -357,7 +357,7 @@ public ConsumerDataflow<TState> WithSendCompressedStep(
TaskScheduler taskScheduler = null)
{
Guard.AgainstNull(_compressionProvider, nameof(_compressionProvider));
if (_compressBlock == null)
if (_compressBlock is null)
{
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);

Expand All @@ -378,7 +378,7 @@ public ConsumerDataflow<TState> WithSendEncryptedStep(
TaskScheduler taskScheduler = null)
{
Guard.AgainstNull(_encryptionProvider, nameof(_encryptionProvider));
if (_encryptBlock == null)
if (_encryptBlock is null)
{
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);

Expand All @@ -398,7 +398,7 @@ public ConsumerDataflow<TState> WithSendStep(
int? boundedCapacity = null,
TaskScheduler taskScheduler = null)
{
if (_sendMessageBlock == null)
if (_sendMessageBlock is null)
{
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
_sendMessageBlock = GetWrappedPublishTransformBlock(_rabbitService, executionOptions);
Expand Down Expand Up @@ -432,7 +432,7 @@ protected virtual void BuildLinkages<TConsumerBlock>(DataflowLinkOptions overrid
}

((ISourceBlock<IReceivedMessage>)_inputBuffer).LinkTo(_buildStateBlock, overrideOptions ?? _linkStepOptions);
_buildStateBlock.LinkTo(_errorBuffer, overrideOptions ?? _linkStepOptions, x => x == null);
_buildStateBlock.LinkTo(_errorBuffer, overrideOptions ?? _linkStepOptions, x => x is null);
SetCurrentSourceBlock(_buildStateBlock);

LinkPreProcessing(overrideOptions);
Expand Down Expand Up @@ -592,7 +592,7 @@ TState WrapAction(TState state)
{
if (state.ReceivedMessage.ObjectType == Constants.HeaderValueForMessageObjectType)
{
if (state.ReceivedMessage.Message == null)
if (state.ReceivedMessage.Message is null)
{ state.ReceivedMessage.Message = _serializationProvider.Deserialize<Message>(state.ReceivedMessage.Body); }

state.ReceivedMessage.Message.Body = action(state.ReceivedMessage.Message.Body);
Expand Down Expand Up @@ -640,7 +640,7 @@ async Task<TState> WrapActionAsync(TState state)
{
if (state.ReceivedMessage.ObjectType == Constants.HeaderValueForMessageObjectType)
{
if (state.ReceivedMessage.Message == null)
if (state.ReceivedMessage.Message is null)
{ state.ReceivedMessage.Message = _serializationProvider.Deserialize<Message>(state.ReceivedMessage.Body); }

state.ReceivedMessage.Message.Body = await action(state.ReceivedMessage.Message.Body).ConfigureAwait(false);
Expand Down
16 changes: 2 additions & 14 deletions src/HouseofCat.RabbitMQ/Dataflows/ConsumerPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class ConsumerPipeline<TOut> : IConsumerPipeline, IDisposable where TOut
private CancellationTokenSource _cancellationTokenSource;
private bool _disposedValue;
private readonly SemaphoreSlim _cpLock = new SemaphoreSlim(1, 1);
private readonly SemaphoreSlim _pipeExecLock = new SemaphoreSlim(1, 1);

public ConsumerPipeline(
IConsumer<PipeReceivedMessage> consumer,
Expand Down Expand Up @@ -135,15 +134,11 @@ public async Task PipelineStreamEngineAsync(
bool waitForCompletion,
CancellationToken token = default)
{
await _pipeExecLock
.WaitAsync(2000, token)
.ConfigureAwait(false);

try
{
await foreach (var receivedMessage in Consumer.GetConsumerBuffer().ReadAllAsync(token))
{
if (receivedMessage == null) { continue; }
if (receivedMessage is null) { continue; }

_logger.LogDebug(
_consumerPipelineQueueing,
Expand Down Expand Up @@ -188,25 +183,20 @@ await receivedMessage
ConsumerOptions.ConsumerName,
ex.Message);
}
finally { _pipeExecLock.Release(); }
}

public async Task PipelineExecutionEngineAsync(
IPipeline<PipeReceivedMessage, TOut> pipeline,
bool waitForCompletion,
CancellationToken token = default)
{
await _pipeExecLock
.WaitAsync(2000, token)
.ConfigureAwait(false);

try
{
while (await Consumer.GetConsumerBuffer().WaitToReadAsync(token).ConfigureAwait(false))
{
while (Consumer.GetConsumerBuffer().TryRead(out var receivedMessage))
{
if (receivedMessage == null) { continue; }
if (receivedMessage is null) { continue; }

_logger.LogDebug(
_consumerPipelineQueueing,
Expand Down Expand Up @@ -252,7 +242,6 @@ await receivedMessage
ConsumerOptions.ConsumerName,
ex.Message);
}
finally { _pipeExecLock.Release(); }
}

public async Task AwaitCompletionAsync()
Expand All @@ -267,7 +256,6 @@ protected virtual void Dispose(bool disposing)
if (disposing)
{
_cpLock.Dispose();
_pipeExecLock.Dispose();
_cancellationTokenSource?.Dispose();
}

Expand Down
2 changes: 1 addition & 1 deletion src/HouseofCat.RabbitMQ/Extensions/MessageExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public static IList<IMessage> CreateManySimpleRandomMessages(string queueName, i

public static void EnrichSpanWithTags(this IMessage message, TelemetrySpan span)
{
if (message == null || span == null || !span.IsRecording) return;
if (message is null || span is null || !span.IsRecording) return;

span.SetAttribute(Constants.MessagingSystemKey, Constants.MessagingSystemValue);

Expand Down
12 changes: 6 additions & 6 deletions src/HouseofCat.RabbitMQ/Messages/Metadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public sealed class Metadata : IMetadata

public bool Encrypted()
{
if (Fields == null) return false;
if (Fields is null) return false;

if (Fields.TryGetValue(Constants.HeaderForEncrypted, out var value))
{
Expand All @@ -33,7 +33,7 @@ public bool Encrypted()

public string EncryptionType()
{
if (Fields == null) return null;
if (Fields is null) return null;

if (Fields.TryGetValue(Constants.HeaderForEncryption, out var value))
{
Expand All @@ -45,7 +45,7 @@ public string EncryptionType()

public string EncryptedDate()
{
if (Fields == null) return null;
if (Fields is null) return null;

if (Fields.TryGetValue(Constants.HeaderForEncryptDate, out var value))
{
Expand All @@ -57,7 +57,7 @@ public string EncryptedDate()

public DateTime? EncryptedDateTime()
{
if (Fields == null) return null;
if (Fields is null) return null;

if (Fields.TryGetValue(Constants.HeaderForEncryptDate, out var value)
&& DateTime.TryParse((string)value, out var dateTime))
Expand All @@ -70,7 +70,7 @@ public string EncryptedDate()

public bool Compressed()
{
if (Fields == null) return false;
if (Fields is null) return false;

if (Fields.TryGetValue(Constants.HeaderForCompressed, out var value))
{
Expand All @@ -82,7 +82,7 @@ public bool Compressed()

public string CompressionType()
{
if (Fields == null) return null;
if (Fields is null) return null;

if (Fields.TryGetValue(Constants.HeaderForCompression, out var value))
{
Expand Down
4 changes: 2 additions & 2 deletions src/HouseofCat.RabbitMQ/Pools/ChannelPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public async Task<IChannelHost> GetChannelAsync()
.ConfigureAwait(false);
}

if (_channels == null)
if (_channels is null)
{
throw new InvalidOperationException(_channelPoolBadOptionChannelError);
}
Expand Down Expand Up @@ -211,7 +211,7 @@ public async Task<IChannelHost> GetAckChannelAsync()
.ConfigureAwait(false);
}

if (_ackChannels == null)
if (_ackChannels is null)
{
throw new InvalidOperationException(_channelPoolBadOptionAckChannelError);
}
Expand Down
12 changes: 6 additions & 6 deletions src/HouseofCat.RabbitMQ/Publisher/Publisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public Publisher(
_logger = LogHelpers.GetLogger<Publisher>();
_serializationProvider = serializationProvider;

if (Options.PublisherOptions.Encrypt && encryptionProvider == null)
if (Options.PublisherOptions.Encrypt && encryptionProvider is null)
{
Options.PublisherOptions.Encrypt = false;
_logger.LogWarning("Encryption disabled, encryptionProvider provided was null.");
Expand All @@ -135,7 +135,7 @@ public Publisher(
_encryptionProvider = encryptionProvider;
}

if (Options.PublisherOptions.Compress && compressionProvider == null)
if (Options.PublisherOptions.Compress && compressionProvider is null)
{
Options.PublisherOptions.Compress = false;
_logger.LogWarning("Compression disabled, compressionProvider provided was null.");
Expand Down Expand Up @@ -280,7 +280,7 @@ private async Task ProcessMessagesAsync(ChannelReader<IMessage> channelReader)
{
while (channelReader.TryRead(out var message))
{
if (message == null)
if (message is null)
{ continue; }

using var span = OpenTelemetryHelpers.StartActiveSpan(
Expand Down Expand Up @@ -373,7 +373,7 @@ public async Task<bool> PublishAsync(

var error = false;
var channelHost = await _channelPool.GetChannelAsync().ConfigureAwait(false);
if (basicProperties == null)
if (basicProperties is null)
{
basicProperties = channelHost.Channel.CreateBasicProperties();
basicProperties.DeliveryMode = 2;
Expand Down Expand Up @@ -481,7 +481,7 @@ public async Task<bool> PublishBatchAsync(

var error = false;
var channelHost = await _channelPool.GetChannelAsync().ConfigureAwait(false);
if (basicProperties == null)
if (basicProperties is null)
{
basicProperties = channelHost.Channel.CreateBasicProperties();
basicProperties.DeliveryMode = 2;
Expand Down Expand Up @@ -917,7 +917,7 @@ public static void EnrichSpanWithTags(
string routingKey,
string messageId = null)
{
if (span == null || !span.IsRecording) return;
if (span is null || !span.IsRecording) return;

span.SetAttribute(Constants.MessagingSystemKey, Constants.MessagingSystemValue);

Expand Down
4 changes: 2 additions & 2 deletions src/HouseofCat.RabbitMQ/Services/MaintenanceService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public async Task<bool> TransferAllMessagesAsync(
while (true)
{
result = channelHost.Channel.BasicGet(originQueueName, true);
if (result == null) { break; }
if (result is null) { break; }

if (result?.Body is not null)
{
Expand Down Expand Up @@ -187,7 +187,7 @@ public async Task<bool> TransferAllMessagesAsync(
try
{
result = channelHost.Channel.BasicGet(originQueueName, true);
if (result == null) { break; }
if (result is null) { break; }
}
catch { error = true; }
finally
Expand Down
2 changes: 1 addition & 1 deletion src/HouseofCat.Utilities/Extensions/ObjectExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public static class ObjectExtensions

public static long GetByteCount(this object input)
{
if (input == null) return 0;
if (input is null) return 0;
var type = input.GetType();

if (_primitiveTypeSizes.TryGetValue(type, out int sizeValue))
Expand Down
2 changes: 1 addition & 1 deletion src/HouseofCat.Utilities/Helpers/AppHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public static bool IsDebug

public static string GetFlexibleSemVersion(AssemblyName assemblyName)
{
if (assemblyName == null)
if (assemblyName is null)
{
return null;
}
Expand Down
Loading

0 comments on commit aeb5c34

Please sign in to comment.