Skip to content

Commit

Permalink
Adding amqps support to rabbit activities (#4612)
Browse files Browse the repository at this point in the history
* Adding ampqs support to rabbit activities

* Correcting case of Ssl
  • Loading branch information
stuartmcgillivray authored Nov 9, 2023
1 parent 8e4a79a commit 5948f3f
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,28 @@ public class RabbitMqMessageReceived : Activity
Category = PropertyCategories.Configuration)]
public string ConnectionString { get; set; } = default!;


[ActivityInput(
Order = 3,
Category = PropertyCategories.Configuration,
Name = "Enable SSL")]
public bool EnableSsl { get; set; }

[ActivityInput(
Order = 4,
Category = PropertyCategories.Configuration,
Name = "SSL Host")]
public string SslHost { get; set; }

[ActivityInput(
Order = 5,
Category = PropertyCategories.Configuration,
UIHint = ActivityInputUIHints.CheckList,
DefaultSyntax = SyntaxNames.Json,
Options = new[] { "Ssl2", "Ssl3", "Tls", "Tls11", "Tls12", "Tls13" },
Name = "SSL Protocols")]
public HashSet<string> SslProtocols { get; set; } = new() { };

public string ClientId => RabbitMqClientConfigurationHelper.GetClientId(Id);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,36 @@ public SendRabbitMqMessage(IMessageSenderClientFactory messageSenderClientFactor
Category = PropertyCategories.Configuration)]
public string ConnectionString { get; set; } = default!;

[ActivityInput(
Order = 2,
SupportedSyntaxes = new[] { SyntaxNames.JavaScript, SyntaxNames.Liquid },
Category = PropertyCategories.Configuration,
Name = "Enable SSL")]
public bool EnableSsl { get; set; }

[ActivityInput(
Order = 3,
SupportedSyntaxes = new[] { SyntaxNames.JavaScript, SyntaxNames.Liquid },
Category = PropertyCategories.Configuration,
Name = "SSL Host")]
public string SslHost { get; set; }

[ActivityInput(
Order = 4,
SupportedSyntaxes = new[] { SyntaxNames.JavaScript, SyntaxNames.Liquid,SyntaxNames.Json },
Category = PropertyCategories.Configuration,
UIHint = ActivityInputUIHints.CheckList,
DefaultSyntax = SyntaxNames.Json,
Options = new[] { "Ssl2", "Ssl3", "Tls", "Tls11", "Tls12", "Tls13" },
Name = "SSL Protocols"
)]
public HashSet<string> SslProtocols { get; set; } = new() { };

public string ClientId => RabbitMqClientConfigurationHelper.GetClientId(Id);

