From 4484b1f30b8ad6a377751f2f788a2a5803a662b1 Mon Sep 17 00:00:00 2001 From: Patrick Ritchie Date: Tue, 12 Dec 2023 01:57:28 -0500 Subject: [PATCH] Updated MQTT Adapter Agent Module --- .../Module.cs | 194 ++++++++++++------ .../ModuleConfiguration.cs | 63 +++++- .../README-Nuget.md | 52 ++++- .../README.md | 52 ++++- 4 files changed, 281 insertions(+), 80 deletions(-) diff --git a/agent/Modules/MTConnect.NET-AgentModule-MqttAdapter/Module.cs b/agent/Modules/MTConnect.NET-AgentModule-MqttAdapter/Module.cs index 29871af0..faf63e02 100644 --- a/agent/Modules/MTConnect.NET-AgentModule-MqttAdapter/Module.cs +++ b/agent/Modules/MTConnect.NET-AgentModule-MqttAdapter/Module.cs @@ -6,7 +6,9 @@ using MQTTnet.Protocol; using MQTTnet.Server; using MTConnect.Agents; +using MTConnect.Assets; using MTConnect.Configurations; +using MTConnect.Devices; using MTConnect.Formatters; using MTConnect.Input; using MTConnect.Logging; @@ -23,6 +25,9 @@ public class Module : MTConnectAgentModule { public const string ConfigurationTypeId = "mqtt-adapter"; private const string ModuleId = "MQTT Adapter"; + private const string ObservationTopic = "observations"; + private const string AssetTopic = "assets"; + private const string DeviceTopic = "device"; private readonly ModuleConfiguration _configuration; private readonly IMTConnectAgentBroker _mtconnectAgent; @@ -79,6 +84,9 @@ private async Task Worker() // Publish Only so use Clean Session = true clientOptionsBuilder.WithCleanSession(_configuration.CleanSession); + // Sets the Timeout + clientOptionsBuilder.WithTimeout(TimeSpan.FromMilliseconds(_configuration.Timeout)); + // Set Client ID if (!string.IsNullOrEmpty(_configuration.ClientId)) { @@ -139,7 +147,7 @@ private async Task Worker() Log(MTConnectLogLevel.Information, $"MQTT Adapter Connected to External Broker ({_configuration.Server}:{_configuration.Port})"); - if (!string.IsNullOrEmpty(_configuration.Topic)) + if (!string.IsNullOrEmpty(_configuration.TopicPrefix)) { // Set QoS MqttQualityOfServiceLevel qos; @@ -151,9 +159,9 @@ private async Task Worker() } // Subscribe to Topic - await _mqttClient.SubscribeAsync($"{_configuration.Topic}/#", qos); + await _mqttClient.SubscribeAsync($"{_configuration.TopicPrefix}/#", qos); - Log(MTConnectLogLevel.Information, $"MQTT Adapter Subscribed to ({_configuration.Topic} @ QoS = {qos})"); + Log(MTConnectLogLevel.Information, $"MQTT Adapter Subscribed to ({_configuration.TopicPrefix} @ QoS = {qos})"); } else { @@ -172,7 +180,7 @@ private async Task Worker() Log(MTConnectLogLevel.Information, $"MQTT Adapter Disconnected from External Broker ({_configuration.Server}:{_configuration.Port})"); - await Task.Delay(_configuration.RetryInterval, _stop.Token); + await Task.Delay(_configuration.ReconnectInterval, _stop.Token); } catch (TaskCanceledException) { } catch (Exception) { } @@ -194,32 +202,86 @@ private Task MessageReceived(MqttApplicationMessageReceivedEventArgs args) { var topic = args.ApplicationMessage.Topic; - var observations = ProcessPayload(args.ApplicationMessage.Payload); - if (!observations.IsNullOrEmpty()) + if (IsObservationTopic(topic)) { - _mtconnectAgent.AddObservations(_configuration.DeviceKey, observations); + var observations = ProcessObservationPayload(args.ApplicationMessage.Payload); + if (!observations.IsNullOrEmpty()) + { + _mtconnectAgent.AddObservations(_configuration.DeviceKey, observations); + } + } + else if (IsAssetTopic(topic)) + { + var assets = ProcessAssetPayload(args.ApplicationMessage.Payload); + if (!assets.IsNullOrEmpty()) + { + _mtconnectAgent.AddAssets(_configuration.DeviceKey, assets); + } + } + else if (IsDeviceTopic(topic)) + { + var device = ProcessDevicePayload(args.ApplicationMessage.Payload); + if (device != null && !string.IsNullOrEmpty(device.Uuid) && !string.IsNullOrEmpty(device.Name)) + { + if (!string.IsNullOrEmpty(_configuration.DeviceKey)) + { + if (_configuration.DeviceKey.ToLower() == device.Uuid.ToLower() || + _configuration.DeviceKey.ToLower() == device.Name.ToLower()) + { + _mtconnectAgent.AddDevice(device); + } + } + else + { + _mtconnectAgent.AddDevice(device); + } + } } } return Task.CompletedTask; } - private IEnumerable ProcessPayload(byte[] payload) + private bool IsObservationTopic(string topic) + { + if (topic != null) + { + var prefix = $"{_configuration.TopicPrefix}/{ObservationTopic}"; + return topic.StartsWith(prefix); + } + + return false; + } + + private bool IsAssetTopic(string topic) + { + if (topic != null) + { + var prefix = $"{_configuration.TopicPrefix}/{AssetTopic}"; + return topic.StartsWith(prefix); + } + + return false; + } + + private bool IsDeviceTopic(string topic) + { + if (topic != null) + { + var prefix = $"{_configuration.TopicPrefix}/{DeviceTopic}"; + return topic.StartsWith(prefix); + } + + return false; + } + + + private IEnumerable ProcessObservationPayload(byte[] payload) { if (!payload.IsNullOrEmpty()) { try { - //// Decompress from gzip - //byte[] uncompressedBytes; - //using (var inputStream = new MemoryStream(payload)) - //using (var outputStream = new MemoryStream()) - //using (var encodingStream = new GZipStream(inputStream, CompressionMode.Decompress, true)) - //{ - // encodingStream.CopyTo(outputStream); - // uncompressedBytes = outputStream.ToArray(); - //} - var uncompressedBytes = payload; if (!uncompressedBytes.IsNullOrEmpty()) { @@ -228,18 +290,6 @@ private IEnumerable ProcessPayload(byte[] payload) { return readResult.Content; } - - //// Convert JSON bytes to Mqtt PayloadModel - //IEnumerable payloadModel = null; - //using (var inputStream2 = new MemoryStream(uncompressedBytes)) - //{ - // payloadModel = JsonSerializer.Deserialize>(inputStream2); - //} - - //if (payloadModel != null) - //{ - // return payloadModel; - //} } } catch (Exception ex) @@ -251,41 +301,55 @@ private IEnumerable ProcessPayload(byte[] payload) return null; } - //public static byte[] CreatePayload(TrakHoundEntityCollection collection) - //{ - // if (collection != null) - // { - // try - // { - // // Convert to Mqtt Collection (serializable to Protobuf) - // var mqttCollection = new TrakHoundMqttEntityCollection(collection); - - // // Convert to Protobuf - // byte[] protobufBytes; - // using (var inputStream1 = new MemoryStream()) - // { - // Serializer.Serialize(inputStream1, mqttCollection); - // protobufBytes = inputStream1.ToArray(); - // } - - // // Compress to gzip - // byte[] compressedBytes; - // using (var inputStream2 = new MemoryStream()) - // { - // using (var zip = new GZipStream(inputStream2, CompressionMode.Compress, true)) - // { - // zip.Write(protobufBytes, 0, protobufBytes.Length); - // } - // compressedBytes = inputStream2.ToArray(); - // } - - // return compressedBytes; - // } - // catch { } - // } - - // return null; - //} + private IEnumerable ProcessAssetPayload(byte[] payload) + { + if (!payload.IsNullOrEmpty()) + { + try + { + var uncompressedBytes = payload; + if (!uncompressedBytes.IsNullOrEmpty()) + { + var readResult = InputFormatter.CreateAssets(_configuration.DocumentFormat, uncompressedBytes); + if (readResult.Success) + { + return readResult.Content; + } + } + } + catch (Exception ex) + { + + } + } + + return null; + } + + private IDevice ProcessDevicePayload(byte[] payload) + { + if (!payload.IsNullOrEmpty()) + { + try + { + var uncompressedBytes = payload; + if (!uncompressedBytes.IsNullOrEmpty()) + { + var readResult = InputFormatter.CreateDevice(_configuration.DocumentFormat, uncompressedBytes); + if (readResult.Success) + { + return readResult.Content; + } + } + } + catch (Exception ex) + { + + } + } + + return null; + } private static string GetFilePath(string path) diff --git a/agent/Modules/MTConnect.NET-AgentModule-MqttAdapter/ModuleConfiguration.cs b/agent/Modules/MTConnect.NET-AgentModule-MqttAdapter/ModuleConfiguration.cs index cf120f6b..95ef3f76 100644 --- a/agent/Modules/MTConnect.NET-AgentModule-MqttAdapter/ModuleConfiguration.cs +++ b/agent/Modules/MTConnect.NET-AgentModule-MqttAdapter/ModuleConfiguration.cs @@ -5,40 +5,92 @@ namespace MTConnect.Configurations { public class ModuleConfiguration { + /// + /// The MQTT broker hostname + /// public string Server { get; set; } + /// + /// The MQTT broker port number + /// public int Port { get; set; } - public int Interval { get; set; } + /// + /// The timeout (in milliseconds) to use for connection and read/write + /// + public int Timeout { get; set; } + /// + /// The interval (in milliseconds) to delay between disconnections + /// + public int ReconnectInterval { get; set; } + + + /// + /// Sets the Username to use for authentication + /// public string Username { get; set; } + /// + /// Sets the Password to use for authentication + /// public string Password { get; set; } + /// + /// Sets the Client ID to use for the connection + /// public string ClientId { get; set; } + /// + /// Sets the CleanSession flag (true or false) + /// public bool CleanSession { get; set; } + /// + /// Sets the Quality Of Service (QoS) to use. 0 = At Most Once, 1 = At least Once, 2 = Exactly Once + /// public int QoS { get; set; } + /// + /// The path to the Certificate Authority file + /// public string CertificateAuthority { get; set; } + /// + /// The path to the PEM Certificate (.pem) file + /// public string PemCertificate { get; set; } + /// + /// The path to the PEM Private Key file + /// public string PemPrivateKey { get; set; } + /// + /// Sets whether to validate the certificate chain (true or false) + /// public bool AllowUntrustedCertificates { get; set; } + /// + /// Sets whether to use TLS or not (true or false) + /// public bool UseTls { get; set; } - public int RetryInterval { get; set; } - - public string Topic { get; set; } + /// + /// The MQTT topic prefix to subscribe to + /// + public string TopicPrefix { get; set; } + /// + /// The UUID or Name of the Device to read data for + /// public string DeviceKey { get; set; } + /// + /// The Document Format ID to use to format the input data + /// public string DocumentFormat { get; set; } @@ -48,7 +100,8 @@ public ModuleConfiguration() Port = 1883; QoS = 1; CleanSession = true; - RetryInterval = 5000; + Timeout = 5000; + ReconnectInterval = 10000; DocumentFormat = "json"; } } diff --git a/agent/Modules/MTConnect.NET-AgentModule-MqttAdapter/README-Nuget.md b/agent/Modules/MTConnect.NET-AgentModule-MqttAdapter/README-Nuget.md index 2dc5ad31..774d7c99 100644 --- a/agent/Modules/MTConnect.NET-AgentModule-MqttAdapter/README-Nuget.md +++ b/agent/Modules/MTConnect.NET-AgentModule-MqttAdapter/README-Nuget.md @@ -8,18 +8,60 @@ This Agent Module implements an adapter to read from an MQTT broker - mqtt-adapter: server: localhost port: 1883 - topic: cnc-01/input + topicPrefix: input deviceKey: M12346 ``` -* `server` - The MQTT broker hostname. +* `server` - The MQTT broker hostname -* `port` - The MQTT broker port number. +* `port` - The MQTT broker port number -* `topic` - The MQTT topic to subscribe to +* `timeout` - The timeout (in milliseconds) to use for connection and read/write -* `deviceKey` - The UUID or Name of the Device to read data for. +* `reconnectInterval` - The interval (in milliseconds) to delay between disconnections +* `username` - Sets the Username to use for authentication + +* `password` - Sets the Password to use for authentication + +* `clientId` - Sets the Client ID to use for the connection + +* `cleanSession` - Sets the CleanSession flag (true or false) + +* `qos` - Sets the Quality Of Service (QoS) to use. 0 = At Most Once, 1 = At least Once, 2 = Exactly Once + +* `certificateAuthority` - The path to the Certificate Authority file + +* `pemCertificatePath` - The path to the PEM Certificate (.pem) file + +* `pemPrivateKey` - The path to the PEM Private Key file + +* `allowUntrustedCertificates` - Sets whether to validate the certificate chain (true or false) + +* `useTls` - Sets whether to use TLS or not (true or false) + +* `topicPrefix` - The MQTT topic prefix to subscribe to + +* `deviceKey` - The UUID or Name of the Device to read data for + +* `DocumentFormat` - The Document Format ID to use to format the input data + +## Input Topics + +### Observations +``` +[TOPIC_PREFIX]/observations +``` + +### Asset +``` +[TOPIC_PREFIX]/assets +``` + +### Device +``` +[TOPIC_PREFIX]/device +``` ## Contribution / Feedback - Please use the [Issues](https://github.com/TrakHound/MTConnect.NET/issues) tab to create issues for specific problems that you may encounter diff --git a/agent/Modules/MTConnect.NET-AgentModule-MqttAdapter/README.md b/agent/Modules/MTConnect.NET-AgentModule-MqttAdapter/README.md index 18d101db..7d98d83b 100644 --- a/agent/Modules/MTConnect.NET-AgentModule-MqttAdapter/README.md +++ b/agent/Modules/MTConnect.NET-AgentModule-MqttAdapter/README.md @@ -26,18 +26,60 @@ This Agent Module implements an adapter to read from an MQTT broker - mqtt-adapter: server: localhost port: 1883 - topic: cnc-01/input + topicPrefix: input deviceKey: M12346 ``` -* `server` - The MQTT broker hostname. +* `server` - The MQTT broker hostname -* `port` - The MQTT broker port number. +* `port` - The MQTT broker port number -* `topic` - The MQTT topic to subscribe to +* `timeout` - The timeout (in milliseconds) to use for connection and read/write -* `deviceKey` - The UUID or Name of the Device to read data for. +* `reconnectInterval` - The interval (in milliseconds) to delay between disconnections +* `username` - Sets the Username to use for authentication + +* `password` - Sets the Password to use for authentication + +* `clientId` - Sets the Client ID to use for the connection + +* `cleanSession` - Sets the CleanSession flag (true or false) + +* `qos` - Sets the Quality Of Service (QoS) to use. 0 = At Most Once, 1 = At least Once, 2 = Exactly Once + +* `certificateAuthority` - The path to the Certificate Authority file + +* `pemCertificatePath` - The path to the PEM Certificate (.pem) file + +* `pemPrivateKey` - The path to the PEM Private Key file + +* `allowUntrustedCertificates` - Sets whether to validate the certificate chain (true or false) + +* `useTls` - Sets whether to use TLS or not (true or false) + +* `topicPrefix` - The MQTT topic prefix to subscribe to + +* `deviceKey` - The UUID or Name of the Device to read data for + +* `DocumentFormat` - The Document Format ID to use to format the input data + +## Input Topics + +### Observations +``` +[TOPIC_PREFIX]/observations +``` + +### Asset +``` +[TOPIC_PREFIX]/assets +``` + +### Device +``` +[TOPIC_PREFIX]/device +``` ## Contribution / Feedback - Please use the [Issues](https://github.com/TrakHound/MTConnect.NET/issues) tab to create issues for specific problems that you may encounter