Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes and refactors #550

Merged
merged 10 commits into from
Jul 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.0-preview.2" PrivateAssets="All" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0-beta2-19270-01" PrivateAssets="All" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0-beta2-19324-01" PrivateAssets="All" />
<PackageReference Include="OpenCover" Version="4.7.922" PrivateAssets="All" />
<PackageReference Include="ReportGenerator" Version="4.1.9" PrivateAssets="All" />
<PackageReference Include="ReportGenerator" Version="4.2.2" PrivateAssets="All" />
<PackageReference Include="Microsoft.CodeAnalysis.FxCopAnalyzers" Version="$(AnalyzersPackageVersion)" PrivateAssets="All" />
<PackageReference Include="Microsoft.CodeQuality.Analyzers" Version="$(AnalyzersPackageVersion)" PrivateAssets="All" />
<PackageReference Include="Microsoft.NetCore.Analyzers" Version="$(AnalyzersPackageVersion)" PrivateAssets="All" />
Expand Down
2 changes: 1 addition & 1 deletion JustSaying.IntegrationTests/Fluent/IntegrationTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ protected IHandlerAsync<T> CreateHandler<T>(TaskCompletionSource<object> complet

handler.Handle(Arg.Any<T>())
.Returns(true)
.AndDoes((_) => completionSource.SetResult(null));
.AndDoes((_) => completionSource.TrySetResult(null));

return handler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
<ProjectReference Include="..\JustSaying.TestingFramework\JustSaying.TestingFramework.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="AWSSDK.Core" Version="3.3.102" />
<PackageReference Include="AWSSDK.SimpleNotificationService" Version="3.3.101.17" />
<PackageReference Include="AWSSDK.SQS" Version="3.3.100.28" />
<PackageReference Include="AWSSDK.Core" Version="3.3.103.5" />
<PackageReference Include="AWSSDK.SimpleNotificationService" Version="3.3.101.30" />
<PackageReference Include="AWSSDK.SQS" Version="3.3.100.41" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.2.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.1.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.2.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" />
<PackageReference Include="StructureMap" Version="4.7.1" />
<PackageReference Include="xunit" Version="2.4.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<ProjectReference Include="..\JustSaying.Extensions.DependencyInjection.StructureMap\JustSaying.Extensions.DependencyInjection.StructureMap.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="AutoFixture" Version="4.9.0" />
<PackageReference Include="AutoFixture" Version="4.10.0" />
<PackageReference Include="JustBehave" Version="2.0.0-beta-62" />
<PackageReference Include="JustBehave.xUnit" Version="2.0.0-beta-62" />
<PackageReference Include="MartinCostello.Logging.XUnit" Version="0.1.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ namespace JustSaying.UnitTests.AwsTools.MessageHandling.SqsNotificationListener
{
public class WhenExactlyOnceIsAppliedWithoutSpecificTimeout : BaseQueuePollingTest
{
//private readonly int _maximumTimeout = int.MaxValue;
private readonly int _maximumTimeout = (int)TimeSpan.MaxValue.TotalSeconds;
private readonly TaskCompletionSource<object> _tcs = new TaskCompletionSource<object>();
private ExactlyOnceSignallingHandler _handler;
Expand Down
10 changes: 5 additions & 5 deletions JustSaying.UnitTests/JustSaying.UnitTests.csproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp2.2</TargetFramework>
</PropertyGroup>
Expand All @@ -12,10 +12,10 @@
<ProjectReference Include="..\JustSaying.TestingFramework\JustSaying.TestingFramework.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="AWSSDK.Core" Version="3.3.102" />
<PackageReference Include="AWSSDK.SimpleNotificationService" Version="3.3.101.17" />
<PackageReference Include="AWSSDK.SQS" Version="3.3.100.28" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.1.1" />
<PackageReference Include="AWSSDK.Core" Version="3.3.103.5" />
<PackageReference Include="AWSSDK.SimpleNotificationService" Version="3.3.101.30" />
<PackageReference Include="AWSSDK.SQS" Version="3.3.100.41" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.2.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public async Task WhenMessageIsLockedByAnotherHandler_MessageWillBeLeftInTheQueu
{
var messageLock = Substitute.For<IMessageLockAsync>();
messageLock.TryAquireLockAsync(Arg.Any<string>(), Arg.Any<TimeSpan>()).Returns(new MessageLockResponse { DoIHaveExclusiveLock = false });
var sut = new ExactlyOnceHandler<OrderAccepted>(Substitute.For<IHandlerAsync<OrderAccepted>>(), messageLock, 1, "handlerName");
var sut = new ExactlyOnceHandler<OrderAccepted>(Substitute.For<IHandlerAsync<OrderAccepted>>(), messageLock, TimeSpan.FromSeconds(1), "handlerName");

var result = await sut.Handle(new OrderAccepted());

Expand Down
59 changes: 44 additions & 15 deletions JustSaying/AwsTools/MessageHandling/MessageDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,26 @@ public async Task DispatchMessage(SQSMessage message, CancellationToken cancella
}
catch (MessageFormatNotSupportedException ex)
{
_logger.LogTrace("Could not handle message with Id '{MessageId}' because a deserializer for the content is not configured. Message body: '{MessageBody}'.",
message.MessageId, message.Body);
_logger.LogTrace(
"Could not handle message with Id '{MessageId}' because a deserializer for the content is not configured. Message body: '{MessageBody}'.",
message.MessageId,
message.Body);

await DeleteMessageFromQueue(message.ReceiptHandle).ConfigureAwait(false);
_onError(ex, message);

return;
}
#pragma warning disable CA1031
catch (Exception ex)
#pragma warning restore CA1031
{
_logger.LogError(0, ex, "Error deserializing message with Id '{MessageId}' and body '{MessageBody}'.",
message.MessageId, message.Body);
_logger.LogError(
ex,
"Error deserializing message with Id '{MessageId}' and body '{MessageBody}'.",
message.MessageId,
message.Body);

_onError(ex, message);
return;
}
Expand All @@ -98,8 +106,11 @@ public async Task DispatchMessage(SQSMessage message, CancellationToken cancella
catch (Exception ex)
#pragma warning restore CA1031
{
_logger.LogError(0, ex, "Error handling message with Id '{MessageId}' and body '{MessageBody}'.",
message.MessageId, message.Body);
_logger.LogError(
ex,
"Error handling message with Id '{MessageId}' and body '{MessageBody}'.",
message.MessageId,
message.Body);

if (typedMessage != null)
{
Expand All @@ -112,18 +123,25 @@ public async Task DispatchMessage(SQSMessage message, CancellationToken cancella
}
finally
{
if (!handlingSucceeded && _messageBackoffStrategy != null)
try
{
await UpdateMessageVisibilityTimeout(message, message.ReceiptHandle, typedMessage, lastException).ConfigureAwait(false);
if (!handlingSucceeded && _messageBackoffStrategy != null)
{
await UpdateMessageVisibilityTimeout(message, message.ReceiptHandle, typedMessage, lastException).ConfigureAwait(false);
}
}
finally
{
_messageContextAccessor.MessageContext = null;
}

_messageContextAccessor.MessageContext = null;
}
}

private async Task<bool> CallMessageHandler(Message message)
{
var handler = _handlerMap.Get(message.GetType());
var messageType = message.GetType();

var handler = _handlerMap.Get(messageType);

if (handler == null)
{
Expand All @@ -135,8 +153,13 @@ private async Task<bool> CallMessageHandler(Message message)
var handlerSucceeded = await handler(message).ConfigureAwait(false);

watch.Stop();
_logger.LogTrace("Handled message of type {MessageType} in {TimeToHandle}.",
message.GetType(), watch.Elapsed);

_logger.LogTrace(
"Handled message with Id '{MessageId}' of type {MessageType} in {TimeToHandle}.",
message.Id,
messageType,
watch.Elapsed);

_messagingMonitor.HandleTime(watch.Elapsed);

return handlerSucceeded;
Expand Down Expand Up @@ -173,7 +196,12 @@ private async Task UpdateMessageVisibilityTimeout(SQSMessage message, string rec
}
catch (AmazonServiceException ex)
{
_logger.LogError(0, ex, "Failed to update message visibility timeout by {VisibilityTimeout} seconds.", visibilityTimeoutSeconds);
_logger.LogError(
ex,
"Failed to update message visibility timeout by {VisibilityTimeout} seconds for message with receipt handle '{ReceiptHandle}'.",
visibilityTimeoutSeconds,
receiptHandle);

_onError(ex, message);
}
}
Expand All @@ -183,7 +211,8 @@ private static bool TryGetApproxReceiveCount(IDictionary<string, string> attribu
{
approxReceiveCount = 0;

return attributes.TryGetValue(MessageSystemAttributeName.ApproximateReceiveCount, out string rawApproxReceiveCount) && int.TryParse(rawApproxReceiveCount, out approxReceiveCount);
return attributes.TryGetValue(MessageSystemAttributeName.ApproximateReceiveCount, out string rawApproxReceiveCount) &&
int.TryParse(rawApproxReceiveCount, out approxReceiveCount);
}
}
}
4 changes: 3 additions & 1 deletion JustSaying/AwsTools/MessageHandling/MessageHandlerWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ private IHandlerAsync<T> MaybeWrapWithExactlyOnce<T>(IHandlerAsync<T> handler) w
}

var handlerName = handlerType.FullName.ToLowerInvariant();
return new ExactlyOnceHandler<T>(handler, _messageLock, exactlyOnceMetadata.GetTimeOut(), handlerName);
var timeout = TimeSpan.FromSeconds(exactlyOnceMetadata.GetTimeOut());

return new ExactlyOnceHandler<T>(handler, _messageLock, timeout, handlerName);
}

private IHandlerAsync<T> MaybeWrapWithStopwatch<T>(IHandlerAsync<T> handler) where T : Message
Expand Down
36 changes: 26 additions & 10 deletions JustSaying/AwsTools/MessageHandling/SqsNotificationListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,20 +132,29 @@ internal async Task ListenLoop(CancellationToken ct)
}
catch (InvalidOperationException ex)
{
_log.LogTrace(0, ex, "Could not determine number of messages to read from queue '{QueueName}' in '{Region}'.",
queueName, regionName);
_log.LogTrace(
ex,
"Could not determine number of messages to read from queue '{QueueName}' in '{Region}'.",
queueName,
regionName);
}
catch (OperationCanceledException ex)
{
_log.LogTrace(0, ex, "Suspected no message on queue '{QueueName}' in region '{Region}'.",
queueName, regionName);
_log.LogTrace(
ex,
"Suspected no message on queue '{QueueName}' in region '{Region}'.",
queueName,
regionName);
}
#pragma warning disable CA1031
catch (Exception ex)
#pragma warning restore CA1031
{
_log.LogError(0, ex, "Error receiving messages on queue '{QueueName}' in region '{Region}'.",
queueName, regionName);
_log.LogError(
ex,
"Error receiving messages on queue '{QueueName}' in region '{Region}'.",
queueName,
regionName);
}

try
Expand All @@ -166,8 +175,11 @@ internal async Task ListenLoop(CancellationToken ct)
catch (Exception ex)
#pragma warning restore CA1031
{
_log.LogError(0, ex, "Error in message handling loop for queue '{QueueName}' in region '{Region}'.",
queueName, regionName);
_log.LogError(
ex,
"Error in message handling loop for queue '{QueueName}' in region '{Region}'.",
queueName,
regionName);
}
}
}
Expand Down Expand Up @@ -237,8 +249,12 @@ private async Task<int> GetNumberOfMessagesToReadFromSqs()

private Task HandleMessage(Amazon.SQS.Model.Message message, CancellationToken ct)
{
var action = new Func<Task>(() => _messageDispatcher.DispatchMessage(message, ct));
return _messageProcessingStrategy.StartWorker(action, ct);
async Task DispatchAsync()
{
await _messageDispatcher.DispatchMessage(message, ct);
slang25 marked this conversation as resolved.
Show resolved Hide resolved
}

return _messageProcessingStrategy.StartWorker(DispatchAsync, ct);
}

public ICollection<ISubscriber> Subscribers { get; }
Expand Down
33 changes: 24 additions & 9 deletions JustSaying/JustSayingBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public IMessageMonitor Monitor
public IMessageLockAsync MessageLock { get; set; }
public IMessageContextAccessor MessageContextAccessor { get; set; }

private ILogger _log;
private readonly ILogger _log;

private readonly object _syncRoot = new object();
private readonly ICollection<IPublisher> _publishers;
Expand Down Expand Up @@ -170,14 +170,18 @@ private IMessagePublisher GetActivePublisherForMessage(Message message)
throw new InvalidOperationException($"Error publishing message. No publishers registered for active region '{activeRegion}'.");
}