protected override async ValueTask<IActivityExecutionResult> OnExecuteAsync(ActivityExecutionContext context)
{
var config = new RabbitMqBusConfiguration(ConnectionString, ExchangeName, RoutingKey, Headers, ClientId);
var config = new RabbitMqBusConfiguration(ConnectionString, ExchangeName, RoutingKey, Headers, ClientId, EnableSsl, SslHost, SslProtocols);

var client = await _messageSenderClientFactory.GetSenderAsync(config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;

namespace Elsa.Activities.RabbitMq.Bookmarks
{
Expand All @@ -11,18 +12,31 @@ public MessageReceivedBookmark()
{
}

public MessageReceivedBookmark(string exchangeName, string routingKey, string connectionString, Dictionary<string, string> headers)
public MessageReceivedBookmark(string exchangeName, string routingKey, string connectionString, Dictionary<string, string> headers, bool sslEnabled, string sslHost, IEnumerable<string> sslProtocols)
{
ExchangeName = exchangeName;
RoutingKey = routingKey;
ConnectionString = connectionString;
SslEnabled = sslEnabled;
SslHost = sslHost;
SslProtocols = sslProtocols ?? new List<string>();
Headers = headers ?? new Dictionary<string, string>();
}

[JsonProperty(Order = 1)]
public string ExchangeName { get; set; } = default!;
[JsonProperty(Order = 2)]
public string RoutingKey { get; set; } = default!;
[JsonProperty(Order = 3)]
public string ConnectionString { get; set; } = default!;
[JsonProperty(Order = 4)]
public bool SslEnabled { get; set; } = default!;
[JsonProperty(Order = 5)]
public string SslHost { get; set; } = default!;
[JsonProperty(Order = 6)]
public Dictionary<string, string> Headers { get; set; } = default!;
[JsonProperty(Order = 7)]
public IEnumerable<string> SslProtocols { get; set; } = default!;
}

public class QueueMessageReceivedBookmarkProvider : BookmarkProvider<MessageReceivedBookmark, RabbitMqMessageReceived>
Expand All @@ -35,7 +49,10 @@ public override async ValueTask<IEnumerable<BookmarkResult>> GetBookmarksAsync(B
exchangeName: (await context.ReadActivityPropertyAsync(x => x.ExchangeName, cancellationToken))!,
routingKey: (await context.ReadActivityPropertyAsync(x => x.RoutingKey, cancellationToken))!,
connectionString: (await context.ReadActivityPropertyAsync(x => x.ConnectionString, cancellationToken))!,
headers: (await context.ReadActivityPropertyAsync(x => x.Headers, cancellationToken))!
headers: (await context.ReadActivityPropertyAsync(x => x.Headers, cancellationToken))!,
sslEnabled:(await context.ReadActivityPropertyAsync(x => x.EnableSsl, cancellationToken))!,
sslHost: (await context.ReadActivityPropertyAsync(x => x.SslHost, cancellationToken))!,
sslProtocols: (await context.ReadActivityPropertyAsync(x => x.SslProtocols, cancellationToken))!
))
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Authentication;

namespace Elsa.Activities.RabbitMq.Configuration
{
Expand All @@ -11,13 +13,19 @@ public class RabbitMqBusConfiguration
public Dictionary<string, string> Headers { get; }
public string ClientId { get; }
public bool AutoDeleteQueue { get; }
public bool EnableSsl { get; }
public string SslHost { get; }
public SslProtocols SslProtocols { get; }

public RabbitMqBusConfiguration(string connectionString, string exchangeName, string routingKey, Dictionary<string, string> headers, string clientId, bool autoDeleteQueue = false)
public RabbitMqBusConfiguration(string connectionString, string exchangeName, string routingKey, Dictionary<string, string> headers, string clientId, bool enableSsl, string sslHost, IEnumerable<string> sslProtocols, bool autoDeleteQueue = false)
{
ConnectionString = connectionString;
ExchangeName = exchangeName;
RoutingKey = routingKey;
Headers = headers ?? new Dictionary<string, string>();
EnableSsl = enableSsl;
SslHost = sslHost;
SslProtocols = ResolveSslProtocols(sslProtocols ?? Array.Empty<string>());
ClientId = clientId;
AutoDeleteQueue = autoDeleteQueue;
}
Expand All @@ -30,5 +38,28 @@ public override int GetHashCode()
}

public string TopicFullName => string.IsNullOrEmpty(ExchangeName) ? RoutingKey : $"{RoutingKey}@{ExchangeName}";

public IEnumerable<string> SslProtocolsString => Enum.GetValues(typeof(SslProtocols))
.Cast<SslProtocols>()
.Where(c => SslProtocols.HasFlag(c) && c != SslProtocols.None)
.Select(c => c.ToString());

private SslProtocols ResolveSslProtocols(IEnumerable<string> sslProtocols)
{
var parsed = sslProtocols
.Select(s =>
{
var val = (SslProtocols)Enum.Parse(typeof(System.Security.Authentication.SslProtocols), s);
return val;
}).ToList();

SslProtocols values = SslProtocols.None;

foreach (var sslProtocol in parsed)
{
values |= sslProtocol;
}
return values;
}
}
}
10 changes: 8 additions & 2 deletions src/activities/Elsa.Activities.RabbitMq/Services/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Rebus.Bus;
using Rebus.Config;
using Rebus.Messages;
using Rebus.RabbitMq;
using Rebus.Routing.TransportMessages;
using System;
using System.Collections.Generic;
Expand Down Expand Up @@ -41,7 +42,9 @@ public void SubscribeWithHandler(Func<TransportMessage, CancellationToken, Task>
}))
.Transport(t =>
{
t.UseRabbitMq(Configuration.ConnectionString, Configuration.ClientId).InputQueueOptions(x => x.SetAutoDelete(Configuration.AutoDeleteQueue));
t.UseRabbitMq(Configuration.ConnectionString, Configuration.ClientId)
.InputQueueOptions(x => x.SetAutoDelete(Configuration.AutoDeleteQueue))
.Ssl(new SslSettings(Configuration.EnableSsl, Configuration.SslHost, version: Configuration.SslProtocols));
})
.Start();

