Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Message base class #685

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions JustSaying.sln
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
Releases.md = Releases.md
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JustSaying.Models", "src\JustSaying.Models\JustSaying.Models.csproj", "{C94866D0-29A6-43F2-B3BC-DFBC51A665C4}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JustSaying", "src\JustSaying\JustSaying.csproj", "{150EC9AA-90CB-48E8-BF45-CAE32C1E993D}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JustSaying.TestingFramework", "tests\JustSaying.TestingFramework\JustSaying.TestingFramework.csproj", "{BDE31352-56B7-4564-8494-D25BA24E55DC}"
Expand Down Expand Up @@ -77,10 +75,6 @@ Global
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{C94866D0-29A6-43F2-B3BC-DFBC51A665C4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C94866D0-29A6-43F2-B3BC-DFBC51A665C4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C94866D0-29A6-43F2-B3BC-DFBC51A665C4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C94866D0-29A6-43F2-B3BC-DFBC51A665C4}.Release|Any CPU.Build.0 = Release|Any CPU
{150EC9AA-90CB-48E8-BF45-CAE32C1E993D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{150EC9AA-90CB-48E8-BF45-CAE32C1E993D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{150EC9AA-90CB-48E8-BF45-CAE32C1E993D}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -130,7 +124,6 @@ Global
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{C94866D0-29A6-43F2-B3BC-DFBC51A665C4} = {A94633F2-29F2-48C6-840A-C5370B300AE2}
{150EC9AA-90CB-48E8-BF45-CAE32C1E993D} = {A94633F2-29F2-48C6-840A-C5370B300AE2}
{BDE31352-56B7-4564-8494-D25BA24E55DC} = {E22A50F2-9952-4483-8AD1-09BE354FB3E4}
{E87E73DE-A963-4024-83DC-F0D5336E9B17} = {E22A50F2-9952-4483-8AD1-09BE354FB3E4}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,5 @@
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\src\JustSaying.Models\JustSaying.Models.csproj" />
</ItemGroup>


</Project>
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using JustSaying.Models;

namespace JustSaying.Sample.Restaurant.Models
{
public class OrderPlacedEvent : Message
public class OrderPlacedEvent
{
public int OrderId { get; set; }

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using JustSaying.Models;

namespace JustSaying.Sample.Restaurant.Models
{
public class OrderReadyEvent : Message
public class OrderReadyEvent
{
public int OrderId { get; set; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
using JustSaying.Messaging.MessageHandling;
using JustSaying.Messaging.MessageSerialization;
using JustSaying.Messaging.Monitoring;
using JustSaying.Models;
using JustSaying.Naming;
using Microsoft.Extensions.DependencyInjection.Extensions;

Expand Down Expand Up @@ -194,7 +193,7 @@ public static IServiceCollection AddJustSaying(this IServiceCollection services,
/// <paramref name="services"/> is <see langword="null"/>.
/// </exception>
public static IServiceCollection AddJustSayingHandler<TMessage, THandler>(this IServiceCollection services)
where TMessage : Message
where TMessage : class
where THandler : class, IHandlerAsync<TMessage>
{
if (services == null)
Expand Down Expand Up @@ -224,7 +223,7 @@ public static IServiceCollection AddJustSayingHandler<TMessage, THandler>(this I
public static IServiceCollection AddJustSayingHandlers<TMessage>(
this IServiceCollection services,
IEnumerable<IHandlerAsync<TMessage>> handlers)
where TMessage : Message
where TMessage : class
{
if (services == null)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.ComponentModel;
using JustSaying.Messaging.MessageHandling;
using JustSaying.Models;

namespace StructureMap
{
Expand All @@ -21,7 +20,7 @@ public static class RegistryExtensions
/// <paramref name="registry"/> is <see langword="null"/>.
/// </exception>
public static void AddJustSayingHandler<TMessage, THandler>(this Registry registry)
where TMessage : Message
where TMessage : class
where THandler : class, IHandlerAsync<TMessage>
{
if (registry == null)
Expand Down
24 changes: 0 additions & 24 deletions src/JustSaying.Models/Message.cs

This file was deleted.

3 changes: 1 addition & 2 deletions src/JustSaying/AwsTools/MessageHandling/ExactlyOnceReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
using System.Linq;
using System.Reflection;
using JustSaying.Messaging.MessageHandling;
using JustSaying.Models;

namespace JustSaying.AwsTools.MessageHandling
{
internal static class HandlerMetadata
{
public static ExactlyOnceReader ReadExactlyOnce<T>(IHandlerAsync<T> handler) where T : Message
public static ExactlyOnceReader ReadExactlyOnce<T>(IHandlerAsync<T> handler) where T : class
{
return new ExactlyOnceReader(handler.GetType());
}
Expand Down
5 changes: 2 additions & 3 deletions src/JustSaying/AwsTools/MessageHandling/HandlerMap.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System;
using System.Collections.Generic;
using HandlerFunc = System.Func<JustSaying.Models.Message, System.Threading.Tasks.Task<bool>>;
using HandlerFunc = System.Func<object, System.Threading.Tasks.Task<bool>>;

namespace JustSaying.AwsTools.MessageHandling
{
Expand All @@ -14,8 +14,7 @@ public class HandlerMap

public HandlerFunc Get(Type messageType)
{
HandlerFunc handler;
return _handlers.TryGetValue(messageType, out handler) ? handler : null;
return _handlers.TryGetValue(messageType, out var handler) ? handler : null;
}
}
}
24 changes: 11 additions & 13 deletions src/JustSaying/AwsTools/MessageHandling/MessageDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
using JustSaying.Messaging.MessageSerialization;
using JustSaying.Messaging.Monitoring;
using Microsoft.Extensions.Logging;
using Message = JustSaying.Models.Message;
using SQSMessage = Amazon.SQS.Model.Message;

namespace JustSaying.AwsTools.MessageHandling
Expand Down Expand Up @@ -54,10 +53,10 @@ public async Task DispatchMessage(SQSMessage message, CancellationToken cancella
return;
}

Message typedMessage;
object untypedMessage;
try
{
typedMessage = _serializationRegister.DeserializeMessage(message.Body);
untypedMessage = _serializationRegister.DeserializeMessage(message.Body);
}
catch (MessageFormatNotSupportedException ex)
{
Expand Down Expand Up @@ -90,11 +89,11 @@ public async Task DispatchMessage(SQSMessage message, CancellationToken cancella

try
{
if (typedMessage != null)
if (untypedMessage != null)
{
_messageContextAccessor.MessageContext = new MessageContext(message, _queue.Uri);

handlingSucceeded = await CallMessageHandler(typedMessage).ConfigureAwait(false);
handlingSucceeded = await CallMessageHandler(untypedMessage).ConfigureAwait(false);
}

if (handlingSucceeded)
Expand All @@ -112,9 +111,9 @@ public async Task DispatchMessage(SQSMessage message, CancellationToken cancella
message.MessageId,
message.Body);

if (typedMessage != null)
if (untypedMessage != null)
{
_messagingMonitor.HandleException(typedMessage.GetType());
_messagingMonitor.HandleException(untypedMessage.GetType());
}

_onError(ex, message);
Expand All @@ -127,7 +126,7 @@ public async Task DispatchMessage(SQSMessage message, CancellationToken cancella
{
if (!handlingSucceeded && _messageBackoffStrategy != null)
{
await UpdateMessageVisibilityTimeout(message, message.ReceiptHandle, typedMessage, lastException).ConfigureAwait(false);
await UpdateMessageVisibilityTimeout(message, message.ReceiptHandle, untypedMessage, lastException).ConfigureAwait(false);
}
}
finally
Expand All @@ -137,7 +136,7 @@ public async Task DispatchMessage(SQSMessage message, CancellationToken cancella
}
}

private async Task<bool> CallMessageHandler(Message message)
private async Task<bool> CallMessageHandler(object message)
{
var messageType = message.GetType();

Expand All @@ -155,8 +154,7 @@ private async Task<bool> CallMessageHandler(Message message)
watch.Stop();

_logger.LogTrace(
"Handled message with Id '{MessageId}' of type {MessageType} in {TimeToHandle}.",
message.Id,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here it is, the one place where we actually do something with a property on Message (ok, there was UniqueKey() too I guess). It's on a trace log, we arguably would want to use the SQS message id for this level of debugging, but tying it back to the domain event is going to be the challenge.

"Handled message of type {MessageType} in {TimeToHandle}.",
messageType,
watch.Elapsed);

Expand All @@ -176,11 +174,11 @@ private async Task DeleteMessageFromQueue(string receiptHandle)
await _queue.Client.DeleteMessageAsync(deleteRequest).ConfigureAwait(false);
}

private async Task UpdateMessageVisibilityTimeout(SQSMessage message, string receiptHandle, Message typedMessage, Exception lastException)
private async Task UpdateMessageVisibilityTimeout(SQSMessage message, string receiptHandle, object untypedMessage, Exception lastException)
{
if (TryGetApproxReceiveCount(message.Attributes, out int approxReceiveCount))
{
var visibilityTimeout = _messageBackoffStrategy.GetBackoffDuration(typedMessage, approxReceiveCount, lastException);
var visibilityTimeout = _messageBackoffStrategy.GetBackoffDuration(untypedMessage, approxReceiveCount, lastException);
var visibilityTimeoutSeconds = (int)visibilityTimeout.TotalSeconds;

var visibilityRequest = new ChangeMessageVisibilityRequest
Expand Down
16 changes: 10 additions & 6 deletions src/JustSaying/AwsTools/MessageHandling/MessageHandlerWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using System.Threading.Tasks;
using JustSaying.Messaging.MessageHandling;
using JustSaying.Messaging.Monitoring;
using JustSaying.Models;
using Microsoft.Extensions.Logging;

namespace JustSaying.AwsTools.MessageHandling
Expand All @@ -23,19 +22,19 @@ public MessageHandlerWrapper(
_loggerFactory = loggerFactory;
}

public Func<Message, Task<bool>> WrapMessageHandler<T>(Func<IHandlerAsync<T>> futureHandler) where T : Message
public Func<object, Task<bool>> WrapMessageHandler<T>(Func<IHandlerAsync<T>> futureHandler, Func<T, string> uniqueKeySelector = default) where T : class
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it not possible to just use the SNS/SQS message ID for the unique key? Why does it have to be a value in the domain message?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original thinking behind this was that only the developer knows what determines the messages identity, and what is a duplicate. Perhaps the event publisher has OrderId and that is the real identity of the message, or a composite of OrderId and Status. Maybe it's a hash of all of the properties on the message?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. And with the benefit of hindsight, has that thinking paid off? Perhaps it's better to remove this "exactly once" functionality from this library and force consumers to make their solutions inherently idempotent in the domain rather than only at the message bus layer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{
return async message =>
{
var handler = futureHandler();
handler = MaybeWrapWithExactlyOnce(handler);
handler = MaybeWrapWithExactlyOnce(handler, uniqueKeySelector);
handler = MaybeWrapWithStopwatch(handler);

return await handler.Handle((T)message).ConfigureAwait(false);
};
}

private IHandlerAsync<T> MaybeWrapWithExactlyOnce<T>(IHandlerAsync<T> handler) where T : Message
private IHandlerAsync<T> MaybeWrapWithExactlyOnce<T>(IHandlerAsync<T> handler, Func<T, string> uniqueKeySelector) where T : class
{
var handlerType = handler.GetType();
var exactlyOnceMetadata = new ExactlyOnceReader(handlerType);
Expand All @@ -49,14 +48,19 @@ private IHandlerAsync<T> MaybeWrapWithExactlyOnce<T>(IHandlerAsync<T> handler) w
throw new Exception("IMessageLock is null. You need to specify an implementation for IMessageLock.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should change this to InvalidOperationException(?)

}

if (uniqueKeySelector == null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should throw this in the caller before we start to do any work, rather than in the delegate further along the call path.

{
throw new ArgumentNullException(nameof(uniqueKeySelector), "You must specify a uniqueKeySelector in order to use exactly once functionality.");
}

var handlerName = handlerType.FullName.ToLowerInvariant();
var timeout = TimeSpan.FromSeconds(exactlyOnceMetadata.GetTimeOut());
var logger = _loggerFactory.CreateLogger<ExactlyOnceHandler<T>>();

return new ExactlyOnceHandler<T>(handler, _messageLock, timeout, handlerName, logger);
return new ExactlyOnceHandler<T>(handler, _messageLock, uniqueKeySelector, timeout, handlerName, logger);
}

private IHandlerAsync<T> MaybeWrapWithStopwatch<T>(IHandlerAsync<T> handler) where T : Message
private IHandlerAsync<T> MaybeWrapWithStopwatch<T>(IHandlerAsync<T> handler) where T : class
{
if (!(_messagingMonitor is IMeasureHandlerExecutionTime executionTimeMonitoring))
{
Expand Down
10 changes: 5 additions & 5 deletions src/JustSaying/AwsTools/MessageHandling/SnsTopicBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
using JustSaying.Messaging;
using JustSaying.Messaging.MessageSerialization;
using Microsoft.Extensions.Logging;
using Message = JustSaying.Models.Message;
using MessageAttributeValue = Amazon.SimpleNotificationService.Model.MessageAttributeValue;

namespace JustSaying.AwsTools.MessageHandling
Expand All @@ -21,7 +20,7 @@ public abstract class SnsTopicBase : IMessagePublisher
private readonly IMessageSerializationRegister _serializationRegister; // ToDo: Grrr...why is this here even. GET OUT!
private readonly IMessageSubjectProvider _messageSubjectProvider;
private readonly SnsWriteConfiguration _snsWriteConfiguration;
public Action<MessageResponse, Message> MessageResponseLogger { get; set; }
public Action<MessageResponse, object> MessageResponseLogger { get; set; }
public string Arn { get; protected set; }
internal ServerSideEncryption ServerSideEncryption { get; set; }
protected IAmazonSimpleNotificationService Client { get; set; }
Expand Down Expand Up @@ -51,7 +50,8 @@ protected SnsTopicBase(

public abstract Task<bool> ExistsAsync();

public async Task PublishAsync(Message message, PublishMetadata metadata, CancellationToken cancellationToken)
public async Task PublishAsync<T>(T message, PublishMetadata metadata, CancellationToken cancellationToken)
where T : class
{
var request = BuildPublishRequest(message, metadata);
PublishResponse response = null;
Expand Down Expand Up @@ -87,9 +87,9 @@ public async Task PublishAsync(Message message, PublishMetadata metadata, Cancel

}

private bool ClientExceptionHandler(Exception ex, Message message) => _snsWriteConfiguration?.HandleException?.Invoke(ex, message) ?? false;
private bool ClientExceptionHandler<T>(Exception ex, T message) where T : class => _snsWriteConfiguration?.HandleException?.Invoke(ex, message) ?? false;

private PublishRequest BuildPublishRequest(Message message, PublishMetadata metadata)
private PublishRequest BuildPublishRequest<T>(T message, PublishMetadata metadata) where T : class
{
var messageToSend = _serializationRegister.Serialize(message, serializeForSnsPublishing: true);
var messageType = _messageSubjectProvider.GetSubjectForType(message.GetType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
using JustSaying.Messaging.MessageSerialization;
using JustSaying.Messaging.Monitoring;
using Microsoft.Extensions.Logging;
using Message = JustSaying.Models.Message;

namespace JustSaying.AwsTools.MessageHandling
{
Expand Down Expand Up @@ -93,7 +92,7 @@ public SqsNotificationListener WithMessageProcessingStrategy(IMessageProcessingS
return this;
}

public void AddMessageHandler<T>(Func<IHandlerAsync<T>> futureHandler) where T : Message
public void AddMessageHandler<T>(Func<IHandlerAsync<T>> futureHandler, Func<T, string> uniqueKeySelector = default) where T : class
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is optional, how does it avoid causing the ArgumentNullException later? Or is this causing the test failure you mentioned?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow. I didn't know there was a default for Func<T, string>!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't, well there is, it's just null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, haha. I don't know why I assumed otherwise.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default is just syntactic sugar added in C# 8 (at least I think it was 8, maybe a minor revision of 7) for default(Type). It's null for any reference type and the "zero" value for value types (0, 0l, etc.). The same behaviour you'd get from LINQ's FirstOrDefault()`.

{
if (_handlerMap.ContainsKey(typeof(T)))
{
Expand All @@ -103,7 +102,7 @@ public void AddMessageHandler<T>(Func<IHandlerAsync<T>> futureHandler) where T :

Subscribers.Add(new Subscriber(typeof(T)));

var handlerFunc = _messageHandlerWrapper.WrapMessageHandler(futureHandler);
var handlerFunc = _messageHandlerWrapper.WrapMessageHandler(futureHandler, uniqueKeySelector);
_handlerMap.Add(typeof(T), handlerFunc);
}

Expand Down
Loading