diff --git a/applications/Agents/MTConnect-Agent-MQTT-Broker/agent.config.default.yaml b/applications/Agents/MTConnect-Agent-MQTT-Broker/agent.config.default.yaml index 78d51a28..09697b25 100644 --- a/applications/Agents/MTConnect-Agent-MQTT-Broker/agent.config.default.yaml +++ b/applications/Agents/MTConnect-Agent-MQTT-Broker/agent.config.default.yaml @@ -32,6 +32,12 @@ server: localhost # The port number of the MQTT broker to publish messages to port: 1883 +# The Intervals to publish Observations at (in milliseconds) +observationIntervals: +- 0 +- 100 +- 1000 + # - Agent Configuration - diff --git a/applications/Agents/MTConnect-Agent-MQTT-Relay/agent.config.default.yaml b/applications/Agents/MTConnect-Agent-MQTT-Relay/agent.config.default.yaml index 97dbd787..f118f076 100644 --- a/applications/Agents/MTConnect-Agent-MQTT-Relay/agent.config.default.yaml +++ b/applications/Agents/MTConnect-Agent-MQTT-Relay/agent.config.default.yaml @@ -32,6 +32,12 @@ server: localhost # The port number of the MQTT broker to publish messages to port: 1883 +# The Intervals to publish Observations at (in milliseconds) +observationIntervals: +- 0 +- 100 +- 1000 + # - Agent Configuration - diff --git a/applications/Clients/MTConnect.NET-Client-MQTT-Example/Program.cs b/applications/Clients/MTConnect.NET-Client-MQTT-Example/Program.cs index 576b11c7..5ba2770b 100644 --- a/applications/Clients/MTConnect.NET-Client-MQTT-Example/Program.cs +++ b/applications/Clients/MTConnect.NET-Client-MQTT-Example/Program.cs @@ -1,4 +1,4 @@ -using MTConnect.Clients.Mqtt; +using MTConnect.Clients; using MTConnect.Configurations; using MTConnect.Observations; @@ -15,6 +15,8 @@ var configuration = new MTConnectMqttClientConfiguration(); configuration.Server = "localhost"; configuration.Port = 1883; +//configuration.Interval = 100; +configuration.DeviceUuid = "OKUMA.Lathe.123456"; var client = new MTConnectMqttClient(configuration, topics: topics); client.DeviceReceived += (s, o) => @@ -27,7 +29,7 @@ }; client.AssetReceived += (s, o) => { - Console.WriteLine("Asset Received"); + Console.WriteLine($"Asset Received : {o.AssetId} : {o.Type}"); }; await client.StartAsync(); diff --git a/docs/MQTT-Protocol.md b/docs/MQTT-Protocol.md new file mode 100644 index 00000000..416c5a5a --- /dev/null +++ b/docs/MQTT-Protocol.md @@ -0,0 +1,55 @@ +# MQTT Protocol for MTConnect + +## Overview +This is a protocol for accessing MTConnect data using MQTT that mimics the functionality of the MTConnect HTTP REST protocol. +The main difference between the HTTP and MQTT protocols is that the MQTT protocol deals with the individual MTConnect entities directly (ex. Device, Observation, Asset). + +#### All Devices +![All_Devices](../img/mtconnect-mqtt-protocol-all-01.png) + + +#### By Device +![All_Devices](../img/mtconnect-mqtt-protocol-by-device-01.png) + + +### Multiple Configurable Observation Intervals +This protocol enables multiple intervals that Observations can be received. +This interval essentially implements a window that a maximum of one Observation per DataItem will be published. +This can be used by client applications such as Dashboards. An interval of "0" will indicate that all observation changes are published. + +#### Observation Topics +The interval (in milliseconds) is specified in the brackets suffix for an Observations topic following the pattern below: + +##### Observation Interval 1000ms +``` +MTConnect/Devices/ff8fc5c5-206f-4d94-96a8-a03c70b682b8/Observations[1000]/# +``` + +##### Observation Interval 100ms +``` +MTConnect/Devices/ff8fc5c5-206f-4d94-96a8-a03c70b682b8/Observations[100]/# +``` + +##### Observation Interval 0 ("Realtime") +``` +MTConnect/Devices/ff8fc5c5-206f-4d94-96a8-a03c70b682b8/Observations/# +``` + + +### Agent Connection Heartbeat +A connection heartbeat that represents the connection between the MTConnect Agent and the MQTT Broker can be read from the topic below: +``` +MTConnect/Agents/[AGENT_UUID]/HeartbeatTimestamp +``` + +The payload is a simple 64 bit Integer representing the last timestamp (in Unix milliseconds) that the Agent sent a heartbeat. + +Using this timestamp in combination with the HeartbeatInterval property found in the **MTConnect/Agents/[AGENT_UUID]/Information** topic, +the connection status of the MTConnect Agent can be determined. +Typically waiting for 3 failed heartbeats allows for temporary connection interruptions and follows a similar pattern of other MTConnect related heartbeat protocols. + + +### Entity level Agent InstanceId support +Each MTConnect entity contains an InstanceId property which can be compared to the **InstanceId** property found in the **MTConnect/Agents/[AGENT_UUID]/Information**. +If the entity's InstanceId property differs, then it can be assumed that the Agent was restarted, reconfigured, etc. and the protocol should be restarted. +This is similar to the MTConnect HTTP protocol. \ No newline at end of file diff --git a/img/mtconnect-mqtt-protocol-all-01.png b/img/mtconnect-mqtt-protocol-all-01.png new file mode 100644 index 00000000..44ce0e5b Binary files /dev/null and b/img/mtconnect-mqtt-protocol-all-01.png differ diff --git a/img/mtconnect-mqtt-protocol-by-device-01.png b/img/mtconnect-mqtt-protocol-by-device-01.png new file mode 100644 index 00000000..fc37ac93 Binary files /dev/null and b/img/mtconnect-mqtt-protocol-by-device-01.png differ diff --git a/src/AssemblyInfo.cs b/src/AssemblyInfo.cs index 7897e8d9..e0129651 100644 --- a/src/AssemblyInfo.cs +++ b/src/AssemblyInfo.cs @@ -1,6 +1,6 @@ using System.Reflection; -[assembly: AssemblyVersion("5.3.0")] -[assembly: AssemblyFileVersion("5.3.0")] +[assembly: AssemblyVersion("5.4.0")] +[assembly: AssemblyFileVersion("5.4.0")] [assembly: AssemblyCompany("TrakHound Inc.")] [assembly: AssemblyCopyright("Copyright (c) 2023 TrakHound Inc., All Rights Reserved.")] diff --git a/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/IMqttAgentApplicationConfiguration.cs b/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/IMqttAgentApplicationConfiguration.cs index 927998a9..8b62d9fd 100644 --- a/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/IMqttAgentApplicationConfiguration.cs +++ b/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/IMqttAgentApplicationConfiguration.cs @@ -1,12 +1,36 @@ // Copyright (c) 2023 TrakHound Inc., All Rights Reserved. // TrakHound Inc. licenses this file to you under the MIT license. +using System.Collections.Generic; + namespace MTConnect.Configurations { - public interface IMqttAgentApplicationConfiguration : IAgentApplicationConfiguration, IMTConnectMqttClientConfiguration + public interface IMqttAgentApplicationConfiguration : IAgentApplicationConfiguration { + string Server { get; set; } + + int Port { get; set; } + + string Username { get; set; } + + string Password { get; set; } + + string CertificateAuthority { get; set; } + + string PemCertificate { get; set; } + + string PemPrivateKey { get; set; } + + bool UseTls { get; set; } + + bool AllowUntrustedCertificates { get; set; } + + string TopicPrefix { get; set; } + bool RetainMessages { get; set; } MTConnectMqttFormat MqttFormat { get; set; } + + IEnumerable ObservationIntervals { get; set; } } } \ No newline at end of file diff --git a/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/IMqttRelayAgentApplicationConfiguration.cs b/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/IMqttRelayAgentApplicationConfiguration.cs new file mode 100644 index 00000000..8b0b7b34 --- /dev/null +++ b/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/IMqttRelayAgentApplicationConfiguration.cs @@ -0,0 +1,14 @@ +// Copyright (c) 2023 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +namespace MTConnect.Configurations +{ + public interface IMqttRelayAgentApplicationConfiguration : IMqttAgentApplicationConfiguration + { + string ClientId { get; set; } + + int QoS { get; set; } + + int RetryInterval { get; set; } + } +} \ No newline at end of file diff --git a/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/IShdrMqttRelayAgentApplicationConfiguration.cs b/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/IShdrMqttRelayAgentApplicationConfiguration.cs new file mode 100644 index 00000000..2a1278a8 --- /dev/null +++ b/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/IShdrMqttRelayAgentApplicationConfiguration.cs @@ -0,0 +1,13 @@ +// Copyright (c) 2023 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +namespace MTConnect.Configurations +{ + /// + /// Configuration for an MTConnect SHDR > MQTT Agent + /// + public interface IShdrMqttRelayAgentApplicationConfiguration : IShdrAgentApplicationConfiguration, IMqttRelayAgentApplicationConfiguration + { + + } +} \ No newline at end of file diff --git a/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/MqttAgentApplicationConfiguration.cs b/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/MqttAgentApplicationConfiguration.cs index 68fee300..efa26bcb 100644 --- a/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/MqttAgentApplicationConfiguration.cs +++ b/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/MqttAgentApplicationConfiguration.cs @@ -1,6 +1,7 @@ // Copyright (c) 2023 TrakHound Inc., All Rights Reserved. // TrakHound Inc. licenses this file to you under the MIT license. +using System.Collections.Generic; using System.Text.Json.Serialization; using YamlDotNet.Serialization; @@ -23,9 +24,6 @@ public class MqttAgentApplicationConfiguration : AgentApplicationConfiguration, [JsonPropertyName("password")] public string Password { get; set; } - [JsonPropertyName("clientId")] - public string ClientId { get; set; } - [JsonPropertyName("certificateAuthority")] public string CertificateAuthority { get; set; } @@ -41,9 +39,6 @@ public class MqttAgentApplicationConfiguration : AgentApplicationConfiguration, [JsonPropertyName("useTls")] public bool UseTls { get; set; } - [JsonPropertyName("retryInterval")] - public int RetryInterval { get; set; } - [JsonPropertyName("retainMessages")] public bool RetainMessages { get; set; } @@ -54,14 +49,17 @@ public class MqttAgentApplicationConfiguration : AgentApplicationConfiguration, [YamlMember(Alias = "mqttTopicPrefix")] public string TopicPrefix { get; set; } + [JsonPropertyName("observationIntervals")] + public IEnumerable ObservationIntervals { get; set; } + public MqttAgentApplicationConfiguration() { Server = "localhost"; Port = 1883; - RetryInterval = 5000; RetainMessages = true; MqttFormat = MTConnectMqttFormat.Hierarchy; + ObservationIntervals = new List { 0, 1000 }; } } } \ No newline at end of file diff --git a/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/MqttRelayAgentApplicationConfiguration.cs b/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/MqttRelayAgentApplicationConfiguration.cs new file mode 100644 index 00000000..5bdaa693 --- /dev/null +++ b/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/MqttRelayAgentApplicationConfiguration.cs @@ -0,0 +1,30 @@ +// Copyright (c) 2023 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using System.Text.Json.Serialization; + +namespace MTConnect.Configurations +{ + public class MqttRelayAgentApplicationConfiguration : MqttAgentApplicationConfiguration, IMqttRelayAgentApplicationConfiguration + { + [JsonPropertyName("clientId")] + public string ClientId { get; set; } + + [JsonPropertyName("qos")] + public int QoS { get; set; } + + [JsonPropertyName("retryInterval")] + public int RetryInterval { get; set; } + + + public MqttRelayAgentApplicationConfiguration() + { + Server = "localhost"; + Port = 1883; + QoS = 1; + RetryInterval = 5000; + RetainMessages = true; + MqttFormat = MTConnectMqttFormat.Hierarchy; + } + } +} \ No newline at end of file diff --git a/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/MqttRelayAgentGatewayApplicationConfiguration.cs b/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/MqttRelayAgentGatewayApplicationConfiguration.cs new file mode 100644 index 00000000..2cd699fa --- /dev/null +++ b/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/MqttRelayAgentGatewayApplicationConfiguration.cs @@ -0,0 +1,20 @@ +// Copyright (c) 2023 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.Text.Json.Serialization; + +namespace MTConnect.Configurations +{ + /// + /// Configuration for an MTConnect Mqtt Gateway Agent Application + /// + public class MqttRelayAgentGatewayApplicationConfiguration : MqttRelayAgentApplicationConfiguration + { + /// + /// List of MTConnect Http Clients to read from + /// + [JsonPropertyName("clients")] + public List Clients { get; set; } + } +} \ No newline at end of file diff --git a/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/ShdrMqttAgentApplicationConfiguration.cs b/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/ShdrMqttAgentApplicationConfiguration.cs index dd734b8c..03498867 100644 --- a/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/ShdrMqttAgentApplicationConfiguration.cs +++ b/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/ShdrMqttAgentApplicationConfiguration.cs @@ -1,6 +1,7 @@ // Copyright (c) 2023 TrakHound Inc., All Rights Reserved. // TrakHound Inc. licenses this file to you under the MIT license. +using System.Collections.Generic; using System.Text.Json.Serialization; using YamlDotNet.Serialization; @@ -23,9 +24,6 @@ public class ShdrMqttAgentApplicationConfiguration : ShdrAgentApplicationConfigu [JsonPropertyName("password")] public string Password { get; set; } - [JsonPropertyName("clientId")] - public string ClientId { get; set; } - [JsonPropertyName("certificateAuthority")] public string CertificateAuthority { get; set; } @@ -41,9 +39,6 @@ public class ShdrMqttAgentApplicationConfiguration : ShdrAgentApplicationConfigu [JsonPropertyName("useTls")] public bool UseTls { get; set; } - [JsonPropertyName("retryInterval")] - public int RetryInterval { get; set; } - [JsonPropertyName("retainMessages")] public bool RetainMessages { get; set; } @@ -54,14 +49,17 @@ public class ShdrMqttAgentApplicationConfiguration : ShdrAgentApplicationConfigu [YamlMember(Alias = "mqttTopicPrefix")] public string TopicPrefix { get; set; } + [JsonPropertyName("observationIntervals")] + public IEnumerable ObservationIntervals { get; set; } + public ShdrMqttAgentApplicationConfiguration() { Server = "localhost"; Port = 1883; - RetryInterval = 5000; RetainMessages = true; MqttFormat = MTConnectMqttFormat.Hierarchy; + ObservationIntervals = new List { 0, 1000 }; } } } \ No newline at end of file diff --git a/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/ShdrMqttRelayAgentApplicationConfiguration.cs b/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/ShdrMqttRelayAgentApplicationConfiguration.cs new file mode 100644 index 00000000..1c1f58cb --- /dev/null +++ b/src/MTConnect.NET-Applications-Agents-MQTT/Configurations/ShdrMqttRelayAgentApplicationConfiguration.cs @@ -0,0 +1,76 @@ +// Copyright (c) 2023 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.Text.Json.Serialization; +using YamlDotNet.Serialization; + +namespace MTConnect.Configurations +{ + /// + /// Configuration for an MTConnect SHDR > MQTT Agent Application + /// + public class ShdrMqttRelayAgentApplicationConfiguration : ShdrAgentApplicationConfiguration, IShdrMqttRelayAgentApplicationConfiguration + { + [JsonPropertyName("server")] + public string Server { get; set; } + + [JsonPropertyName("port")] + public int Port { get; set; } + + [JsonPropertyName("clientId")] + public string ClientId { get; set; } + + [JsonPropertyName("qos")] + public int QoS { get; set; } + + [JsonPropertyName("username")] + public string Username { get; set; } + + [JsonPropertyName("password")] + public string Password { get; set; } + + [JsonPropertyName("certificateAuthority")] + public string CertificateAuthority { get; set; } + + [JsonPropertyName("pemCertificate")] + public string PemCertificate { get; set; } + + [JsonPropertyName("pemPrivateKey")] + public string PemPrivateKey { get; set; } + + [JsonPropertyName("allowUntrustedCertificates")] + public bool AllowUntrustedCertificates { get; set; } + + [JsonPropertyName("useTls")] + public bool UseTls { get; set; } + + [JsonPropertyName("retryInterval")] + public int RetryInterval { get; set; } + + [JsonPropertyName("retainMessages")] + public bool RetainMessages { get; set; } + + [JsonPropertyName("mqttFormat")] + public MTConnectMqttFormat MqttFormat { get; set; } + + [JsonPropertyName("mqttTopicPrefix")] + [YamlMember(Alias = "mqttTopicPrefix")] + public string TopicPrefix { get; set; } + + [JsonPropertyName("observationIntervals")] + public IEnumerable ObservationIntervals { get; set; } + + + public ShdrMqttRelayAgentApplicationConfiguration() + { + Server = "localhost"; + Port = 1883; + QoS = 1; + RetryInterval = 5000; + RetainMessages = true; + MqttFormat = MTConnectMqttFormat.Hierarchy; + ObservationIntervals = new List { 0, 1000 }; + } + } +} \ No newline at end of file diff --git a/src/MTConnect.NET-Applications-Agents-MQTT/MTConnectMqttAgentApplication.cs b/src/MTConnect.NET-Applications-Agents-MQTT/MTConnectMqttAgentApplication.cs index a3c645f0..5351ae18 100644 --- a/src/MTConnect.NET-Applications-Agents-MQTT/MTConnectMqttAgentApplication.cs +++ b/src/MTConnect.NET-Applications-Agents-MQTT/MTConnectMqttAgentApplication.cs @@ -3,7 +3,6 @@ using MTConnect.Agents; using MTConnect.Assets; -using MTConnect.Buffers; using MTConnect.Configurations; using MTConnect.Devices; using MTConnect.Devices.DataItems; @@ -271,7 +270,8 @@ public void StartAgent(IAgentApplicationConfiguration configuration, bool verbos // Create MTConnectAgentBroker - _mtconnectAgent = new MTConnectAgent(configuration, agentInformation.Uuid, agentInformation.InstanceId, agentInformation.DeviceModelChangeTime, initializeDataItems); + _mtconnectAgent = new MTConnectAgent(configuration, agentInformation.Uuid); + if (verboseLogging) { diff --git a/src/MTConnect.NET-Applications-Agents-MQTT/MTConnectMqttBrokerAgentApplication.cs b/src/MTConnect.NET-Applications-Agents-MQTT/MTConnectMqttBrokerAgentApplication.cs index 1dcd7bdc..e2884ee3 100644 --- a/src/MTConnect.NET-Applications-Agents-MQTT/MTConnectMqttBrokerAgentApplication.cs +++ b/src/MTConnect.NET-Applications-Agents-MQTT/MTConnectMqttBrokerAgentApplication.cs @@ -116,7 +116,12 @@ protected override async void OnStartAgentBeforeLoad(IEnumerable observationIntervals = new List { 0, 1000 }; + if (!_configuration.ObservationIntervals.IsNullOrEmpty()) observationIntervals = _configuration.ObservationIntervals; + + _mqttBroker = new MTConnectMqttBroker(Agent, _mqttServer, observationIntervals); _mqttBroker.Format = _configuration.MqttFormat; _mqttBroker.RetainMessages = _configuration.RetainMessages; _mqttBroker.TopicPrefix = _configuration.TopicPrefix; diff --git a/src/MTConnect.NET-Applications-Agents-MQTT/MTConnectMqttBrokerAgentGatewayApplication.cs b/src/MTConnect.NET-Applications-Agents-MQTT/MTConnectMqttBrokerAgentGatewayApplication.cs index 394e1dda..e7bea9a3 100644 --- a/src/MTConnect.NET-Applications-Agents-MQTT/MTConnectMqttBrokerAgentGatewayApplication.cs +++ b/src/MTConnect.NET-Applications-Agents-MQTT/MTConnectMqttBrokerAgentGatewayApplication.cs @@ -61,6 +61,8 @@ protected override void OnAgentConfigurationWatcherInitialize(IAgentApplicationC protected override void OnAgentConfigurationUpdated(AgentConfiguration configuration) { _configuration = configuration as MqttAgentGatewayApplicationConfiguration; + + base.OnAgentConfigurationUpdated(configuration); } diff --git a/src/MTConnect.NET-Applications-Agents-MQTT/MTConnectMqttRelayAgentApplication.cs b/src/MTConnect.NET-Applications-Agents-MQTT/MTConnectMqttRelayAgentApplication.cs index 3ca55554..79947fdc 100644 --- a/src/MTConnect.NET-Applications-Agents-MQTT/MTConnectMqttRelayAgentApplication.cs +++ b/src/MTConnect.NET-Applications-Agents-MQTT/MTConnectMqttRelayAgentApplication.cs @@ -1,7 +1,6 @@ // Copyright (c) 2023 TrakHound Inc., All Rights Reserved. // TrakHound Inc. licenses this file to you under the MIT license. -using MQTTnet.Extensions.ManagedClient; using MTConnect.Configurations; using MTConnect.Mqtt; using NLog; @@ -24,7 +23,7 @@ public class MTConnectMqttRelayAgentApplication : MTConnectMqttAgentApplication private readonly Logger _mqttLogger = LogManager.GetLogger("mqtt-logger"); private MTConnectMqttRelay _relay; - private IMqttAgentApplicationConfiguration _configuration; + private IMqttRelayAgentApplicationConfiguration _configuration; private int _port = 0; @@ -65,7 +64,7 @@ protected override void OnCommandLineArgumentsRead(string[] args) protected override IAgentApplicationConfiguration OnConfigurationFileRead(string configurationPath) { // Read the Configuration File - var configuration = AgentConfiguration.Read(configurationPath); + var configuration = AgentConfiguration.Read(configurationPath); base.OnAgentConfigurationUpdated(configuration); _configuration = configuration; return _configuration; @@ -73,12 +72,14 @@ protected override IAgentApplicationConfiguration OnConfigurationFileRead(string protected override void OnAgentConfigurationWatcherInitialize(IAgentApplicationConfiguration configuration) { - _agentConfigurationWatcher = new AgentConfigurationFileWatcher(configuration.Path, configuration.ConfigurationFileRestartInterval * 1000); + _agentConfigurationWatcher = new AgentConfigurationFileWatcher(configuration.Path, configuration.ConfigurationFileRestartInterval * 1000); } protected override void OnAgentConfigurationUpdated(AgentConfiguration configuration) { - _configuration = configuration as IMqttAgentApplicationConfiguration; + _configuration = configuration as IMqttRelayAgentApplicationConfiguration; + + base.OnAgentConfigurationUpdated(configuration); } @@ -89,6 +90,7 @@ protected override async void OnStartAgentAfterLoad(IEnumerable 0 ? _port : _configuration.Port; + clientConfiguration.QoS = _configuration.QoS; clientConfiguration.Username = _configuration.Username; clientConfiguration.Password = _configuration.Password; clientConfiguration.ClientId = _configuration.ClientId; @@ -98,7 +100,13 @@ protected override async void OnStartAgentAfterLoad(IEnumerable observationIntervals = new List { 0, 1000 }; + if (!_configuration.ObservationIntervals.IsNullOrEmpty()) observationIntervals = _configuration.ObservationIntervals; + + + _relay = new MTConnectMqttRelay(Agent, clientConfiguration, observationIntervals); _relay.Format = _configuration.MqttFormat; _relay.RetainMessages = _configuration.RetainMessages; _relay.Connected += MqttClientConnected; @@ -114,7 +122,11 @@ protected override async void OnStartAgentAfterLoad(IEnumerable(configurationPath); - base.OnAgentConfigurationUpdated(configuration); + var configuration = AgentConfiguration.Read(configurationPath); + OnAgentConfigurationUpdated(configuration); _configuration = configuration; return _configuration; } protected override void OnAgentConfigurationWatcherInitialize(IAgentApplicationConfiguration configuration) { - _agentConfigurationWatcher = new AgentConfigurationFileWatcher(configuration.Path, configuration.ConfigurationFileRestartInterval * 1000); + _agentConfigurationWatcher = new AgentConfigurationFileWatcher(configuration.Path, configuration.ConfigurationFileRestartInterval * 1000); } protected override void OnAgentConfigurationUpdated(AgentConfiguration configuration) { - _configuration = configuration as MqttAgentGatewayApplicationConfiguration; + _configuration = configuration as MqttRelayAgentGatewayApplicationConfiguration; + + base.OnAgentConfigurationUpdated(configuration); } diff --git a/src/MTConnect.NET-Applications-Agents-MQTT/MTConnectShdrMqttBrokerAgentApplication.cs b/src/MTConnect.NET-Applications-Agents-MQTT/MTConnectShdrMqttBrokerAgentApplication.cs index e28df9ef..aadc86fd 100644 --- a/src/MTConnect.NET-Applications-Agents-MQTT/MTConnectShdrMqttBrokerAgentApplication.cs +++ b/src/MTConnect.NET-Applications-Agents-MQTT/MTConnectShdrMqttBrokerAgentApplication.cs @@ -49,6 +49,8 @@ protected override void OnAgentConfigurationWatcherInitialize(IAgentApplicationC protected override void OnAgentConfigurationUpdated(AgentConfiguration configuration) { _configuration = configuration as ShdrMqttAgentApplicationConfiguration; + + base.OnAgentConfigurationUpdated(configuration); } diff --git a/src/MTConnect.NET-Applications-Agents-MQTT/MTConnectShdrMqttRelayAgentApplication.cs b/src/MTConnect.NET-Applications-Agents-MQTT/MTConnectShdrMqttRelayAgentApplication.cs index 4b4aa161..2f460ef2 100644 --- a/src/MTConnect.NET-Applications-Agents-MQTT/MTConnectShdrMqttRelayAgentApplication.cs +++ b/src/MTConnect.NET-Applications-Agents-MQTT/MTConnectShdrMqttRelayAgentApplication.cs @@ -23,7 +23,7 @@ public class MTConnectShdrMqttRelayAgentApplication : MTConnectMqttRelayAgentApp private readonly Logger _adapterShdrLogger = LogManager.GetLogger("adapter-shdr-logger"); private readonly List _adapters = new List(); - private IShdrMqttAgentApplicationConfiguration _configuration; + private IShdrMqttRelayAgentApplicationConfiguration _configuration; public MTConnectShdrMqttRelayAgentApplication() @@ -35,7 +35,7 @@ public MTConnectShdrMqttRelayAgentApplication() protected override IAgentApplicationConfiguration OnConfigurationFileRead(string configurationPath) { // Read the Configuration File - var configuration = AgentConfiguration.Read(configurationPath); + var configuration = AgentConfiguration.Read(configurationPath); base.OnAgentConfigurationUpdated(configuration); _configuration = configuration; return _configuration; @@ -43,12 +43,14 @@ protected override IAgentApplicationConfiguration OnConfigurationFileRead(string protected override void OnAgentConfigurationWatcherInitialize(IAgentApplicationConfiguration configuration) { - _agentConfigurationWatcher = new AgentConfigurationFileWatcher(configuration.Path, configuration.ConfigurationFileRestartInterval * 1000); + _agentConfigurationWatcher = new AgentConfigurationFileWatcher(configuration.Path, configuration.ConfigurationFileRestartInterval * 1000); } protected override void OnAgentConfigurationUpdated(AgentConfiguration configuration) { - _configuration = configuration as ShdrMqttAgentApplicationConfiguration; + _configuration = configuration as ShdrMqttRelayAgentApplicationConfiguration; + + base.OnAgentConfigurationUpdated(configuration); } diff --git a/src/MTConnect.NET-MQTT/Clients/MTConnectMqttClient.cs b/src/MTConnect.NET-MQTT/Clients/MTConnectMqttClient.cs index 4678cc50..ecfa246d 100644 --- a/src/MTConnect.NET-MQTT/Clients/MTConnectMqttClient.cs +++ b/src/MTConnect.NET-MQTT/Clients/MTConnectMqttClient.cs @@ -4,32 +4,41 @@ using MQTTnet; using MQTTnet.Client; using MTConnect.Assets; +using MTConnect.Assets.Json; using MTConnect.Configurations; using MTConnect.Devices; using MTConnect.Devices.DataItems; using MTConnect.Devices.Json; +using MTConnect.Formatters; +using MTConnect.Mqtt; using MTConnect.Observations; using MTConnect.Streams.Json; using System; using System.Collections.Generic; using System.IO; using System.Security.Cryptography.X509Certificates; +using System.Text; using System.Text.Json; using System.Text.RegularExpressions; using System.Threading.Tasks; -namespace MTConnect.Clients.Mqtt +namespace MTConnect.Clients { public class MTConnectMqttClient : IDisposable { private const string _defaultTopic = "MTConnect/#"; + private const string _defaultAgentsTopic = "MTConnect/Agents/#"; + private const string _defaultAgentTopicPattern = "MTConnect\\/Agents\\/([^\\/]*)\\/([^\\/]*)"; private const string _deviceUuidTopicPattern = "MTConnect\\/Devices\\/([^\\/]*)"; private const string _deviceTopicPattern = "MTConnect\\/Devices\\/([^\\/]*)\\/Device"; + private const string _deviceAgentUuidTopicPattern = "MTConnect\\/Devices\\/([^\\/]*)\\/AgentUuid"; private const string _observationsTopicPattern = "MTConnect\\/Devices\\/([^\\/]*)\\/Observations"; private const string _conditionsTopicPattern = "MTConnect\\/Devices\\/(.*)\\/Observations\\/.*\\/Conditions"; private const string _assetTopicPattern = "MTConnect\\/Devices\\/([^\\/]*)\\/Assets"; + private static readonly Regex _agentRegex = new Regex(_defaultAgentTopicPattern); private static readonly Regex _deviceUuidRegex = new Regex(_deviceUuidTopicPattern); + private static readonly Regex _deviceAgentUuidRegex = new Regex(_deviceAgentUuidTopicPattern); private static readonly Regex _deviceRegex = new Regex(_deviceTopicPattern); private static readonly Regex _observationsRegex = new Regex(_observationsTopicPattern); private static readonly Regex _conditionsRegex = new Regex(_conditionsTopicPattern); @@ -39,6 +48,9 @@ public class MTConnectMqttClient : IDisposable private readonly IMqttClient _mqttClient; private readonly string _server; private readonly int _port; + private readonly int _qos; + private readonly int _interval; + private readonly string _deviceUuid; private readonly string _username; private readonly string _password; private readonly string _clientId; @@ -48,6 +60,16 @@ public class MTConnectMqttClient : IDisposable private readonly bool _allowUntrustedCertificates; private readonly bool _useTls; private readonly IEnumerable _topics; + private readonly Dictionary _agents = new Dictionary(); // AgentUuid > AgentInformation + private readonly Dictionary _deviceAgentUuids = new Dictionary(); // DeviceUuid > AgentUuid + private readonly Dictionary _agentInstanceIds = new Dictionary(); // AgentUuid > InstanceId + private readonly Dictionary _agentHeartbeatTimestamps = new Dictionary(); // AgentUuid > Last Heartbeat received (Unix milliseconds) + private readonly Dictionary _connectionTimers = new Dictionary(); + private readonly object _lock = new object(); + + + private MTConnectMqttConnectionStatus _connectionStatus; + public delegate void MTConnectMqttEventHandler(string deviceUuid, T item); @@ -56,12 +78,20 @@ public class MTConnectMqttClient : IDisposable public int Port => _port; + public int QoS => _qos; + + public int Interval => _interval; + public IEnumerable Topics => _topics; + public MTConnectMqttConnectionStatus ConnectionStatus => _connectionStatus; + public EventHandler Connected { get; set; } public EventHandler Disconnected { get; set; } + public EventHandler ConnectionStatusChanged { get; set; } + public EventHandler ConnectionError { get; set; } public MTConnectMqttEventHandler DeviceReceived { get; set; } @@ -71,15 +101,26 @@ public class MTConnectMqttClient : IDisposable public MTConnectMqttEventHandler AssetReceived { get; set; } - public MTConnectMqttClient(string server, int port = 1883, IEnumerable topics = null) + public MTConnectMqttClient(string server, int port = 1883, int interval = 0, string deviceUuid = null, IEnumerable topics = null, int qos = 1) { _server = server; _port = port; _topics = !topics.IsNullOrEmpty() ? topics : new List { _defaultTopic }; + _qos = qos; + _interval = interval; + _deviceUuid = deviceUuid; _mqttFactory = new MqttFactory(); _mqttClient = _mqttFactory.CreateMqttClient(); - _mqttClient.ApplicationMessageReceivedAsync += MessageReceived; + + if (!string.IsNullOrEmpty(_deviceUuid)) + { + _mqttClient.ApplicationMessageReceivedAsync += DeviceMessageReceived; + } + else + { + _mqttClient.ApplicationMessageReceivedAsync += AllDevicesMessageReceived; + } } public MTConnectMqttClient(IMTConnectMqttClientConfiguration configuration, IEnumerable topics = null) @@ -87,7 +128,10 @@ public MTConnectMqttClient(IMTConnectMqttClientConfiguration configuration, IEnu if (configuration != null) { _server = configuration.Server; - _port = configuration.Port; ; + _port = configuration.Port; + _interval = configuration.Interval; + _deviceUuid = configuration.DeviceUuid; + _qos = configuration.QoS; _username = configuration.Username; _password = configuration.Password; _clientId = configuration.ClientId; @@ -102,7 +146,15 @@ public MTConnectMqttClient(IMTConnectMqttClientConfiguration configuration, IEnu _mqttFactory = new MqttFactory(); _mqttClient = _mqttFactory.CreateMqttClient(); - _mqttClient.ApplicationMessageReceivedAsync += MessageReceived; + + if (!string.IsNullOrEmpty(_deviceUuid)) + { + _mqttClient.ApplicationMessageReceivedAsync += DeviceMessageReceived; + } + else + { + _mqttClient.ApplicationMessageReceivedAsync += AllDevicesMessageReceived; + } } @@ -167,16 +219,17 @@ public async Task StartAsync() // Connect to the MQTT Client await _mqttClient.ConnectAsync(clientOptions); - // Configure Topics to subscribe to - var subscribeEptionsBuilder = _mqttFactory.CreateSubscribeOptionsBuilder(); - foreach (var topic in _topics) + + if (!string.IsNullOrEmpty(_deviceUuid)) { - subscribeEptionsBuilder.WithTopicFilter(topic); + // Start protocol for a single Device + await StartDeviceProtocol(_deviceUuid); + } + else + { + // Start protocol for all devices + await StartAllDevicesProtocol(); } - var subscribeOptions = subscribeEptionsBuilder.Build(); - - // Subscribe to Topics - await _mqttClient.SubscribeAsync(subscribeOptions); } catch (Exception ex) { @@ -200,7 +253,55 @@ public void Dispose() } - private Task MessageReceived(MqttApplicationMessageReceivedEventArgs args) + private async Task StartAllDevicesProtocol() + { + // Clear any previous subscriptions + await _mqttClient.UnsubscribeAsync("#"); + + // Subscribe to all Agents + await _mqttClient.SubscribeAsync("MTConnect/Agents/#"); + } + + private async Task StartDeviceProtocol(string deviceUuid) + { + // Clear any previous subscriptions + await _mqttClient.UnsubscribeAsync("#"); + + // Subscribe to the Agent UUID for the Device + await _mqttClient.SubscribeAsync($"MTConnect/Devices/{deviceUuid}/AgentUuid"); + } + + private async Task SubscribeToDeviceAgent(string agentUuid) + { + // Subscribe to both Agent Information and Heartbeat + await _mqttClient.SubscribeAsync($"MTConnect/Agents/{agentUuid}/#"); + } + + private async Task SubscribeToDeviceModel(string deviceUuid) + { + // Subscribe to the Device Model + await _mqttClient.SubscribeAsync($"MTConnect/Devices/{deviceUuid}/Device"); + } + + private async Task SubscribeToDevice(string deviceUuid, int interval = 0) + { + if (interval > 0) + { + // Subscribe to the Device Observations for the specified Interval + await _mqttClient.SubscribeAsync($"MTConnect/Devices/{deviceUuid}/Observations[{interval}]/#"); + } + else + { + // Subscribe to the Device "Realtime" Observations + await _mqttClient.SubscribeAsync($"MTConnect/Devices/{deviceUuid}/Observations/#"); + } + + // Subscribe to the Device Assets + await _mqttClient.SubscribeAsync($"MTConnect/Devices/{deviceUuid}/Assets/#"); + } + + + private async Task AllDevicesMessageReceived(MqttApplicationMessageReceivedEventArgs args) { if (args.ApplicationMessage.Payload != null && args.ApplicationMessage.Payload.Length > 0) { @@ -222,17 +323,57 @@ private Task MessageReceived(MqttApplicationMessageReceivedEventArgs args) { ProcessDevice(args.ApplicationMessage); } + else if (_deviceAgentUuidRegex.IsMatch(topic)) + { + await ProcessDeviceAgentUuid(args.ApplicationMessage); + } + else if (_agentRegex.IsMatch(topic)) + { + await ProcessAgent(args.ApplicationMessage, SubscribeToAllDevices); + } } + } + + private async Task DeviceMessageReceived(MqttApplicationMessageReceivedEventArgs args) + { + if (args.ApplicationMessage.Payload != null && args.ApplicationMessage.Payload.Length > 0) + { + var topic = args.ApplicationMessage.Topic; - return Task.CompletedTask; + if (_conditionsRegex.IsMatch(topic)) + { + ProcessObservations(args.ApplicationMessage); + } + else if (_observationsRegex.IsMatch(topic)) + { + ProcessObservation(args.ApplicationMessage); + } + else if (_assetRegex.IsMatch(topic)) + { + ProcessAsset(args.ApplicationMessage); + } + else if (_deviceRegex.IsMatch(topic)) + { + ProcessDevice(args.ApplicationMessage); + } + else if (_deviceAgentUuidRegex.IsMatch(topic)) + { + await ProcessDeviceAgentUuid(args.ApplicationMessage); + } + else if (_agentRegex.IsMatch(topic)) + { + await ProcessAgent(args.ApplicationMessage, SubscribeToDevice); + } + } } - private void ProcessObservation(MqttApplicationMessage message) + + private async void ProcessObservation(MqttApplicationMessage message) { try { // Read Device UUID - var deviceUuid = _deviceUuidRegex.Match(message.Topic).Groups[0].Value; + var deviceUuid = _deviceUuidRegex.Match(message.Topic).Groups[1].Value; // Deserialize JSON to Observation var jsonObservation = JsonSerializer.Deserialize(message.Payload); @@ -240,6 +381,7 @@ private void ProcessObservation(MqttApplicationMessage message) { var observation = new Observation(); observation.DeviceUuid = deviceUuid; + observation.InstanceId = jsonObservation.InstanceId; observation.DataItemId = jsonObservation.DataItemId; observation.Category = jsonObservation.Category.ConvertEnum(); observation.Name = jsonObservation.Name; @@ -256,10 +398,34 @@ private void ProcessObservation(MqttApplicationMessage message) observation.AddValue(ValueKeys.Result, jsonObservation.Result); } - - if (ObservationReceived != null) + // Get stored Agent Uuid for Device + string agentUuid; + lock (_lock) _deviceAgentUuids.TryGetValue(deviceUuid, out agentUuid); + if (!string.IsNullOrEmpty(agentUuid)) { - ObservationReceived.Invoke(deviceUuid, observation); + // Verify Agent InstanceId + long agentInstanceId; + lock (_lock) _agentInstanceIds.TryGetValue(agentUuid, out agentInstanceId); + + if (observation.InstanceId == agentInstanceId) + { + if (ObservationReceived != null) + { + ObservationReceived.Invoke(deviceUuid, observation); + } + } + else + { + // If InstanceId has changed, then restart protocol + if (!string.IsNullOrEmpty(_deviceUuid)) + { + await StartDeviceProtocol(_deviceUuid); + } + else + { + await StartAllDevicesProtocol(); + } + } } } } @@ -271,7 +437,7 @@ private void ProcessObservations(MqttApplicationMessage message) try { // Read Device UUID - var deviceUuid = _deviceUuidRegex.Match(message.Topic).Groups[0].Value; + var deviceUuid = _deviceUuidRegex.Match(message.Topic).Groups[1].Value; // Deserialize JSON to Observation var jsonObservations = JsonSerializer.Deserialize>(message.Payload); @@ -281,6 +447,7 @@ private void ProcessObservations(MqttApplicationMessage message) { var observation = new Observation(); observation.DeviceUuid = deviceUuid; + observation.InstanceId = jsonObservation.InstanceId; observation.DataItemId = jsonObservation.DataItemId; observation.Category = jsonObservation.Category.ConvertEnum(); observation.Name = jsonObservation.Name; @@ -307,12 +474,93 @@ private void ProcessObservations(MqttApplicationMessage message) catch { } } - private void ProcessDevice(MqttApplicationMessage message) + private async Task ProcessAgent(MqttApplicationMessage message, Func onConnectedFunction) + { + try + { + var match = _agentRegex.Match(message.Topic); + if (match.Success && match.Groups.Count > 2) + { + // Read Agent UUID + var agentUuid = match.Groups[1].Value; + + // Read Agent Property + var property = match.Groups[2].Value; + + if (!string.IsNullOrEmpty(agentUuid) && !string.IsNullOrEmpty(property)) + { + // Decode UTF8 bytes to string + var value = Encoding.UTF8.GetString(message.Payload); + + switch (property.ToLower()) + { + case "information": + + var agentInformation = JsonSerializer.Deserialize(value); + if (agentInformation != null) + { + lock (_lock) + { + // Update the stored Agent Information + _agents.Remove(agentUuid); + _agents.Add(agentUuid, agentInformation); + + // Update the stored Agent InstanceId + _agentInstanceIds.Remove(agentUuid); + _agentInstanceIds.Add(agentUuid, agentInformation.InstanceId); + + // Stop the existing Connection Timer + _connectionTimers.TryGetValue(agentUuid, out var connectionTimer); + if (connectionTimer != null) connectionTimer.Stop(); + + // Start new Connection Timer + _connectionTimers.Remove(agentUuid); + connectionTimer = new System.Timers.Timer(); + connectionTimer.Interval = agentInformation.HeartbeatInterval; + connectionTimer.Elapsed += (s, a) => ConnectionTimerElapsed(agentUuid); + _connectionTimers.Add(agentUuid, connectionTimer); + connectionTimer.Start(); + } + } + + break; + + + case "heartbeattimestamp": + + var previousConnectionStatus = _connectionStatus; + _connectionStatus = MTConnectMqttConnectionStatus.Connected; + + lock (_lock) + { + _agentHeartbeatTimestamps.Remove(agentUuid); + _agentHeartbeatTimestamps.Add(agentUuid, value.ToLong()); + } + + if (previousConnectionStatus == MTConnectMqttConnectionStatus.Disconnected) + { + if (ConnectionStatusChanged != null) ConnectionStatusChanged.Invoke(this, _connectionStatus); + + if (_agents.TryGetValue(agentUuid, out var agent)) + { + if (onConnectedFunction != null) await onConnectedFunction(agent); + } + } + + break; + } + } + } + } + catch { } + } + + private async void ProcessDevice(MqttApplicationMessage message) { try { // Read Device UUID - var deviceUuid = _deviceUuidRegex.Match(message.Topic).Groups[0].Value; + var deviceUuid = _deviceUuidRegex.Match(message.Topic).Groups[1].Value; // Deserialize JSON to Device var jsonDevice = JsonSerializer.Deserialize(message.Payload); @@ -325,29 +573,42 @@ private void ProcessDevice(MqttApplicationMessage message) { DeviceReceived.Invoke(deviceUuid, device); } + + await SubscribeToDevice(deviceUuid, _interval); } } } catch { } } + private async Task ProcessDeviceAgentUuid(MqttApplicationMessage message) + { + try + { + var agentUuid = Encoding.UTF8.GetString(message.Payload); + + await SubscribeToDeviceAgent(agentUuid); + } + catch { } + } + private void ProcessAsset(MqttApplicationMessage message) { try { // Read Device UUID - var deviceUuid = _deviceUuidRegex.Match(message.Topic).Groups[0].Value; + var deviceUuid = _deviceUuidRegex.Match(message.Topic).Groups[1].Value; // Deserialize JSON to Device - var jsonDevice = JsonSerializer.Deserialize(message.Payload); - if (jsonDevice != null) + var jsonAsset = JsonSerializer.Deserialize(message.Payload); + if (jsonAsset != null) { - var device = jsonDevice.ToDevice(); - if (device != null) + var response = EntityFormatter.CreateAsset(DocumentFormat.JSON, jsonAsset.Type, message.Payload); + if (response.Success) { - if (DeviceReceived != null) + if (AssetReceived != null) { - DeviceReceived.Invoke(deviceUuid, device); + AssetReceived.Invoke(deviceUuid, response.Entity); } } } @@ -356,6 +617,64 @@ private void ProcessAsset(MqttApplicationMessage message) } + private async Task SubscribeToAllDevices(MTConnectMqttAgentInformation agent) + { + // Subscribe to Devices + if (!agent.Devices.IsNullOrEmpty()) + { + foreach (var deviceUuid in agent.Devices) + { + lock (_lock) + { + _deviceAgentUuids.Remove(deviceUuid); + _deviceAgentUuids.Add(deviceUuid, agent.Uuid); + } + + await SubscribeToDeviceModel(deviceUuid); + } + } + } + + private async Task SubscribeToDevice(MTConnectMqttAgentInformation agent) + { + lock (_lock) + { + _deviceAgentUuids.Remove(_deviceUuid); + _deviceAgentUuids.Add(_deviceUuid, agent.Uuid); + } + + // Subscribe to Device + await SubscribeToDeviceModel(_deviceUuid); + } + + + private void ConnectionTimerElapsed(string agentUuid) + { + MTConnectMqttAgentInformation agentInformation; + long timestamp = 0; + var now = UnixDateTime.Now / 10000; + + lock (_lock) + { + _agents.TryGetValue(agentUuid, out agentInformation); + _agentHeartbeatTimestamps.TryGetValue(agentUuid, out timestamp); + } + + if (agentInformation != null && timestamp > 0) + { + var diff = now - timestamp; + + if (_connectionStatus == MTConnectMqttConnectionStatus.Connected && diff > agentInformation.HeartbeatInterval * 3) + { + // Set Connection Status to Disconnected + _connectionStatus = MTConnectMqttConnectionStatus.Disconnected; + + if (ConnectionStatusChanged != null) ConnectionStatusChanged.Invoke(this, _connectionStatus); + } + } + } + + private static string GetFilePath(string path) { var x = path; diff --git a/src/MTConnect.NET-MQTT/Configurations/IMTConnectMqttClientConfiguration.cs b/src/MTConnect.NET-MQTT/Configurations/IMTConnectMqttClientConfiguration.cs index 81fb4288..c059b270 100644 --- a/src/MTConnect.NET-MQTT/Configurations/IMTConnectMqttClientConfiguration.cs +++ b/src/MTConnect.NET-MQTT/Configurations/IMTConnectMqttClientConfiguration.cs @@ -9,12 +9,18 @@ public interface IMTConnectMqttClientConfiguration int Port { get; set; } + int Interval { get; set; } + + string DeviceUuid { get; set; } + string Username { get; set; } string Password { get; set; } string ClientId { get; set; } + int QoS { get; set; } + string CertificateAuthority { get; set; } string PemCertificate { get; set; } diff --git a/src/MTConnect.NET-MQTT/Configurations/MTConnectMqttClientConfiguration.cs b/src/MTConnect.NET-MQTT/Configurations/MTConnectMqttClientConfiguration.cs index 7f637bd2..82185d85 100644 --- a/src/MTConnect.NET-MQTT/Configurations/MTConnectMqttClientConfiguration.cs +++ b/src/MTConnect.NET-MQTT/Configurations/MTConnectMqttClientConfiguration.cs @@ -13,6 +13,12 @@ public class MTConnectMqttClientConfiguration : IMTConnectMqttClientConfiguratio [JsonPropertyName("port")] public int Port { get; set; } + [JsonPropertyName("interval")] + public int Interval { get; set; } + + [JsonPropertyName("deviceUuid")] + public string DeviceUuid { get; set; } + [JsonPropertyName("username")] public string Username { get; set; } @@ -22,6 +28,9 @@ public class MTConnectMqttClientConfiguration : IMTConnectMqttClientConfiguratio [JsonPropertyName("clientId")] public string ClientId { get; set; } + [JsonPropertyName("qos")] + public int QoS { get; set; } + [JsonPropertyName("certificateAuthority")] public string CertificateAuthority { get; set; } @@ -48,6 +57,7 @@ public MTConnectMqttClientConfiguration() { Server = "localhost"; Port = 1883; + QoS = 1; RetryInterval = 5000; } } diff --git a/src/MTConnect.NET-MQTT/MTConnectMqttAgentInformation.cs b/src/MTConnect.NET-MQTT/MTConnectMqttAgentInformation.cs new file mode 100644 index 00000000..f91565b3 --- /dev/null +++ b/src/MTConnect.NET-MQTT/MTConnectMqttAgentInformation.cs @@ -0,0 +1,36 @@ +// Copyright (c) 2023 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using System; +using System.Collections.Generic; +using System.Text.Json.Serialization; + +namespace MTConnect.Mqtt +{ + class MTConnectMqttAgentInformation + { + [JsonPropertyName("uuid")] + public string Uuid { get; set; } + + [JsonPropertyName("instanceId")] + public long InstanceId { get; set; } + + [JsonPropertyName("sender")] + public string Sender { get; set; } + + [JsonPropertyName("version")] + public Version Version { get; set; } + + [JsonPropertyName("deviceModelChangeTime")] + public DateTime DeviceModelChangeTime { get; set; } + + [JsonPropertyName("heartbeatInterval")] + public int HeartbeatInterval { get; set; } + + [JsonPropertyName("observationIntervals")] + public IEnumerable ObservationIntervals { get; set; } + + [JsonPropertyName("devices")] + public IEnumerable Devices { get; set; } + } +} diff --git a/src/MTConnect.NET-MQTT/MTConnectMqttBroker.cs b/src/MTConnect.NET-MQTT/MTConnectMqttBroker.cs index 86e05909..3f3fd65a 100644 --- a/src/MTConnect.NET-MQTT/MTConnectMqttBroker.cs +++ b/src/MTConnect.NET-MQTT/MTConnectMqttBroker.cs @@ -19,12 +19,22 @@ namespace MTConnect.Mqtt public class MTConnectMqttBroker : IHostedService { private const int _retryInterval = 5000; + private const string _documentFormat = "JSON"; private readonly IMTConnectAgent _mtconnectAgent; private readonly MqttServer _mqttServer; + private readonly object _lock = new object(); private CancellationTokenSource _stop; - private IEnumerable _documentFormats = new List() { "JSON" }; + private readonly List _observationIntervals = new List(); + private readonly List _observationIntervalTimers = new List(); + private readonly Dictionary> _observationBuffers = new Dictionary>(); + + private readonly int _heartbeatInterval; + private readonly System.Timers.Timer _heartbeatTimer = new System.Timers.Timer(); + + + public int HeartbeatInterval => _heartbeatInterval; public MTConnectMqttFormat Format { get; set; } @@ -43,7 +53,7 @@ public class MTConnectMqttBroker : IHostedService public EventHandler PublishError { get; set; } - public MTConnectMqttBroker(IMTConnectAgent mtconnectAgent, MqttServer mqttServer) + public MTConnectMqttBroker(IMTConnectAgent mtconnectAgent, MqttServer mqttServer, IEnumerable observationIntervals = null, int heartbeatInterval = 1000) { _mtconnectAgent = mtconnectAgent; _mtconnectAgent.DeviceAdded += DeviceAdded; @@ -62,8 +72,20 @@ public MTConnectMqttBroker(IMTConnectAgent mtconnectAgent, MqttServer mqttServer { if (ClientDisconnected != null) ClientDisconnected.Invoke(this, new EventArgs()); }; + + // Set Observation Intervals + if (!observationIntervals.IsNullOrEmpty()) + { + _observationIntervals.AddRange(observationIntervals); + } + + // Set Heartbeat Timer + _heartbeatInterval = heartbeatInterval; + _heartbeatTimer.Interval = _heartbeatInterval; + _heartbeatTimer.Elapsed += HeartbeatTimerElapsed; } + public async Task StartAsync(CancellationToken cancellationToken) { _stop = new CancellationTokenSource(); @@ -71,12 +93,39 @@ public async Task StartAsync(CancellationToken cancellationToken) if (!_mqttServer.IsStarted) { _ = Task.Run(Worker, _stop.Token); + + _heartbeatTimer.Start(); + + if (!_observationIntervals.IsNullOrEmpty()) + { + var timerIntervals = _observationIntervals.Where(o => o > 0); + if (!timerIntervals.IsNullOrEmpty()) + { + foreach (var interval in timerIntervals) + { + var timer = new System.Timers.Timer(); + timer.Interval = interval; + timer.Elapsed += ObservationIntervalTimerElapsed; + lock (_lock) _observationIntervalTimers.Add(timer); + + timer.Start(); + } + } + } } } public async Task StopAsync(CancellationToken cancellationToken) { if (_stop != null) _stop.Cancel(); + if (_heartbeatTimer != null) _heartbeatTimer.Stop(); + + if (!_observationIntervalTimers.IsNullOrEmpty()) + { + foreach (var timer in _observationIntervalTimers) timer.Dispose(); + _observationIntervalTimers.Clear(); + } + if (_mqttServer != null) await _mqttServer.StopAsync(); } @@ -110,6 +159,7 @@ private async Task Worker() var observation = Observation.Create(observationOutput.DataItem); observation.DeviceUuid = observationOutput.DeviceUuid; observation.DataItem = observationOutput.DataItem; + observation.InstanceId = observationOutput.InstanceId; observation.Timestamp = observationOutput.Timestamp; observation.AddValues(observationOutput.Values); @@ -139,7 +189,6 @@ private async Task Worker() private async void DeviceAdded(object sender, IDevice device) { await PublishDevice(device); - await PublishAgent(_mtconnectAgent); } private async void ObservationAdded(object sender, IObservation observation) @@ -150,23 +199,19 @@ private async void ObservationAdded(object sender, IObservation observation) private async void AssetAdded(object sender, IAsset asset) { await PublishAsset(asset); - await PublishAgent(_mtconnectAgent); } private async Task PublishAgent(IMTConnectAgent agent) { - foreach (var documentFormat in _documentFormats) + var messages = MTConnectMqttMessage.Create(agent, _observationIntervals, _heartbeatInterval, RetainMessages); + if (!messages.IsNullOrEmpty()) { - var messages = MTConnectMqttMessage.Create(agent, RetainMessages); - if (!messages.IsNullOrEmpty()) + foreach (var message in messages) { - foreach (var message in messages) + if (message != null && message.Payload != null) { - if (message != null && message.Payload != null) - { - await Publish(message); - } + await Publish(message); } } } @@ -174,17 +219,14 @@ private async Task PublishAgent(IMTConnectAgent agent) private async Task PublishDevice(IDevice device) { - foreach (var documentFormat in _documentFormats) + var messages = MTConnectMqttMessage.Create(device, _mtconnectAgent.Uuid, _documentFormat, RetainMessages); + if (!messages.IsNullOrEmpty()) { - var messages = MTConnectMqttMessage.Create(device, documentFormat, RetainMessages); - if (!messages.IsNullOrEmpty()) + foreach (var message in messages) { - foreach (var message in messages) + if (message != null && message.Payload != null) { - if (message != null && message.Payload != null) - { - await Publish(message); - } + await Publish(message); } } } @@ -192,49 +234,81 @@ private async Task PublishDevice(IDevice device) private async Task PublishObservation(IObservation observation) { - foreach (var documentFormat in _documentFormats) + if (!_observationIntervals.IsNullOrEmpty()) { - if (observation.Category != Devices.DataItems.DataItemCategory.CONDITION) + foreach (var interval in _observationIntervals) { - var message = MTConnectMqttMessage.Create(observation, Format, documentFormat, RetainMessages); - if (message != null && message.Payload != null) await Publish(message); - } - else - { - var observations = _mtconnectAgent.GetCurrentObservations(observation.DeviceUuid); - if (!observations.IsNullOrEmpty()) + if (interval > 0) { - var dataItemObservations = observations.Where(o => o.DataItemId == observation.DataItemId); - if (!dataItemObservations.IsNullOrEmpty()) + var bufferKey = CreateBufferKey(observation.DeviceUuid, observation.DataItemId, interval); + if (!string.IsNullOrEmpty(bufferKey)) { - var x = new List(); - foreach (var dataItemObservation in dataItemObservations) + lock (_lock) { - var y = Observation.Create(dataItemObservation.DataItem); - y.DeviceUuid = dataItemObservation.DeviceUuid; - y.DataItem = dataItemObservation.DataItem; - y.Timestamp = dataItemObservation.Timestamp; - y.AddValues(dataItemObservation.Values); - x.Add(y); + _observationBuffers.TryGetValue(interval, out var buffer); + if (buffer == null) + { + buffer = new Dictionary(); + _observationBuffers.Add(interval, buffer); + } + + buffer.Remove(bufferKey); + buffer.Add(bufferKey, observation); } - - var message = MTConnectMqttMessage.Create(x, Format, documentFormat, RetainMessages); - if (message != null && message.Payload != null) await Publish(message); } } + else + { + await PublishObservation(observation, 0); + } } } + else + { + await PublishObservation(observation, 0); + } } - private async Task PublishAsset(IAsset asset) + private async Task PublishObservation(IObservation observation, int interval) { - foreach (var documentFormat in _documentFormats) + if (observation.Category != Devices.DataItems.DataItemCategory.CONDITION) + { + var message = MTConnectMqttMessage.Create(observation, Format, _documentFormat, RetainMessages, interval); + if (message != null && message.Payload != null) await Publish(message); + } + else { - var messages = MTConnectMqttMessage.Create(asset, documentFormat, RetainMessages); - await Publish(messages); + var observations = _mtconnectAgent.GetCurrentObservations(observation.DeviceUuid); + if (!observations.IsNullOrEmpty()) + { + var dataItemObservations = observations.Where(o => o.DataItemId == observation.DataItemId); + if (!dataItemObservations.IsNullOrEmpty()) + { + var x = new List(); + foreach (var dataItemObservation in dataItemObservations) + { + var y = Observation.Create(dataItemObservation.DataItem); + y.DeviceUuid = dataItemObservation.DeviceUuid; + y.DataItem = dataItemObservation.DataItem; + y.InstanceId = dataItemObservation.InstanceId; + y.Timestamp = dataItemObservation.Timestamp; + y.AddValues(dataItemObservation.Values); + x.Add(y); + } + + var message = MTConnectMqttMessage.Create(x, Format, _documentFormat, RetainMessages, interval); + if (message != null && message.Payload != null) await Publish(message); + } + } } } + private async Task PublishAsset(IAsset asset) + { + var messages = MTConnectMqttMessage.Create(asset, _documentFormat, RetainMessages); + await Publish(messages); + } + private async Task Publish(MqttApplicationMessage message) { @@ -262,5 +336,46 @@ private async Task Publish(IEnumerable messages) } } } + + + private async void HeartbeatTimerElapsed(object sender, System.Timers.ElapsedEventArgs e) + { + await Publish(MTConnectMqttMessage.CreateHeartbeat(_mtconnectAgent, UnixDateTime.Now)); + } + + private async void ObservationIntervalTimerElapsed(object sender, System.Timers.ElapsedEventArgs e) + { + if (sender != null) + { + var timer = (System.Timers.Timer)sender; + var interval = (int)timer.Interval; + + Dictionary buffer; + lock (_lock) + { + _observationBuffers.TryGetValue(interval, out buffer); + _observationBuffers.Remove(interval); + } + + if (!buffer.IsNullOrEmpty()) + { + foreach (var observation in buffer.Values) + { + await PublishObservation(observation, interval); + } + } + } + } + + + private static string CreateBufferKey(string deviceUuid, string dataItemId, int interval) + { + if (!string.IsNullOrEmpty(deviceUuid) && !string.IsNullOrEmpty(dataItemId) && interval > 0) + { + return $"{deviceUuid}::{dataItemId}::{interval}"; + } + + return null; + } } } \ No newline at end of file diff --git a/src/MTConnect.NET-MQTT/MTConnectMqttConnectionStatus.cs b/src/MTConnect.NET-MQTT/MTConnectMqttConnectionStatus.cs new file mode 100644 index 00000000..f7489c5e --- /dev/null +++ b/src/MTConnect.NET-MQTT/MTConnectMqttConnectionStatus.cs @@ -0,0 +1,11 @@ +// Copyright (c) 2023 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +namespace MTConnect +{ + public enum MTConnectMqttConnectionStatus + { + Disconnected, + Connected + } +} diff --git a/src/MTConnect.NET-MQTT/MTConnectMqttMessage.cs b/src/MTConnect.NET-MQTT/MTConnectMqttMessage.cs index f5a22f24..24ba5608 100644 --- a/src/MTConnect.NET-MQTT/MTConnectMqttMessage.cs +++ b/src/MTConnect.NET-MQTT/MTConnectMqttMessage.cs @@ -8,6 +8,7 @@ using MTConnect.Observations; using System.Collections.Generic; using System.Linq; +using System.Text.Json; namespace MTConnect.Mqtt { @@ -28,31 +29,119 @@ private static MqttApplicationMessage CreateMessage(string topic, string payload return null; } - public static IEnumerable Create(IMTConnectAgent agent, bool retain = false) + public static IEnumerable Create(IMTConnectAgent agent, IEnumerable observationIntervals, int heartbeat, bool retain = false) { if (agent != null) { var messages = new List(); - // UUID - var topic = $"MTConnect/Agents/{agent.Uuid}/UUID"; - messages.Add(CreateMessage(topic, agent.Uuid, retain)); + try + { + var information = new MTConnectMqttAgentInformation(); + information.Uuid = agent.Uuid; + information.InstanceId = agent.InstanceId; + information.Version = agent.Version; + information.Sender = agent.Sender; + information.DeviceModelChangeTime = agent.DeviceModelChangeTime; + information.HeartbeatInterval = heartbeat; + + // Set Observation Intervals + information.ObservationIntervals = observationIntervals; + + // Set Devices (list of Device UUID's associated with the Agent) + var devices = agent.GetDevices(); + if (!devices.IsNullOrEmpty()) + { + information.Devices = devices.Select(o => o.Uuid); + } + + var topic = $"MTConnect/Agents/{agent.Uuid}/Information"; + var json = JsonSerializer.Serialize(information, new JsonSerializerOptions { WriteIndented = true }); + + messages.Add(CreateMessage(topic, json, retain)); + } + catch { } + + + + //// UUID + //var topic = $"MTConnect/Agents/{agent.Uuid}/UUID"; + //messages.Add(CreateMessage(topic, agent.Uuid, retain)); + + //// InstanceId + //topic = $"MTConnect/Agents/{agent.Uuid}/InstanceId"; + //messages.Add(CreateMessage(topic, agent.InstanceId.ToString(), retain)); + + //// Agent Application Version + //topic = $"MTConnect/Agents/{agent.Uuid}/Version"; + //messages.Add(CreateMessage(topic, agent.Version.ToString(), retain)); + + //// Sender + //topic = $"MTConnect/Agents/{agent.Uuid}/Sender"; + //messages.Add(CreateMessage(topic, agent.Sender, retain)); + + //// DeviceModelChangeTime + //topic = $"MTConnect/Agents/{agent.Uuid}/DeviceModelChangeTime"; + //messages.Add(CreateMessage(topic, agent.DeviceModelChangeTime.ToString("o"), retain)); + + //// Heartbeat Interval + //topic = $"MTConnect/Agents/{agent.Uuid}/HeartbeatInterval"; + //messages.Add(CreateMessage(topic, heartbeat.ToString(), true)); + + return messages; + } + + return null; + } + + //public static IEnumerable Create(IMTConnectAgent agent, int heartbeat, bool retain = false) + //{ + // if (agent != null) + // { + // var messages = new List(); - // InstanceId - topic = $"MTConnect/Agents/{agent.Uuid}/InstanceId"; - messages.Add(CreateMessage(topic, agent.InstanceId.ToString(), retain)); + // // UUID + // var topic = $"MTConnect/Agents/{agent.Uuid}/UUID"; + // messages.Add(CreateMessage(topic, agent.Uuid, retain)); - // Agent Application Version - topic = $"MTConnect/Agents/{agent.Uuid}/Version"; - messages.Add(CreateMessage(topic, agent.Version.ToString(), retain)); + // // InstanceId + // topic = $"MTConnect/Agents/{agent.Uuid}/InstanceId"; + // messages.Add(CreateMessage(topic, agent.InstanceId.ToString(), retain)); - // Sender - topic = $"MTConnect/Agents/{agent.Uuid}/Sender"; - messages.Add(CreateMessage(topic, agent.Sender, retain)); + // // Agent Application Version + // topic = $"MTConnect/Agents/{agent.Uuid}/Version"; + // messages.Add(CreateMessage(topic, agent.Version.ToString(), retain)); - // DeviceModelChangeTime - topic = $"MTConnect/Agents/{agent.Uuid}/DeviceModelChangeTime"; - messages.Add(CreateMessage(topic, agent.DeviceModelChangeTime.ToString("o"), retain)); + // // Sender + // topic = $"MTConnect/Agents/{agent.Uuid}/Sender"; + // messages.Add(CreateMessage(topic, agent.Sender, retain)); + + // // DeviceModelChangeTime + // topic = $"MTConnect/Agents/{agent.Uuid}/DeviceModelChangeTime"; + // messages.Add(CreateMessage(topic, agent.DeviceModelChangeTime.ToString("o"), retain)); + + // // Heartbeat Interval + // topic = $"MTConnect/Agents/{agent.Uuid}/HeartbeatInterval"; + // messages.Add(CreateMessage(topic, heartbeat.ToString(), true)); + + // return messages; + // } + + // return null; + //} + + public static IEnumerable CreateHeartbeat(IMTConnectAgent agent, long timestamp) + { + if (agent != null) + { + var messages = new List(); + + // Conver to Milliseconds (from Ticks) + var ts = timestamp / 10000; + + // Heartbeat Timestamp + var topic = $"MTConnect/Agents/{agent.Uuid}/HeartbeatTimestamp"; + messages.Add(CreateMessage(topic, ts.ToString())); return messages; } @@ -60,14 +149,30 @@ public static IEnumerable Create(IMTConnectAgent agent, return null; } - public static IEnumerable Create(IDevice device, string documentFormatterId = DocumentFormat.XML, bool retain = false) + public static IEnumerable Create(IDevice device, string agentUuid, string documentFormatterId = DocumentFormat.XML, bool retain = false) { if (device != null && !string.IsNullOrEmpty(documentFormatterId)) { var messages = new List(); - var topic = $"MTConnect/Devices/{device.Uuid}/Device"; + // Create Agent UUID Message + var topic = $"MTConnect/Devices/{device.Uuid}/AgentUuid"; + messages.Add(CreateMessage(topic, agentUuid, true)); + + //// Create DeviceIndex UUID Message + //topic = $"MTConnect/DeviceIndex/{device.Uuid}"; + //messages.Add(CreateMessage(topic, device.Uuid, true)); + + //// Create DeviceIndex ID Message + //topic = $"MTConnect/DeviceIndex/{device.Id}"; + //messages.Add(CreateMessage(topic, device.Uuid, true)); + //// Create DeviceIndex Name Message + //topic = $"MTConnect/DeviceIndex/{device.Name}"; + //messages.Add(CreateMessage(topic, device.Uuid, true)); + + // Create Device Message + topic = $"MTConnect/Devices/{device.Uuid}/Device"; var payload = Formatters.EntityFormatter.Format(documentFormatterId, device); if (!string.IsNullOrEmpty(payload)) { @@ -99,15 +204,16 @@ public static IEnumerable Create(IAsset asset, string do return null; } - public static MqttApplicationMessage Create(IObservation observation, MTConnectMqttFormat format, string documentFormatterId = DocumentFormat.XML, bool retain = false) + public static MqttApplicationMessage Create(IObservation observation, MTConnectMqttFormat format, string documentFormatterId = DocumentFormat.XML, bool retain = false, int interval = 0) { if (observation != null && !string.IsNullOrEmpty(observation.DeviceUuid) && observation.DataItem != null && observation.DataItem.Container != null && !observation.Values.IsNullOrEmpty()) { - var topic = CreateTopic(observation, format); + var topic = CreateTopic(observation, format, interval); var formatOptions = new List> { - new KeyValuePair("categoryOutput", "true") + new KeyValuePair("categoryOutput", "true"), + new KeyValuePair("instanceIdOutput", "true") }; var payload = Formatters.EntityFormatter.Format(documentFormatterId, observation, formatOptions); @@ -120,7 +226,7 @@ public static MqttApplicationMessage Create(IObservation observation, MTConnectM return null; } - public static MqttApplicationMessage Create(IEnumerable observations, MTConnectMqttFormat format, string documentFormatterId = DocumentFormat.XML, bool retain = false) + public static MqttApplicationMessage Create(IEnumerable observations, MTConnectMqttFormat format, string documentFormatterId = DocumentFormat.XML, bool retain = false, int interval = 0) { if (!observations.IsNullOrEmpty()) { @@ -128,11 +234,12 @@ public static MqttApplicationMessage Create(IEnumerable observatio var firstObservation = observations.FirstOrDefault(); if (firstObservation != null) { - var topic = CreateTopic(firstObservation, format); + var topic = CreateTopic(firstObservation, format, interval); var formatOptions = new List> { - new KeyValuePair("categoryOutput", "true") + new KeyValuePair("categoryOutput", "true"), + new KeyValuePair("instanceIdOutput", "true") }; var payload = Formatters.EntityFormatter.Format(documentFormatterId, observations, formatOptions); @@ -147,7 +254,7 @@ public static MqttApplicationMessage Create(IEnumerable observatio } - private static string CreateTopic(IObservation observation, MTConnectMqttFormat format) + private static string CreateTopic(IObservation observation, MTConnectMqttFormat format, int interval = 0) { if (observation != null) { @@ -161,7 +268,16 @@ private static string CreateTopic(IObservation observation, MTConnectMqttFormat prefixes.Add("MTConnect"); prefixes.Add("Devices"); prefixes.Add(observation.DeviceUuid); - prefixes.Add("Observations"); + + if (interval > 0) + { + prefixes.Add($"Observations[{interval}]"); + } + else + { + prefixes.Add("Observations"); + } + var paths = new List(); diff --git a/src/MTConnect.NET-MQTT/MTConnectMqttRelay.cs b/src/MTConnect.NET-MQTT/MTConnectMqttRelay.cs index fccbd0fa..21944a38 100644 --- a/src/MTConnect.NET-MQTT/MTConnectMqttRelay.cs +++ b/src/MTConnect.NET-MQTT/MTConnectMqttRelay.cs @@ -20,23 +20,37 @@ namespace MTConnect.Mqtt { public class MTConnectMqttRelay : IDisposable { + private const string _documentFormat = "JSON"; + + private readonly IMTConnectAgent _mtconnectAgent; - private readonly MTConnectMqttClientConfiguration _configuration; + private readonly IMTConnectMqttClientConfiguration _configuration; private readonly MqttFactory _mqttFactory; private readonly IMqttClient _mqttClient; - private readonly IEnumerable _documentFormats = new List() { "JSON" }; + private readonly object _lock = new object(); private CancellationTokenSource _stop; + private readonly List _observationIntervals = new List(); + private readonly List _observationIntervalTimers = new List(); + private readonly Dictionary> _observationBuffers = new Dictionary>(); + + private readonly int _heartbeatInterval; + private readonly System.Timers.Timer _heartbeatTimer = new System.Timers.Timer(); + public string Server => _configuration.Server; public int Port => _configuration.Port; + public int QoS => _configuration.QoS; + /// /// Gets or Sets the Interval in Milliseconds that the Client will attempt to reconnect if the connection fails /// public int RetryInterval => _configuration.RetryInterval; + public int HeartbeatInterval => _heartbeatInterval; + public MTConnectMqttFormat Format { get; set; } public string TopicPrefix { get; set; } @@ -56,34 +70,74 @@ public class MTConnectMqttRelay : IDisposable public EventHandler PublishError { get; set; } - public MTConnectMqttRelay(IMTConnectAgent mtconnectAgent, MTConnectMqttClientConfiguration configuration) + public MTConnectMqttRelay(IMTConnectAgent mtconnectAgent, IMTConnectMqttClientConfiguration configuration, IEnumerable observationIntervals = null, int heartbeatInterval = 1000) { _mtconnectAgent = mtconnectAgent; _mtconnectAgent.DeviceAdded += DeviceAdded; _mtconnectAgent.ObservationAdded += ObservationAdded; _mtconnectAgent.AssetAdded += AssetAdded; + // Set Configuration (MQTT client to external broker) _configuration = configuration; if (_configuration == null) _configuration = new MTConnectMqttClientConfiguration(); + // Set Observation Intervals + if (!observationIntervals.IsNullOrEmpty()) + { + _observationIntervals.AddRange(observationIntervals); + } + Format = MTConnectMqttFormat.Hierarchy; RetainMessages = true; _mqttFactory = new MqttFactory(); _mqttClient = _mqttFactory.CreateMqttClient(); + + // Set Heartbeat Timer + _heartbeatInterval = heartbeatInterval; + _heartbeatTimer.Interval = _heartbeatInterval; + _heartbeatTimer.Elapsed += HeartbeatTimerElapsed; } + public void Start() { _stop = new CancellationTokenSource(); _ = Task.Run(Worker, _stop.Token); + + _heartbeatTimer.Start(); + + if (!_observationIntervals.IsNullOrEmpty()) + { + var timerIntervals = _observationIntervals.Where(o => o > 0); + if (!timerIntervals.IsNullOrEmpty()) + { + foreach (var interval in timerIntervals) + { + var timer = new System.Timers.Timer(); + timer.Interval = interval; + timer.Elapsed += ObservationIntervalTimerElapsed; + lock (_lock) _observationIntervalTimers.Add(timer); + + timer.Start(); + } + } + } } public void Stop() { if (_stop != null) _stop.Cancel(); + if (_heartbeatTimer != null) _heartbeatTimer.Stop(); + + if (!_observationIntervalTimers.IsNullOrEmpty()) + { + foreach (var timer in _observationIntervalTimers) timer.Dispose(); + _observationIntervalTimers.Clear(); + } + try { if (_mqttClient != null) @@ -184,6 +238,7 @@ private async Task Worker() var observation = Observation.Create(observationOutput.DataItem); observation.DeviceUuid = observationOutput.DeviceUuid; observation.DataItem = observationOutput.DataItem; + observation.InstanceId = observationOutput.InstanceId; observation.Timestamp = observationOutput.Timestamp; observation.AddValues(observationOutput.Values); @@ -220,7 +275,6 @@ public void Dispose() private async void DeviceAdded(object sender, IDevice device) { await PublishDevice(device); - await PublishAgent(_mtconnectAgent); } private async void ObservationAdded(object sender, IObservation observation) @@ -231,23 +285,19 @@ private async void ObservationAdded(object sender, IObservation observation) private async void AssetAdded(object sender, IAsset asset) { await PublishAsset(asset); - await PublishAgent(_mtconnectAgent); } private async Task PublishAgent(IMTConnectAgent agent) { - foreach (var documentFormat in _documentFormats) + var messages = MTConnectMqttMessage.Create(agent, _observationIntervals, _heartbeatInterval, RetainMessages); + if (!messages.IsNullOrEmpty()) { - var messages = MTConnectMqttMessage.Create(agent, RetainMessages); - if (!messages.IsNullOrEmpty()) + foreach (var message in messages) { - foreach (var message in messages) + if (message != null && message.Payload != null) { - if (message != null && message.Payload != null) - { - await Publish(message); - } + await Publish(message); } } } @@ -255,17 +305,14 @@ private async Task PublishAgent(IMTConnectAgent agent) private async Task PublishDevice(IDevice device) { - foreach (var documentFormat in _documentFormats) + var messages = MTConnectMqttMessage.Create(device, _mtconnectAgent.Uuid, _documentFormat, RetainMessages); + if (!messages.IsNullOrEmpty()) { - var messages = MTConnectMqttMessage.Create(device, documentFormat, RetainMessages); - if (!messages.IsNullOrEmpty()) + foreach (var message in messages) { - foreach (var message in messages) + if (message != null && message.Payload != null) { - if (message != null && message.Payload != null) - { - await Publish(message); - } + await Publish(message); } } } @@ -273,49 +320,81 @@ private async Task PublishDevice(IDevice device) private async Task PublishObservation(IObservation observation) { - foreach (var documentFormat in _documentFormats) + if (!_observationIntervals.IsNullOrEmpty()) { - if (observation.Category != Devices.DataItems.DataItemCategory.CONDITION) + foreach (var interval in _observationIntervals) { - var message = MTConnectMqttMessage.Create(observation, Format, documentFormat, RetainMessages); - if (message != null && message.Payload != null) await Publish(message); - } - else - { - var observations = _mtconnectAgent.GetCurrentObservations(observation.DeviceUuid); - if (!observations.IsNullOrEmpty()) + if (interval > 0) { - var dataItemObservations = observations.Where(o => o.DataItemId == observation.DataItemId); - if (!dataItemObservations.IsNullOrEmpty()) + var bufferKey = CreateBufferKey(observation.DeviceUuid, observation.DataItemId, interval); + if (!string.IsNullOrEmpty(bufferKey)) { - var x = new List(); - foreach (var dataItemObservation in dataItemObservations) + lock (_lock) { - var y = Observation.Create(dataItemObservation.DataItem); - y.DeviceUuid = dataItemObservation.DeviceUuid; - y.DataItem = dataItemObservation.DataItem; - y.Timestamp = dataItemObservation.Timestamp; - y.AddValues(dataItemObservation.Values); - x.Add(y); + _observationBuffers.TryGetValue(interval, out var buffer); + if (buffer == null) + { + buffer = new Dictionary(); + _observationBuffers.Add(interval, buffer); + } + + buffer.Remove(bufferKey); + buffer.Add(bufferKey, observation); } - - var message = MTConnectMqttMessage.Create(x, Format, documentFormat, RetainMessages); - if (message != null && message.Payload != null) await Publish(message); } } + else + { + await PublishObservation(observation, 0); + } } } + else + { + await PublishObservation(observation, 0); + } } - private async Task PublishAsset(IAsset asset) + private async Task PublishObservation(IObservation observation, int interval) { - foreach (var documentFormat in _documentFormats) + if (observation.Category != Devices.DataItems.DataItemCategory.CONDITION) { - var messages = MTConnectMqttMessage.Create(asset, documentFormat, RetainMessages); - await Publish(messages); + var message = MTConnectMqttMessage.Create(observation, Format, _documentFormat, RetainMessages, interval); + if (message != null && message.Payload != null) await Publish(message); + } + else + { + var observations = _mtconnectAgent.GetCurrentObservations(observation.DeviceUuid); + if (!observations.IsNullOrEmpty()) + { + var dataItemObservations = observations.Where(o => o.DataItemId == observation.DataItemId); + if (!dataItemObservations.IsNullOrEmpty()) + { + var x = new List(); + foreach (var dataItemObservation in dataItemObservations) + { + var y = Observation.Create(dataItemObservation.DataItem); + y.DeviceUuid = dataItemObservation.DeviceUuid; + y.DataItem = dataItemObservation.DataItem; + y.InstanceId = dataItemObservation.InstanceId; + y.Timestamp = dataItemObservation.Timestamp; + y.AddValues(dataItemObservation.Values); + x.Add(y); + } + + var message = MTConnectMqttMessage.Create(x, Format, _documentFormat, RetainMessages, interval); + if (message != null && message.Payload != null) await Publish(message); + } + } } } + private async Task PublishAsset(IAsset asset) + { + var messages = MTConnectMqttMessage.Create(asset, _documentFormat, RetainMessages); + await Publish(messages); + } + private async Task Publish(MqttApplicationMessage message) { @@ -326,6 +405,9 @@ private async Task Publish(MqttApplicationMessage message) // Set the Topic Prefix if (!string.IsNullOrEmpty(TopicPrefix)) message.Topic = $"{TopicPrefix}/{message.Topic}"; + // Set QoS for Message + message.QualityOfServiceLevel = (MQTTnet.Protocol.MqttQualityOfServiceLevel)QoS; + await _mqttClient.PublishAsync(message); } } @@ -346,6 +428,9 @@ private async Task Publish(IEnumerable messages) // Set the Topic Prefix if (!string.IsNullOrEmpty(TopicPrefix)) message.Topic = $"{TopicPrefix}/{message.Topic}"; + // Set QoS for Message + message.QualityOfServiceLevel = (MQTTnet.Protocol.MqttQualityOfServiceLevel)QoS; + await _mqttClient.PublishAsync(message); } } @@ -357,6 +442,47 @@ private async Task Publish(IEnumerable messages) } + private async void HeartbeatTimerElapsed(object sender, System.Timers.ElapsedEventArgs e) + { + await Publish(MTConnectMqttMessage.CreateHeartbeat(_mtconnectAgent, UnixDateTime.Now)); + } + + private async void ObservationIntervalTimerElapsed(object sender, System.Timers.ElapsedEventArgs e) + { + if (sender != null) + { + var timer = (System.Timers.Timer)sender; + var interval = (int)timer.Interval; + + Dictionary buffer; + lock (_lock) + { + _observationBuffers.TryGetValue(interval, out buffer); + _observationBuffers.Remove(interval); + } + + if (!buffer.IsNullOrEmpty()) + { + foreach (var observation in buffer.Values) + { + await PublishObservation(observation, interval); + } + } + } + } + + + + private static string CreateBufferKey(string deviceUuid, string dataItemId, int interval) + { + if (!string.IsNullOrEmpty(deviceUuid) && !string.IsNullOrEmpty(dataItemId) && interval > 0) + { + return $"{deviceUuid}::{dataItemId}::{interval}"; + } + + return null; + } + private static string GetFilePath(string path) { var x = path; diff --git a/src/MTConnect.NET.sln b/src/MTConnect.NET.sln index 52c79dd5..d6e086aa 100644 --- a/src/MTConnect.NET.sln +++ b/src/MTConnect.NET.sln @@ -88,10 +88,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MTConnect.NET-Applications- EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Docs", "Docs", "{0957974A-CA4D-45DD-A0CB-2C4A9CD03350}" ProjectSection(SolutionItems) = preProject + ..\docs\MQTT-AWS-Greengrass-Moquette.md = ..\docs\MQTT-AWS-Greengrass-Moquette.md ..\docs\MQTT-AWS-Greengrass-Mqtt-Bridge.md = ..\docs\MQTT-AWS-Greengrass-Mqtt-Bridge.md ..\docs\MQTT-AWS-IoT.md = ..\docs\MQTT-AWS-IoT.md - ..\docs\MQTT-AWS-Greengrass-Moquette.md = ..\docs\MQTT-AWS-Greengrass-Moquette.md ..\docs\MQTT-HiveMQ.md = ..\docs\MQTT-HiveMQ.md + ..\docs\MQTT-Protocol.md = ..\docs\MQTT-Protocol.md ..\README.md = ..\README.md EndProjectSection EndProject