Expand All @@ -64,7 +67,10 @@ private void ConfigureAsOneWayClient()
{
_bus = Configure
.With(_activator)
.Transport(t => t.UseRabbitMqAsOneWayClient(Configuration.ConnectionString).InputQueueOptions(o => o.SetAutoDelete(autoDelete: true)))
.Transport(t => t.UseRabbitMqAsOneWayClient(Configuration.ConnectionString)
.InputQueueOptions(o => o.SetAutoDelete(autoDelete: true))
.Ssl(new SslSettings(Configuration.EnableSsl, Configuration.SslHost, version: Configuration.SslProtocols))
)
.Start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,12 @@ private RabbitMqBusConfiguration CreateConfigurationFromBookmark(MessageReceived
var exchangeName = bookmark.ExchangeName;
var routingKey = bookmark.RoutingKey;
var headers = bookmark.Headers;
var enableSsl = bookmark.SslEnabled;
var sslHost = bookmark.SslHost;
var sslProtocols = bookmark.SslProtocols;
var clientId = RabbitMqClientConfigurationHelper.GetClientId(activityId);

return new RabbitMqBusConfiguration(connectionString!, exchangeName!, routingKey!, headers, clientId);
return new RabbitMqBusConfiguration(connectionString!, exchangeName!, routingKey!, headers, clientId, enableSsl, sslHost, sslProtocols);
}

private async Task DisposeExistingWorkersAsync()
Expand Down
2 changes: 1 addition & 1 deletion src/activities/Elsa.Activities.RabbitMq/Services/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private async Task TriggerWorkflowsAsync(TransportMessage message, CancellationT

var config = _client.Configuration;

var bookmark = new MessageReceivedBookmark(config.ExchangeName, config.RoutingKey, config.ConnectionString, config.Headers);
var bookmark = new MessageReceivedBookmark(config.ExchangeName, config.RoutingKey, config.ConnectionString, config.Headers, config.EnableSsl, config.SslHost, config.SslProtocolsString);
var launchContext = new WorkflowsQuery(ActivityType, bookmark);

using var scope = _scopeFactory.CreateScope();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,12 @@ private async IAsyncEnumerable<RabbitMqBusConfiguration> GetConfigurationsAsync(
var routingKey = await activity.EvaluatePropertyValueAsync(x => x.RoutingKey, cancellationToken);
var exchangeName = await activity.EvaluatePropertyValueAsync(x => x.ExchangeName, cancellationToken);
var headers = await activity.EvaluatePropertyValueAsync(x => x.Headers, cancellationToken);
var enableSSL = await activity.EvaluatePropertyValueAsync(x => x.EnableSsl, cancellationToken);
var sslHost = await activity.EvaluatePropertyValueAsync(x => x.SslHost, cancellationToken);
var sslProtocols = await activity.EvaluatePropertyValueAsync(x => x.SslProtocols, cancellationToken);
var clientId = RabbitMqClientConfigurationHelper.GetTestClientId(activity.ActivityBlueprint.Id);

var config = new RabbitMqBusConfiguration(connectionString!, exchangeName!, routingKey!, headers!, clientId, autoDeleteQueue: true);
var config = new RabbitMqBusConfiguration(connectionString!, exchangeName!, routingKey!, headers!, clientId, enableSSL, sslHost, sslProtocols, autoDeleteQueue: true);

yield return config;
}
Expand Down

0 comments on commit 5948f3f

Please sign in to comment.