Skip to content

Commit

Permalink
AmazonSQS transport
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Nov 10, 2024
1 parent 3a3a11d commit b10113a
Show file tree
Hide file tree
Showing 13 changed files with 375 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace SlimMessageBus.Host.AmazonSQS.Config;

public static class SqsConsumerBuilderExtensions
{
public static ConsumerBuilder<T> Queue<T>(this ConsumerBuilder<T> consumerBuilder, string queueName)
{
if (consumerBuilder is null) throw new ArgumentNullException(nameof(consumerBuilder));
if (queueName is null) throw new ArgumentNullException(nameof(queueName));

return consumerBuilder.Path(queueName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
namespace SlimMessageBus.Host.AmazonSQS.Config;

public static class SqsProducerBuilderExtensions
{
/// <summary>
/// Enables FIFO support for the queue when it will be provisioned.
/// </summary>
/// <typeparam name="TProducerBuilder"></typeparam>
/// <param name="producerBuilder"></param>
/// <param name="enabled"></param>
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
public static TProducerBuilder EnableFifo<TProducerBuilder>(this TProducerBuilder producerBuilder, bool enabled = true)
where TProducerBuilder : IProducerBuilder
{
if (producerBuilder is null) throw new ArgumentNullException(nameof(producerBuilder));

producerBuilder.Settings.Properties[SqsProperties.EnableFifoKey] = enabled;
return producerBuilder;
}

public static ProducerBuilder<T> DefaultQueue<T>(this ProducerBuilder<T> producerBuilder, string queueName)
{
if (producerBuilder is null) throw new ArgumentNullException(nameof(producerBuilder));
if (queueName is null) throw new ArgumentNullException(nameof(queueName));

return producerBuilder.DefaultPath(queueName);
}
}
6 changes: 6 additions & 0 deletions src/SlimMessageBus.Host.AmazonSQS/Config/SqsProperties.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace SlimMessageBus.Host.AmazonSQS.Config;

internal static class SqsProperties
{
public static readonly string EnableFifoKey = "Sqs_EnableFifo";
}
9 changes: 7 additions & 2 deletions src/SlimMessageBus.Host.AmazonSQS/SqsMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ protected override void Build()
base.Build();

AddInit(_clientProvider.EnsureClientAuthenticated());
AddInit(ProvisionTopology());
if (ProviderSettings.TopologyProvisioning?.Enabled ?? false)
{
AddInit(ProvisionTopology());
}

// ToDo: path mappings

Expand All @@ -54,6 +57,8 @@ public override async Task ProduceToTransport(object message, Type messageType,
{
try
{
OnProduceToTransport(message, messageType, path, messageHeaders);

var queueUrl = GetQueueUrl(path);
var (payload, attributes) = GetTransportMessage(message, messageType, path, messageHeaders);
var req = new SendMessageRequest(queueUrl, payload)
Expand Down Expand Up @@ -85,7 +90,7 @@ public override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBu
}).ToList();

// ToDo: Chunk if exceeds 10 messages and payload size
var req = new SendMessageBatchRequest(path, entries);
var req = new SendMessageBatchRequest(queueUrl, entries);
await _clientProvider.Client.SendMessageBatchAsync(req, cancellationToken);

dispatched.AddRange(envelopes);
Expand Down
5 changes: 5 additions & 0 deletions src/SlimMessageBus.Host.AmazonSQS/SqsMessageBusSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ public class SqsMessageBusSettings
/// </summary>
public AmazonSQSConfig ClientConfig { get; set; } = new AmazonSQSConfig();

/// <summary>
/// Settings for auto creation of queues if they don't exist.
/// </summary>
public SqsTopologySettings TopologyProvisioning { get; set; } = new();

/// <summary>
/// Follows the AWS long term credentials.
/// See https://docs.aws.amazon.com/sdkref/latest/guide/access-iam-users.html
Expand Down
62 changes: 61 additions & 1 deletion src/SlimMessageBus.Host.AmazonSQS/SqsTopologyService.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
namespace SlimMessageBus.Host.AmazonSQS;

using SlimMessageBus.Host.AmazonSQS.Config;

public class SqsTopologyService
{
private readonly ILogger _logger;
Expand All @@ -21,6 +23,64 @@ public SqsTopologyService(

public async Task ProvisionTopology()
{
if (_providerSettings.TopologyProvisioning.CanProducerCreateQueue)
{
foreach (var producer in _settings.Producers)
{
try
{
try
{
var queueUrl = await _clientProvider.Client.GetQueueUrlAsync(producer.DefaultPath);
if (queueUrl != null)
{
continue;
}
}
catch (QueueDoesNotExistException)
{
// proceed to create the queue
}

var createQueueRequest = new CreateQueueRequest
{
QueueName = producer.DefaultPath,
Attributes = []
};

/*
{QueueAttributeName.FifoQueue, "false"},
{QueueAttributeName.MessageRetentionPeriod, "86400"},
{QueueAttributeName.VisibilityTimeout, "30"},
{QueueAttributeName.DelaySeconds, "0"},
{QueueAttributeName.MaximumMessageSize, "262144"},
{QueueAttributeName.ReceiveMessageWaitTimeSeconds, "0"},
{QueueAttributeName.Policy, "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":\"*\",\"Action\":\"sqs:SendMessage\",\"Resource\":\"*\"}]}"},
{QueueAttributeName.RedrivePolicy, "{\"deadLetterTargetArn\":\"arn:aws:sqs:us-east-1:123456789012:MyDeadLetterQueue\",\"maxReceiveCount\":\"5\"}"}
*/

if (producer.GetOrDefault(SqsProperties.EnableFifoKey, _settings, false))
{
createQueueRequest.Attributes.Add(QueueAttributeName.FifoQueue, "true");
}

_providerSettings.TopologyProvisioning.CreateQueueOptions?.Invoke(createQueueRequest);

try
{
var createQueueResponse = await _clientProvider.Client.CreateQueueAsync(createQueueRequest);
_logger.LogInformation("Created queue {QueueName} with URL {QueueUrl}", producer.DefaultPath, createQueueResponse.QueueUrl);
}
catch (QueueNameExistsException)
{
_logger.LogInformation("Queue {QueueName} already exists", producer.DefaultPath);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error creating queue {QueueName}", producer.DefaultPath);
}
}
}
}
}
}
22 changes: 22 additions & 0 deletions src/SlimMessageBus.Host.AmazonSQS/SqsTopologySettings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace SlimMessageBus.Host.AmazonSQS;

public class SqsTopologySettings
{
/// <summary>
/// Indicates whether topology provisioning is enabled. Default is true.
/// </summary>
public bool Enabled { get; set; } = true;
/// <summary>
/// A filter that allows (or not) for declared producers to provision needed queues. True by default.
/// </summary>
public bool CanProducerCreateQueue { get; set; } = true;
/// <summary>
/// A filter that allows (or not) for declared consumers to provision needed queues. True by default.
/// </summary>
public bool CanConsumerCreateQueue { get; set; } = true;
/// <summary>
/// Default configuration to be applied when a topic needs to be created (<see cref="CreateQueueRequest"/>).
/// </summary>
public Action<CreateQueueRequest> CreateQueueOptions { get; set; }

}
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ protected async Task DoProvisionTopology()
var topologyProvisioning = _providerSettings.TopologyProvisioning;

var consumersSettingsByPath = _settings.Consumers.OfType<AbstractConsumerSettings>()
.Concat(new[] { _settings.RequestResponse })
.Concat([_settings.RequestResponse])
.Where(x => x != null)
.GroupBy(x => (x.Path, x.PathKind))
.ToDictionary(x => x.Key, x => x.ToList());
Expand Down
11 changes: 11 additions & 0 deletions src/SlimMessageBus.sln
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Artifacts", "Artifacts", "{
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SlimMessageBus.Host.AmazonSQS", "SlimMessageBus.Host.AmazonSQS\SlimMessageBus.Host.AmazonSQS.csproj", "{4DF4BC7C-5EE3-4310-BC40-054C1494444E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SlimMessageBus.Host.AmazonSQS.Test", "Tests\SlimMessageBus.Host.AmazonSQS.Test\SlimMessageBus.Host.AmazonSQS.Test.csproj", "{9255A33D-9697-4E69-9418-AD31656FF8AC}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -856,6 +858,14 @@ Global
{4DF4BC7C-5EE3-4310-BC40-054C1494444E}.Release|Any CPU.Build.0 = Release|Any CPU
{4DF4BC7C-5EE3-4310-BC40-054C1494444E}.Release|x86.ActiveCfg = Release|Any CPU
{4DF4BC7C-5EE3-4310-BC40-054C1494444E}.Release|x86.Build.0 = Release|Any CPU
{9255A33D-9697-4E69-9418-AD31656FF8AC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9255A33D-9697-4E69-9418-AD31656FF8AC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9255A33D-9697-4E69-9418-AD31656FF8AC}.Debug|x86.ActiveCfg = Debug|Any CPU
{9255A33D-9697-4E69-9418-AD31656FF8AC}.Debug|x86.Build.0 = Debug|Any CPU
{9255A33D-9697-4E69-9418-AD31656FF8AC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9255A33D-9697-4E69-9418-AD31656FF8AC}.Release|Any CPU.Build.0 = Release|Any CPU
{9255A33D-9697-4E69-9418-AD31656FF8AC}.Release|x86.ActiveCfg = Release|Any CPU
{9255A33D-9697-4E69-9418-AD31656FF8AC}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -943,6 +953,7 @@ Global
{5250E48D-36C7-4214-8D7E-5924A9E337C6} = {59F88FB5-6D19-4520-87E8-227B3539BBB3}
{9FCBF788-1F0C-43E2-909D-1F96B2685F38} = {9F005B5C-A856-4351-8C0C-47A8B785C637}
{4DF4BC7C-5EE3-4310-BC40-054C1494444E} = {9291D340-B4FA-44A3-8060-C14743FB1712}
{9255A33D-9697-4E69-9418-AD31656FF8AC} = {9F005B5C-A856-4351-8C0C-47A8B785C637}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {435A0D65-610C-4B84-B1AA-2C7FBE72DB80}
Expand Down
12 changes: 12 additions & 0 deletions src/Tests/SlimMessageBus.Host.AmazonSQS.Test/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
global using FluentAssertions;

global using Microsoft.Extensions.Configuration;
global using Microsoft.Extensions.DependencyInjection;
global using Microsoft.Extensions.Logging;

global using SecretStore;

global using SlimMessageBus.Host.Test.Common.IntegrationTest;

global using Xunit;
global using Xunit.Abstractions;
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">

<Import Project="../Host.Test.Properties.xml" />

<ItemGroup>
<ProjectReference Include="..\SlimMessageBus.Host.Test.Common\SlimMessageBus.Host.Test.Common.csproj" />
<ProjectReference Include="..\..\SlimMessageBus.Host.AmazonSQS\SlimMessageBus.Host.AmazonSQS.csproj" />
<ProjectReference Include="..\..\SlimMessageBus.Host.Serialization.SystemTextJson\SlimMessageBus.Host.Serialization.SystemTextJson.csproj" />
<ProjectReference Include="..\..\Tools\SecretStore\SecretStore.csproj" />
</ItemGroup>

<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="xunit.runner.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>

</Project>
Loading

0 comments on commit b10113a

Please sign in to comment.