var topic = message.GetType().ToTopicName();
var messageType = message.GetType();
var topic = messageType.ToTopicName();
var publisherFound = publishersForRegion.TryGetValue(topic, out var publisher);

if (!publisherFound)
{
_log.LogError("Error publishing message. No publishers registered for message type '{MessageType}' in active region '{Region}'.",
message.GetType(), activeRegion);
throw new InvalidOperationException($"Error publishing message, no publishers registered for message type '{message.GetType()}' in active region '{activeRegion}'.");
_log.LogError(
"Error publishing message. No publishers registered for message type '{MessageType}' in active region '{Region}'.",
messageType,
activeRegion);

throw new InvalidOperationException($"Error publishing message, no publishers registered for message type '{messageType}' in active region '{activeRegion}'.");
}

return publisher;
Expand Down Expand Up @@ -236,16 +240,27 @@ await publisher.PublishAsync(message, metadata, cancellationToken)
}
catch (Exception ex)
{
var messageType = message.GetType();

if (attemptCount >= Config.PublishFailureReAttempts)
{
Monitor.IssuePublishingMessage();
_log.LogError(0, ex, "Failed to publish a message of type '{MessageType}'. Halting after attempt number {PublishAttemptCount}.",
message.GetType(), attemptCount);

_log.LogError(
ex,
"Failed to publish a message of type '{MessageType}'. Halting after attempt number {PublishAttemptCount}.",
messageType,
attemptCount);

throw;
}

_log.LogWarning(0, ex, "Failed to publish a message of type '{MessageType}'. Retrying after attempt number {PublishAttemptCount} of {PublishFailureReattempts}.",
message.GetType(), attemptCount, Config.PublishFailureReAttempts);
_log.LogWarning(
ex,
"Failed to publish a message of type '{MessageType}'. Retrying after attempt number {PublishAttemptCount} of {PublishFailureReattempts}.",
messageType,
attemptCount,
Config.PublishFailureReAttempts);

var delayForAttempt = TimeSpan.FromMilliseconds(Config.PublishFailureBackoff.TotalMilliseconds * attemptCount);
await Task.Delay(delayForAttempt, cancellationToken).ConfigureAwait(false);
Expand Down
4 changes: 2 additions & 2 deletions JustSaying/Messaging/MessageHandling/ExactlyOnceAttribute.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;

namespace JustSaying.Messaging.MessageHandling
{
Expand All @@ -7,9 +7,9 @@ public class ExactlyOnceAttribute : Attribute
{
public ExactlyOnceAttribute()
{
//TimeOut = int.MaxValue;
TimeOut = (int)TimeSpan.MaxValue.TotalSeconds;
}

public int TimeOut { get; set; }
}
}
Loading