From a08a551fc277ec43ab14fbdcb3561b70a090691a Mon Sep 17 00:00:00 2001 From: Patrick Ritchie Date: Wed, 1 Nov 2023 03:00:45 -0400 Subject: [PATCH] Update to MQTT Client --- .../CppAgentMqttClient.cs | 84 +++ .../MTConnect.NET-Client-Example-01.csproj | 3 + .../Devices/IContainer.cs | 13 +- .../Devices/JsonDeviceContainer.cs | 52 ++ .../Formatters/JsonEntityFormatter.cs | 361 +++++++++ .../Clients/MTConnectMqtt2Client.cs | 701 ++++++++++++++++++ src/MTConnect.NET-MQTT/MTConnectMqttBroker.cs | 2 +- src/MTConnect.NET-MQTT/MTConnectMqttRelay.cs | 2 +- 8 files changed, 1212 insertions(+), 6 deletions(-) create mode 100644 applications/Clients/MTConnect.NET-Client-Example-01/CppAgentMqttClient.cs create mode 100644 src/MTConnect.NET-JSON-cppagent/Devices/JsonDeviceContainer.cs create mode 100644 src/MTConnect.NET-JSON-cppagent/Formatters/JsonEntityFormatter.cs create mode 100644 src/MTConnect.NET-MQTT/Clients/MTConnectMqtt2Client.cs diff --git a/applications/Clients/MTConnect.NET-Client-Example-01/CppAgentMqttClient.cs b/applications/Clients/MTConnect.NET-Client-Example-01/CppAgentMqttClient.cs new file mode 100644 index 00000000..f6dfd271 --- /dev/null +++ b/applications/Clients/MTConnect.NET-Client-Example-01/CppAgentMqttClient.cs @@ -0,0 +1,84 @@ +using MTConnect.Clients; +using MTConnect.Observations; + +var agentUrl = "localhost"; + +var client = new MTConnectMqtt2Client(agentUrl); +//client.Interval = 100; +//client.Heartbeat = 10000; +//client.ContentType = "application/json"; +//client.DocumentFormat = "json-cppagent"; +client.DeviceReceived += (sender, device) => +{ + Console.WriteLine($"Device Received : {device.Uuid}"); +}; +client.ObservationReceived += (sender, observation) => +{ + switch (observation.Representation) + { + case MTConnect.Devices.DataItemRepresentation.VALUE: + Console.WriteLine($"Observation Received : {observation.DataItemId} = {observation.GetValue("Result")} @ {observation.Timestamp}"); + break; + + case MTConnect.Devices.DataItemRepresentation.DATA_SET: + if (!observation.IsUnavailable) + { + var entries = DataSetObservation.GetEntries(observation.Values); + foreach (var entry in entries) + { + Console.WriteLine($"Observation Received : {observation.DataItemId} : DATA_SET : {entry.Key} = {entry.Value} @ {observation.Timestamp}"); + } + } + else + { + Console.WriteLine($"Observation Received : {observation.DataItemId} = {observation.GetValue("Result")} @ {observation.Timestamp}"); + } + break; + + case MTConnect.Devices.DataItemRepresentation.TABLE: + if (!observation.IsUnavailable) + { + var entries = TableObservation.GetEntries(observation.Values); + foreach (var entry in entries) + { + foreach (var cell in entry.Cells) + { + Console.WriteLine($"Observation Received : {observation.DataItemId} : TABLE : {entry.Key} : {cell.Key} = {cell.Value} @ {observation.Timestamp}"); + } + } + } + else + { + Console.WriteLine($"Observation Received : {observation.DataItemId} = {observation.GetValue("Result")} @ {observation.Timestamp}"); + } + break; + + case MTConnect.Devices.DataItemRepresentation.TIME_SERIES: + if (!observation.IsUnavailable) + { + var samples = TimeSeriesObservation.GetSamples(observation.Values).ToList(); + Console.WriteLine($"Observation Received : {observation.DataItemId} : TIME_SERIES : {string.Join(" ", samples)} @ {observation.Timestamp}"); + } + else + { + Console.WriteLine($"Observation Received : {observation.DataItemId} = {observation.GetValue("Result")} @ {observation.Timestamp}"); + } + break; + } +}; +client.AssetReceived += (sender, asset) => +{ + Console.WriteLine($"Asset Received : {asset.AssetId}"); +}; +client.ClientStarted += (sender, asset) => +{ + Console.WriteLine($"Client Started."); +}; +client.ClientStopped += (sender, asset) => +{ + Console.WriteLine($"Client Stopped."); +}; + +client.Start(); + +Console.ReadLine(); diff --git a/applications/Clients/MTConnect.NET-Client-Example-01/MTConnect.NET-Client-Example-01.csproj b/applications/Clients/MTConnect.NET-Client-Example-01/MTConnect.NET-Client-Example-01.csproj index a1de218c..7c21f523 100644 --- a/applications/Clients/MTConnect.NET-Client-Example-01/MTConnect.NET-Client-Example-01.csproj +++ b/applications/Clients/MTConnect.NET-Client-Example-01/MTConnect.NET-Client-Example-01.csproj @@ -11,6 +11,7 @@ + @@ -19,6 +20,7 @@ + @@ -27,6 +29,7 @@ + diff --git a/src/MTConnect.NET-Common/Devices/IContainer.cs b/src/MTConnect.NET-Common/Devices/IContainer.cs index 92054355..ac714dd2 100644 --- a/src/MTConnect.NET-Common/Devices/IContainer.cs +++ b/src/MTConnect.NET-Common/Devices/IContainer.cs @@ -24,11 +24,16 @@ public partial interface IContainer : IMTConnectEntity /// string Hash { get; } + /// + /// + /// + string Type { get; } - /// - /// The text description that describes what the Component Type represents - /// - string TypeDescription { get; } + + /// + /// The text description that describes what the Component Type represents + /// + string TypeDescription { get; } /// diff --git a/src/MTConnect.NET-JSON-cppagent/Devices/JsonDeviceContainer.cs b/src/MTConnect.NET-JSON-cppagent/Devices/JsonDeviceContainer.cs new file mode 100644 index 00000000..44670b7b --- /dev/null +++ b/src/MTConnect.NET-JSON-cppagent/Devices/JsonDeviceContainer.cs @@ -0,0 +1,52 @@ +// Copyright (c) 2023 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using System.Text.Json.Serialization; + +namespace MTConnect.Devices.Json +{ + public class JsonDeviceContainer + { + [JsonPropertyName("Agent")] + public JsonDevice Agent { get; set; } + + [JsonPropertyName("Device")] + public JsonDevice Device { get; set; } + + + public JsonDeviceContainer() { } + + public JsonDeviceContainer(IDevice device) + { + if (device != null) + { + switch (device.Type) + { + case Devices.Agent.TypeId: + Agent = new JsonDevice(device); + break; + + default: + Device = new JsonDevice(device); + break; + } + } + } + + + public IDevice ToDevice() + { + if (Agent != null) + { + return Agent.ToDevice(); + } + + if (Device != null) + { + return Device.ToDevice(); + } + + return null; + } + } +} \ No newline at end of file diff --git a/src/MTConnect.NET-JSON-cppagent/Formatters/JsonEntityFormatter.cs b/src/MTConnect.NET-JSON-cppagent/Formatters/JsonEntityFormatter.cs new file mode 100644 index 00000000..1c527a03 --- /dev/null +++ b/src/MTConnect.NET-JSON-cppagent/Formatters/JsonEntityFormatter.cs @@ -0,0 +1,361 @@ +// Copyright (c) 2023 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using MTConnect.Assets; +using MTConnect.Assets.CuttingTools; +using MTConnect.Assets.Json.CuttingTools; +using MTConnect.Assets.Files; +using MTConnect.Assets.Json.Files; +using MTConnect.Assets.Json.QIF; +using MTConnect.Assets.Json.RawMaterials; +using MTConnect.Assets.QIF; +using MTConnect.Assets.RawMaterials; +using MTConnect.Devices; +using MTConnect.Devices.Json; +using MTConnect.Observations; +using MTConnect.Streams.Json; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Text.Json; + +namespace MTConnect.Formatters +{ + public class JsonEntityFormatter : IEntityFormatter + { + public string Id => "JSON-cppagent"; + + public string ContentType => "application/json"; + + + public string Format(IDevice device, IEnumerable> options = null) + { + if (device != null) + { + var indentOuput = GetFormatterOption(options, "indentOutput"); + + return new JsonDevice(device).ToString(indentOuput); + } + + return null; + } + + public string Format(IComponent component, IEnumerable> options = null) + { + if (component != null) + { + return new JsonComponent(component).ToString(); + } + + return null; + } + + public string Format(IComposition composition, IEnumerable> options = null) + { + if (composition != null) + { + return new JsonComposition(composition).ToString(); + } + + return null; + } + + public string Format(IDataItem dataItem, IEnumerable> options = null) + { + if (dataItem != null) + { + var indentOuput = GetFormatterOption(options, "indentOutput"); + + return JsonFunctions.Convert(new JsonDataItem(dataItem), indented:indentOuput); + } + + return null; + } + + public string Format(IObservation observation, IEnumerable> options = null) + { + if (observation != null) + { + // Get Option for 'Category' output + var categoryOutput = GetFormatterOption(options, "categoryOutput"); + + // Get Option for 'InstanceId' output + var instanceIdOutput = GetFormatterOption(options, "instanceIdOutput"); + + //switch (observation.Category) + //{ + // // Sample + // case Devices.DataItemCategory.SAMPLE: + // var sampleObservation = SampleObservation.Create(observation); + // if (sampleObservation != null) + // { + // return JsonFunctions.Convert(new JsonSample(sampleObservation, categoryOutput, instanceIdOutput)); + // } + // break; + + // // Event + // case Devices.DataItemCategory.EVENT: + // var eventObservation = EventObservation.Create(observation); + // if (eventObservation != null) + // { + // return JsonFunctions.Convert(new JsonEvent(eventObservation, categoryOutput, instanceIdOutput)); + // } + // break; + + // // Condition + // case Devices.DataItemCategory.CONDITION: + // var conditionObservation = ConditionObservation.Create(observation); + // if (conditionObservation != null) + // { + // return JsonFunctions.Convert(new JsonCondition(conditionObservation, categoryOutput, instanceIdOutput)); + // } + // break; + //} + } + + return null; + } + + public string Format(IEnumerable observations, IEnumerable> options = null) + { + if (!observations.IsNullOrEmpty()) + { + // Get Option for 'Category' output + var categoryOutput = GetFormatterOption(options, "categoryOutput"); + + // Get Option for 'InstanceId' output + var instanceIdOutput = GetFormatterOption(options, "instanceIdOutput"); + + var x = new List(); + + //foreach (var observation in observations) + //{ + // switch (observation.Category) + // { + // // Sample + // case Devices.DataItemCategory.SAMPLE: + // var sampleObservation = SampleObservation.Create(observation); + // if (sampleObservation != null) + // { + // x.Add(new JsonSample(sampleObservation, categoryOutput, instanceIdOutput)); + // } + // break; + + // // Event + // case Devices.DataItemCategory.EVENT: + // var eventObservation = EventObservation.Create(observation); + // if (eventObservation != null) + // { + // x.Add(new JsonEvent(eventObservation, categoryOutput, instanceIdOutput)); + // } + // break; + + // // Condition + // case Devices.DataItemCategory.CONDITION: + // var conditionObservation = ConditionObservation.Create(observation); + // if (conditionObservation != null) + // { + // x.Add(new JsonCondition(conditionObservation, categoryOutput, instanceIdOutput)); + // } + // break; + // } + + //} + + return JsonFunctions.Convert(x); + } + + return null; + } + + public string Format(IAsset asset, IEnumerable> options = null) + { + if (asset != null) + { + switch (asset.Type) + { + case "CuttingTool": return JsonFunctions.Convert(new JsonCuttingToolAsset(asset as CuttingToolAsset)); + case "File": return JsonFunctions.Convert(new JsonFileAsset(asset as FileAsset)); + case "QIFDocumentWrapper": return JsonFunctions.Convert(new JsonQIFDocumentWrapperAsset(asset as QIFDocumentWrapperAsset)); + case "RawMaterial": return JsonFunctions.Convert(new JsonRawMaterialAsset(asset as RawMaterialAsset)); + + default: return JsonFunctions.Convert(asset); + } + } + + return null; + } + + + public FormattedEntityReadResult CreateDevice(byte[] content, IEnumerable> options = null) + { + var messages = new List(); + var warnings = new List(); + var errors = new List(); + + try + { + var jsonEntity = JsonSerializer.Deserialize(content); + var entity = jsonEntity.ToDevice(); + var success = entity != null; + + return new FormattedEntityReadResult(entity, success, messages, warnings, errors); + } + catch { } + + return new FormattedEntityReadResult(); + } + + public FormattedEntityReadResult CreateComponent(byte[] content, IEnumerable> options = null) + { + var messages = new List(); + var warnings = new List(); + var errors = new List(); + + try + { + //var jsonEntity = JsonSerializer.Deserialize(content); + //var entity = jsonEntity.ToComponent(); + //var success = entity != null; + + //return new FormattedEntityReadResult(entity, success, messages, warnings, errors); + } + catch { } + + return new FormattedEntityReadResult(); + } + + public FormattedEntityReadResult CreateComposition(byte[] content, IEnumerable> options = null) + { + var messages = new List(); + var warnings = new List(); + var errors = new List(); + + try + { + var jsonEntity = JsonSerializer.Deserialize(content); + var entity = jsonEntity.ToComposition(); + var success = entity != null; + + return new FormattedEntityReadResult(entity, success, messages, warnings, errors); + } + catch { } + + return new FormattedEntityReadResult(); + } + + public FormattedEntityReadResult CreateDataItem(byte[] content, IEnumerable> options = null) + { + var messages = new List(); + var warnings = new List(); + var errors = new List(); + + try + { + var jsonEntity = JsonSerializer.Deserialize(content); + var entity = jsonEntity.ToDataItem(); + var success = entity != null; + + return new FormattedEntityReadResult(entity, success, messages, warnings, errors); + } + catch { } + + return new FormattedEntityReadResult(); + } + + public FormattedEntityReadResult CreateAsset(string assetType, byte[] content, IEnumerable> options = null) + { + var messages = new List(); + var warnings = new List(); + var errors = new List(); + + IAsset asset = null; + + // Read Document + if (!string.IsNullOrEmpty(assetType) && content != null) + { + try + { + // Convert from UTF8 bytes + var json = Encoding.UTF8.GetString(content); + if (!string.IsNullOrEmpty(json)) + { + switch (assetType) + { + case "CuttingTool": + asset = JsonSerializer.Deserialize(json).ToCuttingToolAsset(); + break; + + case "File": + asset = JsonSerializer.Deserialize(json).ToFileAsset(); + break; + + case "QIFDocumentWrapper": + asset = JsonSerializer.Deserialize(json).ToQIFDocumentWrapperAsset(); + break; + + case "RawMaterial": + asset = JsonSerializer.Deserialize(json).ToRawMaterialAsset(); + break; + } + } + } + catch (Exception ex) + { + errors.Add(ex.Message); + } + } + + var success = asset != null; + + return new FormattedEntityReadResult(asset, success, messages, warnings, errors); + } + + + private static T GetFormatterOption(IEnumerable> options, string key) + { + if (!options.IsNullOrEmpty()) + { + var x = options.FirstOrDefault(o => o.Key == key).Value; + if (!string.IsNullOrEmpty(x)) + { + try + { + return (T)Convert.ChangeType(x, typeof(T)); + } + catch { } + } + } + + return default; + } + + private static IEnumerable GetFormatterOptions(IEnumerable> options, string key) + { + var l = new List(); + + if (!options.IsNullOrEmpty()) + { + var x = options.Where(o => o.Key == key); + if (!x.IsNullOrEmpty()) + { + foreach (var y in x) + { + if (!string.IsNullOrEmpty(y.Value)) + { + try + { + var obj = (T)Convert.ChangeType(y.Value, typeof(T)); + l.Add(obj); + } + catch { } + } + } + } + } + + return l; + } + } +} \ No newline at end of file diff --git a/src/MTConnect.NET-MQTT/Clients/MTConnectMqtt2Client.cs b/src/MTConnect.NET-MQTT/Clients/MTConnectMqtt2Client.cs new file mode 100644 index 00000000..3eb121da --- /dev/null +++ b/src/MTConnect.NET-MQTT/Clients/MTConnectMqtt2Client.cs @@ -0,0 +1,701 @@ +// Copyright (c) 2023 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using MQTTnet; +using MQTTnet.Client; +using MTConnect.Assets; +using MTConnect.Devices; +using MTConnect.Formatters; +using MTConnect.Mqtt; +using MTConnect.Observations; +using MTConnect.Streams; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Security.Cryptography.X509Certificates; +using System.Threading; +using System.Threading.Tasks; + +namespace MTConnect.Clients +{ + public class MTConnectMqtt2Client : IMTConnectEntityClient, IDisposable + { + private const string _defaultTopicPrefix = "MTConnect"; + private const string _defaultDocumentFormat = "json-cppagent"; + + private const string _defaultProbeTopicPrefix = "Probe"; + private const string _defaultCurrentTopicPrefix = "Current"; + private const string _defaultSampleTopicPrefix = "Sample"; + private const string _defaultAssetTopicPrefix = "Asset"; + + private readonly MqttFactory _mqttFactory; + private readonly IMqttClient _mqttClient; + private readonly string _server; + private readonly int _port; + private readonly int _qos; + private readonly int _interval; + private readonly string _deviceUuid; + private readonly string _topicPrefix; + private readonly string _documentFormat; + private readonly string _username; + private readonly string _password; + private readonly string _clientId; + private readonly string _caCertPath; + private readonly string _pemClientCertPath; + private readonly string _pemPrivateKeyPath; + private readonly bool _allowUntrustedCertificates; + private readonly bool _useTls; + private readonly Dictionary _agents = new Dictionary(); // AgentUuid > AgentInformation + private readonly Dictionary _deviceAgentUuids = new Dictionary(); // DeviceUuid > AgentUuid + private readonly Dictionary _agentInstanceIds = new Dictionary(); // AgentUuid > InstanceId + private readonly Dictionary _agentHeartbeatTimestamps = new Dictionary(); // AgentUuid > Last Heartbeat received (Unix milliseconds) + private readonly Dictionary _connectionTimers = new Dictionary(); + private readonly Dictionary _devices = new Dictionary(); + private readonly Dictionary _deviceLastSequence = new Dictionary(); + private readonly Dictionary _deviceLastCurrentSequence = new Dictionary(); + private readonly object _lock = new object(); + + + private CancellationTokenSource _stop; + private MTConnectMqttConnectionStatus _connectionStatus; + //private long _lastInstanceId; + //private long _lastCurrentSequence; + //private long _lastSequence; + private long _lastResponse; + + + public delegate void MTConnectMqttEventHandler(string deviceUuid, T item); + + + /// + /// Gets or Sets the Interval in Milliseconds that the Client will attempt to reconnect if the connection fails + /// + public int ReconnectionInterval { get; set; } + + public string Server => _server; + + public int Port => _port; + + public int QoS => _qos; + + public int Interval => _interval; + + public string TopicPrefix => _topicPrefix; + + ///// + ///// Gets the Last Instance ID read from the MTConnect Agent + ///// + //public long LastInstanceId => _lastInstanceId; + + ///// + ///// Gets the Last Sequence read from the MTConnect Agent + ///// + //public long LastSequence => _lastSequence; + + /// + /// Gets the Unix Timestamp (in Milliseconds) since the last response from the MTConnect Agent + /// + public long LastResponse => _lastResponse; + + public MTConnectMqttConnectionStatus ConnectionStatus => _connectionStatus; + + public event EventHandler Connected; + + public event EventHandler Disconnected; + + public event EventHandler ConnectionStatusChanged; + + public event EventHandler ConnectionError; + + /// + /// Raised when an Internal Error occurs + /// + public event EventHandler InternalError; + + public event EventHandler DeviceReceived; + + public event EventHandler ObservationReceived; + + public event EventHandler AssetReceived; + + /// + /// Raised when an MTConnectDevices Document is received + /// + public event EventHandler ProbeReceived; + + /// + /// Raised when an MTConnectSreams Document is received from a Current Request + /// + public event EventHandler CurrentReceived; + + /// + /// Raised when an MTConnectSreams Document is received from the Samples Stream + /// + public event EventHandler SampleReceived; + + /// + /// Raised when an MTConnectAssets Document is received + /// + public event EventHandler AssetsReceived; + + /// + /// Raised when any Response from the Client is received + /// + public event EventHandler ResponseReceived; + + /// + /// Raised when the Client is Starting + /// + public event EventHandler ClientStarting; + + /// + /// Raised when the Client is Started + /// + public event EventHandler ClientStarted; + + /// + /// Raised when the Client is Stopping + /// + public event EventHandler ClientStopping; + + /// + /// Raised when the Client is Stopeed + /// + public event EventHandler ClientStopped; + + + public MTConnectMqtt2Client(string server, int port = 1883, string deviceUuid = null, string topicPrefix = _defaultTopicPrefix, string documentFormat = _defaultDocumentFormat, int qos = 1) + { + ReconnectionInterval = 10000; + + _server = server; + _port = port; + _deviceUuid = deviceUuid; + _topicPrefix = topicPrefix; + _documentFormat = documentFormat; + _qos = qos; + + _mqttFactory = new MqttFactory(); + _mqttClient = _mqttFactory.CreateMqttClient(); + _mqttClient.ApplicationMessageReceivedAsync += MessageReceived; + } + + + public void Start() + { + _stop = new CancellationTokenSource(); + + ClientStarting?.Invoke(this, new EventArgs()); + + _ = Task.Run(Worker, _stop.Token); + } + + public void Stop() + { + ClientStopping?.Invoke(this, new EventArgs()); + + if (_stop != null) _stop.Cancel(); + } + + public void Dispose() + { + if (_mqttClient != null) _mqttClient.Dispose(); + } + + + private async Task Worker() + { + do + { + try + { + try + { + // Declare new MQTT Client Options with Tcp Server + var clientOptionsBuilder = new MqttClientOptionsBuilder().WithTcpServer(_server, _port); + + // Set Client ID + if (!string.IsNullOrEmpty(_clientId)) + { + clientOptionsBuilder.WithClientId(_clientId); + } + + var certificates = new List(); + + // Add CA (Certificate Authority) + if (!string.IsNullOrEmpty(_caCertPath)) + { + certificates.Add(new X509Certificate2(GetFilePath(_caCertPath))); + } + + // Add Client Certificate & Private Key + if (!string.IsNullOrEmpty(_pemClientCertPath) && !string.IsNullOrEmpty(_pemPrivateKeyPath)) + { + +#if NET5_0_OR_GREATER + certificates.Add(new X509Certificate2(X509Certificate2.CreateFromPemFile(GetFilePath(_pemClientCertPath), GetFilePath(_pemPrivateKeyPath)).Export(X509ContentType.Pfx))); +#else + throw new Exception("PEM Certificates Not Supported in .NET Framework 4.8 or older"); +#endif + + clientOptionsBuilder.WithTls(new MqttClientOptionsBuilderTlsParameters() + { + UseTls = true, + SslProtocol = System.Security.Authentication.SslProtocols.Tls12, + IgnoreCertificateRevocationErrors = _allowUntrustedCertificates, + IgnoreCertificateChainErrors = _allowUntrustedCertificates, + AllowUntrustedCertificates = _allowUntrustedCertificates, + Certificates = certificates + }); + } + + // Add Credentials + if (!string.IsNullOrEmpty(_username) && !string.IsNullOrEmpty(_password)) + { + if (_useTls) + { + clientOptionsBuilder.WithCredentials(_username, _password).WithTls(); + } + else + { + clientOptionsBuilder.WithCredentials(_username, _password); + } + } + + // Build MQTT Client Options + var clientOptions = clientOptionsBuilder.Build(); + + // Connect to the MQTT Client + _mqttClient.ConnectAsync(clientOptions).Wait(); + + if (!string.IsNullOrEmpty(_deviceUuid)) + { + // Start protocol for a single Device + StartDeviceProtocol(_deviceUuid).Wait(); + } + else + { + // Start protocol for all devices + StartAllDevicesProtocol().Wait(); + } + + ClientStarted?.Invoke(this, new EventArgs()); + + while (_mqttClient.IsConnected && !_stop.IsCancellationRequested) + { + await Task.Delay(100); + } + } + catch (Exception ex) + { + if (ConnectionError != null) ConnectionError.Invoke(this, ex); + } + + await Task.Delay(ReconnectionInterval, _stop.Token); + } + catch (TaskCanceledException) { } + catch (Exception ex) + { + InternalError?.Invoke(this, ex); + } + + } while (!_stop.Token.IsCancellationRequested); + + + try + { + // Disconnect from the MQTT Client + if (_mqttClient != null) _mqttClient.DisconnectAsync(MqttClientDisconnectReason.NormalDisconnection).Wait(); + } + catch { } + + + ClientStopped?.Invoke(this, new EventArgs()); + } + + + private async Task StartAllDevicesProtocol() + { + await _mqttClient.SubscribeAsync("MTConnect/Probe/#"); + await _mqttClient.SubscribeAsync("MTConnect/Current/#"); + await _mqttClient.SubscribeAsync("MTConnect/Sample/#"); + await _mqttClient.SubscribeAsync("MTConnect/Asset/#"); + } + + private async Task StartDeviceProtocol(string deviceUuid) + { + await _mqttClient.SubscribeAsync($"MTConnect/Probe/{deviceUuid}"); + await _mqttClient.SubscribeAsync($"MTConnect/Current/{deviceUuid}"); + await _mqttClient.SubscribeAsync($"MTConnect/Sample/{deviceUuid}"); + await _mqttClient.SubscribeAsync($"MTConnect/Asset/{deviceUuid}"); + } + + + private async Task MessageReceived(MqttApplicationMessageReceivedEventArgs args) + { + if (args.ApplicationMessage.Payload != null && args.ApplicationMessage.Payload.Length > 0) + { + var topic = args.ApplicationMessage.Topic; + + if (IsSampleTopic(topic)) + { + ProcessSampleMessage(args.ApplicationMessage); + } + else if (IsCurrentTopic(topic)) + { + ProcessCurrentMessage(args.ApplicationMessage); + } + else if (IsProbeTopic(topic)) + { + ProcessProbeMessage(args.ApplicationMessage); + } + } + } + + + private bool IsProbeTopic(string topic) + { + if (topic != null) + { + var prefix = $"{_topicPrefix}/{_defaultProbeTopicPrefix}/"; + return topic.StartsWith(prefix); + } + + return false; + } + + private bool IsCurrentTopic(string topic) + { + if (topic != null) + { + var prefix = $"{_topicPrefix}/{_defaultCurrentTopicPrefix}/"; + return topic.StartsWith(prefix); + } + + return false; + } + + private bool IsSampleTopic(string topic) + { + if (topic != null) + { + var prefix = $"{_topicPrefix}/{_defaultSampleTopicPrefix}/"; + return topic.StartsWith(prefix); + } + + return false; + } + + private bool IsAssetTopic(string topic) + { + if (topic != null) + { + var prefix = $"{_topicPrefix}/{_defaultAssetTopicPrefix}/"; + return topic.StartsWith(prefix); + } + + return false; + } + + + private void ProcessProbeMessage(MqttApplicationMessage message) + { + var result = EntityFormatter.CreateDevice(_documentFormat, message.Payload); + if (result.Success) + { + var device = result.Entity; + if (device != null && device.Uuid != null) + { + // Add to cached list + lock (_lock) + { + _devices.Remove(device.Uuid); + _devices.Add(device.Uuid, device); + } + + DeviceReceived?.Invoke(this, device); + } + } + } + + private void ProcessCurrentMessage(MqttApplicationMessage message) + { + var result = ResponseDocumentFormatter.CreateStreamsResponseDocument(_documentFormat, message.Payload); + if (result.Success) + { + ProcessCurrentDocument(result.Document); + } + } + + private void ProcessSampleMessage(MqttApplicationMessage message) + { + var result = ResponseDocumentFormatter.CreateStreamsResponseDocument(_documentFormat, message.Payload); + if (result.Success) + { + ProcessSampleDocument(result.Document); + } + } + + private void ProcessCurrentDocument(IStreamsResponseDocument document) + { + _lastResponse = UnixDateTime.Now; + ResponseReceived?.Invoke(this, new EventArgs()); + + if (document != null) + { + if (!document.Streams.IsNullOrEmpty()) + { + IDeviceStream deviceStream = null; + + // Get the DeviceStream for the Device or default to the first + if (!string.IsNullOrEmpty(_deviceUuid)) deviceStream = document.Streams.FirstOrDefault(o => o.Uuid == _deviceUuid); + else deviceStream = document.Streams.FirstOrDefault(); + + var observations = deviceStream.Observations; + if (deviceStream != null && deviceStream.Uuid != null && !observations.IsNullOrEmpty()) + { + long lastSequence; + lock (_lock) lastSequence = _deviceLastSequence.GetValueOrDefault(deviceStream.Uuid); + + // Recreate Response Document (to set DataItem property for Observations) + var response = new StreamsResponseDocument(); + response.Header = document.Header; + + var deviceStreams = new List(); + foreach (var stream in document.Streams) + { + deviceStreams.Add(ProcessDeviceStream(stream)); + } + response.Streams = deviceStreams; + + //CheckAssetChanged(deviceStream.Observations, cancel); + + CurrentReceived?.Invoke(this, response); + + observations = response.GetObservations(); + if (!observations.IsNullOrEmpty()) + { + foreach (var observation in observations) + { + if (observation.Sequence > lastSequence) + { + ObservationReceived?.Invoke(this, observation); + } + } + + var maxSequence = observations.Max(o => o.Sequence); + + // Save the most recent Sequence that was read + lock (_lock) + { + _deviceLastCurrentSequence.Remove(deviceStream.Uuid); + _deviceLastCurrentSequence.Add(deviceStream.Uuid, maxSequence); + + _deviceLastSequence.Remove(deviceStream.Uuid); + _deviceLastSequence.Add(deviceStream.Uuid, maxSequence); + } + } + } + } + } + } + + private void ProcessSampleDocument(IStreamsResponseDocument document) + { + _lastResponse = UnixDateTime.Now; + ResponseReceived?.Invoke(this, new EventArgs()); + + if (document != null) + { + // Set Agent Instance ID + //if (document.Header != null) _lastInstanceId = document.Header.InstanceId; + + if (!document.Streams.IsNullOrEmpty()) + { + IDeviceStream deviceStream = null; + + // Get the DeviceStream for the Device or default to the first + if (!string.IsNullOrEmpty(_deviceUuid)) deviceStream = document.Streams.FirstOrDefault(o => o.Uuid == _deviceUuid); + else deviceStream = document.Streams.FirstOrDefault(); + + if (deviceStream != null && deviceStream.Observations != null && deviceStream.Observations.Count() > 0) + { + long lastCurrentSequence; + long lastSequence; + lock (_lock) + { + lastCurrentSequence = _deviceLastCurrentSequence.GetValueOrDefault(deviceStream.Uuid); + lastSequence = _deviceLastSequence.GetValueOrDefault(deviceStream.Uuid); + } + + // Recreate Response Document (to set DataItem property for Observations) + var response = new StreamsResponseDocument(); + response.Header = document.Header; + + var deviceStreams = new List(); + foreach (var stream in document.Streams) + { + deviceStreams.Add(ProcessDeviceStream(stream)); + } + response.Streams = deviceStreams; + + //CheckAssetChanged(deviceStream.Observations, cancel); + + SampleReceived?.Invoke(this, response); + + var observations = response.GetObservations(); + if (!observations.IsNullOrEmpty()) + { + foreach (var observation in observations) + { + if (observation.Sequence > lastSequence && observation.Sequence > lastCurrentSequence) + { + ObservationReceived?.Invoke(this, observation); + } + } + + //// Save the most recent Sequence that was read + //_lastSequence = observations.Max(o => o.Sequence); + + var maxSequence = observations.Max(o => o.Sequence); + + // Save the most recent Sequence that was read + lock (_lock) + { + _deviceLastSequence.Remove(deviceStream.Uuid); + _deviceLastSequence.Add(deviceStream.Uuid, maxSequence); + } + } + } + } + } + } + + private IDeviceStream ProcessDeviceStream(IDeviceStream inputDeviceStream) + { + var outputDeviceStream = new DeviceStream(); + outputDeviceStream.Name = inputDeviceStream.Name; + outputDeviceStream.Uuid = inputDeviceStream.Uuid; + + var componentStreams = new List(); + if (!inputDeviceStream.ComponentStreams.IsNullOrEmpty()) + { + foreach (var componentStream in inputDeviceStream.ComponentStreams) + { + componentStreams.Add(ProcessComponentStream(outputDeviceStream.Uuid, componentStream)); + } + } + outputDeviceStream.ComponentStreams = componentStreams; + + return outputDeviceStream; + } + + private IComponentStream ProcessComponentStream(string deviceUuid, IComponentStream inputComponentStream) + { + var outputComponentStream = new ComponentStream(); + outputComponentStream.Name = inputComponentStream.Name; + outputComponentStream.NativeName = inputComponentStream.NativeName; + outputComponentStream.Uuid = inputComponentStream.Uuid; + outputComponentStream.Component = GetCachedComponent(deviceUuid, inputComponentStream.ComponentId); + + var observations = new List(); + if (!inputComponentStream.Observations.IsNullOrEmpty()) + { + foreach (var inputObservation in inputComponentStream.Observations) + { + var dataItem = GetCachedDataItem(deviceUuid, inputObservation.DataItemId); + if (dataItem != null) + { + var outputObservation = Observation.Create(dataItem); + outputObservation.DeviceUuid = deviceUuid; + outputObservation.DataItemId = inputObservation.DataItemId; + outputObservation.DataItem = GetCachedDataItem(deviceUuid, inputObservation.DataItemId); + outputObservation.CompositionId = inputObservation.CompositionId; + outputObservation.Category = inputObservation.Category; + outputObservation.Representation = inputObservation.Representation; + outputObservation.Type = inputObservation.Type; + outputObservation.SubType = inputObservation.SubType; + outputObservation.Name = inputObservation.Name; + outputObservation.Sequence = inputObservation.Sequence; + outputObservation.Timestamp = inputObservation.Timestamp; + outputObservation.AddValues(inputObservation.Values); + observations.Add(outputObservation); + } + } + } + outputComponentStream.Observations = observations; + + return outputComponentStream; + } + + + private static string GetFilePath(string path) + { + var x = path; + if (!Path.IsPathRooted(x)) + { + x = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, x); + } + + return x; + } + + #region "Cache" + + private IDevice GetCachedDevice(string deviceUuid) + { + if (!string.IsNullOrEmpty(deviceUuid)) + { + lock (_lock) + { + _devices.TryGetValue(deviceUuid, out var device); + return device; + } + } + + return null; + } + + private IComponent GetCachedComponent(string deviceUuid, string componentId) + { + if (!string.IsNullOrEmpty(deviceUuid) && !string.IsNullOrEmpty(componentId)) + { + lock (_lock) + { + _devices.TryGetValue(deviceUuid, out var device); + if (device != null && !device.Components.IsNullOrEmpty()) + { + return device.Components.FirstOrDefault(o => o.Id == componentId); + } + } + } + + return null; + } + + private IDataItem GetCachedDataItem(string deviceUuid, string dataItemId) + { + if (!string.IsNullOrEmpty(deviceUuid) && !string.IsNullOrEmpty(dataItemId)) + { + lock (_lock) + { + _devices.TryGetValue(deviceUuid, out var device); + if (device != null) + { + var dataItems = device.GetDataItems(); + if (!dataItems.IsNullOrEmpty()) + { + return dataItems.FirstOrDefault(o => o.Id == dataItemId); + } + } + } + } + + return null; + } + + #endregion + + } +} \ No newline at end of file diff --git a/src/MTConnect.NET-MQTT/MTConnectMqttBroker.cs b/src/MTConnect.NET-MQTT/MTConnectMqttBroker.cs index 6f2ca76c..0b2fc6a9 100644 --- a/src/MTConnect.NET-MQTT/MTConnectMqttBroker.cs +++ b/src/MTConnect.NET-MQTT/MTConnectMqttBroker.cs @@ -271,7 +271,7 @@ private async Task PublishObservation(IObservation observation) private async Task PublishObservation(IObservation observation, int interval) { - if (observation.Category != Devices.DataItems.DataItemCategory.CONDITION) + if (observation.Category != Devices.DataItemCategory.CONDITION) { var message = MTConnectMqttMessage.Create(observation, Format, _documentFormat, RetainMessages, interval); if (message != null && message.Payload != null) await Publish(message); diff --git a/src/MTConnect.NET-MQTT/MTConnectMqttRelay.cs b/src/MTConnect.NET-MQTT/MTConnectMqttRelay.cs index 63676eb5..2e8edeb9 100644 --- a/src/MTConnect.NET-MQTT/MTConnectMqttRelay.cs +++ b/src/MTConnect.NET-MQTT/MTConnectMqttRelay.cs @@ -357,7 +357,7 @@ private async Task PublishObservation(IObservation observation) private async Task PublishObservation(IObservation observation, int interval) { - if (observation.Category != Devices.DataItems.DataItemCategory.CONDITION) + if (observation.Category != Devices.DataItemCategory.CONDITION) { var message = MTConnectMqttMessage.Create(observation, Format, _documentFormat, RetainMessages, interval); if (message != null && message.Payload != null) await Publish(message);